public PCollection expand()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java [1003:1197]


    public PCollection<T> expand(PBegin input) {
      ValueProvider<TableReference> table = getTableProvider();

      if (table != null) {
        checkArgument(getQuery() == null, "from() and fromQuery() are exclusive");
        checkArgument(
            getQueryPriority() == null,
            "withQueryPriority() can only be specified when using fromQuery()");
        checkArgument(
            getFlattenResults() == null,
            "Invalid BigQueryIO.Read: Specifies a table with a result flattening"
                + " preference, which only applies to queries");
        checkArgument(
            getUseLegacySql() == null,
            "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
                + " preference, which only applies to queries");
        checkArgument(
            getQueryTempDataset() == null,
            "Invalid BigQueryIO.Read: Specifies a temp dataset, which can"
                + " only be specified when using fromQuery()");
        if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
          LOG.info(
              "Project of {} not set. The value of {}.getProject() at execution time will be used.",
              TableReference.class.getSimpleName(),
              BigQueryOptions.class.getSimpleName());
        }
      } else {
        checkArgument(getQuery() != null, "Either from() or fromQuery() is required");
        checkArgument(
            getFlattenResults() != null, "flattenResults should not be null if query is set");
        checkArgument(getUseLegacySql() != null, "useLegacySql should not be null if query is set");
      }
      checkArgument(getParseFn() != null, "A parseFn is required");

      // if both toRowFn and fromRowFn values are set, enable Beam schema support
      boolean beamSchemaEnabled = false;
      if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) {
        beamSchemaEnabled = true;
      }

      Pipeline p = input.getPipeline();
      final Coder<T> coder = inferCoder(p.getCoderRegistry());

      if (getMethod() == Method.DIRECT_READ) {
        return expandForDirectRead(input, coder);
      }

      checkArgument(
          getSelectedFields() == null,
          "Invalid BigQueryIO.Read: Specifies selected fields, "
              + "which only applies when using Method.DIRECT_READ");

      checkArgument(
          getRowRestriction() == null,
          "Invalid BigQueryIO.Read: Specifies row restriction, "
              + "which only applies when using Method.DIRECT_READ");

      final BigQuerySourceDef sourceDef = createSourceDef();
      final PCollectionView<String> jobIdTokenView;
      PCollection<String> jobIdTokenCollection;
      PCollection<T> rows;
      if (!getWithTemplateCompatibility()) {
        // Create a singleton job ID token at construction time.
        final String staticJobUuid = BigQueryHelpers.randomUUIDString();
        jobIdTokenView =
            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
                .apply("ViewId", View.asSingleton());
        // Apply the traditional Source model.
        rows =
            p.apply(
                org.apache.beam.sdk.io.Read.from(
                    sourceDef.toSource(
                        staticJobUuid, coder, getParseFn(), getUseAvroLogicalTypes())));
      } else {
        // Create a singleton job ID token at execution time.
        jobIdTokenCollection =
            p.apply("TriggerIdCreation", Create.of("ignored"))
                .apply(
                    "CreateJobId",
                    MapElements.via(
                        new SimpleFunction<String, String>() {
                          @Override
                          public String apply(String input) {
                            return BigQueryHelpers.randomUUIDString();
                          }
                        }));
        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());

        final TupleTag<String> filesTag = new TupleTag<>();
        final TupleTag<String> tableSchemaTag = new TupleTag<>();
        PCollectionTuple tuple =
            jobIdTokenCollection.apply(
                "RunCreateJob",
                ParDo.of(
                        new DoFn<String, String>() {
                          @ProcessElement
                          public void processElement(ProcessContext c) throws Exception {
                            String jobUuid = c.element();
                            BigQuerySourceBase<T> source =
                                sourceDef.toSource(
                                    jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
                            BigQueryOptions options =
                                c.getPipelineOptions().as(BigQueryOptions.class);
                            ExtractResult res = source.extractFiles(options);
                            LOG.info("Extract job produced {} files", res.extractedFiles.size());
                            source.cleanupTempResource(options);
                            for (ResourceId file : res.extractedFiles) {
                              c.output(file.toString());
                            }
                            c.output(tableSchemaTag, BigQueryHelpers.toJsonString(res.schema));
                          }
                        })
                    .withOutputTags(filesTag, TupleTagList.of(tableSchemaTag)));
        tuple.get(filesTag).setCoder(StringUtf8Coder.of());
        tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());
        final PCollectionView<String> schemaView =
            tuple.get(tableSchemaTag).apply(View.asSingleton());
        rows =
            tuple
                .get(filesTag)
                .apply(Reshuffle.viaRandomKey())
                .apply(
                    "ReadFiles",
                    ParDo.of(
                            new DoFn<String, T>() {
                              @ProcessElement
                              public void processElement(ProcessContext c) throws Exception {
                                TableSchema schema =
                                    BigQueryHelpers.fromJsonString(
                                        c.sideInput(schemaView), TableSchema.class);
                                String jobUuid = c.sideInput(jobIdTokenView);
                                BigQuerySourceBase<T> source =
                                    sourceDef.toSource(
                                        jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
                                List<BoundedSource<T>> sources =
                                    source.createSources(
                                        ImmutableList.of(
                                            FileSystems.matchNewResource(
                                                c.element(), false /* is directory */)),
                                        schema,
                                        null);
                                checkArgument(sources.size() == 1, "Expected exactly one source.");
                                BoundedSource<T> avroSource = sources.get(0);
                                BoundedSource.BoundedReader<T> reader =
                                    avroSource.createReader(c.getPipelineOptions());
                                for (boolean more = reader.start(); more; more = reader.advance()) {
                                  c.output(reader.getCurrent());
                                }
                              }
                            })
                        .withSideInputs(schemaView, jobIdTokenView))
                .setCoder(coder);
      }
      PassThroughThenCleanup.CleanupOperation cleanupOperation =
          new PassThroughThenCleanup.CleanupOperation() {
            @Override
            void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
              PipelineOptions options = c.getPipelineOptions();
              BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
              String jobUuid = c.getJobId();
              final String extractDestinationDir =
                  resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", jobUuid);
              final String executingProject = bqOptions.getProject();
              JobReference jobRef =
                  new JobReference()
                      .setProjectId(executingProject)
                      .setJobId(
                          BigQueryResourceNaming.createJobIdPrefix(
                              bqOptions.getJobName(), jobUuid, JobType.EXPORT));

              Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);

              if (extractJob != null) {
                List<ResourceId> extractFiles =
                    getExtractFilePaths(extractDestinationDir, extractJob);
                if (extractFiles != null && !extractFiles.isEmpty()) {
                  FileSystems.delete(
                      extractFiles, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
                }
              }
            }
          };

      rows = rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));

      if (beamSchemaEnabled) {
        BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
        Schema beamSchema = sourceDef.getBeamSchema(bqOptions);
        SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
        SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);

        rows.setSchema(beamSchema, getTypeDescriptor(), toBeamRow, fromBeamRow);
      }
      return rows;
    }