private PCollection expandForDirectRead()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java [1199:1396]


    private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
      ValueProvider<TableReference> tableProvider = getTableProvider();
      Pipeline p = input.getPipeline();
      if (tableProvider != null) {
        // No job ID is required. Read directly from BigQuery storage.
        return p.apply(
            org.apache.beam.sdk.io.Read.from(
                BigQueryStorageTableSource.create(
                    tableProvider,
                    getSelectedFields(),
                    getRowRestriction(),
                    getParseFn(),
                    outputCoder,
                    getBigQueryServices())));
      }

      checkArgument(
          getSelectedFields() == null,
          "Invalid BigQueryIO.Read: Specifies selected fields, "
              + "which only applies when reading from a table");

      checkArgument(
          getRowRestriction() == null,
          "Invalid BigQueryIO.Read: Specifies row restriction, "
              + "which only applies when reading from a table");

      //
      // N.B. All of the code below exists because the BigQuery storage API can't (yet) read from
      // all anonymous tables, so we need the job ID to reason about the name of the destination
      // table for the query to read the data and subsequently delete the table and dataset. Once
      // the storage API can handle anonymous tables, the storage source should be modified to use
      // anonymous tables and all of the code related to job ID generation and table and dataset
      // cleanup can be removed. [BEAM-6931]
      //

      PCollectionView<String> jobIdTokenView;
      PCollection<T> rows;

      if (!getWithTemplateCompatibility()) {
        // Create a singleton job ID token at pipeline construction time.
        String staticJobUuid = BigQueryHelpers.randomUUIDString();
        jobIdTokenView =
            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
                .apply("ViewId", View.asSingleton());
        // Apply the traditional Source model.
        rows =
            p.apply(
                org.apache.beam.sdk.io.Read.from(
                    createStorageQuerySource(staticJobUuid, outputCoder)));
      } else {
        // Create a singleton job ID token at pipeline execution time.
        PCollection<String> jobIdTokenCollection =
            p.apply("TriggerIdCreation", Create.of("ignored"))
                .apply(
                    "CreateJobId",
                    MapElements.via(
                        new SimpleFunction<String, String>() {
                          @Override
                          public String apply(String input) {
                            return BigQueryHelpers.randomUUIDString();
                          }
                        }));

        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());

        TupleTag<ReadStream> readStreamsTag = new TupleTag<>();
        TupleTag<ReadSession> readSessionTag = new TupleTag<>();
        TupleTag<String> tableSchemaTag = new TupleTag<>();

        PCollectionTuple tuple =
            jobIdTokenCollection.apply(
                "RunQueryJob",
                ParDo.of(
                        new DoFn<String, ReadStream>() {
                          @ProcessElement
                          public void processElement(ProcessContext c) throws Exception {
                            BigQueryOptions options =
                                c.getPipelineOptions().as(BigQueryOptions.class);
                            String jobUuid = c.element();
                            // Execute the query and get the destination table holding the results.
                            // The getTargetTable call runs a new instance of the query and returns
                            // the destination table created to hold the results.
                            BigQueryStorageQuerySource<T> querySource =
                                createStorageQuerySource(jobUuid, outputCoder);
                            Table queryResultTable = querySource.getTargetTable(options);

                            // Create a read session without specifying a desired stream count and
                            // let the BigQuery storage server pick the number of streams.
                            CreateReadSessionRequest request =
                                CreateReadSessionRequest.newBuilder()
                                    .setParent(
                                        BigQueryHelpers.toProjectResourceName(options.getProject()))
                                    .setReadSession(
                                        ReadSession.newBuilder()
                                            .setTable(
                                                BigQueryHelpers.toTableResourceName(
                                                    queryResultTable.getTableReference()))
                                            .setDataFormat(DataFormat.AVRO))
                                    .setMaxStreamCount(0)
                                    .build();

                            ReadSession readSession;
                            try (StorageClient storageClient =
                                getBigQueryServices().getStorageClient(options)) {
                              readSession = storageClient.createReadSession(request);
                            }

                            for (ReadStream readStream : readSession.getStreamsList()) {
                              c.output(readStream);
                            }

                            c.output(readSessionTag, readSession);
                            c.output(
                                tableSchemaTag,
                                BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
                          }
                        })
                    .withOutputTags(
                        readStreamsTag, TupleTagList.of(readSessionTag).and(tableSchemaTag)));

        tuple.get(readStreamsTag).setCoder(ProtoCoder.of(ReadStream.class));
        tuple.get(readSessionTag).setCoder(ProtoCoder.of(ReadSession.class));
        tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());

        PCollectionView<ReadSession> readSessionView =
            tuple.get(readSessionTag).apply("ReadSessionView", View.asSingleton());
        PCollectionView<String> tableSchemaView =
            tuple.get(tableSchemaTag).apply("TableSchemaView", View.asSingleton());

        rows =
            tuple
                .get(readStreamsTag)
                .apply(Reshuffle.viaRandomKey())
                .apply(
                    ParDo.of(
                            new DoFn<ReadStream, T>() {
                              @ProcessElement
                              public void processElement(ProcessContext c) throws Exception {
                                ReadSession readSession = c.sideInput(readSessionView);
                                TableSchema tableSchema =
                                    BigQueryHelpers.fromJsonString(
                                        c.sideInput(tableSchemaView), TableSchema.class);
                                ReadStream readStream = c.element();

                                BigQueryStorageStreamSource<T> streamSource =
                                    BigQueryStorageStreamSource.create(
                                        readSession,
                                        readStream,
                                        tableSchema,
                                        getParseFn(),
                                        outputCoder,
                                        getBigQueryServices());

                                // Read all of the data from the stream. In the event that this work
                                // item fails and is rescheduled, the same rows will be returned in
                                // the same order.
                                BoundedSource.BoundedReader<T> reader =
                                    streamSource.createReader(c.getPipelineOptions());
                                for (boolean more = reader.start(); more; more = reader.advance()) {
                                  c.output(reader.getCurrent());
                                }
                              }
                            })
                        .withSideInputs(readSessionView, tableSchemaView))
                .setCoder(outputCoder);
      }

      PassThroughThenCleanup.CleanupOperation cleanupOperation =
          new CleanupOperation() {
            @Override
            void cleanup(ContextContainer c) throws Exception {
              BigQueryOptions options = c.getPipelineOptions().as(BigQueryOptions.class);
              String jobUuid = c.getJobId();

              Optional<String> queryTempDataset = Optional.ofNullable(getQueryTempDataset());

              TableReference tempTable =
                  createTempTableReference(
                      options.getProject(),
                      BigQueryResourceNaming.createJobIdPrefix(
                          options.getJobName(), jobUuid, JobType.QUERY),
                      queryTempDataset);

              DatasetService datasetService = getBigQueryServices().getDatasetService(options);
              LOG.info("Deleting temporary table with query results {}", tempTable);
              datasetService.deleteTable(tempTable);
              // Delete dataset only if it was created by Beam
              boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
              if (datasetCreatedByBeam) {
                LOG.info(
                    "Deleting temporary dataset with query results {}", tempTable.getDatasetId());
                datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());
              }
            }
          };

      return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
    }