private[finagle] def get()

in finagle-thrift/src/main/scala/com/twitter/finagle/thrift/exp/partitioning/PartitioningStrategy.scala [150:503]


    private[finagle] def get(methodName: String): Option[ResponseMerger[Any]] =
      repMergers.get(methodName)
  }
}

/**
 * Service partitioning strategy to apply on the clients in order to let clients route
 * requests accordingly. Two particular partitioning strategies are going to be supported,
 * [[HashingPartitioningStrategy]] and [[CustomPartitioningStrategy]], each one supports
 * both configuring Finagle Client Stack and ThriftMux MethodBuilder.
 * Either one will need developers to provide a concrete function to give each request an
 * indicator of destination, for example a hashing key or a partition address.
 * Messaging fan-out is supported by leveraging RequestMerger and ResponseMerger.
 */
sealed trait PartitioningStrategy

sealed trait HashingPartitioningStrategy extends PartitioningStrategy

sealed trait CustomPartitioningStrategy extends PartitioningStrategy {
  private[finagle] def newNodeManager[Req, Rep](
    underlying: Stack[ServiceFactory[Req, Rep]],
    params: Stack.Params
  ): PartitionNodeManager[Req, Rep, _, ClientCustomStrategy.ToPartitionedMap]

  /**
   * A ResponseMergerRegistry implemented by client to supply [[ResponseMerger]]s
   * for message fan-out cases.
   *
   * @see [[ResponseMerger]]
   */
  val responseMergerRegistry: ResponseMergerRegistry = new ResponseMergerRegistry()
}

private[finagle] object Disabled extends PartitioningStrategy

object ClientHashingStrategy {
  // input: original thrift request
  // output: a Map of hashing keys and split requests
  type ToPartitionedMap = PartialFunction[ThriftStructIface, Map[Any, ThriftStructIface]]

  /**
   * The Java-friendly way to create a [[ClientHashingStrategy]].
   * Scala users should construct a [[ClientHashStrategy]] directly.
   *
   * @note [[com.twitter.util.Function]] may be useful in helping create a [[scala.PartialFunction]].
   */
  def create(
    toPartitionedMap: PartialFunction[
      ThriftStructIface,
      JMap[Any, ThriftStructIface]
    ]
  ): ClientHashingStrategy = new ClientHashingStrategy(toPartitionedMap.andThen(_.asScala.toMap))

  /**
   * Thrift requests not specifying hashing keys will fall in here. This allows a
   * Thrift/ThriftMux partition aware client to serve a part of endpoints of a service.
   * Un-specified endpoints should not be called from this client, otherwise, throw
   * [[com.twitter.finagle.partitioning.ConsistentHashPartitioningService.NoPartitioningKeys]].
   */
  private[finagle] val defaultHashingKeyAndRequest: ThriftStructIface => Map[
    Any,
    ThriftStructIface
  ] = args => Map(None -> args)
}

/**
 * An API to set a consistent hashing partitioning strategy for a Thrift/ThriftMux Client.
 * For a Java-friendly way to do the same thing, see `ClientHashingStrategy.create`
 *
 * @param getHashingKeyAndRequest A PartialFunction implemented by client that
 *        provides the partitioning logic on a request. It takes a Thrift object
 *        request, and returns a Map of hashing keys to sub-requests. If we
 *        don't need to fan-out, it should return one element: hashing key to
 *        the original request.  This PartialFunction can take multiple Thrift
 *        request types of one Thrift service (different method endpoints of one
 *        service).
 */
final class ClientHashingStrategy(
  val getHashingKeyAndRequest: ClientHashingStrategy.ToPartitionedMap)
    extends HashingPartitioningStrategy {

  /**
   * A RequestMergerRegistry implemented by client to supply [[RequestMerger]]s
   * for message fan-out cases.
   * @see [[RequestMerger]]
   */
  val requestMergerRegistry: RequestMergerRegistry = new RequestMergerRegistry()

  /**
   * A ResponseMergerRegistry implemented by client to supply [[ResponseMerger]]s
   * for message fan-out cases.
   * @see [[ResponseMerger]]
   */
  val responseMergerRegistry: ResponseMergerRegistry = new ResponseMergerRegistry()
}

