def execute()

in src/python/pants/backend/jvm/tasks/jvm_compile/execution_graph.py [0:0]


  def execute(self, pool, log):
    """Runs scheduled work, ensuring all dependencies for each element are done before execution.

    :param pool: A WorkerPool to run jobs on
    :param log: logger for logging debug information and progress

    submits all the work without any dependencies to the worker pool
    when a unit of work finishes,
      if it is successful
        calls success callback
        checks for dependees whose dependencies are all successful, and submits them
      if it fails
        calls failure callback
        marks dependees as failed and queues them directly into the finished work queue
    when all work is either successful or failed,
      cleans up the work pool
    if there's an exception on the main thread,
      calls failure callback for unfinished work
      aborts work pool
      re-raises
    """
    log.debug(self.format_dependee_graph())

    status_table = StatusTable(self._job_keys_as_scheduled,
                               {key: len(self._jobs[key].dependencies) for key in self._job_keys_as_scheduled})
    finished_queue = queue.Queue()

    heap = []
    jobs_in_flight = ThreadSafeCounter()

    def put_jobs_into_heap(job_keys):
      for job_key in job_keys:
        status_table.mark_queued(job_key)
        # minus because jobs with larger priority should go first
        heappush(heap, (-self._job_priority[job_key], job_key))

    def try_to_submit_jobs_from_heap():
      def worker(worker_key, work):
        status_table.mark_as(RUNNING, worker_key)
        try:
          work()
          result = (worker_key, SUCCESSFUL, None)
        except BaseException:
          _, exc_value, exc_traceback = sys.exc_info()
          result = (worker_key, FAILED, (exc_value, traceback.format_tb(exc_traceback)))
        finished_queue.put(result)
        jobs_in_flight.decrement()

      while len(heap) > 0 and jobs_in_flight.get() < pool.num_workers:
        priority, job_key = heappop(heap)
        jobs_in_flight.increment()
        pool.submit_async_work(Work(worker, [(job_key, (self._jobs[job_key]))]))

    def submit_jobs(job_keys):
      put_jobs_into_heap(job_keys)
      try_to_submit_jobs_from_heap()

    try:
      submit_jobs(self._job_keys_with_no_dependencies)

      while not status_table.are_all_done():
        try:
          finished_key, result_status, value = finished_queue.get(timeout=10)
        except queue.Empty:
          self.log_progress(log, status_table)

          try_to_submit_jobs_from_heap()
          continue

        finished_job = self._jobs[finished_key]
        direct_dependees = self._dependees[finished_key]
        status_table.mark_as(result_status, finished_key)

        # Queue downstream tasks.
        if result_status is SUCCESSFUL:
          try:
            finished_job.run_success_callback()
          except Exception as e:
            log.debug(traceback.format_exc())
            raise ExecutionFailure("Error in on_success for {}".format(finished_key), e)

          ready_dependees = []
          for dependee in direct_dependees:
            status_table.mark_one_successful_dependency(dependee)
            if status_table.is_ready_to_submit(dependee):
              ready_dependees.append(dependee)

          submit_jobs(ready_dependees)
        else:  # Failed or canceled.
          try:
            finished_job.run_failure_callback()
          except Exception as e:
            log.debug(traceback.format_exc())
            raise ExecutionFailure("Error in on_failure for {}".format(finished_key), e)

          # Propagate failures downstream.
          for dependee in direct_dependees:
            if status_table.is_unstarted(dependee):
              status_table.mark_queued(dependee)
              finished_queue.put((dependee, CANCELED, None))

        # Log success or failure for this job.
        if result_status is FAILED:
          exception, tb = value
          log.error("{} failed: {}".format(finished_key, exception))
          if self._print_stack_trace:
            log.error('Traceback:\n{}'.format('\n'.join(tb)))
        else:
          log.debug("{} finished with status {}".format(finished_key, result_status))
    except ExecutionFailure:
      raise
    except Exception as e:
      # Call failure callbacks for jobs that are unfinished.
      for key, state in status_table.unfinished_items():
        self._jobs[key].run_failure_callback()
      log.debug(traceback.format_exc())
      raise ExecutionFailure("Error running job", e)

    if status_table.has_failures():
      raise ExecutionFailure("Failed jobs: {}".format(', '.join(status_table.failed_keys())))