def run_pipeline()

in exec/src/klio_exec/runners/gke_direct.py [0:0]


    def run_pipeline(self, pipeline, options):
        """Execute the entire pipeline and returns an DirectPipelineResult."""

        # Klio maintainer note: This code is the eact same logic in
        # direct_runner.BundleBasedDirectRunner.run_pipeline with the
        # following changes:
        # 1. Import statements that were originally inside this method
        #    was moved to the top of this module.
        # 2. Import statements adjusted to import module and not objects
        #    according to the google style guide.
        # 3. The functionalty we needed to override, which is invoking
        #    our own TransformEvaluatorRegistry when instantiating the
        #    Executor class (called out below).

        # If the TestStream I/O is used, use a mock test clock.
        class TestStreamUsageVisitor(beam_pipeline.PipelineVisitor):
            """Visitor determining whether a Pipeline uses a TestStream."""

            def __init__(self):
                self.uses_test_stream = False

            def visit_transform(self, applied_ptransform):
                if isinstance(
                    applied_ptransform.transform, test_stream.TestStream
                ):
                    self.uses_test_stream = True

        visitor = TestStreamUsageVisitor()
        pipeline.visit(visitor)
        clock = (
            beam_clock.TestClock()
            if visitor.uses_test_stream
            else beam_clock.RealClock()
        )

        # Performing configured PTransform overrides.
        pipeline.replace_all(direct_runner._get_transform_overrides(options))

        _LOGGER.info("Running pipeline with Klio's GkeDirectRunner.")
        self.consumer_tracking_visitor = ctpv.ConsumerTrackingPipelineVisitor()
        pipeline.visit(self.consumer_tracking_visitor)

        bndl_factory = bundle_factory.BundleFactory(
            stacked=options.view_as(
                pipeline_options.DirectOptions
            ).direct_runner_use_stacked_bundle
        )
        evaluation_context = eval_ctx.EvaluationContext(
            options,
            bndl_factory,
            self.consumer_tracking_visitor.root_transforms,
            self.consumer_tracking_visitor.value_to_consumers,
            self.consumer_tracking_visitor.step_names,
            self.consumer_tracking_visitor.views,
            clock,
        )

        # Klio maintainer note: this is where the change in logic is:
        # using our own `KlioTransformEvaluatorRegistry`.
        executor = beam_exec.Executor(
            self.consumer_tracking_visitor.value_to_consumers,
            evaluators.KlioTransformEvaluatorRegistry(evaluation_context),
            evaluation_context,
        )
        # DirectRunner does not support injecting
        # PipelineOptions values at runtime
        value_provider.RuntimeValueProvider.set_runtime_options({})
        # Start the executor. This is a non-blocking call, it will start the
        # execution in background threads and return.
        executor.start(self.consumer_tracking_visitor.root_transforms)
        result = direct_runner.DirectPipelineResult(
            executor, evaluation_context
        )

        return result