in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java [1199:1396]
private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
ValueProvider<TableReference> tableProvider = getTableProvider();
Pipeline p = input.getPipeline();
if (tableProvider != null) {
// No job ID is required. Read directly from BigQuery storage.
return p.apply(
org.apache.beam.sdk.io.Read.from(
BigQueryStorageTableSource.create(
tableProvider,
getSelectedFields(),
getRowRestriction(),
getParseFn(),
outputCoder,
getBigQueryServices())));
}
checkArgument(
getSelectedFields() == null,
"Invalid BigQueryIO.Read: Specifies selected fields, "
+ "which only applies when reading from a table");
checkArgument(
getRowRestriction() == null,
"Invalid BigQueryIO.Read: Specifies row restriction, "
+ "which only applies when reading from a table");
//
// N.B. All of the code below exists because the BigQuery storage API can't (yet) read from
// all anonymous tables, so we need the job ID to reason about the name of the destination
// table for the query to read the data and subsequently delete the table and dataset. Once
// the storage API can handle anonymous tables, the storage source should be modified to use
// anonymous tables and all of the code related to job ID generation and table and dataset
// cleanup can be removed. [BEAM-6931]
//
PCollectionView<String> jobIdTokenView;
PCollection<T> rows;
if (!getWithTemplateCompatibility()) {
// Create a singleton job ID token at pipeline construction time.
String staticJobUuid = BigQueryHelpers.randomUUIDString();
jobIdTokenView =
p.apply("TriggerIdCreation", Create.of(staticJobUuid))
.apply("ViewId", View.asSingleton());
// Apply the traditional Source model.
rows =
p.apply(
org.apache.beam.sdk.io.Read.from(
createStorageQuerySource(staticJobUuid, outputCoder)));
} else {
// Create a singleton job ID token at pipeline execution time.
PCollection<String> jobIdTokenCollection =
p.apply("TriggerIdCreation", Create.of("ignored"))
.apply(
"CreateJobId",
MapElements.via(
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return BigQueryHelpers.randomUUIDString();
}
}));
jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
TupleTag<ReadStream> readStreamsTag = new TupleTag<>();
TupleTag<ReadSession> readSessionTag = new TupleTag<>();
TupleTag<String> tableSchemaTag = new TupleTag<>();
PCollectionTuple tuple =
jobIdTokenCollection.apply(
"RunQueryJob",
ParDo.of(
new DoFn<String, ReadStream>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
BigQueryOptions options =
c.getPipelineOptions().as(BigQueryOptions.class);
String jobUuid = c.element();
// Execute the query and get the destination table holding the results.
// The getTargetTable call runs a new instance of the query and returns
// the destination table created to hold the results.
BigQueryStorageQuerySource<T> querySource =
createStorageQuerySource(jobUuid, outputCoder);
Table queryResultTable = querySource.getTargetTable(options);
// Create a read session without specifying a desired stream count and
// let the BigQuery storage server pick the number of streams.
CreateReadSessionRequest request =
CreateReadSessionRequest.newBuilder()
.setParent(
BigQueryHelpers.toProjectResourceName(options.getProject()))
.setReadSession(
ReadSession.newBuilder()
.setTable(
BigQueryHelpers.toTableResourceName(
queryResultTable.getTableReference()))
.setDataFormat(DataFormat.AVRO))
.setMaxStreamCount(0)
.build();
ReadSession readSession;
try (StorageClient storageClient =
getBigQueryServices().getStorageClient(options)) {
readSession = storageClient.createReadSession(request);
}
for (ReadStream readStream : readSession.getStreamsList()) {
c.output(readStream);
}
c.output(readSessionTag, readSession);
c.output(
tableSchemaTag,
BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
}
})
.withOutputTags(
readStreamsTag, TupleTagList.of(readSessionTag).and(tableSchemaTag)));
tuple.get(readStreamsTag).setCoder(ProtoCoder.of(ReadStream.class));
tuple.get(readSessionTag).setCoder(ProtoCoder.of(ReadSession.class));
tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());
PCollectionView<ReadSession> readSessionView =
tuple.get(readSessionTag).apply("ReadSessionView", View.asSingleton());
PCollectionView<String> tableSchemaView =
tuple.get(tableSchemaTag).apply("TableSchemaView", View.asSingleton());
rows =
tuple
.get(readStreamsTag)
.apply(Reshuffle.viaRandomKey())
.apply(
ParDo.of(
new DoFn<ReadStream, T>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
ReadSession readSession = c.sideInput(readSessionView);
TableSchema tableSchema =
BigQueryHelpers.fromJsonString(
c.sideInput(tableSchemaView), TableSchema.class);
ReadStream readStream = c.element();
BigQueryStorageStreamSource<T> streamSource =
BigQueryStorageStreamSource.create(
readSession,
readStream,
tableSchema,
getParseFn(),
outputCoder,
getBigQueryServices());
// Read all of the data from the stream. In the event that this work
// item fails and is rescheduled, the same rows will be returned in
// the same order.
BoundedSource.BoundedReader<T> reader =
streamSource.createReader(c.getPipelineOptions());
for (boolean more = reader.start(); more; more = reader.advance()) {
c.output(reader.getCurrent());
}
}
})
.withSideInputs(readSessionView, tableSchemaView))
.setCoder(outputCoder);
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
new CleanupOperation() {
@Override
void cleanup(ContextContainer c) throws Exception {
BigQueryOptions options = c.getPipelineOptions().as(BigQueryOptions.class);
String jobUuid = c.getJobId();
Optional<String> queryTempDataset = Optional.ofNullable(getQueryTempDataset());
TableReference tempTable =
createTempTableReference(
options.getProject(),
BigQueryResourceNaming.createJobIdPrefix(
options.getJobName(), jobUuid, JobType.QUERY),
queryTempDataset);
DatasetService datasetService = getBigQueryServices().getDatasetService(options);
LOG.info("Deleting temporary table with query results {}", tempTable);
datasetService.deleteTable(tempTable);
// Delete dataset only if it was created by Beam
boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
if (datasetCreatedByBeam) {
LOG.info(
"Deleting temporary dataset with query results {}", tempTable.getDatasetId());
datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());
}
}
};
return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
}