cli/src/klio_cli/utils/cli_utils.py (87 lines of code) (raw):

# Copyright 2020 Spotify AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import subprocess from klio_core import variables as var def get_git_sha(cwd=None, image_tag=None): cmd = "git describe --match=NeVeRmAtCh --always --abbrev=8 --dirty" try: return ( subprocess.check_output( # pipe to devnull to suppress the error msgs from git itself cmd.split(), cwd=cwd, stderr=subprocess.DEVNULL, ) .decode() .strip() ) except subprocess.CalledProcessError: if not image_tag: logging.error( "The directory from which you are running this is not a git " "directory, or has no commits yet. The latest commit is used " "to tag the Docker image that is built by this command. " "Consider overriding this value using the --image-tag flag " "until such a time as commits are available." ) raise SystemExit(1) # TODO: Move this to KlioConfig validation # once overriding & templates are done def validate_dataflow_runner_config(klio_config): pipeline_opts = klio_config.pipeline_options.as_dict() mandatory_gcp_keys = [ "project", "staging_location", "temp_location", "region", ] is_gcp = all( pipeline_opts.get(key) is not None for key in mandatory_gcp_keys ) if not is_gcp: logging.error( "Unable to verify the mandatory configuration fields for" " DataflowRunner. Please fix job configuration or run via direct" "runner." ) raise SystemExit(1) def is_direct_runner(klio_config, direct_runner_flag): if direct_runner_flag: return True runner = klio_config.pipeline_options.runner if runner == var.KlioRunner.DIRECT_RUNNER: return True if runner == var.KlioRunner.DATAFLOW_RUNNER: validate_dataflow_runner_config(klio_config) return False def import_gke_commands(): # Importing GKE commands needs to be behind a try/except because the # kubernetes dependency is not part of the base install dependencies try: from klio_cli.commands.job import gke as gke_commands # the import is only local to this function so we need to return the # module return gke_commands except ImportError as e: if "kubernetes" in e.msg: logging.error( "Failed to import DirectGKERunner dependencies." " Did you install `klio-cli[kubernetes]`?" ) raise SystemExit(1) logging.error(e) raise SystemExit(1) def error_stackdriver_logger_metrics(klio_config, direct_runner): """Warn if deprecated + removed stackdriver_logger was configured.""" if direct_runner: return if klio_config.pipeline_options.runner == var.KlioRunner.DIRECT_GKE_RUNNER: return metrics_config = klio_config.job_config.metrics if isinstance(metrics_config, dict): stackdriver_conf = metrics_config.get("stackdriver_logger") if stackdriver_conf not in (False, None): msg = ( "The Stackdriver log-based metric client has been deprecated " "since 21.3.0 and removed in 21.12.0, in favor of the Native " "metrics client.\n" "See docs on how to turn on the native client for Dataflow: " "https://docs.klio.io/en/stable/userguide/pipeline/metrics.html" "#stackdriver-required-setup\n" "And docs how to configure the native client: " "https://docs.klio.io/en/stable/userguide/pipeline/metrics.html" "#configuration" ) logging.error(msg) raise SystemExit(1)