private[this] def runLoad()

in src/main/scala/com/twitter/iago/feeder/ParrotFeeder.scala [158:193]


  private[this] def runLoad() {

    skipForward(config.linesToSkipF())

    while (state == FeederState.RUNNING) {
      // limit the number of parrots we use to what we were configured to use
      val parrots = cluster.runningParrots.slice(0, config.numInstancesF())

      parrots foreach { parrot =>
        if (!initialized(parrot)) {
          initialize(parrot)
        }

        parrot.queueBatch {
          readBatch(linesToRead)
        }

        if (config.maxRequestsF() - requestsRead.get <= 0) {
          log.info(
            "ParrotFeeder.runLoad: exiting because config.maxRequests = %d and requestsRead.get = %d",
            config.maxRequestsF(),
            requestsRead.get
          )
          state = FeederState.EOF
        } else if (!lines.hasNext) {
          if (config.reuseFileF()) {
            log.trace("ParrotFeeder.runLoad: inputLog is exhausted, restarting reader.")
            lines.reset()
          } else {
            log.info("ParrotFeeder.runLoad: exiting because the log is exhausted")
            state = FeederState.EOF
          }
        }
      }
    }
  }