def run()

in exec/src/klio_exec/commands/run.py [0:0]


    def run(self):
        self._verify_packaging()
        options = self._get_pipeline_options()
        options.view_as(pipeline_options.SetupOptions).save_main_session = True

        pipeline = beam.Pipeline(options=options)

        self._setup_pipeline(pipeline)

        try:
            # NOTE: When running with Dataflow, this `result` object has a lot
            #       of information about the job (id, name, project, status,
            #       etc). Could be useful if wanting to report back the status,
            #       URL of the dataflow job, etc.  @lynn
            result = pipeline.run()
        except ValueError as e:
            if (
                self.runtime_conf.update
                and "No running job found with name" in str(e)
            ):
                # job is currently not running - should simply deploy without
                # updating set
                # TODO: is this possible?
                self.runtime_conf = self.runtime_conf._replace(update=None)
                return self.run()

            logging.error(f"Error running pipeline: {e}")
            raise SystemExit(1)

        # TODO: update me to `var.KlioRunner.DIRECT_GKE_RUNNER` once
        #       direct_on_gke_runner_clean is merged
        is_direct_gke = (
            self.config.pipeline_options.runner == "DirectGKERunner"
        )
        should_block = any(
            (
                self.runtime_conf.direct_runner,
                self.runtime_conf.blocking,
                is_direct_gke,
            )
        )

        if should_block:
            # the pipeline on direct runner will otherwise get garbage collected
            result.wait_until_finish()

        # If the blocking flag was already passed don't wait again
        if (
            self.config.job_config.wait_for_pipeline_running
            and self.config.pipeline_options.streaming
            and not should_block
        ):
            self.wait_for_pipeline_running(result)