object MethodBuilderHashingStrategy {
  // input: original thrift request
  // output: a Map of hashing keys and split requests
  type ToPartitionedMap[Req] = Req => Map[Any, Req]
}

/**
 * An API to set a hashing partitioning strategy for a client MethodBuilder.
 * For a Java-friendly way to do the same thing, see `MethodBuilderHashingStrategy.create`
 *
 * @param getHashingKeyAndRequest A function for the partitioning logic. MethodBuilder is
 *                                customized per-method so that this method only takes one
 *                                Thrift request type.
 * @param requestMerger           Supplies a [[RequestMerger]] for messaging fan-out.
 *                                Non-fan-out case the default is [[None]].
 * @param responseMerger          Supplies a [[ResponseMerger]] for messaging fan-out.
 *                                Non-fan-out case the default is [[None]].
 */
final class MethodBuilderHashingStrategy[Req <: ThriftStructIface, Rep](
  val getHashingKeyAndRequest: MethodBuilderHashingStrategy.ToPartitionedMap[Req],
  val requestMerger: Option[RequestMerger[Req]],
  val responseMerger: Option[ResponseMerger[Rep]])
    extends HashingPartitioningStrategy {

  def this(getHashingKeyAndRequest: MethodBuilderHashingStrategy.ToPartitionedMap[Req]) =
    this(getHashingKeyAndRequest, None, None)
}

object ClientCustomStrategy {
  // input: original thrift request
  // output: Future Map of partition ids and split requests
  type ToPartitionedMap = PartialFunction[ThriftStructIface, Future[Map[Int, ThriftStructIface]]]

  /**
   * Constructs a [[ClientCustomStrategy]] that does not reshard.
   *
   * This is appropriate for static partitioning backend topologies.
   *
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
   *
   * @param getPartitionIdAndRequest A PartialFunction implemented by client that
   *        provides the partitioning logic on a request. It takes a Thrift object
   *        request, and returns Future Map of partition ids to sub-requests. If
   *        we don't need to fan-out, it should return one element: partition id
   *        to the original request.  This PartialFunction can take multiple
   *        Thrift request types of one Thrift service (different method endpoints
   *        of one service).  In this context, the returned partition id is also
   *        the shard id.  Each instance is its own partition.
   */
  def noResharding(
    getPartitionIdAndRequest: ClientCustomStrategy.ToPartitionedMap
  ): CustomPartitioningStrategy =
    noResharding(getPartitionIdAndRequest, { a: Int => Seq(a) })

  /**
   * Constructs a [[ClientCustomStrategy]] that does not reshard.
   *
   * This is appropriate for static partitioning backend topologies.
   *
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
   *
   * @param getPartitionIdAndRequest A PartialFunction implemented by client that
   *        provides the partitioning logic on a request. It takes a Thrift object
   *        request, and returns Future Map of partition ids to sub-requests. If
   *        we don't need to fan-out, it should return one element: partition id
   *        to the original request.  This PartialFunction can take multiple
   *        Thrift request types of one Thrift service (different method endpoints
   *        of one service).
   * @param getLogicalPartitionId Gets the logical partition identifiers from a host
   *        identifier, host identifiers are derived from [[ZkMetadata]]
   *        shardId. Indicates which logical partitions a physical host belongs to,
   *        multiple hosts can belong to the same partition, and one host can belong
   *        to multiple partitions, for example:
   *        {{{
   *          {
   *            case a if Range(0, 10).contains(a) => Seq(0, 1)
   *            case b if Range(10, 20).contains(b) => Seq(1)
   *            case c if Range(20, 30).contains(c) => Seq(2)
   *            case _ => throw ...
   *          }
   *        }}}
   */
  def noResharding(
    getPartitionIdAndRequest: ClientCustomStrategy.ToPartitionedMap,
    getLogicalPartitionId: Int => Seq[Int]
  ): CustomPartitioningStrategy =
    new ClientCustomStrategy[Unit](
      _ => getPartitionIdAndRequest,
      _ => getLogicalPartitionId,
      Activity.value(()))

