in finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala [73:236]
def bindWithBridge(bridge: ChannelInboundHandler, addr: SocketAddress): ListeningServer =
new ListeningServer with CloseAwaitably {
private[this] val bossLoop: EventLoopGroup =
if (ListeningServerBuilder.isLocal(addr)) new DefaultEventLoopGroup(1)
else BossEventLoop.Global
private[this] val bootstrap = new ServerBootstrap()
if (ListeningServerBuilder.isLocal(addr)) {
bootstrap.channel(classOf[LocalServerChannel])
} else {
if (useNativeEpoll() && Epoll.isAvailable) {
bootstrap.channel(classOf[EpollServerSocketChannel])
bootstrap.option[JBool](UnixChannelOption.SO_REUSEPORT, reusePort)
} else {
bootstrap.channel(classOf[NioServerSocketChannel])
}
// Trying to set SO_REUSEADDR and TCP_NODELAY gives 'Unkonwn channel option' warnings
// when used with `LocalServerChannel`. So skip setting them at all.
bootstrap.option[JBool](ChannelOption.SO_REUSEADDR, reuseAddr)
bootstrap.childOption[JBool](ChannelOption.TCP_NODELAY, noDelay)
// Turning off AUTO_READ causes SSL/TLS errors in Finagle when using `LocalChannel`.
// So skip setting it at all.
bootstrap.childOption[JBool](ChannelOption.AUTO_READ, !backPressureEnabled)
}
bootstrap.group(bossLoop, params[param.WorkerPool].eventLoopGroup)
bootstrap.option(ChannelOption.ALLOCATOR, allocator)
bootstrap.childOption(ChannelOption.ALLOCATOR, allocator)
backlog.foreach(bootstrap.option[JInt](ChannelOption.SO_BACKLOG, _))
sendBufSize.foreach(bootstrap.childOption[JInt](ChannelOption.SO_SNDBUF, _))
recvBufSize.foreach(bootstrap.childOption[JInt](ChannelOption.SO_RCVBUF, _))
keepAlive.foreach(bootstrap.childOption[JBool](ChannelOption.SO_KEEPALIVE, _))
params[Listener.TrafficClass].value.foreach { tc =>
bootstrap.option[JInt](Netty4Listener.TrafficClass, tc)
bootstrap.childOption[JInt](Netty4Listener.TrafficClass, tc)
}
private[this] val rawInitializer = new Netty4RawServerChannelInitializer(params)
private[this] val framedInitializer = new Netty4FramedServerChannelInitializer(params)
// our netty pipeline is divided into four chunks:
// raw => marshalling => framed => bridge
// `pipelineInit` sets up the marshalling handlers
// `rawInitializer` adds the raw handlers to the beginning
// `framedInitializer` adds the framed handlers to the end
// `bridge` adds the bridging handler to the end.
//
// This order is necessary because the bridge must be at the end, raw must
// be before marshalling, and marshalling must be before framed. This
// creates an ordering:
//
// raw => marshalling
// marshalling => framed
// raw => bridge
// marshalling => bridge
// framed => bridge
//
// The only way to satisfy this ordering is
//
// raw => marshalling => framed => bridge.
bootstrap.childHandler(new ChannelInitializer[Channel] {
private[this] val activeConnections = new AtomicLong()
private[this] val closeListener = new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = {
activeConnections.decrementAndGet()
}
}
def initChannel(ch: Channel): Unit = {
if (activeConnections.incrementAndGet() > maxConnections) {
activeConnections.decrementAndGet()
ch.close()
} else {
ch.closeFuture().addListener(closeListener)
// pipelineInit comes first so that implementors can put whatever they
// want in pipelineInit, without having to worry about clobbering any
// of the other handlers.
pipelineInit(ch.pipeline)
ch.pipeline.addLast(rawInitializer)
// we use `setupMarshalling` to support protocols where the
// connection is multiplexed over child channels in the
// netty layer
ch.pipeline.addLast(
"marshalling",
setupMarshalling(new ChannelInitializer[Channel] {
def initChannel(ch: Channel): Unit = {
ch.pipeline.addLast("framedInitializer", framedInitializer)
// The bridge handler must be last in the pipeline to ensure
// that the bridging code sees all encoding and transformations
// of inbound messages.
ch.pipeline.addLast("finagleBridge", bridge)
}
})
)
}
}
})
// Block until listening socket is bound. `ListeningServer`
// represents a bound server and if we don't block here there's
// a race between #listen and #boundAddress being available.
private[this] val bound = bootstrap.bind(addr).awaitUninterruptibly()
if (!bound.isSuccess)
throw new java.net.BindException(
s"Failed to bind to ${addr.toString}: ${bound.cause().getMessage}"
)
private[this] val ch = bound.channel()
/**
* Immediately close the listening socket then shutdown the netty
* boss threadpool with ``deadline`` timeout for existing tasks.
*
* @return a [[Future]] representing the shutdown of the boss threadpool.
*/
def closeServer(deadline: Time): Future[Unit] = closeAwaitably {
// note: this ultimately calls close(2) on
// a non-blocking socket so it should not block.
ch.close().awaitUninterruptibly()
val timeout = deadline - Time.now
val timeoutMs = timeout.inMillis
// We only need to shutdown bossLoop if the bind address is a local address.
// Otherwise, the bossLoop is shared across the process.
if (!ListeningServerBuilder.isLocal(addr)) Future.Done
else {
val p = new Promise[Unit]
// The boss loop immediately starts refusing new work.
// Existing tasks have ``timeoutMs`` time to finish executing.
bossLoop
.shutdownGracefully(0 /* quietPeriod */, timeoutMs.max(0), TimeUnit.MILLISECONDS)
.addListener(new FutureListener[Any] {
def operationComplete(future: NettyFuture[Any]): Unit = p.setDone()
})
// Don't rely on netty to satisfy the promise and transform all results to
// success because we don't want the non-deterministic lifecycle of external
// resources to affect application success.
p.raiseWithin(timeout)(timer).transform { _ => Future.Done }
}
}
def boundAddress: SocketAddress = ch.localAddress()
private[this] val workerPoolTrackingSettings =
params[param.TrackWorkerPool]
if (workerPoolTrackingSettings.enableTracking) {
EventLoopGroupTracker.track(
params[param.WorkerPool].eventLoopGroup,
workerPoolTrackingSettings.trackingTaskPeriod,
workerPoolTrackingSettings.threadDumpThreshold,
params[Stats].statsReceiver,
s"finagle/netty-4/delayTracking/${boundAddress}",
Logger.get("com.twitter.finagle.netty4.Netty4Listener.threadDelay")
)
}
}