def run_job()

in luigi/contrib/hadoop.py [0:0]


    def run_job(self, job, tracking_url_callback=None):
        if tracking_url_callback is not None:
            warnings.warn("tracking_url_callback argument is deprecated, task.set_tracking_url is "
                          "used instead.", DeprecationWarning)

        packages = [luigi] + self.modules + job.extra_modules() + list(_attached_packages)

        # find the module containing the job
        packages.append(__import__(job.__module__, None, None, 'dummy'))

        # find the path to out runner.py
        runner_path = mrrunner.__file__
        # assume source is next to compiled
        if runner_path.endswith("pyc"):
            runner_path = runner_path[:-3] + "py"

        base_tmp_dir = configuration.get_config().get('core', 'tmp-dir', None)
        if base_tmp_dir:
            warnings.warn("The core.tmp-dir configuration item is"
                          " deprecated, please use the TMPDIR"
                          " environment variable if you wish"
                          " to control where luigi.contrib.hadoop may"
                          " create temporary files and directories.")
            self.tmp_dir = os.path.join(base_tmp_dir, 'hadoop_job_%016x' % random.getrandbits(64))
            os.makedirs(self.tmp_dir)
        else:
            self.tmp_dir = tempfile.mkdtemp()

        logger.debug("Tmp dir: %s", self.tmp_dir)

        # build arguments
        config = configuration.get_config()
        python_executable = config.get('hadoop', 'python-executable', 'python')
        runner_arg = 'mrrunner.pex' if job.package_binary is not None else 'mrrunner.py'
        command = '{0} {1} {{step}}'.format(python_executable, runner_arg)
        map_cmd = command.format(step='map')
        cmb_cmd = command.format(step='combiner')
        red_cmd = command.format(step='reduce')

        output_final = job.output().path
        # atomic output: replace output with a temporary work directory
        if self.end_job_with_atomic_move_dir:
            illegal_targets = (
                luigi.contrib.s3.S3FlagTarget, luigi.contrib.gcs.GCSFlagTarget)
            if isinstance(job.output(), illegal_targets):
                raise TypeError("end_job_with_atomic_move_dir is not supported"
                                " for {}".format(illegal_targets))
            output_hadoop = '{output}-temp-{time}'.format(
                output=output_final,
                time=datetime.datetime.now().isoformat().replace(':', '-'))
        else:
            output_hadoop = output_final

        arglist = luigi.contrib.hdfs.load_hadoop_cmd() + ['jar', self.streaming_jar]

        # 'libjars' is a generic option, so place it first
        libjars = [libjar for libjar in self.libjars]

        for libjar in self.libjars_in_hdfs:
            run_cmd = luigi.contrib.hdfs.load_hadoop_cmd() + ['fs', '-get', libjar, self.tmp_dir]
            logger.debug(subprocess.list2cmdline(run_cmd))
            subprocess.call(run_cmd)
            libjars.append(os.path.join(self.tmp_dir, os.path.basename(libjar)))

        if libjars:
            arglist += ['-libjars', ','.join(libjars)]

        # 'archives' is also a generic option
        archives = []
        extra_archives = job.extra_archives()

        if self.archives:
            archives = self.archives

        if extra_archives:
            archives += extra_archives

        if archives:
            arglist += ['-archives', ','.join(archives)]

        # Add static files and directories
        extra_files = get_extra_files(job.extra_files())

        files = []
        for src, dst in extra_files:
            dst_tmp = '%s_%09d' % (dst.replace('/', '_'), random.randint(0, 999999999))
            files += ['%s#%s' % (src, dst_tmp)]
            # -files doesn't support subdirectories, so we need to create the dst_tmp -> dst manually
            job.add_link(dst_tmp, dst)

        if files:
            arglist += ['-files', ','.join(files)]

        jobconfs = job.jobconfs()

        for k, v in self.jobconfs.items():
            jobconfs.append('%s=%s' % (k, v))

        for conf in jobconfs:
            arglist += ['-D', conf]

        arglist += self.streaming_args

        # Add additional non-generic  per-job streaming args
        extra_streaming_args = job.extra_streaming_arguments()
        for (arg, value) in extra_streaming_args:
            if not arg.startswith('-'):  # safety first
                arg = '-' + arg
            arglist += [arg, value]

        arglist += ['-mapper', map_cmd]

        if job.combiner != NotImplemented:
            arglist += ['-combiner', cmb_cmd]
        if job.reducer != NotImplemented:
            arglist += ['-reducer', red_cmd]
        packages_fn = 'mrrunner.pex' if job.package_binary is not None else 'packages.tar'
        files = [
            runner_path if job.package_binary is None else None,
            os.path.join(self.tmp_dir, packages_fn),
            os.path.join(self.tmp_dir, 'job-instance.pickle'),
        ]

        for f in filter(None, files):
            arglist += ['-file', f]

        if self.output_format:
            arglist += ['-outputformat', self.output_format]
        if self.input_format:
            arglist += ['-inputformat', self.input_format]

        allowed_input_targets = (
            luigi.contrib.hdfs.HdfsTarget,
            luigi.contrib.s3.S3Target,
            luigi.contrib.gcs.GCSTarget)
        for target in luigi.task.flatten(job.input_hadoop()):
            if not isinstance(target, allowed_input_targets):
                raise TypeError('target must one of: {}'.format(
                    allowed_input_targets))
            arglist += ['-input', target.path]

        allowed_output_targets = (
            luigi.contrib.hdfs.HdfsTarget,
            luigi.contrib.s3.S3FlagTarget,
            luigi.contrib.gcs.GCSFlagTarget)
        if not isinstance(job.output(), allowed_output_targets):
            raise TypeError('output must be one of: {}'.format(
                allowed_output_targets))
        arglist += ['-output', output_hadoop]

        # submit job
        if job.package_binary is not None:
            shutil.copy(job.package_binary, os.path.join(self.tmp_dir, 'mrrunner.pex'))
        else:
            create_packages_archive(packages, os.path.join(self.tmp_dir, 'packages.tar'))

        job.dump(self.tmp_dir)

        run_and_track_hadoop_job(arglist, tracking_url_callback=job.set_tracking_url)

        if self.end_job_with_atomic_move_dir:
            luigi.contrib.hdfs.HdfsTarget(output_hadoop).move_dir(output_final)
        self.finish()