in src/main/scala/com/twitter/iago/feeder/ParrotFeeder.scala [158:193]
private[this] def runLoad() {
skipForward(config.linesToSkipF())
while (state == FeederState.RUNNING) {
// limit the number of parrots we use to what we were configured to use
val parrots = cluster.runningParrots.slice(0, config.numInstancesF())
parrots foreach { parrot =>
if (!initialized(parrot)) {
initialize(parrot)
}
parrot.queueBatch {
readBatch(linesToRead)
}
if (config.maxRequestsF() - requestsRead.get <= 0) {
log.info(
"ParrotFeeder.runLoad: exiting because config.maxRequests = %d and requestsRead.get = %d",
config.maxRequestsF(),
requestsRead.get
)
state = FeederState.EOF
} else if (!lines.hasNext) {
if (config.reuseFileF()) {
log.trace("ParrotFeeder.runLoad: inputLog is exhausted, restarting reader.")
lines.reset()
} else {
log.info("ParrotFeeder.runLoad: exiting because the log is exhausted")
state = FeederState.EOF
}
}
}
}
}