def bindWithBridge()

in finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala [73:236]


  def bindWithBridge(bridge: ChannelInboundHandler, addr: SocketAddress): ListeningServer =
    new ListeningServer with CloseAwaitably {
      private[this] val bossLoop: EventLoopGroup =
        if (ListeningServerBuilder.isLocal(addr)) new DefaultEventLoopGroup(1)
        else BossEventLoop.Global

      private[this] val bootstrap = new ServerBootstrap()

      if (ListeningServerBuilder.isLocal(addr)) {
        bootstrap.channel(classOf[LocalServerChannel])
      } else {
        if (useNativeEpoll() && Epoll.isAvailable) {
          bootstrap.channel(classOf[EpollServerSocketChannel])
          bootstrap.option[JBool](UnixChannelOption.SO_REUSEPORT, reusePort)
        } else {
          bootstrap.channel(classOf[NioServerSocketChannel])
        }
        // Trying to set SO_REUSEADDR and TCP_NODELAY gives 'Unkonwn channel option' warnings
        // when used with `LocalServerChannel`. So skip setting them at all.
        bootstrap.option[JBool](ChannelOption.SO_REUSEADDR, reuseAddr)
        bootstrap.childOption[JBool](ChannelOption.TCP_NODELAY, noDelay)

        // Turning off AUTO_READ causes SSL/TLS errors in Finagle when using `LocalChannel`.
        // So skip setting it at all.
        bootstrap.childOption[JBool](ChannelOption.AUTO_READ, !backPressureEnabled)
      }

      bootstrap.group(bossLoop, params[param.WorkerPool].eventLoopGroup)
      bootstrap.option(ChannelOption.ALLOCATOR, allocator)
      bootstrap.childOption(ChannelOption.ALLOCATOR, allocator)

      backlog.foreach(bootstrap.option[JInt](ChannelOption.SO_BACKLOG, _))
      sendBufSize.foreach(bootstrap.childOption[JInt](ChannelOption.SO_SNDBUF, _))
      recvBufSize.foreach(bootstrap.childOption[JInt](ChannelOption.SO_RCVBUF, _))
      keepAlive.foreach(bootstrap.childOption[JBool](ChannelOption.SO_KEEPALIVE, _))
      params[Listener.TrafficClass].value.foreach { tc =>
        bootstrap.option[JInt](Netty4Listener.TrafficClass, tc)
        bootstrap.childOption[JInt](Netty4Listener.TrafficClass, tc)
      }

      private[this] val rawInitializer = new Netty4RawServerChannelInitializer(params)
      private[this] val framedInitializer = new Netty4FramedServerChannelInitializer(params)

      // our netty pipeline is divided into four chunks:
      // raw => marshalling => framed => bridge
      // `pipelineInit` sets up the marshalling handlers
      // `rawInitializer` adds the raw handlers to the beginning
      // `framedInitializer` adds the framed handlers to the end
      // `bridge` adds the bridging handler to the end.
      //
      // This order is necessary because the bridge must be at the end, raw must
      // be before marshalling, and marshalling must be before framed.  This
      // creates an ordering:
      //
      // raw => marshalling
      // marshalling => framed
      // raw => bridge
      // marshalling => bridge
      // framed => bridge
      //
      // The only way to satisfy this ordering is
      //
      // raw => marshalling => framed => bridge.
      bootstrap.childHandler(new ChannelInitializer[Channel] {
        private[this] val activeConnections = new AtomicLong()
        private[this] val closeListener = new ChannelFutureListener {
          def operationComplete(future: ChannelFuture): Unit = {
            activeConnections.decrementAndGet()
          }
        }

        def initChannel(ch: Channel): Unit = {
          if (activeConnections.incrementAndGet() > maxConnections) {
            activeConnections.decrementAndGet()
            ch.close()
          } else {
            ch.closeFuture().addListener(closeListener)
            // pipelineInit comes first so that implementors can put whatever they
            // want in pipelineInit, without having to worry about clobbering any
            // of the other handlers.
            pipelineInit(ch.pipeline)
            ch.pipeline.addLast(rawInitializer)

            // we use `setupMarshalling` to support protocols where the
            // connection is multiplexed over child channels in the
            // netty layer
            ch.pipeline.addLast(
              "marshalling",
              setupMarshalling(new ChannelInitializer[Channel] {
                def initChannel(ch: Channel): Unit = {
                  ch.pipeline.addLast("framedInitializer", framedInitializer)

                  // The bridge handler must be last in the pipeline to ensure
                  // that the bridging code sees all encoding and transformations
                  // of inbound messages.
                  ch.pipeline.addLast("finagleBridge", bridge)
                }
              })
            )
          }
        }
      })

      // Block until listening socket is bound. `ListeningServer`
      // represents a bound server and if we don't block here there's
      // a race between #listen and #boundAddress being available.
      private[this] val bound = bootstrap.bind(addr).awaitUninterruptibly()
      if (!bound.isSuccess)
        throw new java.net.BindException(
          s"Failed to bind to ${addr.toString}: ${bound.cause().getMessage}"
        )

      private[this] val ch = bound.channel()

      /**
       * Immediately close the listening socket then shutdown the netty
       * boss threadpool with ``deadline`` timeout for existing tasks.
       *
       * @return a [[Future]] representing the shutdown of the boss threadpool.
       */
      def closeServer(deadline: Time): Future[Unit] = closeAwaitably {
        // note: this ultimately calls close(2) on
        // a non-blocking socket so it should not block.
        ch.close().awaitUninterruptibly()

        val timeout = deadline - Time.now
        val timeoutMs = timeout.inMillis

        // We only need to shutdown bossLoop if the bind address is a local address.
        // Otherwise, the bossLoop is shared across the process.
        if (!ListeningServerBuilder.isLocal(addr)) Future.Done
        else {
          val p = new Promise[Unit]

          // The boss loop immediately starts refusing new work.
          // Existing tasks have ``timeoutMs`` time to finish executing.
          bossLoop
            .shutdownGracefully(0 /* quietPeriod */, timeoutMs.max(0), TimeUnit.MILLISECONDS)
            .addListener(new FutureListener[Any] {
              def operationComplete(future: NettyFuture[Any]): Unit = p.setDone()
            })

          // Don't rely on netty to satisfy the promise and transform all results to
          // success because we don't want the non-deterministic lifecycle of external
          // resources to affect application success.
          p.raiseWithin(timeout)(timer).transform { _ => Future.Done }
        }
      }

      def boundAddress: SocketAddress = ch.localAddress()

      private[this] val workerPoolTrackingSettings =
        params[param.TrackWorkerPool]
      if (workerPoolTrackingSettings.enableTracking) {
        EventLoopGroupTracker.track(
          params[param.WorkerPool].eventLoopGroup,
          workerPoolTrackingSettings.trackingTaskPeriod,
          workerPoolTrackingSettings.threadDumpThreshold,
          params[Stats].statsReceiver,
          s"finagle/netty-4/delayTracking/${boundAddress}",
          Logger.get("com.twitter.finagle.netty4.Netty4Listener.threadDelay")
        )
      }
    }