def withWindowRightSumming[T: Ordering, K: Ordering, V, JoinedV: Semigroup]()

in scalding-base/src/main/scala/com/twitter/scalding/typed/LookupJoin.scala [106:191]


  def withWindowRightSumming[T: Ordering, K: Ordering, V, JoinedV: Semigroup](
      left: TypedPipe[(T, (K, V))],
      right: TypedPipe[(T, (K, JoinedV))],
      reducers: Option[Int] = None
  )(gate: (T, T) => Boolean): TypedPipe[(T, (K, (V, Option[JoinedV])))] = {

    /**
     * Implicit ordering on an either that doesn't care about the actual container values, puts the lookups
     * before the service writes Since we assume it takes non-zero time to do a lookup.
     */
    implicit def eitherOrd[T, U]: Ordering[Either[T, U]] =
      new Ordering[Either[T, U]] {
        def compare(l: Either[T, U], r: Either[T, U]) =
          (l, r) match {
            case (Left(_), Right(_))  => -1
            case (Right(_), Left(_))  => 1
            case (Left(_), Left(_))   => 0
            case (Right(_), Right(_)) => 0
          }
      }

    val joined: TypedPipe[(K, (Option[(T, JoinedV)], Option[(T, V, Option[JoinedV])]))] =
      left
        .map { case (t, (k, v)) => (k, (t, Left(v): Either[V, JoinedV])) }
        .++(right.map { case (t, (k, joinedV)) =>
          (k, (t, Right(joinedV): Either[V, JoinedV]))
        })
        .group
        .withReducers(reducers.getOrElse(-1)) // -1 means default in scalding
        .sorted
        /**
         * Grouping by K leaves values of (T, Either[V, JoinedV]). Sort by time and scanLeft. The iterator
         * will now represent pairs of T and either new values to join against or updates to the simulated
         * "realtime store" described above.
         */
        .scanLeft(
          /**
           * In the simulated realtime store described above, this None is the value in the store at the
           * current time. Because we sort by time and scan forward, this value will be updated with a new
           * value every time a Right(delta) shows up in the iterator.
           *
           * The second entry in the pair will be None when the JoinedV is updated and Some(newValue) when a
           * (K, V) shows up and a new join occurs.
           */
          (Option.empty[(T, JoinedV)], Option.empty[(T, V, Option[JoinedV])])
        ) {
          case ((None, result), (time, Left(v))) => {
            // The was no value previously
            (None, Some((time, v, None)))
          }

          case ((prev @ Some((oldt, jv)), result), (time, Left(v))) => {
            // Left(v) means that we have a new value from the left
            // pipe that we need to join against the current
            // "lastJoined" value sitting in scanLeft's state. This
            // is equivalent to a lookup on the data in the right
            // pipe at time "thisTime".
            val filteredJoined = if (gate(time, oldt)) Some(jv) else None
            (prev, Some((time, v, filteredJoined)))
          }

          case ((None, result), (time, Right(joined))) => {
            // There was no value before, so we just update to joined
            (Some((time, joined)), None)
          }

          case ((Some((oldt, oldJ)), result), (time, Right(joined))) => {
            // Right(joinedV) means that we've received a new value
            // to use in the simulated realtime service
            // described in the comments above
            // did it fall out of cache?
            val nextJoined = if (gate(time, oldt)) Semigroup.plus(oldJ, joined) else joined
            (Some((time, nextJoined)), None)
          }
        }
        .toTypedPipe

    // Now, get rid of residual state from the scanLeft above:
    joined.flatMap { case (k, (_, optV)) =>
      // filter out every event that produced a Right(delta) above,
      // leaving only the leftJoin events that occurred above:
      optV.map { case (t, v, optJoined) =>
        (t, (k, (v, optJoined)))
      }
    }
  }