in scalding-base/src/main/scala/com/twitter/scalding/typed/LookupJoin.scala [106:191]
def withWindowRightSumming[T: Ordering, K: Ordering, V, JoinedV: Semigroup](
left: TypedPipe[(T, (K, V))],
right: TypedPipe[(T, (K, JoinedV))],
reducers: Option[Int] = None
)(gate: (T, T) => Boolean): TypedPipe[(T, (K, (V, Option[JoinedV])))] = {
/**
* Implicit ordering on an either that doesn't care about the actual container values, puts the lookups
* before the service writes Since we assume it takes non-zero time to do a lookup.
*/
implicit def eitherOrd[T, U]: Ordering[Either[T, U]] =
new Ordering[Either[T, U]] {
def compare(l: Either[T, U], r: Either[T, U]) =
(l, r) match {
case (Left(_), Right(_)) => -1
case (Right(_), Left(_)) => 1
case (Left(_), Left(_)) => 0
case (Right(_), Right(_)) => 0
}
}
val joined: TypedPipe[(K, (Option[(T, JoinedV)], Option[(T, V, Option[JoinedV])]))] =
left
.map { case (t, (k, v)) => (k, (t, Left(v): Either[V, JoinedV])) }
.++(right.map { case (t, (k, joinedV)) =>
(k, (t, Right(joinedV): Either[V, JoinedV]))
})
.group
.withReducers(reducers.getOrElse(-1)) // -1 means default in scalding
.sorted
/**
* Grouping by K leaves values of (T, Either[V, JoinedV]). Sort by time and scanLeft. The iterator
* will now represent pairs of T and either new values to join against or updates to the simulated
* "realtime store" described above.
*/
.scanLeft(
/**
* In the simulated realtime store described above, this None is the value in the store at the
* current time. Because we sort by time and scan forward, this value will be updated with a new
* value every time a Right(delta) shows up in the iterator.
*
* The second entry in the pair will be None when the JoinedV is updated and Some(newValue) when a
* (K, V) shows up and a new join occurs.
*/
(Option.empty[(T, JoinedV)], Option.empty[(T, V, Option[JoinedV])])
) {
case ((None, result), (time, Left(v))) => {
// The was no value previously
(None, Some((time, v, None)))
}
case ((prev @ Some((oldt, jv)), result), (time, Left(v))) => {
// Left(v) means that we have a new value from the left
// pipe that we need to join against the current
// "lastJoined" value sitting in scanLeft's state. This
// is equivalent to a lookup on the data in the right
// pipe at time "thisTime".
val filteredJoined = if (gate(time, oldt)) Some(jv) else None
(prev, Some((time, v, filteredJoined)))
}
case ((None, result), (time, Right(joined))) => {
// There was no value before, so we just update to joined
(Some((time, joined)), None)
}
case ((Some((oldt, oldJ)), result), (time, Right(joined))) => {
// Right(joinedV) means that we've received a new value
// to use in the simulated realtime service
// described in the comments above
// did it fall out of cache?
val nextJoined = if (gate(time, oldt)) Semigroup.plus(oldJ, joined) else joined
(Some((time, nextJoined)), None)
}
}
.toTypedPipe
// Now, get rid of residual state from the scanLeft above:
joined.flatMap { case (k, (_, optV)) =>
// filter out every event that produced a Right(delta) above,
// leaving only the leftJoin events that occurred above:
optV.map { case (t, v, optJoined) =>
(t, (k, (v, optJoined)))
}
}
}