in lib/src/klio/transforms/helpers.py [0:0]
def _should_process(self, klio_message):
intended_recipients = klio_message.metadata.intended_recipients
# returns "anyone", "limited", or None if not set
recipients = intended_recipients.WhichOneof("recipients")
if recipients is None:
# is it safe to assume if this is not set in a v2 message, it should
# be top-down? I think this will be the case for batch
self._klio.logger.warning(
"Dropping KlioMessage - No 'intended_recipients' set in "
"metadata of KlioMessage with element '{}'.".format(
klio_message.data.element
)
)
return False
if recipients == "anyone":
return True
current_job = klio_pb2.KlioJob()
current_job.ParseFromString(self._klio.job)
# otherwise, recipients == "limited"
# don't process if this job is not in the intended recipients
if not _helpers._job_in_jobs(
current_job, intended_recipients.limited.recipients
):
return False
# if it is in the intended recipients _and_ is the job in
# trigger_children_of, then this message was originally in top-down
# mode, but was missing dependencies, and therefore should update the
# message intended receipients to be "anyone" signifying top-down
if _helpers._job_in_jobs(
current_job, [intended_recipients.limited.trigger_children_of]
):
# FYI: since 'anyone' is essentially empty msg, it can't simply
# be assigned. To set `anyone` as the intended_recipients, use
# kmsg.metadata.intended_recipients.anyone.SetInParent()`
# https://stackoverflow.com/a/29651069
intended_recipients.anyone.SetInParent()
return True