def _get_dataflow_args()

in luigi/contrib/beam_dataflow.py [0:0]


    def _get_dataflow_args(self):
        def f(key, value):
            return '--{}={}'.format(key, value)

        output = []

        output.append(f(self.dataflow_params.runner, self._get_runner()))

        if self.project:
            output.append(f(self.dataflow_params.project, self.project))
        if self.zone:
            output.append(f(self.dataflow_params.zone, self.zone))
        if self.region:
            output.append(f(self.dataflow_params.region, self.region))
        if self.staging_location:
            output.append(f(self.dataflow_params.staging_location, self.staging_location))
        if self.temp_location:
            output.append(f(self.dataflow_params.temp_location, self.temp_location))
        if self.gcp_temp_location:
            output.append(f(self.dataflow_params.gcp_temp_location, self.gcp_temp_location))
        if self.num_workers:
            output.append(f(self.dataflow_params.num_workers, self.num_workers))
        if self.autoscaling_algorithm:
            output.append(f(self.dataflow_params.autoscaling_algorithm, self.autoscaling_algorithm))
        if self.max_num_workers:
            output.append(f(self.dataflow_params.max_num_workers, self.max_num_workers))
        if self.disk_size_gb:
            output.append(f(self.dataflow_params.disk_size_gb, self.disk_size_gb))
        if self.worker_machine_type:
            output.append(f(self.dataflow_params.worker_machine_type, self.worker_machine_type))
        if self.worker_disk_type:
            output.append(f(self.dataflow_params.worker_disk_type, self.worker_disk_type))
        if self.network:
            output.append(f(self.dataflow_params.network, self.network))
        if self.subnetwork:
            output.append(f(self.dataflow_params.subnetwork, self.subnetwork))
        if self.job_name:
            output.append(f(self.dataflow_params.job_name, self.job_name))
        if self.service_account:
            output.append(f(self.dataflow_params.service_account, self.service_account))
        if self.labels:
            output.append(f(self.dataflow_params.labels, json.dumps(self.labels)))

        return output