in exec/src/klio_exec/commands/run.py [0:0]
def _filter_intended_recipients(self, in_pcol, label_pfx=None):
pfx = ""
if label_pfx is not None:
pfx = "[{}] ".format(label_pfx)
def lbl(label):
return "{}{}".format(pfx, label)
# TODO: this "tagging by version then processing each version
# differently" should only be temporary and removed once v2
# migration is done
version_lbl = lbl("Tag Message Versions")
msg_version = in_pcol | version_lbl >> helpers._KlioTagMessageVersion()
# tag each v1 message as 'process' or to 'drop' depending on if this
# job should actually be handling the received message.
v1_proc_lbl = lbl("Should Process v1 Message")
v1_to_process = (
msg_version.v1 | v1_proc_lbl >> helpers._KlioV1CheckRecipients()
)
v2_proc_lbl = lbl("Should Process v2 Message")
v2_to_process = (
msg_version.v2 | v2_proc_lbl >> helpers.KlioCheckRecipients()
)
flatten_ign_lbl = lbl("Flatten to Drop Messages to Ignore")
to_drop_flatten = (v1_to_process.drop, v2_to_process.drop)
to_drop = to_drop_flatten | flatten_ign_lbl >> beam.Flatten()
# TODO: update me to `var.KlioRunner.DIRECT_GKE_RUNNER` once
# direct_on_gke_runner_clean is merged
if self.config.pipeline_options.runner == "DirectGKERunner":
ack_inp_lbl = lbl("Ack Dropped Input Message")
_ = to_drop | ack_inp_lbl >> beam.ParDo(
helpers.KlioAckInputMessage()
)
ignore_lbl = lbl("Drop Messages to Ignore")
_ = to_drop | ignore_lbl >> helpers.KlioDrop()
flatten_proc_lbl = lbl("Flatten to Process Intended Messages")
to_process_flatten = (v1_to_process.process, v2_to_process.process)
to_process = to_process_flatten | flatten_proc_lbl >> beam.Flatten()
return to_process