in cli/src/klio_cli/cli.py [0:0]
def deploy_job(klio_config, config_meta, **kwargs):
direct_runner = cli_utils.is_direct_runner(
klio_config, kwargs.pop("direct_runner")
)
cli_utils.error_stackdriver_logger_metrics(klio_config, direct_runner)
git_sha = cli_utils.get_git_sha(
config_meta.job_dir, kwargs.get("image_tag")
)
image_tag = kwargs.get("image_tag") or git_sha
if config_meta.config_file:
basename = os.path.basename(config_meta.config_file)
image_tag = "{}-{}".format(image_tag, basename)
runtime_config = DockerRuntimeConfig(
image_tag=image_tag,
force_build=kwargs.get("force_build"),
config_file_override=config_meta.config_file,
)
run_job_config = RunJobConfig(
direct_runner=direct_runner,
update=kwargs.pop("update"),
git_sha=git_sha,
)
# TODO: make this a click option once draining is supported @lynn
if not run_job_config.update:
job_name = klio_config.job_name
gcp_project = klio_config.pipeline_options.project
region = klio_config.pipeline_options.region
strategy = "cancel"
job_commands.stop.StopJob().stop(
job_name, gcp_project, region, strategy
)
if (
not direct_runner
and klio_config.pipeline_options.runner
== var.KlioRunner.DIRECT_GKE_RUNNER
):
gke_commands = cli_utils.import_gke_commands()
run_command = gke_commands.RunPipelineGKE(
config_meta.job_dir, klio_config, runtime_config, run_job_config
)
else:
run_command = job_commands.run.RunPipeline(
config_meta.job_dir, klio_config, runtime_config, run_job_config
)
rc = run_command.run()
sys.exit(rc)