def run_job()

in luigi/contrib/hadoop_jar.py [0:0]


    def run_job(self, job, tracking_url_callback=None):
        if tracking_url_callback is not None:
            warnings.warn("tracking_url_callback argument is deprecated, task.set_tracking_url is "
                          "used instead.", DeprecationWarning)

        # TODO(jcrobak): libjars, files, etc. Can refactor out of
        # hadoop.HadoopJobRunner
        if not job.jar():
            raise HadoopJarJobError("Jar not defined")

        hadoop_arglist = luigi.contrib.hdfs.load_hadoop_cmd() + ['jar', job.jar()]
        if job.main():
            hadoop_arglist.append(job.main())

        jobconfs = job.jobconfs()

        for jc in jobconfs:
            hadoop_arglist += ['-D' + jc]

        (tmp_files, job_args) = fix_paths(job)

        hadoop_arglist += job_args

        ssh_config = job.ssh()
        if ssh_config:
            host = ssh_config.get("host", None)
            key_file = ssh_config.get("key_file", None)
            username = ssh_config.get("username", None)
            if not host or not key_file or not username:
                raise HadoopJarJobError("missing some config for HadoopRemoteJarJobRunner")
            arglist = ['ssh', '-i', key_file,
                       '-o', 'BatchMode=yes']  # no password prompts etc
            if ssh_config.get("no_host_key_check", False):
                arglist += ['-o', 'UserKnownHostsFile=/dev/null',
                            '-o', 'StrictHostKeyChecking=no']
            arglist.append('{}@{}'.format(username, host))
            hadoop_arglist = [shlex.quote(arg) for arg in hadoop_arglist]
            arglist.append(' '.join(hadoop_arglist))
        else:
            if not os.path.exists(job.jar()):
                logger.error("Can't find jar: %s, full path %s", job.jar(),
                             os.path.abspath(job.jar()))
                raise HadoopJarJobError("job jar does not exist")
            arglist = hadoop_arglist

        luigi.contrib.hadoop.run_and_track_hadoop_job(arglist, job.set_tracking_url)

        for a, b in tmp_files:
            a.move(b)