def streamOut()

in finagle-netty4-http/src/main/scala/com/twitter/finagle/netty4/http/Netty4StreamTransport.scala [84:172]


  def streamOut(
    trans: Transport[Any, Any],
    r: Reader[Chunk],
    contentLength: Option[Long]
  ): Future[Unit] = {

    // A helper to ensure that we don't send netty more (or less) data that the
    // content-length header requires.
    def verifyContentLength(chunk: Option[Chunk], written: Long): Unit = contentLength match {
      case None => // nop: we don't have a content-length header so no constraints
      case Some(contentLength) =>
        chunk match {
          // Short write case: the reader doesn't contain as much data is its
          // content-length header advertised which is an illegal message. We
          // handle this by surfacing an exception that will close the channel.
          case None if contentLength != written =>
            r.discard()
            throw new IllegalStateException(
              s"HTTP stream terminated before enough content was written. " +
                s"Provided content length: ${contentLength}, observed: $written.")

          // Attempting to write trailers which don't honor the content-length
          // header, either too little or too much data.
          case Some(chunk) if chunk.isLast && chunk.content.length + written != contentLength =>
            r.discard()
            throw new IllegalStateException(
              s"HTTP stream terminated with incorrect amount of data written. " +
                s"Provided content length: ${contentLength}, observed: ${chunk.content.length + written}.")

          // Attempting to write a chunk that overflows the length
          // dictated by the content-length header
          case Some(chunk) if contentLength < written + chunk.content.length =>
            r.discard()
            throw new IllegalStateException(
              s"HTTP stream attempted to write more data than the content-length header allows. " +
                s"Provided content length: ${contentLength}, observed (so far): ${written + chunk.content.length}")

          case _ => // nop: nothing illegal observed with this chunk.
        }
    }

    def continue(written: Long): Future[Unit] = r.read().flatMap { chunk =>
      verifyContentLength(chunk, written)
      chunk match {
        case None =>
          trans.write(LastHttpContent.EMPTY_LAST_CONTENT)

        case Some(chunk) if chunk.isLast =>
          terminate(chunk)

        case Some(chunk) =>
          trans
            .write(
              new DefaultHttpContent(ByteBufConversion.bufAsByteBuf(chunk.content))
            ).before(continue(written + chunk.content.length))
      }
    }

    // We need to read one more time before writing last chunk to ensure the stream isn't malformed.
    def terminate(last: Chunk): Future[Unit] = r.read().flatMap {
      case None =>
        // TODO (vk): PR against Netty; we need to construct out of given Headers so we avoid
        // copying afterwards.

        if (last.content.isEmpty && last.trailers.isEmpty) {
          trans.write(LastHttpContent.EMPTY_LAST_CONTENT)
        } else {
          val contentAndTrailers = new DefaultLastHttpContent(
            ByteBufConversion.bufAsByteBuf(last.content),
            false /*validateHeaders*/
          )

          if (!last.trailers.isEmpty) {
            Bijections.finagle
              .writeFinagleHeadersToNetty(last.trailers, contentAndTrailers.trailingHeaders())
          }

          trans.write(contentAndTrailers)
        }

      case _ =>
        Future.exception(
          new IllegalStateException("HTTP stream is malformed: only EOS can follow trailers")
        )
    }

    // Begin the loop.
    continue(written = 0L)
  }