in cli/src/klio_cli/commands/job/create.py [0:0]
def _get_context_from_defaults(self, kwargs):
def _get_worker_image():
create_dockerfile = True
image = self.WORKER_IMAGE_TPL.format(**kwargs)
if kwargs.get("worker_image"):
create_dockerfile = False
image = kwargs["worker_image"]
return image, create_dockerfile
# TODO: increase abstraction - we generate some defaults more than once
# throughout this `job create` flow. We can probably abstract
# it better and write more DRY code (@lynn)
tmp_default = "gs://{gcp_project}-dataflow-tmp/{job_name}".format(
**kwargs
)
staging_default = tmp_default + "/staging"
temp_default = tmp_default + "/temp"
worker_image, create_dockerfile = _get_worker_image()
if "use_fnapi" in kwargs:
use_fnapi = kwargs.get("use_fnapi")
if use_fnapi:
use_fnapi = str(use_fnapi).lower() in ("y", "true", "yes")
else: # then it was used as a flag
use_fnapi = True
else:
use_fnapi = DEFAULTS["use_fnapi"]
create_resources = self._get_create_resources(kwargs)
default_exps = DEFAULTS["experiments"].split(",")
if not use_fnapi:
default_exps = [e for e in default_exps if e != "beam_fn_api"]
experiments = kwargs.get("experiments")
if not experiments:
experiments = default_exps
else:
experiments = experiments.split(",")
pipeline_context = {
"worker_harness_container_image": worker_image,
"experiments": experiments,
"project": kwargs["gcp_project"],
"region": kwargs.get("region", DEFAULTS["region"]),
"staging_location": kwargs.get(
"staging_location", staging_default
),
"temp_location": kwargs.get("temp_location", temp_default),
"num_workers": kwargs.get("num_workers", DEFAULTS["num_workers"]),
"max_num_workers": kwargs.get(
"max_num_workers", DEFAULTS["max_num_workers"]
),
"autoscaling_algorithm": kwargs.get(
"autoscaling_algorithm", DEFAULTS["autoscaling_algorithm"]
),
"disk_size_gb": kwargs.get(
"disk_size_gb", DEFAULTS["disk_size_gb"]
),
"worker_machine_type": kwargs.get(
"worker_machine_type", DEFAULTS["worker_machine_type"]
),
}
job_type = kwargs.get("job_type", DEFAULTS["job_type"])
if job_type == "batch":
job_context = self._get_default_batch_job_context(kwargs)
else:
job_context = self._get_default_streaming_job_context(kwargs)
python_version = kwargs.get(
"python_version", DEFAULTS["python_version"]
)
python_version = self._parse_python_version(python_version)
context = {
"pipeline_options": pipeline_context,
"job_options": job_context,
"python_version": python_version,
"use_fnapi": use_fnapi,
"create_resources": create_resources,
"job_type": job_type,
}
return context, create_dockerfile