in luigi/contrib/external_program.py [0:0]
def run(self):
args = list(map(str, self.program_args()))
logger.info('Running command: %s', ' '.join(args))
env = self.program_environment()
kwargs = {'env': env}
tmp_stdout, tmp_stderr = None, None
if self.capture_output:
tmp_stdout, tmp_stderr = tempfile.TemporaryFile(), tempfile.TemporaryFile()
kwargs.update({'stdout': tmp_stdout, 'stderr': tmp_stderr})
try:
if self.stream_for_searching_tracking_url != 'none' and self.tracking_url_pattern is not None:
with self._proc_with_tracking_url_context(proc_args=args, proc_kwargs=kwargs) as proc:
proc.wait()
else:
proc = subprocess.Popen(args, **kwargs)
with ExternalProgramRunContext(proc):
proc.wait()
success = proc.returncode == 0
if self.capture_output:
stdout = self._clean_output_file(tmp_stdout)
stderr = self._clean_output_file(tmp_stderr)
if stdout:
logger.info('Program stdout:\n{}'.format(stdout))
if stderr:
if self.always_log_stderr or not success:
logger.info('Program stderr:\n{}'.format(stderr))
else:
stdout, stderr = None, None
if not success:
raise ExternalProgramRunError(
'Program failed with return code={}:'.format(proc.returncode),
args, env=env, stdout=stdout, stderr=stderr)
finally:
if self.capture_output:
tmp_stderr.close()
tmp_stdout.close()