in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java [1003:1197]
public PCollection<T> expand(PBegin input) {
ValueProvider<TableReference> table = getTableProvider();
if (table != null) {
checkArgument(getQuery() == null, "from() and fromQuery() are exclusive");
checkArgument(
getQueryPriority() == null,
"withQueryPriority() can only be specified when using fromQuery()");
checkArgument(
getFlattenResults() == null,
"Invalid BigQueryIO.Read: Specifies a table with a result flattening"
+ " preference, which only applies to queries");
checkArgument(
getUseLegacySql() == null,
"Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
+ " preference, which only applies to queries");
checkArgument(
getQueryTempDataset() == null,
"Invalid BigQueryIO.Read: Specifies a temp dataset, which can"
+ " only be specified when using fromQuery()");
if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
LOG.info(
"Project of {} not set. The value of {}.getProject() at execution time will be used.",
TableReference.class.getSimpleName(),
BigQueryOptions.class.getSimpleName());
}
} else {
checkArgument(getQuery() != null, "Either from() or fromQuery() is required");
checkArgument(
getFlattenResults() != null, "flattenResults should not be null if query is set");
checkArgument(getUseLegacySql() != null, "useLegacySql should not be null if query is set");
}
checkArgument(getParseFn() != null, "A parseFn is required");
// if both toRowFn and fromRowFn values are set, enable Beam schema support
boolean beamSchemaEnabled = false;
if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) {
beamSchemaEnabled = true;
}
Pipeline p = input.getPipeline();
final Coder<T> coder = inferCoder(p.getCoderRegistry());
if (getMethod() == Method.DIRECT_READ) {
return expandForDirectRead(input, coder);
}
checkArgument(
getSelectedFields() == null,
"Invalid BigQueryIO.Read: Specifies selected fields, "
+ "which only applies when using Method.DIRECT_READ");
checkArgument(
getRowRestriction() == null,
"Invalid BigQueryIO.Read: Specifies row restriction, "
+ "which only applies when using Method.DIRECT_READ");
final BigQuerySourceDef sourceDef = createSourceDef();
final PCollectionView<String> jobIdTokenView;
PCollection<String> jobIdTokenCollection;
PCollection<T> rows;
if (!getWithTemplateCompatibility()) {
// Create a singleton job ID token at construction time.
final String staticJobUuid = BigQueryHelpers.randomUUIDString();
jobIdTokenView =
p.apply("TriggerIdCreation", Create.of(staticJobUuid))
.apply("ViewId", View.asSingleton());
// Apply the traditional Source model.
rows =
p.apply(
org.apache.beam.sdk.io.Read.from(
sourceDef.toSource(
staticJobUuid, coder, getParseFn(), getUseAvroLogicalTypes())));
} else {
// Create a singleton job ID token at execution time.
jobIdTokenCollection =
p.apply("TriggerIdCreation", Create.of("ignored"))
.apply(
"CreateJobId",
MapElements.via(
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return BigQueryHelpers.randomUUIDString();
}
}));
jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
final TupleTag<String> filesTag = new TupleTag<>();
final TupleTag<String> tableSchemaTag = new TupleTag<>();
PCollectionTuple tuple =
jobIdTokenCollection.apply(
"RunCreateJob",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String jobUuid = c.element();
BigQuerySourceBase<T> source =
sourceDef.toSource(
jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
BigQueryOptions options =
c.getPipelineOptions().as(BigQueryOptions.class);
ExtractResult res = source.extractFiles(options);
LOG.info("Extract job produced {} files", res.extractedFiles.size());
source.cleanupTempResource(options);
for (ResourceId file : res.extractedFiles) {
c.output(file.toString());
}
c.output(tableSchemaTag, BigQueryHelpers.toJsonString(res.schema));
}
})
.withOutputTags(filesTag, TupleTagList.of(tableSchemaTag)));
tuple.get(filesTag).setCoder(StringUtf8Coder.of());
tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());
final PCollectionView<String> schemaView =
tuple.get(tableSchemaTag).apply(View.asSingleton());
rows =
tuple
.get(filesTag)
.apply(Reshuffle.viaRandomKey())
.apply(
"ReadFiles",
ParDo.of(
new DoFn<String, T>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableSchema schema =
BigQueryHelpers.fromJsonString(
c.sideInput(schemaView), TableSchema.class);
String jobUuid = c.sideInput(jobIdTokenView);
BigQuerySourceBase<T> source =
sourceDef.toSource(
jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
List<BoundedSource<T>> sources =
source.createSources(
ImmutableList.of(
FileSystems.matchNewResource(
c.element(), false /* is directory */)),
schema,
null);
checkArgument(sources.size() == 1, "Expected exactly one source.");
BoundedSource<T> avroSource = sources.get(0);
BoundedSource.BoundedReader<T> reader =
avroSource.createReader(c.getPipelineOptions());
for (boolean more = reader.start(); more; more = reader.advance()) {
c.output(reader.getCurrent());
}
}
})
.withSideInputs(schemaView, jobIdTokenView))
.setCoder(coder);
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
new PassThroughThenCleanup.CleanupOperation() {
@Override
void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
PipelineOptions options = c.getPipelineOptions();
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
String jobUuid = c.getJobId();
final String extractDestinationDir =
resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", jobUuid);
final String executingProject = bqOptions.getProject();
JobReference jobRef =
new JobReference()
.setProjectId(executingProject)
.setJobId(
BigQueryResourceNaming.createJobIdPrefix(
bqOptions.getJobName(), jobUuid, JobType.EXPORT));
Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);
if (extractJob != null) {
List<ResourceId> extractFiles =
getExtractFilePaths(extractDestinationDir, extractJob);
if (extractFiles != null && !extractFiles.isEmpty()) {
FileSystems.delete(
extractFiles, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
}
}
}
};
rows = rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
if (beamSchemaEnabled) {
BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
Schema beamSchema = sourceDef.getBeamSchema(bqOptions);
SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);
rows.setSchema(beamSchema, getTypeDescriptor(), toBeamRow, fromBeamRow);
}
return rows;
}