in scio-core/src/main/scala/com/spotify/scio/util/Functions.scala [149:234]
def combineFn[T: Coder, C: Coder](
sc: ScioContext,
createCombiner: T => C,
mergeValue: (C, T) => C,
mergeCombiners: (C, C) => C
): BCombineFn[T, (Option[C], JList[T]), C] =
new CombineFn[T, (Option[C], JList[T]), C] {
override val vacoder = Coder[(Option[C], JList[T])]
override val vocoder = Coder[C]
override val context: CombineContext = CombineContext(sc)
// defeat closure
private[this] val cc = ClosureCleaner.clean(createCombiner)
private[this] val mv = ClosureCleaner.clean(mergeValue)
private[this] val mc = ClosureCleaner.clean(mergeCombiners)
private def foldOption(accumulator: (Option[C], JList[T])): Option[C] = {
val (opt, l) = accumulator
if (opt.isDefined) {
Some(Fns.fold(opt.get, l)(mv))
} else {
if (opt.isEmpty && l.isEmpty) {
None
} else {
var c = cc(l.get(0))
var i = 1
while (i < l.size) {
c = mv(c, l.get(i))
i += 1
}
Some(c)
}
}
}
override def createAccumulator(): (Option[C], JList[T]) =
(None, new JArrayList[T]())
override def addInput(accumulator: (Option[C], JList[T]), input: T): (Option[C], JList[T]) = {
val (_, l) = accumulator
l.add(input)
if (l.size() >= BufferSize) {
(foldOption(accumulator), new JArrayList[T]())
} else {
accumulator
}
}
override def extractOutput(accumulator: (Option[C], JList[T])): C = {
val out = foldOption(accumulator)
assert(
out.isDefined,
"Empty output in combine*/sum* transform. " +
"Use aggregate* or fold* instead to fallback to a default value."
)
out.get
}
override def mergeAccumulators(
accumulators: JIterable[(Option[C], JList[T])]
): (Option[C], JList[T]) = {
val iter = accumulators.iterator()
val empty = new JArrayList[T]()
if (!iter.hasNext) {
(None, empty)
} else {
Fns.reduce(accumulators) { (a, b) =>
val aa = foldOption(a)
val bb = foldOption(b)
val result = if (aa.isDefined && bb.isDefined) {
Some(mc(aa.get, bb.get))
} else {
if (aa.isDefined || bb.isDefined) {
aa.orElse(bb)
} else {
None
}
}
(result, empty)
}
}
}
}