final def apply()

in scio-core/src/main/scala/com/spotify/scio/coders/CoderMaterializer.scala [54:116]


    final def apply(o: PipelineOptions): CoderOptions = {
      cache.computeIfAbsent(
        ObjectUtils.identityToString(o),
        { _ =>
          val scioOpts = o.as(classOf[com.spotify.scio.options.ScioOptions])
          val nullableCoder = scioOpts.getNullableCoders

          val (errors, classPathMapping) = Option(scioOpts.getZstdDictionary)
            .map(_.asScala.toList)
            .getOrElse(List.empty)
            .partitionMap {
              case s @ ZstdArgRegex(className, path) =>
                Option
                  .when(ZstdPackageBlacklist.exists(className.startsWith))(
                    s"zstdDictionary command-line arguments may not be used for class $className. " +
                      s"Provide Zstd coders manually instead."
                  )
                  .orElse {
                    Try(Class.forName(className)).failed.toOption
                      .map(_ => s"Class for zstdDictionary argument ${s} not found.")
                  }
                  .toLeft(className.replaceAll("\\$", ".") -> path)
              case s =>
                Left(
                  "zstdDictionary arguments must be in a colon-separated format. " +
                    s"Example: `com.spotify.ClassName:gs://path`. Found: $s"
                )
            }

          if (errors.nonEmpty) {
            throw new IllegalArgumentException(
              errors.mkString("Bad zstdDictionary arguments:\n\t", "\n\t", "\n")
            )
          }

          val zstdDictPaths = classPathMapping
            .groupBy(_._1)
            .map { case (className, values) => className -> values.map(_._2).toSet }

          val dupes = zstdDictPaths
            .collect {
              case (className, values) if values.size > 1 =>
                s"Class $className -> [${values.mkString(", ")}]"
            }
          if (dupes.size > 1) {
            throw new IllegalArgumentException(
              dupes.mkString("Found multiple Zstd dictionaries for:\n\t", "\n\t", "\n")
            )
          }

          val zstdDictMapping = zstdDictPaths.map { case (clazz, dictUriSet) =>
            // dictUriSet always contains exactly 1 item
            val dictUri = dictUriSet.toList.head
            val dictPath = RemoteFileUtil.create(o).download(new URI(dictUri))
            val dictBytes = Files.readAllBytes(dictPath)
            Files.delete(dictPath)
            clazz -> dictBytes
          }

          new CoderOptions(nullableCoder, KryoOptions(o), zstdDictMapping)
        }
      )
    }