in luigi/contrib/lsf.py [0:0]
def _run_job(self):
"""
Build a bsub argument that will run lsf_runner.py on the directory we've specified.
"""
args = []
if isinstance(self.output(), list):
log_output = os.path.split(self.output()[0].path)
else:
log_output = os.path.split(self.output().path)
args += ["bsub", "-q", self.queue_flag]
args += ["-n", str(self.n_cpu_flag)]
args += ["-M", str(self.memory_flag)]
args += ["-R", "rusage[%s]" % self.resource_flag]
args += ["-W", str(self.runtime_flag)]
if self.job_name_flag:
args += ["-J", str(self.job_name_flag)]
args += ["-o", os.path.join(log_output[0], "job.out")]
args += ["-e", os.path.join(log_output[0], "job.err")]
if self.extra_bsub_args:
args += self.extra_bsub_args.split()
# Find where the runner file is
runner_path = os.path.abspath(lsf_runner.__file__)
args += [runner_path]
args += [self.tmp_dir]
# That should do it. Let the world know what we're doing.
LOGGER.info("### LSF SUBMISSION ARGS: %s",
" ".join([str(a) for a in args]))
# Submit the job
run_job_proc = subprocess.Popen(
[str(a) for a in args],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, cwd=self.tmp_dir)
output = run_job_proc.communicate()[0]
# ASSUMPTION
# The result will be of the format
# Job <123> is submitted ot queue <myqueue>
# So get the number in those first brackets.
# I cannot think of a better workaround that leaves logic on the Task side of things.
LOGGER.info("### JOB SUBMISSION OUTPUT: %s", str(output))
self.job_id = int(output.split("<")[1].split(">")[0])
LOGGER.info(
"Job %ssubmitted as job %s",
self.job_name_flag + ' ',
str(self.job_id)
)
self._track_job()
# If we want to save the job temporaries, then do so
# We'll move them to be next to the job output
if self.save_job_info:
LOGGER.info("Saving up temporary bits")
# dest_dir = self.output().path
shutil.move(self.tmp_dir, "/".join(log_output[0:-1]))
# Now delete the temporaries, if they're there.
self._finish()