in scio-core/src/main/scala/com/spotify/scio/coders/CoderMaterializer.scala [149:212]
final private[scio] def beamImpl[T](
o: CoderOptions,
coder: Coder[T],
refs: TrieMap[Ref[_], RefCoder[_]],
topLevel: Boolean = false
): BCoder[T] = {
val bCoder: BCoder[T] = coder match {
case RawBeam(c) =>
c
case Beam(c) =>
c
case Fallback(_) =>
new KryoAtomicCoder[T](o.kryo)
case CoderTransform(_, coder, from) =>
val underlying = beamImpl(o, coder, refs)
beamImpl(o, from(underlying), refs)
case Transform(typeName, c, t, f) =>
new TransformCoder(typeName, beamImpl(o, c, refs), t, f)
case Singleton(typeName, supply) =>
new SingletonCoder(typeName, supply)
case Record(typeName, coders, construct, destruct) =>
new RecordCoder(
typeName,
coders.map { case (n, c) => n -> beamImpl(o, c, refs) },
construct,
destruct
)
case Disjunction(typeName, idCoder, coders, id) =>
new DisjunctionCoder(
typeName,
beamImpl(o, idCoder, refs),
coders.map { case (k, u) => k -> beamImpl(o, u, refs) },
id
)
case AggregateCoder(c) =>
IterableCoder.of(beamImpl(o, c, refs, topLevel))
case KVCoder(koder, voder) =>
// propagate topLevel to k & v coders
val kbc = beamImpl(o, koder, refs, topLevel)
val vbc = beamImpl(o, voder, refs, topLevel)
KvCoder.of(kbc, vbc)
case r @ Ref(t, c) =>
refs.get(r) match {
case Some(rc) =>
new LazyCoder(t, rc.bcoder.asInstanceOf[BCoder[T]])
case None =>
val rc = new RefCoder[T]()
refs += r -> rc
rc.bcoder = beamImpl(o, c, refs)
rc
}
}
bCoder
.pipe(bc => if (isNullableCoder(o, coder)) NullableCoder.of(bc) else bc)
.pipe { bc =>
Option(coder)
.collect { case x: TypeName => x.typeName }
.flatMap(o.zstdDictMapping.get)
.map(ZstdCoder.of(bc, _))
.getOrElse(bc)
}
.pipe(bc => if (isWrappableCoder(topLevel, coder)) new MaterializedCoder(bc) else bc)
}