def _setup_pipeline()

in exec/src/klio_exec/commands/run.py [0:0]


    def _setup_pipeline(self, pipeline):
        run_callable = self._get_run_callable()

        to_pass_thru = None
        to_process = pipeline
        # sanity check - I think klio config forces event input
        if self._has_event_inputs:
            if not self._has_multi_event_inputs:
                input_config = self.config.job_config.events.inputs[0]
                to_process, to_pass_thru = self._generate_pcoll(
                    pipeline, input_config
                )
            else:
                to_process, to_pass_thru = self._generate_pcoll_per_input(
                    pipeline
                )

        out_pcol = run_callable(to_process, self.config)

        # TODO: update me to `var.KlioRunner.DIRECT_GKE_RUNNER` once
        #       direct_on_gke_runner_clean is merged
        if self.config.pipeline_options.runner == "DirectGKERunner":
            if to_pass_thru:
                to_ack_input = (
                    out_pcol,
                    to_pass_thru,
                ) | "Flatten to Ack Input Messages" >> beam.Flatten()
            else:
                to_ack_input = out_pcol

            _ = to_ack_input | "Ack Input Messages" >> beam.ParDo(
                helpers.KlioAckInputMessage()
            )

        if self._has_event_outputs:
            output_config = self.config.job_config.events.outputs[0]
            if not output_config.skip_klio_write:
                transform_cls_out = self._io_mapper.output[output_config.name]
                to_output = out_pcol
                if to_pass_thru:
                    to_output_tuple = (out_pcol, to_pass_thru)
                    to_output = (
                        to_output_tuple | "Flatten to Output" >> beam.Flatten()
                    )

                _ = to_output | transform_cls_out(
                    **output_config.to_io_kwargs()
                )