in luigi/contrib/pig.py [0:0]
def track_and_progress(self, cmd):
temp_stdout = tempfile.TemporaryFile('wb')
env = os.environ.copy()
env['PIG_HOME'] = self.pig_home()
for k, v in self.pig_env_vars().items():
env[k] = v
proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
reads = [proc.stderr.fileno(), proc.stdout.fileno()]
# tracking the possible problems with this job
err_lines = []
with PigRunContext():
while proc.poll() is None:
ret = select.select(reads, [], [])
for fd in ret[0]:
if fd == proc.stderr.fileno():
line = proc.stderr.readline().decode('utf8')
err_lines.append(line)
if fd == proc.stdout.fileno():
line_bytes = proc.stdout.readline()
temp_stdout.write(line_bytes)
line = line_bytes.decode('utf8')
err_line = line.lower()
if err_line.find('More information at:') != -1:
logger.info(err_line.split('more information at: ')[-1].strip())
if err_line.find(' - '):
t = err_line.split(' - ')[-1].strip()
if t != "":
logger.info(t)
# Read the rest + stdout
err = ''.join(err_lines + [an_err_line.decode('utf8') for an_err_line in proc.stderr])
if proc.returncode == 0:
logger.info("Job completed successfully!")
else:
logger.error("Error when running script:\n%s", self.pig_script_path())
logger.error(err)
raise PigJobError("Pig script failed with return value: %s" % (proc.returncode,), err=err)