in exec/src/klio_exec/runners/gke_direct.py [0:0]
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# Klio maintainer note: This code is the eact same logic in
# direct_runner.BundleBasedDirectRunner.run_pipeline with the
# following changes:
# 1. Import statements that were originally inside this method
# was moved to the top of this module.
# 2. Import statements adjusted to import module and not objects
# according to the google style guide.
# 3. The functionalty we needed to override, which is invoking
# our own TransformEvaluatorRegistry when instantiating the
# Executor class (called out below).
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(beam_pipeline.PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(
applied_ptransform.transform, test_stream.TestStream
):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = (
beam_clock.TestClock()
if visitor.uses_test_stream
else beam_clock.RealClock()
)
# Performing configured PTransform overrides.
pipeline.replace_all(direct_runner._get_transform_overrides(options))
_LOGGER.info("Running pipeline with Klio's GkeDirectRunner.")
self.consumer_tracking_visitor = ctpv.ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
bndl_factory = bundle_factory.BundleFactory(
stacked=options.view_as(
pipeline_options.DirectOptions
).direct_runner_use_stacked_bundle
)
evaluation_context = eval_ctx.EvaluationContext(
options,
bndl_factory,
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock,
)
# Klio maintainer note: this is where the change in logic is:
# using our own `KlioTransformEvaluatorRegistry`.
executor = beam_exec.Executor(
self.consumer_tracking_visitor.value_to_consumers,
evaluators.KlioTransformEvaluatorRegistry(evaluation_context),
evaluation_context,
)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
value_provider.RuntimeValueProvider.set_runtime_options({})
# Start the executor. This is a non-blocking call, it will start the
# execution in background threads and return.
executor.start(self.consumer_tracking_visitor.root_transforms)
result = direct_runner.DirectPipelineResult(
executor, evaluation_context
)
return result