in cli/src/klio_cli/commands/job/create.py [0:0]
def _get_context_from_user_inputs(self, kwargs):
job_type = kwargs.get("job_type") or click.prompt(
"Job Type",
type=click.Choice(["batch", "streaming"]),
default=DEFAULTS["job_type"],
)
region = kwargs.get("region") or click.prompt(
"Desired GCP Region", default=DEFAULTS["region"]
)
self._validate_region(region)
if "use_fnapi" in kwargs:
use_fnapi = kwargs.get("use_fnapi")
if use_fnapi:
use_fnapi = str(use_fnapi).lower() in ("y", "true", "yes")
else: # then it was used as a flag
use_fnapi = True
else:
use_fnapi = click.prompt(
"Use Apache Beam's FnAPI (experimental) [Y/n]",
type=click.Choice(["y", "Y", "n", "N"]),
default="y" if DEFAULTS["use_fnapi"] is True else "n",
show_choices=False,
show_default=False, # shown in prompt
)
use_fnapi = use_fnapi.lower() == "y"
create_resources = self._get_create_resources(kwargs, user_input=True)
# TODO should this even be an option? run-job will break if so.
# TODO: figure out if we should expose `experiments` to the user, or
# if it's okay to always assume `beam_fn_api` is the only
# experiment someone would ever use.
default_exps = DEFAULTS["experiments"].split(",")
if not use_fnapi:
default_exps = [e for e in default_exps if e != "beam_fn_api"]
experiments = kwargs.get("experiments")
if experiments:
experiments = experiments.split(",")
else:
experiments = click.prompt(
"Beam experiments to enable", default=default_exps,
)
num_workers = kwargs.get("num_workers") or click.prompt(
"Number of workers to run",
type=int,
default=DEFAULTS["num_workers"],
)
max_workers = kwargs.get("max_num_workers") or click.prompt(
"Maximum number of workers to run",
type=int,
default=DEFAULTS["max_num_workers"],
)
autoscaling_algorithm = kwargs.get(
"autoscaling_algorithm"
) or click.prompt(
"Autoscaling algorithm to use. "
"Can be NONE (default) or THROUGHPUT_BASED",
type=str,
default=DEFAULTS["autoscaling_algorithm"],
)
disk_size = kwargs.get("disk_size_gb") or click.prompt(
"Size of a worker disk (GB)",
type=int,
default=DEFAULTS["disk_size_gb"],
)
machine_type = kwargs.get("machine_type") or click.prompt(
"Type of GCP instance for the worker machine(s)",
default=DEFAULTS["worker_machine_type"],
)
# TODO: remove support for providing a dockerfile and docker image
# specifically for a job. But be able to provide the ability to
# choose between py2 or py3 base fnapi image. (@lynn)
create_dockerfile = False
worker_image = kwargs.get("worker_image")
python_version = DEFAULTS["python_version"]
if not worker_image:
worker_image = click.prompt(
(
"Docker image to use for the worker."
"If none, a Dockerfile will"
" be created for you"
),
default="",
)
# FYI: for some reason, coverage doesn't detect that this branch
# is indeed covered; force skipping for now
if not worker_image: # pragma: no cover
worker_image = self.WORKER_IMAGE_TPL.format(**kwargs)
python_version = kwargs.get("python_version") or click.prompt(
"Python major version ({})".format(
", ".join(VALID_BEAM_PY_VERSIONS)
),
default=DEFAULTS["python_version"],
)
create_dockerfile = True
python_version = self._parse_python_version(python_version)
self._validate_worker_image(worker_image)
tmp_default = "gs://{gcp_project}-dataflow-tmp/{job_name}".format(
**kwargs
)
staging_default = tmp_default + "/staging"
temp_default = tmp_default + "/temp"
staging = kwargs.get("staging_location") or click.prompt(
"Staging environment location", default=staging_default
)
temp = kwargs.get("temp_location") or click.prompt(
"Temporary environment location", default=temp_default
)
pipeline_context = {
"worker_harness_container_image": worker_image,
"experiments": experiments,
"region": region,
"staging_location": staging,
"temp_location": temp,
"num_workers": num_workers,
"max_num_workers": max_workers,
"autoscaling_algorithm": autoscaling_algorithm,
"disk_size_gb": disk_size,
"worker_machine_type": machine_type,
}
if job_type == "batch":
job_context = self._get_batch_user_input_job_context(kwargs)
else:
job_context = self._get_streaming_user_input(kwargs)
context = {
"pipeline_options": pipeline_context,
"job_options": job_context,
"python_version": python_version,
"use_fnapi": use_fnapi,
"create_resources": create_resources,
"job_type": job_type,
}
return context, create_dockerfile