public StreamingDataflowWorker()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker.java [240:293]


  public StreamingDataflowWorker(
      List<MapTask> mapTasks, WindmillServerStub server, DataflowWorkerHarnessOptions options) {
    this.options = options;
    this.instructionMap = new ConcurrentHashMap<>();
    this.outputMap = new ConcurrentHashMap<>();
    this.mapTaskExecutors = new ConcurrentHashMap<>();
    this.activeWorkMap = new ConcurrentHashMap<>();
    this.readerCache = new ConcurrentHashMap<>();
    this.commitCallbacks = new ConcurrentHashMap<>();
    this.stateNameMap = new ConcurrentHashMap<>();
    this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
    this.threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
          Thread t = new Thread(r);
          t.setDaemon(true);
          return t;
        }
      };
    this.workUnitExecutor = new BoundedQueueExecutor(
        chooseMaximumNumberOfThreads(options), THREAD_EXPIRATION_TIME_SEC, TimeUnit.SECONDS,
        MAX_WORK_UNITS_QUEUED, threadFactory);
    this.commitExecutor =
        new ThreadPoolExecutor(
            1,
            1,
            Long.MAX_VALUE,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(2),
            new ThreadFactory() {
              @Override
              public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setPriority(Thread.MAX_PRIORITY);
                t.setName("CommitThread");
                return t;
              }
            },
            new ThreadPoolExecutor.DiscardPolicy());
    this.windmillServer = server;
    this.metricTrackingWindmillServer = new MetricTrackingWindmillServerStub(server, memoryMonitor);
    this.running = new AtomicBoolean();
    this.stateFetcher = new StateFetcher(metricTrackingWindmillServer);
    this.clientId = new Random().nextLong();
    this.lastException = new AtomicReference<>();

    for (MapTask mapTask : mapTasks) {
      addComputation(mapTask);
    }

    DataflowWorkerLoggingMDC.setJobId(options.getJobId());
    DataflowWorkerLoggingMDC.setWorkerId(options.getWorkerId());
  }