in cli/src/klio_cli/commands/job/create.py [0:0]
def _get_streaming_user_input(self, kwargs):
base_topic = self.BASE_TOPIC_TPL.format(**kwargs)
default_input_topic = base_topic + "-input"
default_output_topic = base_topic + "-output"
input_topic = kwargs.get("input_topic") or click.prompt(
"Input topic (usually your dependency's output topic)",
default=default_input_topic,
)
output_topic = kwargs.get("output_topic") or click.prompt(
"Output topic", default=default_output_topic
)
default_input_loc = "gs://{gcp_project}-input/{job_name}".format(
**kwargs
)
input_location = kwargs.get("input_data_location") or click.prompt(
(
"Location of job's input data (usually the location of your "
"dependency's output data)"
),
default=default_input_loc,
)
default_output_loc = "gs://{gcp_project}-output/{job_name}".format(
**kwargs
)
output_location = kwargs.get("output_data_location") or click.prompt(
"Location of job's output", default=default_output_loc
)
match = TOPIC_REGEX.match(input_topic)
topic_name = match.group("topic")
default_sub = (
"projects/{gcp_project}/subscriptions/{topic_name}-{job_name}"
)
default_sub = default_sub.format(topic_name=topic_name, **kwargs)
inputs = [
{
"topic": input_topic,
"subscription": default_sub,
"data_location": input_location,
}
]
outputs = [{"topic": output_topic, "data_location": output_location}]
job_context = {
"inputs": inputs,
"outputs": outputs,
"dependencies": [],
}
dependencies = kwargs.get("dependencies")
if not dependencies:
if click.confirm("Does your job have dependencies on other jobs?"):
dependencies = self._get_dependencies_from_user_inputs()
if dependencies is not None:
job_context["dependencies"] = dependencies
return job_context