in scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/ExtractOps.scala [64:150]
def asCsv(
sourceTable: String,
destinationUris: List[String],
compression: CsvCompression.Compression = CsvCompression.NoCompression(),
fieldDelimiter: Option[String] = None,
printHeader: Option[Boolean] = None
): Unit =
exportTable(
sourceTable = sourceTable,
destinationUris = destinationUris,
format = "CSV",
compression = compression.name,
fieldDelimiter = fieldDelimiter,
printHeader = printHeader
)
/** Export a table as Json */
def asJson(
sourceTable: String,
destinationUris: List[String],
compression: JsonCompression.Compression = JsonCompression.NoCompression()
): Unit =
exportTable(
sourceTable = sourceTable,
destinationUris = destinationUris,
format = "NEWLINE_DELIMITED_JSON",
compression = compression.name
)
/** Export a table as Avro */
def asAvro(
sourceTable: String,
destinationUris: List[String],
compression: AvroCompression.Compression = AvroCompression.NoCompression()
): Unit =
exportTable(
sourceTable = sourceTable,
destinationUris = destinationUris,
format = "AVRO",
compression = compression.name
)
def asParquet(
sourceTable: String,
destinationUris: List[String],
compression: ParquetCompression.Compression = ParquetCompression.NoCompression()
): Unit =
exportTable(
sourceTable = sourceTable,
destinationUris = destinationUris,
format = "PARQUET",
compression = compression.name
)
private def exportTable(
sourceTable: String,
destinationUris: List[String],
format: String,
compression: Option[String],
fieldDelimiter: Option[String] = None,
printHeader: Option[Boolean] = None
): Unit = {
val tableRef = bq.BigQueryHelpers.parseTableSpec(sourceTable)
val jobConfigExtract = new JobConfigurationExtract()
.setSourceTable(tableRef)
.setDestinationUris(destinationUris.asJava)
.setDestinationFormat(format)
compression.foreach(jobConfigExtract.setCompression)
fieldDelimiter.foreach(jobConfigExtract.setFieldDelimiter)
printHeader.foreach(jobConfigExtract.setPrintHeader(_))
val jobConfig = new JobConfiguration().setExtract(jobConfigExtract)
val fullJobId = BigQueryUtil.generateJobId(client.project)
val jobReference = new JobReference().setProjectId(client.project).setJobId(fullJobId)
val job = new Job().setConfiguration(jobConfig).setJobReference(jobReference)
Logger.info(s"Extracting table $sourceTable to ${destinationUris.mkString(", ")}")
client.execute(_.jobs().insert(client.project, job))
val extractJob = ExtractJob(destinationUris, Some(jobReference), tableRef)
jobService.waitForJobs(extractJob)
}