def retrying()

in finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/ZkSession.scala [350:415]


  def retrying(
    backoff: RetryStream,
    newZkSession: () => ZkSession
  )(
    implicit timer: Timer
  ): Var[ZkSession] = {
    val v = Var(ZkSession.nil)

    @volatile var closing = false
    @volatile var zkSession: ZkSession = ZkSession.nil

    def reconnect(): Unit = {
      if (closing) return

      logger.info(s"Closing zk session ${zkSession.sessionIdAsHex}")
      zkSession.close()
      zkSession = newZkSession()
      logger.info(s"Starting new zk session ${zkSession.sessionId}")

      // Upon initial connection, send auth info, then update `u`.
      zkSession.state.changes
        .filter {
          _ == WatchState.SessionState(SessionState.SyncConnected)
        }
        .toFuture()
        .unit.before {
          zkSession.addAuthInfo("digest", Buf.Utf8(authInfo))
        }
        .respond {
          case Return(_) =>
            logger.info(s"New ZKSession is connected. Session ID: ${zkSession.sessionIdAsHex}")
            v() = zkSession
            backoff.reset()
          case _ =>
        }

      // Kick off a delayed reconnection if the new session failed to initialize _or_ the current session expired.
      zkSession.state.changes
        .filter(needsReconnect)
        .toFuture()
        .flatMap { state =>
          val jitter = backoff.next()
          state match {
            case WatchState.FailedToInitialize(exc) =>
              logger.error(
                s"Zookeeper session failed to initialize with exception: $exc. Retrying in $jitter"
              )
            case WatchState.SessionState(SessionState.Expired) =>
              logger.error(
                s"Zookeeper session ${zkSession.sessionIdAsHex} has expired. Reconnecting in $jitter"
              )
            case _ => throw new MatchError
          }
          Future.sleep(jitter)
        }
        .ensure(reconnect())
    }

    reconnect()

    Closable.make { deadline =>
      closing = true
      zkSession.close()
    }
    v
  }