def _get_context_from_defaults()

in cli/src/klio_cli/commands/job/create.py [0:0]


    def _get_context_from_defaults(self, kwargs):
        def _get_worker_image():
            create_dockerfile = True
            image = self.WORKER_IMAGE_TPL.format(**kwargs)
            if kwargs.get("worker_image"):
                create_dockerfile = False
                image = kwargs["worker_image"]
            return image, create_dockerfile

        # TODO: increase abstraction - we generate some defaults more than once
        #       throughout this `job create` flow. We can probably abstract
        #       it better and write more DRY code (@lynn)
        tmp_default = "gs://{gcp_project}-dataflow-tmp/{job_name}".format(
            **kwargs
        )
        staging_default = tmp_default + "/staging"
        temp_default = tmp_default + "/temp"
        worker_image, create_dockerfile = _get_worker_image()

        if "use_fnapi" in kwargs:
            use_fnapi = kwargs.get("use_fnapi")
            if use_fnapi:
                use_fnapi = str(use_fnapi).lower() in ("y", "true", "yes")
            else:  # then it was used as a flag
                use_fnapi = True
        else:
            use_fnapi = DEFAULTS["use_fnapi"]

        create_resources = self._get_create_resources(kwargs)
        default_exps = DEFAULTS["experiments"].split(",")
        if not use_fnapi:
            default_exps = [e for e in default_exps if e != "beam_fn_api"]
        experiments = kwargs.get("experiments")
        if not experiments:
            experiments = default_exps
        else:
            experiments = experiments.split(",")

        pipeline_context = {
            "worker_harness_container_image": worker_image,
            "experiments": experiments,
            "project": kwargs["gcp_project"],
            "region": kwargs.get("region", DEFAULTS["region"]),
            "staging_location": kwargs.get(
                "staging_location", staging_default
            ),
            "temp_location": kwargs.get("temp_location", temp_default),
            "num_workers": kwargs.get("num_workers", DEFAULTS["num_workers"]),
            "max_num_workers": kwargs.get(
                "max_num_workers", DEFAULTS["max_num_workers"]
            ),
            "autoscaling_algorithm": kwargs.get(
                "autoscaling_algorithm", DEFAULTS["autoscaling_algorithm"]
            ),
            "disk_size_gb": kwargs.get(
                "disk_size_gb", DEFAULTS["disk_size_gb"]
            ),
            "worker_machine_type": kwargs.get(
                "worker_machine_type", DEFAULTS["worker_machine_type"]
            ),
        }

        job_type = kwargs.get("job_type", DEFAULTS["job_type"])
        if job_type == "batch":
            job_context = self._get_default_batch_job_context(kwargs)
        else:
            job_context = self._get_default_streaming_job_context(kwargs)

        python_version = kwargs.get(
            "python_version", DEFAULTS["python_version"]
        )
        python_version = self._parse_python_version(python_version)

        context = {
            "pipeline_options": pipeline_context,
            "job_options": job_context,
            "python_version": python_version,
            "use_fnapi": use_fnapi,
            "create_resources": create_resources,
            "job_type": job_type,
        }

        return context, create_dockerfile