in cli/src/klio_cli/commands/job/create.py [0:0]
def _get_default_streaming_job_context(self, kwargs):
base_topic = self.BASE_TOPIC_TPL.format(**kwargs)
output_topic = kwargs.get("output_topic")
if not output_topic:
output_topic = base_topic + "-output"
output_location = kwargs.get("output_data_location")
if not output_location:
output_location = "gs://{gcp_project}-output/{job_name}".format(
**kwargs
)
input_topic = kwargs.get("input_topic")
if not input_topic:
input_topic = base_topic + "-input"
input_data_location = kwargs.get("input_data_location")
if not input_data_location:
input_data_location = "gs://{gcp_project}-input/{job_name}".format(
**kwargs
)
dependencies = kwargs.get("dependencies", [])
match = TOPIC_REGEX.match(input_topic)
topic_name = match.group("topic")
default_sub = (
"projects/{gcp_project}/subscriptions/{topic_name}-{job_name}"
)
default_sub = default_sub.format(topic_name=topic_name, **kwargs)
inputs = [
{
"topic": input_topic,
"subscription": default_sub,
"data_location": input_data_location,
}
]
outputs = [{"topic": output_topic, "data_location": output_location}]
job_context = {
"inputs": inputs,
"outputs": outputs,
"dependencies": dependencies,
}
return job_context