def _get_streaming_user_input()

in cli/src/klio_cli/commands/job/create.py [0:0]


    def _get_streaming_user_input(self, kwargs):
        base_topic = self.BASE_TOPIC_TPL.format(**kwargs)
        default_input_topic = base_topic + "-input"
        default_output_topic = base_topic + "-output"

        input_topic = kwargs.get("input_topic") or click.prompt(
            "Input topic (usually your dependency's output topic)",
            default=default_input_topic,
        )

        output_topic = kwargs.get("output_topic") or click.prompt(
            "Output topic", default=default_output_topic
        )

        default_input_loc = "gs://{gcp_project}-input/{job_name}".format(
            **kwargs
        )
        input_location = kwargs.get("input_data_location") or click.prompt(
            (
                "Location of job's input data (usually the location of your "
                "dependency's output data)"
            ),
            default=default_input_loc,
        )

        default_output_loc = "gs://{gcp_project}-output/{job_name}".format(
            **kwargs
        )
        output_location = kwargs.get("output_data_location") or click.prompt(
            "Location of job's output", default=default_output_loc
        )

        match = TOPIC_REGEX.match(input_topic)
        topic_name = match.group("topic")
        default_sub = (
            "projects/{gcp_project}/subscriptions/{topic_name}-{job_name}"
        )
        default_sub = default_sub.format(topic_name=topic_name, **kwargs)
        inputs = [
            {
                "topic": input_topic,
                "subscription": default_sub,
                "data_location": input_location,
            }
        ]
        outputs = [{"topic": output_topic, "data_location": output_location}]

        job_context = {
            "inputs": inputs,
            "outputs": outputs,
            "dependencies": [],
        }

        dependencies = kwargs.get("dependencies")
        if not dependencies:
            if click.confirm("Does your job have dependencies on other jobs?"):
                dependencies = self._get_dependencies_from_user_inputs()

        if dependencies is not None:
            job_context["dependencies"] = dependencies

        return job_context