def track_and_progress()

in luigi/contrib/pig.py [0:0]


    def track_and_progress(self, cmd):
        temp_stdout = tempfile.TemporaryFile('wb')
        env = os.environ.copy()
        env['PIG_HOME'] = self.pig_home()
        for k, v in self.pig_env_vars().items():
            env[k] = v

        proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
        reads = [proc.stderr.fileno(), proc.stdout.fileno()]
        # tracking the possible problems with this job
        err_lines = []
        with PigRunContext():
            while proc.poll() is None:
                ret = select.select(reads, [], [])
                for fd in ret[0]:
                    if fd == proc.stderr.fileno():
                        line = proc.stderr.readline().decode('utf8')
                        err_lines.append(line)
                    if fd == proc.stdout.fileno():
                        line_bytes = proc.stdout.readline()
                        temp_stdout.write(line_bytes)
                        line = line_bytes.decode('utf8')

                err_line = line.lower()
                if err_line.find('More information at:') != -1:
                    logger.info(err_line.split('more information at: ')[-1].strip())
                if err_line.find(' - '):
                    t = err_line.split(' - ')[-1].strip()
                    if t != "":
                        logger.info(t)

        # Read the rest + stdout
        err = ''.join(err_lines + [an_err_line.decode('utf8') for an_err_line in proc.stderr])
        if proc.returncode == 0:
            logger.info("Job completed successfully!")
        else:
            logger.error("Error when running script:\n%s", self.pig_script_path())
            logger.error(err)
            raise PigJobError("Pig script failed with return value: %s" % (proc.returncode,), err=err)