in exec/src/klio_exec/commands/run.py [0:0]
def run(self):
self._verify_packaging()
options = self._get_pipeline_options()
options.view_as(pipeline_options.SetupOptions).save_main_session = True
pipeline = beam.Pipeline(options=options)
self._setup_pipeline(pipeline)
try:
# NOTE: When running with Dataflow, this `result` object has a lot
# of information about the job (id, name, project, status,
# etc). Could be useful if wanting to report back the status,
# URL of the dataflow job, etc. @lynn
result = pipeline.run()
except ValueError as e:
if (
self.runtime_conf.update
and "No running job found with name" in str(e)
):
# job is currently not running - should simply deploy without
# updating set
# TODO: is this possible?
self.runtime_conf = self.runtime_conf._replace(update=None)
return self.run()
logging.error(f"Error running pipeline: {e}")
raise SystemExit(1)
# TODO: update me to `var.KlioRunner.DIRECT_GKE_RUNNER` once
# direct_on_gke_runner_clean is merged
is_direct_gke = (
self.config.pipeline_options.runner == "DirectGKERunner"
)
should_block = any(
(
self.runtime_conf.direct_runner,
self.runtime_conf.blocking,
is_direct_gke,
)
)
if should_block:
# the pipeline on direct runner will otherwise get garbage collected
result.wait_until_finish()
# If the blocking flag was already passed don't wait again
if (
self.config.job_config.wait_for_pipeline_running
and self.config.pipeline_options.streaming
and not should_block
):
self.wait_for_pipeline_running(result)