in luigi/contrib/beam_dataflow.py [0:0]
def _get_dataflow_args(self):
def f(key, value):
return '--{}={}'.format(key, value)
output = []
output.append(f(self.dataflow_params.runner, self._get_runner()))
if self.project:
output.append(f(self.dataflow_params.project, self.project))
if self.zone:
output.append(f(self.dataflow_params.zone, self.zone))
if self.region:
output.append(f(self.dataflow_params.region, self.region))
if self.staging_location:
output.append(f(self.dataflow_params.staging_location, self.staging_location))
if self.temp_location:
output.append(f(self.dataflow_params.temp_location, self.temp_location))
if self.gcp_temp_location:
output.append(f(self.dataflow_params.gcp_temp_location, self.gcp_temp_location))
if self.num_workers:
output.append(f(self.dataflow_params.num_workers, self.num_workers))
if self.autoscaling_algorithm:
output.append(f(self.dataflow_params.autoscaling_algorithm, self.autoscaling_algorithm))
if self.max_num_workers:
output.append(f(self.dataflow_params.max_num_workers, self.max_num_workers))
if self.disk_size_gb:
output.append(f(self.dataflow_params.disk_size_gb, self.disk_size_gb))
if self.worker_machine_type:
output.append(f(self.dataflow_params.worker_machine_type, self.worker_machine_type))
if self.worker_disk_type:
output.append(f(self.dataflow_params.worker_disk_type, self.worker_disk_type))
if self.network:
output.append(f(self.dataflow_params.network, self.network))
if self.subnetwork:
output.append(f(self.dataflow_params.subnetwork, self.subnetwork))
if self.job_name:
output.append(f(self.dataflow_params.job_name, self.job_name))
if self.service_account:
output.append(f(self.dataflow_params.service_account, self.service_account))
if self.labels:
output.append(f(self.dataflow_params.labels, json.dumps(self.labels)))
return output