in finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/ZkSession.scala [350:415]
def retrying(
backoff: RetryStream,
newZkSession: () => ZkSession
)(
implicit timer: Timer
): Var[ZkSession] = {
val v = Var(ZkSession.nil)
@volatile var closing = false
@volatile var zkSession: ZkSession = ZkSession.nil
def reconnect(): Unit = {
if (closing) return
logger.info(s"Closing zk session ${zkSession.sessionIdAsHex}")
zkSession.close()
zkSession = newZkSession()
logger.info(s"Starting new zk session ${zkSession.sessionId}")
// Upon initial connection, send auth info, then update `u`.
zkSession.state.changes
.filter {
_ == WatchState.SessionState(SessionState.SyncConnected)
}
.toFuture()
.unit.before {
zkSession.addAuthInfo("digest", Buf.Utf8(authInfo))
}
.respond {
case Return(_) =>
logger.info(s"New ZKSession is connected. Session ID: ${zkSession.sessionIdAsHex}")
v() = zkSession
backoff.reset()
case _ =>
}
// Kick off a delayed reconnection if the new session failed to initialize _or_ the current session expired.
zkSession.state.changes
.filter(needsReconnect)
.toFuture()
.flatMap { state =>
val jitter = backoff.next()
state match {
case WatchState.FailedToInitialize(exc) =>
logger.error(
s"Zookeeper session failed to initialize with exception: $exc. Retrying in $jitter"
)
case WatchState.SessionState(SessionState.Expired) =>
logger.error(
s"Zookeeper session ${zkSession.sessionIdAsHex} has expired. Reconnecting in $jitter"
)
case _ => throw new MatchError
}
Future.sleep(jitter)
}
.ensure(reconnect())
}
reconnect()
Closable.make { deadline =>
closing = true
zkSession.close()
}
v
}