def _run_job()

in luigi/contrib/lsf.py [0:0]


    def _run_job(self):
        """
        Build a bsub argument that will run lsf_runner.py on the directory we've specified.
        """

        args = []

        if isinstance(self.output(), list):
            log_output = os.path.split(self.output()[0].path)
        else:
            log_output = os.path.split(self.output().path)

        args += ["bsub", "-q", self.queue_flag]
        args += ["-n", str(self.n_cpu_flag)]
        args += ["-M", str(self.memory_flag)]
        args += ["-R", "rusage[%s]" % self.resource_flag]
        args += ["-W", str(self.runtime_flag)]
        if self.job_name_flag:
            args += ["-J", str(self.job_name_flag)]
        args += ["-o", os.path.join(log_output[0], "job.out")]
        args += ["-e", os.path.join(log_output[0], "job.err")]
        if self.extra_bsub_args:
            args += self.extra_bsub_args.split()

        # Find where the runner file is
        runner_path = os.path.abspath(lsf_runner.__file__)

        args += [runner_path]
        args += [self.tmp_dir]

        # That should do it. Let the world know what we're doing.
        LOGGER.info("### LSF SUBMISSION ARGS: %s",
                    " ".join([str(a) for a in args]))

        # Submit the job
        run_job_proc = subprocess.Popen(
            [str(a) for a in args],
            stdin=subprocess.PIPE, stdout=subprocess.PIPE, cwd=self.tmp_dir)
        output = run_job_proc.communicate()[0]

        # ASSUMPTION
        # The result will be of the format
        # Job <123> is submitted ot queue <myqueue>
        # So get the number in those first brackets.
        # I cannot think of a better workaround that leaves logic on the Task side of things.
        LOGGER.info("### JOB SUBMISSION OUTPUT: %s", str(output))
        self.job_id = int(output.split("<")[1].split(">")[0])
        LOGGER.info(
            "Job %ssubmitted as job %s",
            self.job_name_flag + ' ',
            str(self.job_id)
        )

        self._track_job()

        # If we want to save the job temporaries, then do so
        # We'll move them to be next to the job output
        if self.save_job_info:
            LOGGER.info("Saving up temporary bits")

            # dest_dir = self.output().path
            shutil.move(self.tmp_dir, "/".join(log_output[0:-1]))

        # Now delete the temporaries, if they're there.
        self._finish()