def _get_context_from_user_inputs()

in cli/src/klio_cli/commands/job/create.py [0:0]


    def _get_context_from_user_inputs(self, kwargs):
        job_type = kwargs.get("job_type") or click.prompt(
            "Job Type",
            type=click.Choice(["batch", "streaming"]),
            default=DEFAULTS["job_type"],
        )
        region = kwargs.get("region") or click.prompt(
            "Desired GCP Region", default=DEFAULTS["region"]
        )
        self._validate_region(region)

        if "use_fnapi" in kwargs:
            use_fnapi = kwargs.get("use_fnapi")
            if use_fnapi:
                use_fnapi = str(use_fnapi).lower() in ("y", "true", "yes")
            else:  # then it was used as a flag
                use_fnapi = True

        else:
            use_fnapi = click.prompt(
                "Use Apache Beam's FnAPI (experimental) [Y/n]",
                type=click.Choice(["y", "Y", "n", "N"]),
                default="y" if DEFAULTS["use_fnapi"] is True else "n",
                show_choices=False,
                show_default=False,  # shown in prompt
            )
            use_fnapi = use_fnapi.lower() == "y"

        create_resources = self._get_create_resources(kwargs, user_input=True)

        # TODO should this even be an option? run-job will break if so.
        # TODO: figure out if we should expose `experiments` to the user, or
        #       if it's okay to always assume `beam_fn_api` is the only
        #       experiment someone would ever use.
        default_exps = DEFAULTS["experiments"].split(",")
        if not use_fnapi:
            default_exps = [e for e in default_exps if e != "beam_fn_api"]

        experiments = kwargs.get("experiments")
        if experiments:
            experiments = experiments.split(",")
        else:
            experiments = click.prompt(
                "Beam experiments to enable", default=default_exps,
            )
        num_workers = kwargs.get("num_workers") or click.prompt(
            "Number of workers to run",
            type=int,
            default=DEFAULTS["num_workers"],
        )
        max_workers = kwargs.get("max_num_workers") or click.prompt(
            "Maximum number of workers to run",
            type=int,
            default=DEFAULTS["max_num_workers"],
        )
        autoscaling_algorithm = kwargs.get(
            "autoscaling_algorithm"
        ) or click.prompt(
            "Autoscaling algorithm to use. "
            "Can be NONE (default) or THROUGHPUT_BASED",
            type=str,
            default=DEFAULTS["autoscaling_algorithm"],
        )
        disk_size = kwargs.get("disk_size_gb") or click.prompt(
            "Size of a worker disk (GB)",
            type=int,
            default=DEFAULTS["disk_size_gb"],
        )
        machine_type = kwargs.get("machine_type") or click.prompt(
            "Type of GCP instance for the worker machine(s)",
            default=DEFAULTS["worker_machine_type"],
        )

        # TODO: remove support for providing a dockerfile and docker image
        #       specifically for a job. But be able to provide the ability to
        #       choose between py2 or py3 base fnapi image. (@lynn)
        create_dockerfile = False
        worker_image = kwargs.get("worker_image")
        python_version = DEFAULTS["python_version"]
        if not worker_image:
            worker_image = click.prompt(
                (
                    "Docker image to use for the worker."
                    "If none, a Dockerfile will"
                    " be created for you"
                ),
                default="",
            )
            # FYI: for some reason, coverage doesn't detect that this branch
            #      is indeed covered; force skipping for now
            if not worker_image:  # pragma: no cover
                worker_image = self.WORKER_IMAGE_TPL.format(**kwargs)
                python_version = kwargs.get("python_version") or click.prompt(
                    "Python major version ({})".format(
                        ", ".join(VALID_BEAM_PY_VERSIONS)
                    ),
                    default=DEFAULTS["python_version"],
                )
                create_dockerfile = True

        python_version = self._parse_python_version(python_version)
        self._validate_worker_image(worker_image)

        tmp_default = "gs://{gcp_project}-dataflow-tmp/{job_name}".format(
            **kwargs
        )
        staging_default = tmp_default + "/staging"
        temp_default = tmp_default + "/temp"
        staging = kwargs.get("staging_location") or click.prompt(
            "Staging environment location", default=staging_default
        )
        temp = kwargs.get("temp_location") or click.prompt(
            "Temporary environment location", default=temp_default
        )

        pipeline_context = {
            "worker_harness_container_image": worker_image,
            "experiments": experiments,
            "region": region,
            "staging_location": staging,
            "temp_location": temp,
            "num_workers": num_workers,
            "max_num_workers": max_workers,
            "autoscaling_algorithm": autoscaling_algorithm,
            "disk_size_gb": disk_size,
            "worker_machine_type": machine_type,
        }

        if job_type == "batch":
            job_context = self._get_batch_user_input_job_context(kwargs)
        else:
            job_context = self._get_streaming_user_input(kwargs)

        context = {
            "pipeline_options": pipeline_context,
            "job_options": job_context,
            "python_version": python_version,
            "use_fnapi": use_fnapi,
            "create_resources": create_resources,
            "job_type": job_type,
        }
        return context, create_dockerfile