in scio-core/src/main/scala/com/spotify/scio/coders/CoderMaterializer.scala [54:116]
final def apply(o: PipelineOptions): CoderOptions = {
cache.computeIfAbsent(
ObjectUtils.identityToString(o),
{ _ =>
val scioOpts = o.as(classOf[com.spotify.scio.options.ScioOptions])
val nullableCoder = scioOpts.getNullableCoders
val (errors, classPathMapping) = Option(scioOpts.getZstdDictionary)
.map(_.asScala.toList)
.getOrElse(List.empty)
.partitionMap {
case s @ ZstdArgRegex(className, path) =>
Option
.when(ZstdPackageBlacklist.exists(className.startsWith))(
s"zstdDictionary command-line arguments may not be used for class $className. " +
s"Provide Zstd coders manually instead."
)
.orElse {
Try(Class.forName(className)).failed.toOption
.map(_ => s"Class for zstdDictionary argument ${s} not found.")
}
.toLeft(className.replaceAll("\\$", ".") -> path)
case s =>
Left(
"zstdDictionary arguments must be in a colon-separated format. " +
s"Example: `com.spotify.ClassName:gs://path`. Found: $s"
)
}
if (errors.nonEmpty) {
throw new IllegalArgumentException(
errors.mkString("Bad zstdDictionary arguments:\n\t", "\n\t", "\n")
)
}
val zstdDictPaths = classPathMapping
.groupBy(_._1)
.map { case (className, values) => className -> values.map(_._2).toSet }
val dupes = zstdDictPaths
.collect {
case (className, values) if values.size > 1 =>
s"Class $className -> [${values.mkString(", ")}]"
}
if (dupes.size > 1) {
throw new IllegalArgumentException(
dupes.mkString("Found multiple Zstd dictionaries for:\n\t", "\n\t", "\n")
)
}
val zstdDictMapping = zstdDictPaths.map { case (clazz, dictUriSet) =>
// dictUriSet always contains exactly 1 item
val dictUri = dictUriSet.toList.head
val dictPath = RemoteFileUtil.create(o).download(new URI(dictUri))
val dictBytes = Files.readAllBytes(dictPath)
Files.delete(dictPath)
clazz -> dictBytes
}
new CoderOptions(nullableCoder, KryoOptions(o), zstdDictMapping)
}
)
}