in finagle-netty4-http/src/main/scala/com/twitter/finagle/netty4/http/Netty4StreamTransport.scala [84:172]
def streamOut(
trans: Transport[Any, Any],
r: Reader[Chunk],
contentLength: Option[Long]
): Future[Unit] = {
// A helper to ensure that we don't send netty more (or less) data that the
// content-length header requires.
def verifyContentLength(chunk: Option[Chunk], written: Long): Unit = contentLength match {
case None => // nop: we don't have a content-length header so no constraints
case Some(contentLength) =>
chunk match {
// Short write case: the reader doesn't contain as much data is its
// content-length header advertised which is an illegal message. We
// handle this by surfacing an exception that will close the channel.
case None if contentLength != written =>
r.discard()
throw new IllegalStateException(
s"HTTP stream terminated before enough content was written. " +
s"Provided content length: ${contentLength}, observed: $written.")
// Attempting to write trailers which don't honor the content-length
// header, either too little or too much data.
case Some(chunk) if chunk.isLast && chunk.content.length + written != contentLength =>
r.discard()
throw new IllegalStateException(
s"HTTP stream terminated with incorrect amount of data written. " +
s"Provided content length: ${contentLength}, observed: ${chunk.content.length + written}.")
// Attempting to write a chunk that overflows the length
// dictated by the content-length header
case Some(chunk) if contentLength < written + chunk.content.length =>
r.discard()
throw new IllegalStateException(
s"HTTP stream attempted to write more data than the content-length header allows. " +
s"Provided content length: ${contentLength}, observed (so far): ${written + chunk.content.length}")
case _ => // nop: nothing illegal observed with this chunk.
}
}
def continue(written: Long): Future[Unit] = r.read().flatMap { chunk =>
verifyContentLength(chunk, written)
chunk match {
case None =>
trans.write(LastHttpContent.EMPTY_LAST_CONTENT)
case Some(chunk) if chunk.isLast =>
terminate(chunk)
case Some(chunk) =>
trans
.write(
new DefaultHttpContent(ByteBufConversion.bufAsByteBuf(chunk.content))
).before(continue(written + chunk.content.length))
}
}
// We need to read one more time before writing last chunk to ensure the stream isn't malformed.
def terminate(last: Chunk): Future[Unit] = r.read().flatMap {
case None =>
// TODO (vk): PR against Netty; we need to construct out of given Headers so we avoid
// copying afterwards.
if (last.content.isEmpty && last.trailers.isEmpty) {
trans.write(LastHttpContent.EMPTY_LAST_CONTENT)
} else {
val contentAndTrailers = new DefaultLastHttpContent(
ByteBufConversion.bufAsByteBuf(last.content),
false /*validateHeaders*/
)
if (!last.trailers.isEmpty) {
Bijections.finagle
.writeFinagleHeadersToNetty(last.trailers, contentAndTrailers.trailingHeaders())
}
trans.write(contentAndTrailers)
}
case _ =>
Future.exception(
new IllegalStateException("HTTP stream is malformed: only EOS can follow trailers")
)
}
// Begin the loop.
continue(written = 0L)
}