def _filter_intended_recipients()

in exec/src/klio_exec/commands/run.py [0:0]


    def _filter_intended_recipients(self, in_pcol, label_pfx=None):
        pfx = ""
        if label_pfx is not None:
            pfx = "[{}] ".format(label_pfx)

        def lbl(label):
            return "{}{}".format(pfx, label)

        # TODO: this "tagging by version then processing each version
        # differently" should only be temporary and removed once v2
        # migration is done
        version_lbl = lbl("Tag Message Versions")
        msg_version = in_pcol | version_lbl >> helpers._KlioTagMessageVersion()

        # tag each v1 message as 'process' or to 'drop' depending on if this
        # job should actually be handling the received message.
        v1_proc_lbl = lbl("Should Process v1 Message")
        v1_to_process = (
            msg_version.v1 | v1_proc_lbl >> helpers._KlioV1CheckRecipients()
        )
        v2_proc_lbl = lbl("Should Process v2 Message")
        v2_to_process = (
            msg_version.v2 | v2_proc_lbl >> helpers.KlioCheckRecipients()
        )

        flatten_ign_lbl = lbl("Flatten to Drop Messages to Ignore")
        to_drop_flatten = (v1_to_process.drop, v2_to_process.drop)
        to_drop = to_drop_flatten | flatten_ign_lbl >> beam.Flatten()

        # TODO: update me to `var.KlioRunner.DIRECT_GKE_RUNNER` once
        #       direct_on_gke_runner_clean is merged
        if self.config.pipeline_options.runner == "DirectGKERunner":
            ack_inp_lbl = lbl("Ack Dropped Input Message")
            _ = to_drop | ack_inp_lbl >> beam.ParDo(
                helpers.KlioAckInputMessage()
            )

        ignore_lbl = lbl("Drop Messages to Ignore")
        _ = to_drop | ignore_lbl >> helpers.KlioDrop()

        flatten_proc_lbl = lbl("Flatten to Process Intended Messages")
        to_process_flatten = (v1_to_process.process, v2_to_process.process)
        to_process = to_process_flatten | flatten_proc_lbl >> beam.Flatten()
        return to_process