in lib/src/klio/transforms/core.py [0:0]
def _get_metrics_registry(self):
native_metrics_client = native_metrics.NativeMetricsClient(self.config)
clients = [native_metrics_client]
use_logger, use_shumway = None, None
metrics_config = self.config.job_config.metrics
# use_logger/use_shumway could be False (turn off),
# None (use default config), or a dict of configured values
use_logger = metrics_config.get("logger")
use_shumway = metrics_config.get("shumway")
# TODO: set runner in OS environment (via klio-exec), since
# the runner defined in config could be overwritten via
# `--direct-runner`.
# i.e.: runner = os.getenv("BEAM_RUNNER", "").lower()
runner = self.config.pipeline_options.runner
if kvars.KlioRunner.DIRECT_RUNNER != runner:
if use_logger is None:
use_logger = False
# use shumway when running on DirectGKERunner unless it's explicitly
# turned off/set to False. Don't set it to True if it's set to False
# or it's a dictionary (aka has some configuration)
if kvars.KlioRunner.DIRECT_GKE_RUNNER == runner:
if use_shumway is None:
use_shumway = True
# shumway only works on DirectGKERunner, so we explicitly set it
# to False
else:
use_shumway = False
if use_logger is not False:
logger_client = metrics_logger.MetricsLoggerClient(self.config)
clients.append(logger_client)
if use_shumway is not False:
shumway_client = shumway.ShumwayMetricsClient(self.config)
clients.append(shumway_client)
return metrics_client.MetricsRegistry(
clients, transform_name=self._transform_name
)