in exec/src/klio_exec/commands/run.py [0:0]
def _setup_data_io_filters(self, in_pcol, label_prefix=None):
# label prefixes are required for multiple inputs (to avoid label
# name collisions in Beam)
if self._has_multi_data_inputs or self._has_multi_data_outputs:
logging.error(
"Klio does not (yet) support multiple data inputs and outputs."
)
raise SystemExit(1)
data_in_config, data_out_config = None, None
if self._has_data_inputs:
data_in_config = self.config.job_config.data.inputs[0]
if self._has_data_outputs:
data_out_config = self.config.job_config.data.outputs[0]
pfx = ""
if label_prefix is not None:
pfx = "[{}] ".format(label_prefix)
def lbl(label):
return "{}{}".format(pfx, label)
to_process_output = in_pcol
pass_thru = None
if data_in_config:
pings = in_pcol | lbl("Ping Filter") >> helpers.KlioFilterPing()
to_process_output = pings.process
pass_thru = pings.pass_thru
if data_out_config and not data_out_config.skip_klio_existence_check:
output_exists = (
to_process_output
| lbl("Output Exists Filter")
>> helpers.KlioGcsCheckOutputExists()
)
output_force = (
output_exists.found
| lbl("Output Force Filter") >> helpers.KlioFilterForce()
)
if pass_thru is not None:
to_pass_thru_tuple = (pass_thru, output_force.pass_thru)
to_pass_thru = (
to_pass_thru_tuple
| lbl("Flatten to Pass Thru") >> beam.Flatten()
)
else:
to_pass_thru = output_force.pass_thru
to_filter_input_tuple = (
output_exists.not_found,
output_force.process,
)
to_filter_input = (
to_filter_input_tuple
| lbl("Flatten to Process") >> beam.Flatten()
)
else:
to_pass_thru = pass_thru
to_filter_input = to_process_output
if data_in_config and not data_in_config.skip_klio_existence_check:
input_exists = (
to_filter_input
| lbl("Input Exists Filter")
>> helpers.KlioGcsCheckInputExists()
)
# TODO: update me to `var.KlioRunner.DIRECT_GKE_RUNNER` once
# direct_on_gke_runner_clean is merged
if self.config.pipeline_options.runner == "DirectGKERunner":
ack_inp_lbl = lbl("Ack Input Message from No Data Input Found")
_ = input_exists.not_found | ack_inp_lbl >> beam.ParDo(
helpers.KlioAckInputMessage()
)
_ = (
input_exists.not_found
| lbl("Drop Not Found Data") >> helpers.KlioDrop()
)
to_process = input_exists.found
else:
to_process = to_filter_input
return to_process, to_pass_thru