Reader create()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ReaderFactory.java [47:179]


  Reader<?> create(
      CloudObject cloudSourceSpec,
      @Nullable Coder<?> coder,
      @Nullable PipelineOptions options,
      @Nullable ExecutionContext executionContext,
      @Nullable CounterSet.AddCounterMutator addCounterMutator,
      @Nullable String operationName)
          throws Exception;

  /**
   * An immutable registry from {@link String} identifiers (provided to the worker by the Dataflow
   * service) to appropriate {@link ReaderFactory} instances.
   */
  public class Registry implements ReaderFactory {

    /**
     * A {@link Registry} with each {@link ReaderFactory} known to the Dataflow worker already
     * registered.
     */
    public static Registry defaultRegistry() {
      Map<String, ReaderFactory> factories = Maps.newHashMap();

      factories.put("TextSource", TextReaderFactory.getInstance());
      factories.put("AvroSource", new AvroReaderFactory());
      factories.put("UngroupedShuffleSource", new UngroupedShuffleReaderFactory());
      factories.put("PartitioningShuffleSource", new PartitioningShuffleReaderFactory());
      factories.put("GroupingShuffleSource", new GroupingShuffleReaderFactory());
      factories.put("InMemorySource", new InMemoryReaderFactory());
      factories.put("BigQuerySource", new BigQueryReaderFactory());

      // Aliases for WindowingWindmillreader
      factories.put("WindowingWindmillReader", new WindowingWindmillReader.Factory());
      factories.put("com.google.cloud.dataflow.sdk.runners.worker.WindowingWindmillReader",
          new WindowingWindmillReader.Factory());
      factories.put("com.google.cloud.dataflow.sdk.runners.worker.BucketingWindmillSource",
          new WindowingWindmillReader.Factory());

      // Aliases for UngroupedWindmillReader
      factories.put("UngroupedWindmillReader", new UngroupedWindmillReader.Factory());
      factories.put("com.google.cloud.dataflow.sdk.runners.worker.UngroupedWindmillSource",
          new UngroupedWindmillReader.Factory());
      factories.put("com.google.cloud.dataflow.sdk.runners.worker.UngroupedWindmillReader",
          new UngroupedWindmillReader.Factory());

      // Aliases for PubsubReader
      factories.put("PubsubReader", new PubsubReader.Factory());
      factories.put("com.google.cloud.dataflow.sdk.runners.worker.PubsubSource",
          new PubsubReader.Factory());

      // Custom sources
      factories.put(
          "com.google.cloud.dataflow.sdk.runners.dataflow.CustomSources",
          new CustomSources.Factory());

      return new Registry(factories);
    }

    /**
     * Builds a new {@link Registry} with the provided mutable map of initial mappings.
     *
     * <p>Owns and mutates the provided map, which must be mutable. This constructor should only be
     * called by methods in this class that are aware of this requirement and abstract from this
     * behavior.
     */
    private Registry(Map<String, ReaderFactory> factories) {
      // ConcatReader requires special treatment: Recursive access to the registry since it calls
      // back to create its sub-readers lazily.
      this.factories = factories;
      this.factories.put(ConcatReader.SOURCE_NAME, ConcatReaderFactory.withRegistry(this));
    }

    /**
     * A map from the short names of predefined sources to the associated {@link ReaderFactory}.
     */
    private final Map<String, ReaderFactory> factories;

    /**
     * Returns a new {@link Registry} with the provided identifier associated with the provided
     * {@link ReaderFactory}, overriding any existing binding for that identifier.
     */
    public Registry register(String readerSpecType, ReaderFactory factory) {
      Map<String, ReaderFactory> newFactories = Maps.newHashMap();
      newFactories.putAll(factories);
      newFactories.put(readerSpecType, factory);
      return new Registry(newFactories);
    }

    /**
     * Creates a {@link Reader} according to the provided {@code sourceSpec}, by dispatching on
     * the type of {@link CloudObject} to instantiate.
     */
    @Override
    public Reader<?> create(
        CloudObject sourceSpec,
        @Nullable Coder<?> coder,
        @Nullable PipelineOptions options,
        @Nullable ExecutionContext executionContext,
        @Nullable CounterSet.AddCounterMutator addCounterMutator,
        @Nullable String operationName)
            throws Exception {

      String objClassName = sourceSpec.getClassName();
      ReaderFactory readerFactory = factories.get(objClassName);
      if (readerFactory == null) {
        throw new IllegalArgumentException(String.format(
            "Unable to create a Reader: Unknown Reader type in Source specification: %s",
            objClassName));
      }
      return readerFactory.create(
          sourceSpec, coder, options, executionContext, addCounterMutator, operationName);
    }

    /**
     * Creates a {@link Reader} from a Dataflow API {@link Source} specification, using the
     * {@link Coder} contained in the {@link Source} specification.
     */
    public Reader<?> create(
        Source cloudSource,
        @Nullable PipelineOptions options,
        @Nullable ExecutionContext executionContext,
        @Nullable CounterSet.AddCounterMutator addCounterMutator,
        @Nullable String operationName)
            throws Exception {

      cloudSource = CloudSourceUtils.flattenBaseSpecs(cloudSource);
      CloudObject sourceSpec = CloudObject.fromSpec(cloudSource.getSpec());
      Coder<?> coder = null;
      if (cloudSource.getCodec() != null) {
        coder = Serializer.deserialize(cloudSource.getCodec(), Coder.class);
      }
      return create(sourceSpec, coder, options, executionContext, addCounterMutator, operationName);
    }
  }