def monitor_work_horse()

in redash/tasks/worker.py [0:0]


    def monitor_work_horse(self, job, queue):
        """The worker will monitor the work horse and make sure that it
        either executes successfully or the status of the job is set to
        failed
        """
        self.monitor_started = utcnow()
        job.started_at = utcnow()
        while True:
            try:
                with UnixSignalDeathPenalty(
                    self.job_monitoring_interval, HorseMonitorTimeoutException
                ):
                    retpid, ret_val = os.waitpid(self._horse_pid, 0)
                break
            except HorseMonitorTimeoutException:
                # Horse has not exited yet and is still running.
                # Send a heartbeat to keep the worker alive.
                self.heartbeat(self.job_monitoring_interval + 5)

                job.refresh()

                if job.is_cancelled:
                    self.stop_executing_job(job)

                if self.soft_limit_exceeded(job):
                    self.enforce_hard_limit(job)
            except OSError as e:
                # In case we encountered an OSError due to EINTR (which is
                # caused by a SIGINT or SIGTERM signal during
                # os.waitpid()), we simply ignore it and enter the next
                # iteration of the loop, waiting for the child to end.  In
                # any other case, this is some other unexpected OS error,
                # which we don't want to catch, so we re-raise those ones.
                if e.errno != errno.EINTR:
                    raise
                # Send a heartbeat to keep the worker alive.
                self.heartbeat()

        if ret_val == os.EX_OK:  # The process exited normally.
            return
        job_status = job.get_status()
        if job_status is None:  # Job completed and its ttl has expired
            return
        if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:

            if not job.ended_at:
                job.ended_at = utcnow()

            # Unhandled failure: move the job to the failed queue
            self.log.warning(
                (
                    "Moving job to FailedJobRegistry "
                    "(work-horse terminated unexpectedly; waitpid returned {})"
                ).format(ret_val)
            )

            self.handle_job_failure(
                job,
                queue=queue,
                exc_string="Work-horse process was terminated unexpectedly "
                "(waitpid returned %s)" % ret_val,
            )