def _format_input_args()

in luigi/contrib/beam_dataflow.py [0:0]


    def _format_input_args(self):
        """
            Parses the result(s) of self.input() into a string-serialized
            key-value list passed to the Dataflow job. Valid inputs include:

            return FooTarget()

            return {"input1": FooTarget(), "input2": FooTarget2())

            return ("input", FooTarget())

            return [("input1", FooTarget()), ("input2": FooTarget2())]

            return [FooTarget(), FooTarget2()]

            Unlabeled input are passed in with under the default key "input".
        """
        job_input = self.input()

        if isinstance(job_input, luigi.Target):
            job_input = {"input": job_input}

        elif isinstance(job_input, tuple):
            job_input = {job_input[0]: job_input[1]}

        elif isinstance(job_input, list):
            if all(isinstance(item, tuple) for item in job_input):
                job_input = dict(job_input)
            else:
                job_input = {"input": job_input}

        elif not isinstance(job_input, dict):
            raise ValueError("Invalid job input requires(). Supported types: ["
                             "Target, tuple of (name, Target), "
                             "dict of (name: Target), list of Targets]")

        if not isinstance(self.file_pattern(), dict):
            raise ValueError('file_pattern() must return a dict type')

        input_args = []

        for (name, targets) in job_input.items():
            uris = [
              self.get_target_path(uri_target) for uri_target in luigi.task.flatten(targets)
            ]
            if isinstance(targets, dict):
                """
                If targets is a dict that means it had multiple outputs.
                Make the input args in that case "<input key>-<task output key>"
                """
                names = ["%s-%s" % (name, key) for key in targets.keys()]

            else:
                names = [name] * len(uris)

            input_dict = {}

            for (arg_name, uri) in zip(names, uris):
                pattern = self.file_pattern().get(name, 'part-*')
                input_value = input_dict.get(arg_name, [])
                input_value.append(uri.rstrip('/') + '/' + pattern)
                input_dict[arg_name] = input_value

            for (key, paths) in input_dict.items():
                input_args.append("--%s=%s" % (key, ','.join(paths)))

        return input_args