in scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala [38:180]
def csv(
sources: List[String],
destinationTable: String,
createDisposition: CreateDisposition = CREATE_IF_NEEDED,
writeDisposition: WriteDisposition = WRITE_APPEND,
schema: Option[TableSchema] = None,
autodetect: Boolean = false,
allowJaggedRows: Boolean = false,
allowQuotedNewLines: Boolean = false,
quote: Option[String] = None,
maxBadRecords: Int = 0,
skipLeadingRows: Int = 0,
fieldDelimiter: Option[String] = None,
ignoreUnknownValues: Boolean = false,
encoding: Option[String] = None,
location: Option[String] = None
): Try[TableReference] =
execute(
sources = sources,
sourceFormat = "CSV",
destinationTable = destinationTable,
createDisposition = createDisposition,
writeDisposition = writeDisposition,
schema = schema,
autodetect = Some(autodetect),
allowJaggedRows = Some(allowJaggedRows),
allowQuotedNewLines = Some(allowQuotedNewLines),
quote = quote,
maxBadRecords = maxBadRecords,
skipLeadingRows = Some(skipLeadingRows),
fieldDelimiter = fieldDelimiter,
ignoreUnknownValues = Some(ignoreUnknownValues),
encoding = encoding,
location = location
)
def json(
sources: List[String],
destinationTable: String,
createDisposition: CreateDisposition = CREATE_IF_NEEDED,
writeDisposition: WriteDisposition = WRITE_APPEND,
schema: Option[TableSchema] = None,
autodetect: Boolean = false,
maxBadRecords: Int = 0,
ignoreUnknownValues: Boolean = false,
encoding: Option[String] = None,
location: Option[String] = None
): Try[TableReference] =
execute(
sources = sources,
sourceFormat = "NEWLINE_DELIMITED_JSON",
destinationTable = destinationTable,
createDisposition = createDisposition,
writeDisposition = writeDisposition,
schema = schema,
autodetect = Some(autodetect),
maxBadRecords = maxBadRecords,
ignoreUnknownValues = Some(ignoreUnknownValues),
encoding = encoding,
location = location
)
def avro(
sources: List[String],
destinationTable: String,
createDisposition: CreateDisposition = CREATE_IF_NEEDED,
writeDisposition: WriteDisposition = WRITE_APPEND,
schema: Option[TableSchema] = None,
maxBadRecords: Int = 0,
encoding: Option[String] = None,
location: Option[String] = None
): Try[TableReference] =
execute(
sources = sources,
sourceFormat = "AVRO",
destinationTable = destinationTable,
createDisposition = createDisposition,
writeDisposition = writeDisposition,
schema = schema,
maxBadRecords = maxBadRecords,
encoding = encoding,
location = location
)
@nowarn("msg=private default argument in class LoadOps is never used")
private def execute(
sources: List[String],
sourceFormat: String,
destinationTable: String,
createDisposition: CreateDisposition = CREATE_IF_NEEDED,
writeDisposition: WriteDisposition = WRITE_APPEND,
schema: Option[TableSchema] = None,
autodetect: Option[Boolean] = None,
allowJaggedRows: Option[Boolean] = None,
allowQuotedNewLines: Option[Boolean] = None,
quote: Option[String] = None,
maxBadRecords: Int = 0,
skipLeadingRows: Option[Int] = None,
fieldDelimiter: Option[String] = None,
ignoreUnknownValues: Option[Boolean] = None,
encoding: Option[String] = None,
location: Option[String] = None
): Try[TableReference] = Try {
val tableRef = bq.BigQueryHelpers.parseTableSpec(destinationTable)
val jobConfigLoad = new JobConfigurationLoad()
.setSourceUris(sources.asJava)
.setSourceFormat(sourceFormat)
.setDestinationTable(tableRef)
.setCreateDisposition(createDisposition.toString)
.setWriteDisposition(writeDisposition.toString)
.setMaxBadRecords(maxBadRecords)
.setSchema(schema.orNull)
.setQuote(quote.orNull)
.setFieldDelimiter(fieldDelimiter.orNull)
.setEncoding(encoding.orNull)
autodetect.foreach(jobConfigLoad.setAutodetect(_))
allowJaggedRows.foreach(jobConfigLoad.setAllowJaggedRows(_))
allowQuotedNewLines.foreach(jobConfigLoad.setAllowQuotedNewlines(_))
skipLeadingRows.foreach(jobConfigLoad.setSkipLeadingRows(_))
ignoreUnknownValues.foreach(jobConfigLoad.setIgnoreUnknownValues(_))
val jobConfig = new JobConfiguration()
.setLoad(jobConfigLoad)
val fullJobId = BigQueryUtil.generateJobId(client.project)
val jobReference = new JobReference()
.setProjectId(client.project)
.setJobId(fullJobId)
.setLocation(location.orNull)
val job = new Job().setConfiguration(jobConfig).setJobReference(jobReference)
Logger.info(s"Loading data into $destinationTable from ${sources.mkString(", ")}")
client.execute(_.jobs().insert(client.project, job))
val loadJob = LoadJob(sources, Some(jobReference), tableRef)
jobService.waitForJobs(loadJob)
tableRef
}