in luigi/contrib/beam_dataflow.py [0:0]
def _format_input_args(self):
"""
Parses the result(s) of self.input() into a string-serialized
key-value list passed to the Dataflow job. Valid inputs include:
return FooTarget()
return {"input1": FooTarget(), "input2": FooTarget2())
return ("input", FooTarget())
return [("input1", FooTarget()), ("input2": FooTarget2())]
return [FooTarget(), FooTarget2()]
Unlabeled input are passed in with under the default key "input".
"""
job_input = self.input()
if isinstance(job_input, luigi.Target):
job_input = {"input": job_input}
elif isinstance(job_input, tuple):
job_input = {job_input[0]: job_input[1]}
elif isinstance(job_input, list):
if all(isinstance(item, tuple) for item in job_input):
job_input = dict(job_input)
else:
job_input = {"input": job_input}
elif not isinstance(job_input, dict):
raise ValueError("Invalid job input requires(). Supported types: ["
"Target, tuple of (name, Target), "
"dict of (name: Target), list of Targets]")
if not isinstance(self.file_pattern(), dict):
raise ValueError('file_pattern() must return a dict type')
input_args = []
for (name, targets) in job_input.items():
uris = [
self.get_target_path(uri_target) for uri_target in luigi.task.flatten(targets)
]
if isinstance(targets, dict):
"""
If targets is a dict that means it had multiple outputs.
Make the input args in that case "<input key>-<task output key>"
"""
names = ["%s-%s" % (name, key) for key in targets.keys()]
else:
names = [name] * len(uris)
input_dict = {}
for (arg_name, uri) in zip(names, uris):
pattern = self.file_pattern().get(name, 'part-*')
input_value = input_dict.get(arg_name, [])
input_value.append(uri.rstrip('/') + '/' + pattern)
input_dict[arg_name] = input_value
for (key, paths) in input_dict.items():
input_args.append("--%s=%s" % (key, ','.join(paths)))
return input_args