in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingDataflowWorker.java [240:293]
public StreamingDataflowWorker(
List<MapTask> mapTasks, WindmillServerStub server, DataflowWorkerHarnessOptions options) {
this.options = options;
this.instructionMap = new ConcurrentHashMap<>();
this.outputMap = new ConcurrentHashMap<>();
this.mapTaskExecutors = new ConcurrentHashMap<>();
this.activeWorkMap = new ConcurrentHashMap<>();
this.readerCache = new ConcurrentHashMap<>();
this.commitCallbacks = new ConcurrentHashMap<>();
this.stateNameMap = new ConcurrentHashMap<>();
this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
this.threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
};
this.workUnitExecutor = new BoundedQueueExecutor(
chooseMaximumNumberOfThreads(options), THREAD_EXPIRATION_TIME_SEC, TimeUnit.SECONDS,
MAX_WORK_UNITS_QUEUED, threadFactory);
this.commitExecutor =
new ThreadPoolExecutor(
1,
1,
Long.MAX_VALUE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setPriority(Thread.MAX_PRIORITY);
t.setName("CommitThread");
return t;
}
},
new ThreadPoolExecutor.DiscardPolicy());
this.windmillServer = server;
this.metricTrackingWindmillServer = new MetricTrackingWindmillServerStub(server, memoryMonitor);
this.running = new AtomicBoolean();
this.stateFetcher = new StateFetcher(metricTrackingWindmillServer);
this.clientId = new Random().nextLong();
this.lastException = new AtomicReference<>();
for (MapTask mapTask : mapTasks) {
addComputation(mapTask);
}
DataflowWorkerLoggingMDC.setJobId(options.getJobId());
DataflowWorkerLoggingMDC.setWorkerId(options.getWorkerId());
}