public void runFetcher()

in flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [417:613]


	public void runFetcher() throws Exception {

		// check that we are running before proceeding
		if (!running) {
			return;
		}

		this.mainThread = Thread.currentThread();

		// ------------------------------------------------------------------------
		//  Procedures before starting the infinite while loop:
		// ------------------------------------------------------------------------

		//  1. check that there is at least one shard in the subscribed streams to consume from (can be done by
		//     checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null)
		boolean hasShards = false;
		StringBuilder streamsWithNoShardsFound = new StringBuilder();
		for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
			if (streamToLastDiscoveredShardEntry.getValue() != null) {
				hasShards = true;
			} else {
				streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", ");
			}
		}

		if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
			LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}",
				indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString());
		}

		if (!hasShards) {
			throw new RuntimeException("No shards can be found for all subscribed streams: " + streams);
		}

		//  2. start consuming any shard state we already have in the subscribedShardState up to this point; the
		//     subscribedShardState may already be seeded with values due to step 1., or explicitly added by the
		//     consumer using a restored state checkpoint
		for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) {
			KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex);

			// only start a consuming thread if the seeded subscribed shard has not been completely read already
			if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
						indexOfThisConsumerSubtask, seededShardState.getStreamShardHandle().toString(),
						seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
					}

				StreamShardHandle streamShardHandle = subscribedShardsState.get(seededStateIndex)
					.getStreamShardHandle();
				KinesisDeserializationSchema<T> shardDeserializationSchema = getClonedDeserializationSchema();
				shardDeserializationSchema.open(() -> consumerMetricGroup
					.addGroup("subtaskId", String.valueOf(indexOfThisConsumerSubtask))
					.addGroup("shardId", streamShardHandle.getShard().getShardId())
					.addGroup("user"));
				shardConsumersExecutor.submit(
					createShardConsumer(
						seededStateIndex,
						streamShardHandle,
						subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum(),
						registerShardMetrics(consumerMetricGroup, subscribedShardsState.get(seededStateIndex)),
						shardDeserializationSchema));
			}
		}

        // start periodic watermark emitter, if a watermark assigner was configured
		if (periodicWatermarkAssigner != null) {
			long periodicWatermarkIntervalMillis = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
			if (periodicWatermarkIntervalMillis > 0) {
				ProcessingTimeService timerService = ((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
				LOG.info("Starting periodic watermark emitter with interval {}", periodicWatermarkIntervalMillis);
				new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis).start();
				if (watermarkTracker != null) {
					// setup global watermark tracking
					long watermarkSyncMillis = Long.parseLong(
						getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
							Long.toString(ConsumerConfigConstants.DEFAULT_WATERMARK_SYNC_MILLIS)));
					watermarkTracker.setUpdateTimeoutMillis(watermarkSyncMillis * 3); // synchronization latency
					watermarkTracker.open(runtimeContext);
					new WatermarkSyncCallback(timerService, watermarkSyncMillis).start();
					// emit records ahead of watermark to offset synchronization latency
					long lookaheadMillis = Long.parseLong(
						getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS,
							Long.toString(0)));
					recordEmitter.setMaxLookaheadMillis(Math.max(lookaheadMillis, watermarkSyncMillis * 3));

					// record emitter depends on periodic watermark
					// it runs in a separate thread since main thread is used for discovery
					Runnable recordEmitterRunnable = new Runnable() {
						@Override
						public void run() {
							try {
								recordEmitter.run();
							} catch (Throwable error) {
								// report the error that terminated the emitter loop to source thread
								stopWithError(error);
							}
						}
					};

					Thread thread = new Thread(recordEmitterRunnable);
					thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks());
					thread.setDaemon(true);
					thread.start();
				}
			}
			this.shardIdleIntervalMillis = Long.parseLong(
				getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
					Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));

		}

		// ------------------------------------------------------------------------

		// finally, start the infinite shard discovery and consumer launching loop;
		// we will escape from this loop only when shutdownFetcher() or stopWithError() is called
		// TODO: have this thread emit the records for tracking backpressure

		final long discoveryIntervalMillis = Long.valueOf(
			configProps.getProperty(
				ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));

		if (this.numberOfActiveShards.get() == 0) {
			LOG.info("Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...",
				indexOfThisConsumerSubtask);
			sourceContext.markAsTemporarilyIdle();
		}

		while (running) {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
					indexOfThisConsumerSubtask);
			}
			List<StreamShardHandle> newShardsDueToResharding = discoverNewShardsToSubscribe();

			for (StreamShardHandle shard : newShardsDueToResharding) {
				// since there may be delay in discovering a new shard, all new shards due to
				// resharding should be read starting from the earliest record possible
				KinesisStreamShardState newShardState =
					new KinesisStreamShardState(convertToStreamShardMetadata(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
				int newStateIndex = registerNewSubscribedShardState(newShardState);

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " +
							"the shard from sequence number {} with ShardConsumer {}",
						indexOfThisConsumerSubtask, newShardState.getStreamShardHandle().toString(),
						newShardState.getLastProcessedSequenceNum(), newStateIndex);
				}

				StreamShardHandle streamShardHandle = newShardState.getStreamShardHandle();
				KinesisDeserializationSchema<T> shardDeserializationSchema = getClonedDeserializationSchema();
				shardDeserializationSchema.open(() -> consumerMetricGroup
					.addGroup("subtaskId", String.valueOf(indexOfThisConsumerSubtask))
					.addGroup("shardId", streamShardHandle.getShard().getShardId())
					.addGroup("user"));
				shardConsumersExecutor.submit(
					createShardConsumer(
						newStateIndex,
						newShardState.getStreamShardHandle(),
						newShardState.getLastProcessedSequenceNum(),
						registerShardMetrics(consumerMetricGroup, newShardState),
						shardDeserializationSchema));
			}

			// we also check if we are running here so that we won't start the discovery sleep
			// interval if the running flag was set to false during the middle of the while loop
			if (running && discoveryIntervalMillis != 0) {
				try {
					Thread.sleep(discoveryIntervalMillis);
				} catch (InterruptedException iex) {
					// the sleep may be interrupted by shutdownFetcher()
				}
			}
		}

		// make sure all resources have been terminated before leaving
		try {
			awaitTermination();
		} catch (InterruptedException ie) {
			// If there is an original exception, preserve it, since that's more important/useful.
			this.error.compareAndSet(null, ie);
		}

		// any error thrown in the shard consumer threads will be thrown to the main thread
		Throwable throwable = this.error.get();
		if (throwable != null) {
			if (throwable instanceof Exception) {
				throw (Exception) throwable;
			} else if (throwable instanceof Error) {
				throw (Error) throwable;
			} else {
				throw new Exception(throwable);
			}
		}
	}