def _get_metrics_registry()

in lib/src/klio/transforms/core.py [0:0]


    def _get_metrics_registry(self):
        native_metrics_client = native_metrics.NativeMetricsClient(self.config)
        clients = [native_metrics_client]
        use_logger, use_shumway = None, None
        metrics_config = self.config.job_config.metrics

        # use_logger/use_shumway could be False (turn off),
        # None (use default config), or a dict of configured values
        use_logger = metrics_config.get("logger")
        use_shumway = metrics_config.get("shumway")

        # TODO: set runner in OS environment (via klio-exec), since
        #       the runner defined in config could be overwritten via
        #       `--direct-runner`.
        #       i.e.: runner = os.getenv("BEAM_RUNNER", "").lower()
        runner = self.config.pipeline_options.runner

        if kvars.KlioRunner.DIRECT_RUNNER != runner:
            if use_logger is None:
                use_logger = False

        # use shumway when running on DirectGKERunner unless it's explicitly
        # turned off/set to False. Don't set it to True if it's set to False
        # or it's a dictionary (aka has some configuration)
        if kvars.KlioRunner.DIRECT_GKE_RUNNER == runner:
            if use_shumway is None:
                use_shumway = True
        # shumway only works on DirectGKERunner, so we explicitly set it
        # to False
        else:
            use_shumway = False

        if use_logger is not False:
            logger_client = metrics_logger.MetricsLoggerClient(self.config)
            clients.append(logger_client)

        if use_shumway is not False:
            shumway_client = shumway.ShumwayMetricsClient(self.config)
            clients.append(shumway_client)

        return metrics_client.MetricsRegistry(
            clients, transform_name=self._transform_name
        )