def _setup_data_io_filters()

in exec/src/klio_exec/commands/run.py [0:0]


    def _setup_data_io_filters(self, in_pcol, label_prefix=None):
        # label prefixes are required for multiple inputs (to avoid label
        # name collisions in Beam)
        if self._has_multi_data_inputs or self._has_multi_data_outputs:
            logging.error(
                "Klio does not (yet) support multiple data inputs and outputs."
            )
            raise SystemExit(1)

        data_in_config, data_out_config = None, None
        if self._has_data_inputs:
            data_in_config = self.config.job_config.data.inputs[0]
        if self._has_data_outputs:
            data_out_config = self.config.job_config.data.outputs[0]

        pfx = ""
        if label_prefix is not None:
            pfx = "[{}] ".format(label_prefix)

        def lbl(label):
            return "{}{}".format(pfx, label)

        to_process_output = in_pcol
        pass_thru = None
        if data_in_config:
            pings = in_pcol | lbl("Ping Filter") >> helpers.KlioFilterPing()
            to_process_output = pings.process
            pass_thru = pings.pass_thru

        if data_out_config and not data_out_config.skip_klio_existence_check:
            output_exists = (
                to_process_output
                | lbl("Output Exists Filter")
                >> helpers.KlioGcsCheckOutputExists()
            )
            output_force = (
                output_exists.found
                | lbl("Output Force Filter") >> helpers.KlioFilterForce()
            )
            if pass_thru is not None:
                to_pass_thru_tuple = (pass_thru, output_force.pass_thru)
                to_pass_thru = (
                    to_pass_thru_tuple
                    | lbl("Flatten to Pass Thru") >> beam.Flatten()
                )
            else:
                to_pass_thru = output_force.pass_thru

            to_filter_input_tuple = (
                output_exists.not_found,
                output_force.process,
            )
            to_filter_input = (
                to_filter_input_tuple
                | lbl("Flatten to Process") >> beam.Flatten()
            )
        else:
            to_pass_thru = pass_thru
            to_filter_input = to_process_output

        if data_in_config and not data_in_config.skip_klio_existence_check:
            input_exists = (
                to_filter_input
                | lbl("Input Exists Filter")
                >> helpers.KlioGcsCheckInputExists()
            )

            # TODO: update me to `var.KlioRunner.DIRECT_GKE_RUNNER` once
            #       direct_on_gke_runner_clean is merged
            if self.config.pipeline_options.runner == "DirectGKERunner":
                ack_inp_lbl = lbl("Ack Input Message from No Data Input Found")
                _ = input_exists.not_found | ack_inp_lbl >> beam.ParDo(
                    helpers.KlioAckInputMessage()
                )
            _ = (
                input_exists.not_found
                | lbl("Drop Not Found Data") >> helpers.KlioDrop()
            )
            to_process = input_exists.found
        else:
            to_process = to_filter_input

        return to_process, to_pass_thru