in finagle-core/src/main/scala/com/twitter/finagle/server/ListeningStackServer.scala [42:154]
protected def newListeningServer(
serviceFactory: ServiceFactory[Req, Rep],
addr: SocketAddress
)(
trackSession: ClientConnection => Unit
): ListeningServer
def serve(addr: SocketAddress, factory: ServiceFactory[Req, Rep]): ListeningServer =
new ListeningServer with CloseAwaitably {
// Ensure that we have performed global initialization.
com.twitter.finagle.Init()
private[this] val monitor = params[Monitor].monitor
private[this] val reporter = params[Reporter].reporter
private[this] val stats = params[Stats].statsReceiver
private[this] val label = params[Label].label
private[this] val registry = ServerRegistry.connectionRegistry(addr)
// For historical reasons, we have to respect the ServerRegistry
// for naming addresses (i.e. label=addr). Until we deprecate
// its usage, it takes precedence for identifying a server as
// it is the most recently set label.
private[this] val serverLabel = ServerRegistry.nameOf(addr).getOrElse(label)
private[this] val statsReceiver =
if (serverLabel.isEmpty) RoleConfiguredStatsReceiver(stats, SourceRole.Server)
else
RoleConfiguredStatsReceiver(
RelativeNameMarkingStatsReceiver(
new RootFinagleStatsReceiver(stats, serverLabel, DimensionalServerScopes)
),
SourceRole.Server,
Some(serverLabel))
private[this] val serverParams = StackServer.DefaultInjectors.injectors.foldLeft(
params +
Label(serverLabel) +
Stats(statsReceiver) +
Monitor(reporter(label, None).andThen(monitor))) {
case (prms, injector) =>
injector(prms)
}
// We re-parameterize in case `newListeningServer` needs to access the
// finalized parameters.
private[this] val server: This = {
val withEndpoint = withStack(stack ++ Stack.leaf(Endpoint, factory))
val transformed =
params[RequestLogger.Param] match {
case RequestLogger.Param.Enabled =>
withEndpoint.transformed(RequestLogger.newStackTransformer(serverLabel))
case RequestLogger.Param.Disabled =>
withEndpoint
}
StackServer.DefaultTransformer.transformers.foldLeft(
transformed.withParams(serverParams)
)((srv, transformer) => srv.transformed(transformer))
}
private[this] val serviceFactory = server.stack.make(serverParams)
// Session bookkeeping used to explicitly manage
// session resources per ListeningServer. Note, draining
// in-flight requests is expected to be managed by the session,
// so we can simply `close` all sessions here.
private[this] val sessions = new Closables
private[this] val underlying = server.newListeningServer(serviceFactory, addr) { session =>
registry.register(session)
sessions.register(session)
session.onClose.ensure {
sessions.unregister(session)
registry.unregister(session)
}
}
ServerRegistry.register(underlying.boundAddress.toString, server.stack, server.params)
protected def closeServer(deadline: Time): Future[Unit] = closeAwaitably {
ServerRegistry.unregister(underlying.boundAddress.toString, server.stack, server.params)
// Here be dragons
// We want to do four things here in this order:
// 1. close the listening socket
// 2. close the factory (not sure if ordering matters for this step)
// 3. drain pending requests for existing sessions
// 4. close those connections when their requests complete
//
// Because we care about the order here, it's important that the closes
// are done synchronously. This means that we must be careful not to
// schedule work for the future, as might happen if we transform or
// respond to a future.
// closing `underlying` eventually calls Netty3Listener.close which has an
// interesting side-effect of synchronously closing #1
val ulClosed = underlying.close(deadline)
// However we don't want to wait on the above because it will only complete
// when #4 is finished. So we ignore it and close everything else. Note that
// closing the connections here will do #2 and drain them via the Dispatcher.
val closingSessions = sessions.close(deadline)
val closingFactory = serviceFactory.close(deadline)
// and once they're drained we can then wait on the listener physically closing them
Future.join(Seq(closingSessions, closingFactory)).before(ulClosed)
}
def boundAddress: SocketAddress = underlying.boundAddress
override def toString: String = {
val protocol = params[ProtocolLibrary].name
val label = if (serverLabel.isEmpty) "<unlabeled>" else serverLabel
s"ListeningServer($protocol, $label, $boundAddress)"
}
}