in luigi/contrib/hadoop.py [0:0]
def run_job(self, job, tracking_url_callback=None):
if tracking_url_callback is not None:
warnings.warn("tracking_url_callback argument is deprecated, task.set_tracking_url is "
"used instead.", DeprecationWarning)
packages = [luigi] + self.modules + job.extra_modules() + list(_attached_packages)
# find the module containing the job
packages.append(__import__(job.__module__, None, None, 'dummy'))
# find the path to out runner.py
runner_path = mrrunner.__file__
# assume source is next to compiled
if runner_path.endswith("pyc"):
runner_path = runner_path[:-3] + "py"
base_tmp_dir = configuration.get_config().get('core', 'tmp-dir', None)
if base_tmp_dir:
warnings.warn("The core.tmp-dir configuration item is"
" deprecated, please use the TMPDIR"
" environment variable if you wish"
" to control where luigi.contrib.hadoop may"
" create temporary files and directories.")
self.tmp_dir = os.path.join(base_tmp_dir, 'hadoop_job_%016x' % random.getrandbits(64))
os.makedirs(self.tmp_dir)
else:
self.tmp_dir = tempfile.mkdtemp()
logger.debug("Tmp dir: %s", self.tmp_dir)
# build arguments
config = configuration.get_config()
python_executable = config.get('hadoop', 'python-executable', 'python')
runner_arg = 'mrrunner.pex' if job.package_binary is not None else 'mrrunner.py'
command = '{0} {1} {{step}}'.format(python_executable, runner_arg)
map_cmd = command.format(step='map')
cmb_cmd = command.format(step='combiner')
red_cmd = command.format(step='reduce')
output_final = job.output().path
# atomic output: replace output with a temporary work directory
if self.end_job_with_atomic_move_dir:
illegal_targets = (
luigi.contrib.s3.S3FlagTarget, luigi.contrib.gcs.GCSFlagTarget)
if isinstance(job.output(), illegal_targets):
raise TypeError("end_job_with_atomic_move_dir is not supported"
" for {}".format(illegal_targets))
output_hadoop = '{output}-temp-{time}'.format(
output=output_final,
time=datetime.datetime.now().isoformat().replace(':', '-'))
else:
output_hadoop = output_final
arglist = luigi.contrib.hdfs.load_hadoop_cmd() + ['jar', self.streaming_jar]
# 'libjars' is a generic option, so place it first
libjars = [libjar for libjar in self.libjars]
for libjar in self.libjars_in_hdfs:
run_cmd = luigi.contrib.hdfs.load_hadoop_cmd() + ['fs', '-get', libjar, self.tmp_dir]
logger.debug(subprocess.list2cmdline(run_cmd))
subprocess.call(run_cmd)
libjars.append(os.path.join(self.tmp_dir, os.path.basename(libjar)))
if libjars:
arglist += ['-libjars', ','.join(libjars)]
# 'archives' is also a generic option
archives = []
extra_archives = job.extra_archives()
if self.archives:
archives = self.archives
if extra_archives:
archives += extra_archives
if archives:
arglist += ['-archives', ','.join(archives)]
# Add static files and directories
extra_files = get_extra_files(job.extra_files())
files = []
for src, dst in extra_files:
dst_tmp = '%s_%09d' % (dst.replace('/', '_'), random.randint(0, 999999999))
files += ['%s#%s' % (src, dst_tmp)]
# -files doesn't support subdirectories, so we need to create the dst_tmp -> dst manually
job.add_link(dst_tmp, dst)
if files:
arglist += ['-files', ','.join(files)]
jobconfs = job.jobconfs()
for k, v in self.jobconfs.items():
jobconfs.append('%s=%s' % (k, v))
for conf in jobconfs:
arglist += ['-D', conf]
arglist += self.streaming_args
# Add additional non-generic per-job streaming args
extra_streaming_args = job.extra_streaming_arguments()
for (arg, value) in extra_streaming_args:
if not arg.startswith('-'): # safety first
arg = '-' + arg
arglist += [arg, value]
arglist += ['-mapper', map_cmd]
if job.combiner != NotImplemented:
arglist += ['-combiner', cmb_cmd]
if job.reducer != NotImplemented:
arglist += ['-reducer', red_cmd]
packages_fn = 'mrrunner.pex' if job.package_binary is not None else 'packages.tar'
files = [
runner_path if job.package_binary is None else None,
os.path.join(self.tmp_dir, packages_fn),
os.path.join(self.tmp_dir, 'job-instance.pickle'),
]
for f in filter(None, files):
arglist += ['-file', f]
if self.output_format:
arglist += ['-outputformat', self.output_format]
if self.input_format:
arglist += ['-inputformat', self.input_format]
allowed_input_targets = (
luigi.contrib.hdfs.HdfsTarget,
luigi.contrib.s3.S3Target,
luigi.contrib.gcs.GCSTarget)
for target in luigi.task.flatten(job.input_hadoop()):
if not isinstance(target, allowed_input_targets):
raise TypeError('target must one of: {}'.format(
allowed_input_targets))
arglist += ['-input', target.path]
allowed_output_targets = (
luigi.contrib.hdfs.HdfsTarget,
luigi.contrib.s3.S3FlagTarget,
luigi.contrib.gcs.GCSFlagTarget)
if not isinstance(job.output(), allowed_output_targets):
raise TypeError('output must be one of: {}'.format(
allowed_output_targets))
arglist += ['-output', output_hadoop]
# submit job
if job.package_binary is not None:
shutil.copy(job.package_binary, os.path.join(self.tmp_dir, 'mrrunner.pex'))
else:
create_packages_archive(packages, os.path.join(self.tmp_dir, 'packages.tar'))
job.dump(self.tmp_dir)
run_and_track_hadoop_job(arglist, tracking_url_callback=job.set_tracking_url)
if self.end_job_with_atomic_move_dir:
luigi.contrib.hdfs.HdfsTarget(output_hadoop).move_dir(output_final)
self.finish()