final private[scio] def beamImpl[T]()

in scio-core/src/main/scala/com/spotify/scio/coders/CoderMaterializer.scala [149:212]


  final private[scio] def beamImpl[T](
    o: CoderOptions,
    coder: Coder[T],
    refs: TrieMap[Ref[_], RefCoder[_]],
    topLevel: Boolean = false
  ): BCoder[T] = {
    val bCoder: BCoder[T] = coder match {
      case RawBeam(c) =>
        c
      case Beam(c) =>
        c
      case Fallback(_) =>
        new KryoAtomicCoder[T](o.kryo)
      case CoderTransform(_, coder, from) =>
        val underlying = beamImpl(o, coder, refs)
        beamImpl(o, from(underlying), refs)
      case Transform(typeName, c, t, f) =>
        new TransformCoder(typeName, beamImpl(o, c, refs), t, f)
      case Singleton(typeName, supply) =>
        new SingletonCoder(typeName, supply)
      case Record(typeName, coders, construct, destruct) =>
        new RecordCoder(
          typeName,
          coders.map { case (n, c) => n -> beamImpl(o, c, refs) },
          construct,
          destruct
        )
      case Disjunction(typeName, idCoder, coders, id) =>
        new DisjunctionCoder(
          typeName,
          beamImpl(o, idCoder, refs),
          coders.map { case (k, u) => k -> beamImpl(o, u, refs) },
          id
        )
      case AggregateCoder(c) =>
        IterableCoder.of(beamImpl(o, c, refs, topLevel))
      case KVCoder(koder, voder) =>
        // propagate topLevel to k & v coders
        val kbc = beamImpl(o, koder, refs, topLevel)
        val vbc = beamImpl(o, voder, refs, topLevel)
        KvCoder.of(kbc, vbc)
      case r @ Ref(t, c) =>
        refs.get(r) match {
          case Some(rc) =>
            new LazyCoder(t, rc.bcoder.asInstanceOf[BCoder[T]])
          case None =>
            val rc = new RefCoder[T]()
            refs += r -> rc
            rc.bcoder = beamImpl(o, c, refs)
            rc
        }
    }

    bCoder
      .pipe(bc => if (isNullableCoder(o, coder)) NullableCoder.of(bc) else bc)
      .pipe { bc =>
        Option(coder)
          .collect { case x: TypeName => x.typeName }
          .flatMap(o.zstdDictMapping.get)
          .map(ZstdCoder.of(bc, _))
          .getOrElse(bc)
      }
      .pipe(bc => if (isWrappableCoder(topLevel, coder)) new MaterializedCoder(bc) else bc)
  }