def run_tests()

in src/python/pants/backend/jvm/tasks/junit_run.py [0:0]


  def run_tests(self, fail_fast, test_targets, output_dir, coverage):
    test_registry = self._collect_test_targets(test_targets)
    if test_registry.empty:
      return TestResult.successful

    coverage.instrument(output_dir)

    def parse_error_handler(parse_error):
      # Just log and move on since the result is only used to characterize failures, and raising
      # an error here would just distract from the underlying test failures.
      self.context.log.error('Error parsing test result file {path}: {cause}'
                             .format(path=parse_error.xml_path, cause=parse_error.cause))

    # The 'instrument_classpath' product below below will be `None` if not set, and we'll default
    # back to runtime_classpath
    classpath_product = self.context.products.get_data('instrument_classpath')

    result = 0
    for batch_id, (properties, batch) in enumerate(self._iter_batches(test_registry)):
      (workdir, platform, target_jvm_options, target_env_vars, concurrency, threads) = properties

      batch_output_dir = output_dir
      if self._batched:
        batch_output_dir = os.path.join(batch_output_dir, 'batch-{}'.format(batch_id))

      run_modifications = coverage.run_modifications(batch_output_dir)

      extra_jvm_options = run_modifications.extra_jvm_options

      # Batches of test classes will likely exist within the same targets: dedupe them.
      relevant_targets = {test_registry.get_owning_target(t) for t in batch}

      complete_classpath = OrderedSet()
      complete_classpath.update(run_modifications.classpath_prepend)
      complete_classpath.update(JUnit.global_instance().runner_classpath(self.context))
      complete_classpath.update(self.classpath(relevant_targets,
                                               classpath_product=classpath_product))

      distribution = JvmPlatform.preferred_jvm_distribution([platform], self._strict_jvm_version)

      # Override cmdline args with values from junit_test() target that specify concurrency:
      args = self._args(fail_fast, batch_output_dir) + [u'-xmlreport']

      if concurrency is not None:
        args = remove_arg(args, '-default-parallel')
        if concurrency == JUnitTests.CONCURRENCY_SERIAL:
          args = ensure_arg(args, '-default-concurrency', param='SERIAL')
        elif concurrency == JUnitTests.CONCURRENCY_PARALLEL_CLASSES:
          args = ensure_arg(args, '-default-concurrency', param='PARALLEL_CLASSES')
        elif concurrency == JUnitTests.CONCURRENCY_PARALLEL_METHODS:
          args = ensure_arg(args, '-default-concurrency', param='PARALLEL_METHODS')
        elif concurrency == JUnitTests.CONCURRENCY_PARALLEL_CLASSES_AND_METHODS:
          args = ensure_arg(args, '-default-concurrency', param='PARALLEL_CLASSES_AND_METHODS')

      if threads is not None:
        args = remove_arg(args, '-parallel-threads', has_param=True)
        args += ['-parallel-threads', str(threads)]

      batch_test_specs = [test.render_test_spec() for test in batch]
      with argfile.safe_args(batch_test_specs, self.get_options()) as batch_tests:
        with self.chroot(relevant_targets, workdir) as chroot:
          self.context.log.debug('CWD = {}'.format(chroot))
          self.context.log.debug('platform = {}'.format(platform))
          with environment_as(**dict(target_env_vars)):
            subprocess_result = self.spawn_and_wait(
              executor=SubprocessExecutor(distribution),
              distribution=distribution,
              classpath=complete_classpath,
              main=JUnit.RUNNER_MAIN,
              jvm_options=self.jvm_options + extra_jvm_options + list(target_jvm_options),
              args=args + batch_tests,
              workunit_factory=self.context.new_workunit,
              workunit_name='run',
              workunit_labels=[WorkUnitLabel.TEST],
              cwd=chroot,
              synthetic_jar_dir=batch_output_dir,
              create_synthetic_jar=self.synthetic_classpath,
            )
            self.context.log.debug('JUnit subprocess exited with result ({})'
                                   .format(subprocess_result))
            result += abs(subprocess_result)

        tests_info = self.parse_test_info(batch_output_dir, parse_error_handler, ['classname'])
        for test_name, test_info in tests_info.items():
          test_item = Test(test_info['classname'], test_name)
          test_target = test_registry.get_owning_target(test_item)
          self.report_all_info_for_single_test(self.options_scope, test_target,
                                               test_name, test_info)

        if result != 0 and fail_fast:
          break

    if result == 0:
      return TestResult.successful

    target_to_failed_test = parse_failed_targets(test_registry, output_dir, parse_error_handler)

    def sort_owning_target(t):
      return t.address.spec if t else None

    failed_targets = sorted(target_to_failed_test, key=sort_owning_target)
    error_message_lines = []
    if self._failure_summary:
      def render_owning_target(t):
        return t.address.reference() if t else '<Unknown Target>'

      for target in failed_targets:
        error_message_lines.append('\n{indent}{owner}'.format(indent=' ' * 4,
                                                              owner=render_owning_target(target)))
        for test in sorted(target_to_failed_test[target]):
          error_message_lines.append('{indent}{classname}#{methodname}'
                                     .format(indent=' ' * 8,
                                             classname=test.classname,
                                             methodname=test.methodname))
    error_message_lines.append(
      '\njava {main} ... exited non-zero ({code}); {failed} failed {targets}.'
        .format(main=JUnit.RUNNER_MAIN, code=result, failed=len(failed_targets),
                targets=pluralize(len(failed_targets), 'target'))
    )
    return TestResult(msg='\n'.join(error_message_lines), rc=result, failed_targets=failed_targets)