  /**
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
   *
   * This is appropriate for simple custom strategies where you only need to
   * know information about the remote cluster in order to reshard. For example,
   * if you want to be able to add or remove capacity safely.
   *
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
   *
   * @param getPartitionIdAndRequestFn A function that given the current state of the
   *        remote cluster, returns a function that gets the logical partition
   *        identifier from a host identifier, host identifiers are derived from
   *        [[ZkMetadata]] shardId. Indicates which logical partition a physical
   *        host belongs to, multiple hosts can belong to the same partition, and one host
   *        can belong to multiple partitions, for example:
   *        {{{
   *          {
   *            case a if Range(0, 10).contains(a) => Seq(0, 1)
   *            case b if Range(10, 20).contains(b) => Seq(1)
   *            case c if Range(20, 30).contains(c) => Seq(2)
   *            case _ => throw ...
   *          }
   *        }}}
   *        Note that this function must be pure (ie referentially transparent).
   *        It cannot change based on anything other than the state of the
   *        remote cluster it is provided with, or else it will malfunction.
   */
  def clusterResharding(
    getPartitionIdAndRequestFn: Set[Address] => ClientCustomStrategy.ToPartitionedMap
  ): CustomPartitioningStrategy =
    clusterResharding(getPartitionIdAndRequestFn, _ => { a: Int => Seq(a) })

  /**
   * Constructs a [[ClientCustomStrategy]] that reshards based on the remote cluster state.
   *
   * This is appropriate for simple custom strategies where you only need to
   * know information about the remote cluster in order to reshard. For example,
   * if you want to be able to add or remove capacity safely.
   *
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
   *
   * @param getPartitionIdAndRequestFn A function that given the current state of
   *        the remote cluster, returns a PartialFunction implemented by client
   *        that provides the partitioning logic on a request. It takes a Thrift
   *        object request, and returns Future Map of partition ids to
   *        sub-requests. If we don't need to fan-out, it should return one
   *        element: partition id to the original request.  This PartialFunction
   *        can take multiple Thrift request types of one Thrift service
   *        (different method endpoints of one service).  Note that this function
   *        must be pure (ie referentially transparent).  It cannot change
   *        based on anything other than the state of the remote cluster it is
   *        provided with, or else it will malfunction.
   * @param getLogicalPartitionIdFn A function that given the current state of the
   *        remote cluster, returns a function that gets the logical partition
   *        identifiers from a host identifier, host identifiers are derived from
   *        [[ZkMetadata]] shardId. Indicates which logical partitions a physical
   *        host belongs to, multiple hosts can belong to the same partition,
   *        and one host can belong to multiple partitions, for example:
   *        {{{
   *          {
   *            case a if Range(0, 10).contains(a) => Seq(0, 1)
   *            case b if Range(10, 20).contains(b) => Seq(1)
   *            case c if Range(20, 30).contains(c) => Seq(2)
   *            case _ => throw ...
   *          }
   *        }}}
   *        Note that this function must be pure (ie referentially transparent).
   *        It cannot change based on anything other than the state of the
   *        remote cluster it is provided with, or else it will malfunction.
   */
  def clusterResharding(
    getPartitionIdAndRequestFn: Set[Address] => ClientCustomStrategy.ToPartitionedMap,
    getLogicalPartitionIdFn: Set[Address] => Int => Seq[Int]
  ): CustomPartitioningStrategy =
    new ClientClusterStrategy(getPartitionIdAndRequestFn, getLogicalPartitionIdFn)

