sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java [130:181]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy rateLimiter) {
    while (poolOpened.get()) {
      try {
        try {
          List<KinesisRecord> kinesisRecords = shardRecordsIterator.readNextBatch();
          try {
            for (KinesisRecord kinesisRecord : kinesisRecords) {
              recordsQueue.put(kinesisRecord);
              numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
            }
          } finally {
            // One of the paths into this finally block is recordsQueue.put() throwing
            // InterruptedException so we should check the thread's interrupted status before
            // calling onSuccess().
            if (!Thread.currentThread().isInterrupted()) {
              rateLimiter.onSuccess(kinesisRecords);
            }
          }
        } catch (KinesisShardClosedException e) {
          LOG.info(
              "Shard iterator for {} shard is closed, finishing the read loop",
              shardRecordsIterator.getShardId(),
              e);
          // Wait until all records from already closed shard are taken from the buffer and only
          // then start reading successive shards. This guarantees that checkpoints will contain
          // either parent or child shard and never both. Such approach allows for more
          // straightforward checkpoint restoration than in a case when new shards are read
          // immediately.
          waitUntilAllShardRecordsRead(shardRecordsIterator);
          readFromSuccessiveShards(shardRecordsIterator);
          break;
        }
      } catch (KinesisClientThrottledException e) {
        try {
          rateLimiter.onThrottle(e);
        } catch (InterruptedException ex) {
          LOG.warn("Thread was interrupted, finishing the read loop", ex);
          Thread.currentThread().interrupt();
          break;
        }
      } catch (TransientKinesisException e) {
        LOG.warn("Transient exception occurred.", e);
      } catch (InterruptedException e) {
        LOG.warn("Thread was interrupted, finishing the read loop", e);
        Thread.currentThread().interrupt();
        break;
      } catch (Throwable e) {
        LOG.error("Unexpected exception occurred", e);
      }
    }
    LOG.info("Kinesis Shard read loop has finished");
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPool.java [130:181]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy rateLimiter) {
    while (poolOpened.get()) {
      try {
        try {
          List<KinesisRecord> kinesisRecords = shardRecordsIterator.readNextBatch();
          try {
            for (KinesisRecord kinesisRecord : kinesisRecords) {
              recordsQueue.put(kinesisRecord);
              numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
            }
          } finally {
            // One of the paths into this finally block is recordsQueue.put() throwing
            // InterruptedException so we should check the thread's interrupted status before
            // calling onSuccess().
            if (!Thread.currentThread().isInterrupted()) {
              rateLimiter.onSuccess(kinesisRecords);
            }
          }
        } catch (KinesisShardClosedException e) {
          LOG.info(
              "Shard iterator for {} shard is closed, finishing the read loop",
              shardRecordsIterator.getShardId(),
              e);
          // Wait until all records from already closed shard are taken from the buffer and only
          // then start reading successive shards. This guarantees that checkpoints will contain
          // either parent or child shard and never both. Such approach allows for more
          // straightforward checkpoint restoration than in a case when new shards are read
          // immediately.
          waitUntilAllShardRecordsRead(shardRecordsIterator);
          readFromSuccessiveShards(shardRecordsIterator);
          break;
        }
      } catch (KinesisClientThrottledException e) {
        try {
          rateLimiter.onThrottle(e);
        } catch (InterruptedException ex) {
          LOG.warn("Thread was interrupted, finishing the read loop", ex);
          Thread.currentThread().interrupt();
          break;
        }
      } catch (TransientKinesisException e) {
        LOG.warn("Transient exception occurred.", e);
      } catch (InterruptedException e) {
        LOG.warn("Thread was interrupted, finishing the read loop", e);
        Thread.currentThread().interrupt();
        break;
      } catch (Throwable e) {
        LOG.error("Unexpected exception occurred", e);
      }
    }
    LOG.info("Kinesis Shard read loop has finished");
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



