in src/main/scala/com/twitter/iago/launcher/LocalMode.scala [51:101]
def localFeederFlagMap = Map("admin.port" -> s":${feederAdminPort()}")
def localServerFlagMap =
Map("admin.port" -> s":${serverAdminPort()}")
def localLaunch(feederFlags: Seq[Flag[_]], serverFlags: Seq[Flag[_]]): Unit = {
missingFlagCheck(List(servicePortF, classPath))
// override any flags here
val feederFlagMap = flagsToFlagMap(feederFlags) +
(numInstancesF.name -> 1.toString) +
(parrotHostsF.name -> servicePortF())
val serverFlagMap = flagsToFlagMap(serverFlags)
val futurePool = FuturePool.interruptibleUnboundedPool
// launch single local server instance
val serverCmd =
List("java", "-cp", classPath(), "com.twitter.iago.server.Main")
val localServerArgs = flagMapToArgs(localServerFlagMap)
val serverArgs = flagMapToArgs(serverFlagMap)
val serverCmdWithArgs = serverCmd ::: localServerArgs ::: serverArgs
val serverProcess = runBackgroundCmd(serverCmdWithArgs)
val serverFuture = futurePool { serverProcess.exitValue() }
.onSuccess { rc =>
log.info(s"Server terminated with exit code: $rc")
}
.onFailure { e =>
serverProcess.destroy()
}
// launch single local feeder instance
val feederCmd =
List("java", "-cp", classPath(), "com.twitter.iago.feeder.Main")
val localFeederArgs = flagMapToArgs(localFeederFlagMap)
val feederArgs = flagMapToArgs(feederFlagMap)
val feederCmdWithArgs = feederCmd ::: localFeederArgs ::: feederArgs
val feederProcess = runBackgroundCmd(feederCmdWithArgs)
val feederFuture = futurePool { feederProcess.exitValue() }
.onSuccess { rc =>
log.info(s"Feeder terminated with exit code: $rc")
}
.onFailure { e =>
feederProcess.destroy()
}
val firstExit = Future.select(List(serverFuture, feederFuture)).onSuccess {
case (_, running: Seq[Future[_]]) => running.foreach { _.raise(new CancellationException()) }
}
Await.result(firstExit)
}