def combineFn[T: Coder, C: Coder]()

in scio-core/src/main/scala/com/spotify/scio/util/Functions.scala [149:234]


  def combineFn[T: Coder, C: Coder](
    sc: ScioContext,
    createCombiner: T => C,
    mergeValue: (C, T) => C,
    mergeCombiners: (C, C) => C
  ): BCombineFn[T, (Option[C], JList[T]), C] =
    new CombineFn[T, (Option[C], JList[T]), C] {
      override val vacoder = Coder[(Option[C], JList[T])]
      override val vocoder = Coder[C]
      override val context: CombineContext = CombineContext(sc)

      // defeat closure
      private[this] val cc = ClosureCleaner.clean(createCombiner)
      private[this] val mv = ClosureCleaner.clean(mergeValue)
      private[this] val mc = ClosureCleaner.clean(mergeCombiners)

      private def foldOption(accumulator: (Option[C], JList[T])): Option[C] = {
        val (opt, l) = accumulator
        if (opt.isDefined) {
          Some(Fns.fold(opt.get, l)(mv))
        } else {
          if (opt.isEmpty && l.isEmpty) {
            None
          } else {
            var c = cc(l.get(0))
            var i = 1
            while (i < l.size) {
              c = mv(c, l.get(i))
              i += 1
            }
            Some(c)
          }
        }
      }

      override def createAccumulator(): (Option[C], JList[T]) =
        (None, new JArrayList[T]())

      override def addInput(accumulator: (Option[C], JList[T]), input: T): (Option[C], JList[T]) = {
        val (_, l) = accumulator
        l.add(input)
        if (l.size() >= BufferSize) {
          (foldOption(accumulator), new JArrayList[T]())
        } else {
          accumulator
        }
      }

      override def extractOutput(accumulator: (Option[C], JList[T])): C = {
        val out = foldOption(accumulator)
        assert(
          out.isDefined,
          "Empty output in combine*/sum* transform. " +
            "Use aggregate* or fold* instead to fallback to a default value."
        )
        out.get
      }

      override def mergeAccumulators(
        accumulators: JIterable[(Option[C], JList[T])]
      ): (Option[C], JList[T]) = {
        val iter = accumulators.iterator()
        val empty = new JArrayList[T]()

        if (!iter.hasNext) {
          (None, empty)
        } else {
          Fns.reduce(accumulators) { (a, b) =>
            val aa = foldOption(a)
            val bb = foldOption(b)

            val result = if (aa.isDefined && bb.isDefined) {
              Some(mc(aa.get, bb.get))
            } else {
              if (aa.isDefined || bb.isDefined) {
                aa.orElse(bb)
              } else {
                None
              }
            }

            (result, empty)
          }
        }
      }
    }