in src/python/pants/backend/jvm/tasks/jvm_compile/execution_graph.py [0:0]
def execute(self, pool, log):
"""Runs scheduled work, ensuring all dependencies for each element are done before execution.
:param pool: A WorkerPool to run jobs on
:param log: logger for logging debug information and progress
submits all the work without any dependencies to the worker pool
when a unit of work finishes,
if it is successful
calls success callback
checks for dependees whose dependencies are all successful, and submits them
if it fails
calls failure callback
marks dependees as failed and queues them directly into the finished work queue
when all work is either successful or failed,
cleans up the work pool
if there's an exception on the main thread,
calls failure callback for unfinished work
aborts work pool
re-raises
"""
log.debug(self.format_dependee_graph())
status_table = StatusTable(self._job_keys_as_scheduled,
{key: len(self._jobs[key].dependencies) for key in self._job_keys_as_scheduled})
finished_queue = queue.Queue()
heap = []
jobs_in_flight = ThreadSafeCounter()
def put_jobs_into_heap(job_keys):
for job_key in job_keys:
status_table.mark_queued(job_key)
# minus because jobs with larger priority should go first
heappush(heap, (-self._job_priority[job_key], job_key))
def try_to_submit_jobs_from_heap():
def worker(worker_key, work):
status_table.mark_as(RUNNING, worker_key)
try:
work()
result = (worker_key, SUCCESSFUL, None)
except BaseException:
_, exc_value, exc_traceback = sys.exc_info()
result = (worker_key, FAILED, (exc_value, traceback.format_tb(exc_traceback)))
finished_queue.put(result)
jobs_in_flight.decrement()
while len(heap) > 0 and jobs_in_flight.get() < pool.num_workers:
priority, job_key = heappop(heap)
jobs_in_flight.increment()
pool.submit_async_work(Work(worker, [(job_key, (self._jobs[job_key]))]))
def submit_jobs(job_keys):
put_jobs_into_heap(job_keys)
try_to_submit_jobs_from_heap()
try:
submit_jobs(self._job_keys_with_no_dependencies)
while not status_table.are_all_done():
try:
finished_key, result_status, value = finished_queue.get(timeout=10)
except queue.Empty:
self.log_progress(log, status_table)
try_to_submit_jobs_from_heap()
continue
finished_job = self._jobs[finished_key]
direct_dependees = self._dependees[finished_key]
status_table.mark_as(result_status, finished_key)
# Queue downstream tasks.
if result_status is SUCCESSFUL:
try:
finished_job.run_success_callback()
except Exception as e:
log.debug(traceback.format_exc())
raise ExecutionFailure("Error in on_success for {}".format(finished_key), e)
ready_dependees = []
for dependee in direct_dependees:
status_table.mark_one_successful_dependency(dependee)
if status_table.is_ready_to_submit(dependee):
ready_dependees.append(dependee)
submit_jobs(ready_dependees)
else: # Failed or canceled.
try:
finished_job.run_failure_callback()
except Exception as e:
log.debug(traceback.format_exc())
raise ExecutionFailure("Error in on_failure for {}".format(finished_key), e)
# Propagate failures downstream.
for dependee in direct_dependees:
if status_table.is_unstarted(dependee):
status_table.mark_queued(dependee)
finished_queue.put((dependee, CANCELED, None))
# Log success or failure for this job.
if result_status is FAILED:
exception, tb = value
log.error("{} failed: {}".format(finished_key, exception))
if self._print_stack_trace:
log.error('Traceback:\n{}'.format('\n'.join(tb)))
else:
log.debug("{} finished with status {}".format(finished_key, result_status))
except ExecutionFailure:
raise
except Exception as e:
# Call failure callbacks for jobs that are unfinished.
for key, state in status_table.unfinished_items():
self._jobs[key].run_failure_callback()
log.debug(traceback.format_exc())
raise ExecutionFailure("Error running job", e)
if status_table.has_failures():
raise ExecutionFailure("Failed jobs: {}".format(', '.join(status_table.failed_keys())))