in flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java [417:613]
public void runFetcher() throws Exception {
// check that we are running before proceeding
if (!running) {
return;
}
this.mainThread = Thread.currentThread();
// ------------------------------------------------------------------------
// Procedures before starting the infinite while loop:
// ------------------------------------------------------------------------
// 1. check that there is at least one shard in the subscribed streams to consume from (can be done by
// checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null)
boolean hasShards = false;
StringBuilder streamsWithNoShardsFound = new StringBuilder();
for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
if (streamToLastDiscoveredShardEntry.getValue() != null) {
hasShards = true;
} else {
streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", ");
}
}
if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}",
indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString());
}
if (!hasShards) {
throw new RuntimeException("No shards can be found for all subscribed streams: " + streams);
}
// 2. start consuming any shard state we already have in the subscribedShardState up to this point; the
// subscribedShardState may already be seeded with values due to step 1., or explicitly added by the
// consumer using a restored state checkpoint
for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) {
KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex);
// only start a consuming thread if the seeded subscribed shard has not been completely read already
if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
indexOfThisConsumerSubtask, seededShardState.getStreamShardHandle().toString(),
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
}
StreamShardHandle streamShardHandle = subscribedShardsState.get(seededStateIndex)
.getStreamShardHandle();
KinesisDeserializationSchema<T> shardDeserializationSchema = getClonedDeserializationSchema();
shardDeserializationSchema.open(() -> consumerMetricGroup
.addGroup("subtaskId", String.valueOf(indexOfThisConsumerSubtask))
.addGroup("shardId", streamShardHandle.getShard().getShardId())
.addGroup("user"));
shardConsumersExecutor.submit(
createShardConsumer(
seededStateIndex,
streamShardHandle,
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum(),
registerShardMetrics(consumerMetricGroup, subscribedShardsState.get(seededStateIndex)),
shardDeserializationSchema));
}
}
// start periodic watermark emitter, if a watermark assigner was configured
if (periodicWatermarkAssigner != null) {
long periodicWatermarkIntervalMillis = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
if (periodicWatermarkIntervalMillis > 0) {
ProcessingTimeService timerService = ((StreamingRuntimeContext) runtimeContext).getProcessingTimeService();
LOG.info("Starting periodic watermark emitter with interval {}", periodicWatermarkIntervalMillis);
new PeriodicWatermarkEmitter(timerService, periodicWatermarkIntervalMillis).start();
if (watermarkTracker != null) {
// setup global watermark tracking
long watermarkSyncMillis = Long.parseLong(
getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_WATERMARK_SYNC_MILLIS)));
watermarkTracker.setUpdateTimeoutMillis(watermarkSyncMillis * 3); // synchronization latency
watermarkTracker.open(runtimeContext);
new WatermarkSyncCallback(timerService, watermarkSyncMillis).start();
// emit records ahead of watermark to offset synchronization latency
long lookaheadMillis = Long.parseLong(
getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS,
Long.toString(0)));
recordEmitter.setMaxLookaheadMillis(Math.max(lookaheadMillis, watermarkSyncMillis * 3));
// record emitter depends on periodic watermark
// it runs in a separate thread since main thread is used for discovery
Runnable recordEmitterRunnable = new Runnable() {
@Override
public void run() {
try {
recordEmitter.run();
} catch (Throwable error) {
// report the error that terminated the emitter loop to source thread
stopWithError(error);
}
}
};
Thread thread = new Thread(recordEmitterRunnable);
thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks());
thread.setDaemon(true);
thread.start();
}
}
this.shardIdleIntervalMillis = Long.parseLong(
getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
}
// ------------------------------------------------------------------------
// finally, start the infinite shard discovery and consumer launching loop;
// we will escape from this loop only when shutdownFetcher() or stopWithError() is called
// TODO: have this thread emit the records for tracking backpressure
final long discoveryIntervalMillis = Long.valueOf(
configProps.getProperty(
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
if (this.numberOfActiveShards.get() == 0) {
LOG.info("Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...",
indexOfThisConsumerSubtask);
sourceContext.markAsTemporarilyIdle();
}
while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
indexOfThisConsumerSubtask);
}
List<StreamShardHandle> newShardsDueToResharding = discoverNewShardsToSubscribe();
for (StreamShardHandle shard : newShardsDueToResharding) {
// since there may be delay in discovering a new shard, all new shards due to
// resharding should be read starting from the earliest record possible
KinesisStreamShardState newShardState =
new KinesisStreamShardState(convertToStreamShardMetadata(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
int newStateIndex = registerNewSubscribedShardState(newShardState);
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " +
"the shard from sequence number {} with ShardConsumer {}",
indexOfThisConsumerSubtask, newShardState.getStreamShardHandle().toString(),
newShardState.getLastProcessedSequenceNum(), newStateIndex);
}
StreamShardHandle streamShardHandle = newShardState.getStreamShardHandle();
KinesisDeserializationSchema<T> shardDeserializationSchema = getClonedDeserializationSchema();
shardDeserializationSchema.open(() -> consumerMetricGroup
.addGroup("subtaskId", String.valueOf(indexOfThisConsumerSubtask))
.addGroup("shardId", streamShardHandle.getShard().getShardId())
.addGroup("user"));
shardConsumersExecutor.submit(
createShardConsumer(
newStateIndex,
newShardState.getStreamShardHandle(),
newShardState.getLastProcessedSequenceNum(),
registerShardMetrics(consumerMetricGroup, newShardState),
shardDeserializationSchema));
}
// we also check if we are running here so that we won't start the discovery sleep
// interval if the running flag was set to false during the middle of the while loop
if (running && discoveryIntervalMillis != 0) {
try {
Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) {
// the sleep may be interrupted by shutdownFetcher()
}
}
}
// make sure all resources have been terminated before leaving
try {
awaitTermination();
} catch (InterruptedException ie) {
// If there is an original exception, preserve it, since that's more important/useful.
this.error.compareAndSet(null, ie);
}
// any error thrown in the shard consumer threads will be thrown to the main thread
Throwable throwable = this.error.get();
if (throwable != null) {
if (throwable instanceof Exception) {
throw (Exception) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new Exception(throwable);
}
}
}