in exec/src/klio_exec/commands/run.py [0:0]
def _setup_pipeline(self, pipeline):
run_callable = self._get_run_callable()
to_pass_thru = None
to_process = pipeline
# sanity check - I think klio config forces event input
if self._has_event_inputs:
if not self._has_multi_event_inputs:
input_config = self.config.job_config.events.inputs[0]
to_process, to_pass_thru = self._generate_pcoll(
pipeline, input_config
)
else:
to_process, to_pass_thru = self._generate_pcoll_per_input(
pipeline
)
out_pcol = run_callable(to_process, self.config)
# TODO: update me to `var.KlioRunner.DIRECT_GKE_RUNNER` once
# direct_on_gke_runner_clean is merged
if self.config.pipeline_options.runner == "DirectGKERunner":
if to_pass_thru:
to_ack_input = (
out_pcol,
to_pass_thru,
) | "Flatten to Ack Input Messages" >> beam.Flatten()
else:
to_ack_input = out_pcol
_ = to_ack_input | "Ack Input Messages" >> beam.ParDo(
helpers.KlioAckInputMessage()
)
if self._has_event_outputs:
output_config = self.config.job_config.events.outputs[0]
if not output_config.skip_klio_write:
transform_cls_out = self._io_mapper.output[output_config.name]
to_output = out_pcol
if to_pass_thru:
to_output_tuple = (out_pcol, to_pass_thru)
to_output = (
to_output_tuple | "Flatten to Output" >> beam.Flatten()
)
_ = to_output | transform_cls_out(
**output_config.to_io_kwargs()
)