  /**
   * Constructs a [[ClientCustomStrategy]] that reshards based on the user provided state.
   *
   * This lets the client be aware of the backend dynamic resharding by providing the
   * fully described state of resharding. The partitioning schema needs to be configured
   * to react to each state, and it needs to be a pure function (see param below).
   * When the state got successfully updated, the partitioning strategy will move
   * to the new schema. See [[clusterResharding]] if only the backend cluster information
   * needs to be observed in order to reshard.
   *
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
   *
   * @param getPartitionIdAndRequestFn A function that given the current state of
   *        `observable`, returns a PartialFunction implemented by client that
   *        provides the partitioning logic on a request. It takes a Thrift
   *        object request, and returns Future Map of partition ids to
   *        sub-requests. If we don't need to fan-out, it should return one
   *        element: partition id to the original request.  This PartialFunction
   *        can take multiple Thrift request types of one Thrift service
   *        (different method endpoints of one service).  Note that this
   *        function must be pure (ie referentially transparent).  It cannot
   *        change based on anything other than the state of `observable`, or
   *        else it will malfunction.
   * @param observable The state that is used for deciding how to reshard the
   *        cluster.
   */
  def resharding[A](
    getPartitionIdAndRequestFn: A => ClientCustomStrategy.ToPartitionedMap,
    observable: Activity[A]
  ): CustomPartitioningStrategy =
    resharding[A](getPartitionIdAndRequestFn, (_: A) => { a: Int => Seq(a) }, observable)

  /**
   * Constructs a [[ClientCustomStrategy]] that reshards based on the user provided state.
   *
   * This lets the client be aware of the backend dynamic resharding by providing the
   * fully described state of resharding. The partitioning schema needs to be configured
   * to react to each state, and it needs to be a pure function (see param below).
   * When the state got successfully updated, the partitioning strategy will move
   * to the new schema. See [[clusterResharding]] if only the backend cluster information
   * needs to be observed in order to reshard.
   *
   * Java users should see [[ClientCustomStrategies$]] for an easier to use API.
   *
   * @param getPartitionIdAndRequestFn A function that given the current state of
   *        `observable`, returns a PartialFunction implemented by client that
   *        provides the partitioning logic on a request. It takes a Thrift
   *        object request, and returns Future Map of partition ids to
   *        sub-requests. If we don't need to fan-out, it should return one
   *        element: partition id to the original request.  This PartialFunction
   *        can take multiple Thrift request types of one Thrift service
   *        (different method endpoints of one service).  Note that this
   *        function must be pure (ie referentially transparent).  It cannot
   *        change based on anything other than the state of `observable`, or
   *        else it will malfunction.
   * @param getLogicalPartitionIdFn A function that given the current state
   *        `observable`, returns a function that gets the logical partition
   *        identifiers from a host identifier, host identifiers are derived from
   *        [[ZkMetadata]] shardId. Indicates which logical partitions a physical
   *        host belongs to, multiple hosts can belong to the same partition,
   *        and one host can belong to multiple partitions, for example:
   *        {{{
   *          {
   *            case a if Range(0, 10).contains(a) => Seq(0, 1)
   *            case b if Range(10, 20).contains(b) => Seq(1)
   *            case c if Range(20, 30).contains(c) => Seq(2)
   *            case _ => throw ...
   *          }
   *        }}}
   *        Note that this function must be pure (ie referentially transparent).
   *        It cannot change based on anything other than the state of
   *        `observable`, or else it will malfunction.
   * @param observable The state that is used for deciding how to reshard the
   *        cluster.
   */
  def resharding[A](
    getPartitionIdAndRequestFn: A => ClientCustomStrategy.ToPartitionedMap,
    getLogicalPartitionIdFn: A => Int => Seq[Int],
    observable: Activity[A]
  ): CustomPartitioningStrategy =
    new ClientCustomStrategy(getPartitionIdAndRequestFn, getLogicalPartitionIdFn, observable)

  /**
   * Thrift requests not specifying partition ids will fall in here. This allows a
   * Thrift/ThriftMux partition aware client to serve a part of endpoints of a service.
   * Un-specified endpoints should not be called from this client, otherwise, throw
   * [[com.twitter.finagle.thrift.exp.partitioning.ThriftPartitioningService.PartitioningStrategyException]].
   */
  private[finagle] val defaultPartitionIdAndRequest: ThriftStructIface => Future[
    Map[Int, ThriftStructIface]
  ] = { _ =>