protected def newListeningServer()

in finagle-core/src/main/scala/com/twitter/finagle/server/ListeningStackServer.scala [42:154]


  protected def newListeningServer(
    serviceFactory: ServiceFactory[Req, Rep],
    addr: SocketAddress
  )(
    trackSession: ClientConnection => Unit
  ): ListeningServer

  def serve(addr: SocketAddress, factory: ServiceFactory[Req, Rep]): ListeningServer =
    new ListeningServer with CloseAwaitably {
      // Ensure that we have performed global initialization.
      com.twitter.finagle.Init()

      private[this] val monitor = params[Monitor].monitor
      private[this] val reporter = params[Reporter].reporter
      private[this] val stats = params[Stats].statsReceiver
      private[this] val label = params[Label].label
      private[this] val registry = ServerRegistry.connectionRegistry(addr)
      // For historical reasons, we have to respect the ServerRegistry
      // for naming addresses (i.e. label=addr). Until we deprecate
      // its usage, it takes precedence for identifying a server as
      // it is the most recently set label.
      private[this] val serverLabel = ServerRegistry.nameOf(addr).getOrElse(label)

      private[this] val statsReceiver =
        if (serverLabel.isEmpty) RoleConfiguredStatsReceiver(stats, SourceRole.Server)
        else
          RoleConfiguredStatsReceiver(
            RelativeNameMarkingStatsReceiver(
              new RootFinagleStatsReceiver(stats, serverLabel, DimensionalServerScopes)
            ),
            SourceRole.Server,
            Some(serverLabel))

      private[this] val serverParams = StackServer.DefaultInjectors.injectors.foldLeft(
        params +
          Label(serverLabel) +
          Stats(statsReceiver) +
          Monitor(reporter(label, None).andThen(monitor))) {
        case (prms, injector) =>
          injector(prms)
      }

      // We re-parameterize in case `newListeningServer` needs to access the
      // finalized parameters.
      private[this] val server: This = {
        val withEndpoint = withStack(stack ++ Stack.leaf(Endpoint, factory))
        val transformed =
          params[RequestLogger.Param] match {
            case RequestLogger.Param.Enabled =>
              withEndpoint.transformed(RequestLogger.newStackTransformer(serverLabel))
            case RequestLogger.Param.Disabled =>
              withEndpoint
          }
        StackServer.DefaultTransformer.transformers.foldLeft(
          transformed.withParams(serverParams)
        )((srv, transformer) => srv.transformed(transformer))
      }

      private[this] val serviceFactory = server.stack.make(serverParams)

      // Session bookkeeping used to explicitly manage
      // session resources per ListeningServer. Note, draining
      // in-flight requests is expected to be managed by the session,
      // so we can simply `close` all sessions here.
      private[this] val sessions = new Closables

      private[this] val underlying = server.newListeningServer(serviceFactory, addr) { session =>
        registry.register(session)
        sessions.register(session)
        session.onClose.ensure {
          sessions.unregister(session)
          registry.unregister(session)
        }
      }

      ServerRegistry.register(underlying.boundAddress.toString, server.stack, server.params)

      protected def closeServer(deadline: Time): Future[Unit] = closeAwaitably {
        ServerRegistry.unregister(underlying.boundAddress.toString, server.stack, server.params)
        // Here be dragons
        // We want to do four things here in this order:
        // 1. close the listening socket
        // 2. close the factory (not sure if ordering matters for this step)
        // 3. drain pending requests for existing sessions
        // 4. close those connections when their requests complete
        //
        // Because we care about the order here, it's important that the closes
        // are done synchronously.  This means that we must be careful not to
        // schedule work for the future, as might happen if we transform or
        // respond to a future.

        // closing `underlying` eventually calls Netty3Listener.close which has an
        // interesting side-effect of synchronously closing #1
        val ulClosed = underlying.close(deadline)

        // However we don't want to wait on the above because it will only complete
        // when #4 is finished.  So we ignore it and close everything else.  Note that
        // closing the connections here will do #2 and drain them via the Dispatcher.
        val closingSessions = sessions.close(deadline)
        val closingFactory = serviceFactory.close(deadline)

        // and once they're drained we can then wait on the listener physically closing them
        Future.join(Seq(closingSessions, closingFactory)).before(ulClosed)
      }

      def boundAddress: SocketAddress = underlying.boundAddress

      override def toString: String = {
        val protocol = params[ProtocolLibrary].name
        val label = if (serverLabel.isEmpty) "<unlabeled>" else serverLabel
        s"ListeningServer($protocol, $label, $boundAddress)"
      }
    }