in src/main/scala/com/twitter/iago/server/ParrotUdpTransport.scala [91:135]
override def sendRequest(request: ParrotRequest): Future[Rep] = {
val data = request.rawLine
log.debug("sending request: %s to %s", data, victim.toString)
allRequests.incrementAndGet()
val connFuture = bootstrap.connect(victim)
if (!connFuture.awaitUninterruptibly(connectTimeout.inMilliseconds)) {
return Future.exception[Rep](new RequestTimeoutException(connectTimeout, "connecting"))
}
val channel = connFuture.getChannel
val future = new Promise[Rep]
channelFutureMap.put(channel.getId, future)
val start = Time.now
channel.write(data)
val channelCloseFuture = channel.getCloseFuture()
channelCloseFuture.addListener(new ChannelFutureListener() {
override def operationComplete(future: ChannelFuture) {
channelFutureMap.remove(future.getChannel.getId)
}
})
val timerTask = requestTimeout.map { timeout =>
timer.schedule(start + timeout) {
future.updateIfEmpty(
Throw[Rep](new RequestTimeoutException(timeout, "waiting for UDP response"))
)
channel.close()
udpRequestTimeout.incr()
}
}
future onSuccess { reply =>
timerTask.foreach {
_.cancel()
}
val usec = (Time.now - start).inMicroseconds.toInt max 0
udpRequestLatencyUsec.add(usec)
}
}