in finagle-core/src/main/scala/com/twitter/finagle/transport/Transport.scala [107:276]
override def write(req: In): Future[Unit] = self.write(req)
override def read(): Future[Out] = self.read()
override def context: Context = newCtx
}
}
/**
* The control panel for the Transport.
*/
def context: Context
}
/**
* A collection of [[com.twitter.finagle.Stack.Param]]'s useful for configuring
* a [[com.twitter.finagle.transport.Transport]].
*
* @define $param a [[com.twitter.finagle.Stack.Param]] used to configure
*/
object Transport {
private[finagle] val sslSessionInfoCtx = new Contexts.local.Key[SslSessionInfo]
/**
* Retrieve the [[Transport]]'s [[SslSessionInfo]] from
* [[com.twitter.finagle.context.Contexts.local]] if available. If none exists,
* a [[NullSslSessionInfo]] is returned instead.
*/
def sslSessionInfo: SslSessionInfo = Contexts.local.get(sslSessionInfoCtx) match {
case Some(info) => info
case None => NullSslSessionInfo
}
/**
* Retrieve the peer certificate of the [[Transport]], if
* one exists.
*/
def peerCertificate: Option[Certificate] = sslSessionInfo.peerCertificates.headOption
/**
* $param the buffer sizes of a `Transport`.
*
* @param send An option indicating the size of the send buffer.
* If None, the implementation default is used.
*
* @param recv An option indicating the size of the receive buffer.
* If None, the implementation default is used.
*/
case class BufferSizes(send: Option[Int], recv: Option[Int]) {
def mk(): (BufferSizes, Stack.Param[BufferSizes]) =
(this, BufferSizes.param)
}
object BufferSizes {
implicit val param = Stack.Param(BufferSizes(None, None))
}
/**
* $param the liveness of a `Transport`. These properties dictate the
* lifecycle of a `Transport` and ensure that it remains relevant.
*
* @param readTimeout A maximum duration a listener is allowed
* to read a request.
*
* @param writeTimeout A maximum duration a listener is allowed to
* write a response.
*
* @param keepAlive An option indicating if the keepAlive is on or off.
* If None, the implementation default is used.
*/
case class Liveness(readTimeout: Duration, writeTimeout: Duration, keepAlive: Option[Boolean]) {
def mk(): (Liveness, Stack.Param[Liveness]) =
(this, Liveness.param)
}
object Liveness {
implicit val param = Stack.Param(Liveness(Duration.Top, Duration.Top, None))
}
/**
* $param the verbosity of a `Transport`. Transport activity is
* written to [[com.twitter.finagle.param.Logger]].
*/
case class Verbose(enabled: Boolean) {
def mk(): (Verbose, Stack.Param[Verbose]) =
(this, Verbose.param)
}
object Verbose {
implicit val param = Stack.Param(Verbose(enabled = false))
}
/**
* $param the SSL/TLS client configuration for a `Transport`.
*/
case class ClientSsl(sslClientConfiguration: Option[SslClientConfiguration]) {
def mk(): (ClientSsl, Stack.Param[ClientSsl]) =
(this, ClientSsl.param)
}
object ClientSsl {
implicit val param = Stack.Param(ClientSsl(None))
}
/**
* $param the SSL/TLS server configuration for a `Transport`.
*/
case class ServerSsl(sslServerConfiguration: Option[SslServerConfiguration]) {
def mk(): (ServerSsl, Stack.Param[ServerSsl]) =
(this, ServerSsl.param)
}
object ServerSsl {
implicit val param = Stack.Param(ServerSsl(None))
}
/**
* $param the options (i.e., socket options) of a `Transport`.
*
* @param noDelay enables or disables `TCP_NODELAY` (Nagle's algorithm)
* option on a transport socket (`noDelay = true` means
* disabled). Default is `true` (disabled).
*
* @param reuseAddr enables or disables `SO_REUSEADDR` option on a
* transport socket. Default is `true`.
*
* @param reusePort enables or disables `SO_REUSEPORT` option on a
* transport socket (Linux 3.9+ only). This option is only
* available when using finagle-netty4 and native epoll support
* is enabled. Default is `false`.
*/
case class Options(noDelay: Boolean, reuseAddr: Boolean, reusePort: Boolean) {
def this(noDelay: Boolean, reuseAddr: Boolean) = this(noDelay, reuseAddr, reusePort = false)
def mk(): (Options, Stack.Param[Options]) = (this, Options.param)
}
object Options {
implicit val param: Stack.Param[Options] =
Stack.Param(Options(noDelay = true, reuseAddr = true, reusePort = false))
def apply(noDelay: Boolean, reuseAddr: Boolean): Options =
this.apply(noDelay = noDelay, reuseAddr = reuseAddr, reusePort = false)
}
/**
* Serializes the object stream from a `Transport` into a
* [[com.twitter.io.Writer]].
*
* The serialization function `f` can return `Future.None` to interrupt the
* stream to facilitate using the transport with multiple writers and vice
* versa.
*
* Both transport and writer are unmanaged, the caller must close when
* done using them.
*
* {{{
* copyToWriter(trans, w)(f).ensure {
* trans.close()
* w.close()
* }
* }}}
*
* @param trans The source Transport.
*
* @param w The destination [[com.twitter.io.Writer]].
*
* @param f A mapping from `A` to `Future[Option[Buf]]`.
*/
private[finagle] def copyToWriter[A](
trans: Transport[_, A],
w: Writer[Buf]
)(
f: A => Future[Option[Buf]]
): Future[Unit] = {
trans.read().flatMap(f).flatMap {