in scio-extra/src/main/scala/com/spotify/scio/extra/sparkey/SparkeyIO.scala [72:193]
private[sparkey] def writeSparkey[K, V](
baseUri: SparkeyUri,
writable: SparkeyWritable[K, V],
data: SCollection[(K, V)],
maxMemoryUsage: Long,
numShards: Short,
compressionType: CompressionType,
compressionBlockSize: Int
): SCollection[SparkeyUri] = {
require(
!baseUri.isSharded,
s"path to which sparkey will be saved must not include a `*` wildcard."
)
require(numShards > 0, s"numShards must be greater than 0, found $numShards")
if (compressionType != CompressionType.NONE) {
require(
compressionBlockSize > 0,
s"Compression block size must be > 0 for $compressionType"
)
}
val sc = data.context
val isUnsharded = numShards == 1
val rfu = RemoteFileUtil.create(sc.options)
val tempLocation = sc.options.getTempLocation
// verify that we're not writing to a previously-used output dir
List(baseUri, SparkeyUri(s"${baseUri.path}/*")).foreach { uri =>
require(!uri.exists(rfu), s"Sparkey URI ${uri.path} already exists")
}
// root destination to which all _interim_ results are written,
// deleted upon successful completion of the write
val tempPath = s"$tempLocation/sparkey-temp-${UUID.randomUUID}"
val outputUri = if (isUnsharded) baseUri else SparkeyUri(s"${baseUri.path}/*")
logger.info(s"Saving as Sparkey with $numShards shards: ${baseUri.path}")
def resourcesForPattern(pattern: String): mutable.Buffer[ResourceId] =
FileSystems
.`match`(pattern, EmptyMatchTreatment.ALLOW)
.metadata()
.asScala
.map(_.resourceId())
data.transform { collection =>
// shard by key hash
val shards = collection
.groupBy { case (k, _) => floorMod(writable.shardHash(k), numShards.toInt).toShort }
// all shards
val allShards = sc
.parallelize(0 until numShards.toInt)
.map(_.toShort -> ())
// write files to temporary locations
val tempShardUris = shards
.hashFullOuterJoin(allShards)
.map { case (shard, (xs, _)) =>
// use a temp uri so that if a bundle fails retries will not fail
val tempUri = SparkeyUri(s"$tempPath/${UUID.randomUUID}")
// perform the write to the temp uri
shard -> writeToSparkey(
tempUri.sparkeyUriForShard(shard, numShards),
rfu,
maxMemoryUsage,
compressionType,
compressionBlockSize,
xs.getOrElse(Iterable.empty),
writable
)
}
// TODO WriteFiles inserts a reshuffle here for unclear reasons
tempShardUris.reifyAsListInGlobalWindow
.map { seq =>
val items = seq.toList
// accumulate source files and destination files
val (srcPaths, dstPaths) = items
.foldLeft((List.empty[ResourceId], List.empty[ResourceId])) {
case ((srcs, dsts), (shard, uri)) =>
if (isUnsharded && shard != 0)
throw new IllegalArgumentException(s"numShards=1 but got shard=$shard")
// assumes paths always returns things in the same order 🙃
val dstUri =
if (isUnsharded) baseUri else baseUri.sparkeyUriForShard(shard, numShards)
val srcResources = srcs ++ uri.paths
val dstResources = dsts ++ dstUri.paths
(srcResources, dstResources)
}
// rename source files to dest files
logger.info(s"Copying ${items.size} files from temp to final GCS destination.")
// per FileBasedSink.java#783 ignore errors as files may have previously been deleted
FileSystems.rename(
srcPaths.asJava,
dstPaths.asJava,
StandardMoveOptions.IGNORE_MISSING_FILES,
StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS
)
// cleanup orphan files per FileBasedSink.removeTemporaryFiles
val orphanTempFiles = resourcesForPattern(s"${tempPath}/*")
orphanTempFiles.foreach { r =>
logger.warn("Will also remove unknown temporary file {}.", r)
}
FileSystems.delete(orphanTempFiles.asJava, StandardMoveOptions.IGNORE_MISSING_FILES)
// clean up temp dir, can fail, but failure is to be ignored per FileBasedSink
val tempPathResource = resourcesForPattern(tempPath)
try {
FileSystems.delete(tempPathResource.asJava, StandardMoveOptions.IGNORE_MISSING_FILES)
} catch {
case _: Exception =>
logger.warn("Failed to remove temporary directory: [{}].", tempPath)
}
outputUri
}
}
}