def _compute_value()

in src/python/pants/option/parser.py [0:0]


  def _compute_value(self, dest, kwargs, flag_val_strs):
    """Compute the value to use for an option.

    The source of the default value is chosen according to the ranking in RankedValue.
    """
    # Helper function to convert a string to a value of the option's type.
    def to_value_type(val_str):
      if val_str is None:
        return None
      elif kwargs.get('type') == bool:
        return self._ensure_bool(val_str)
      else:
        type_arg = kwargs.get('type', str)
        try:
          return self._wrap_type(type_arg)(val_str)
        except TypeError as e:
          raise ParseError(
            "Error applying type '{}' to option value '{}', for option '--{}' in {}: {}"
            .format(type_arg.__name__, val_str, dest, self._scope_str(), e))

    # Helper function to expand a fromfile=True value string, if needed.
    # May return a string or a dict/list decoded from a json/yaml file.
    def expand(val_or_str):
      if (kwargs.get('fromfile', True) and val_or_str and
          isinstance(val_or_str, str) and val_or_str.startswith('@')):
        if val_or_str.startswith('@@'):   # Support a literal @ for fromfile values via @@.
          return val_or_str[1:]
        else:
          fromfile = val_or_str[1:]
          try:
            with open(fromfile, 'r') as fp:
              s = fp.read().strip()
              if fromfile.endswith('.json'):
                return json.loads(s)
              elif fromfile.endswith('.yml') or fromfile.endswith('.yaml'):
                return yaml.safe_load(s)
              else:
                return s
          except (IOError, ValueError, yaml.YAMLError) as e:
            raise self.FromfileError('Failed to read {} in {} from file {}: {}'.format(
                dest, self._scope_str(), fromfile, e))
      else:
        return val_or_str

    # Get value from config files, and capture details about its derivation.
    config_details = None
    config_section = GLOBAL_SCOPE_CONFIG_SECTION if self._scope == GLOBAL_SCOPE else self._scope
    config_default_val_or_str = expand(self._config.get(Config.DEFAULT_SECTION, dest, default=None))
    config_val_or_str = expand(self._config.get(config_section, dest, default=None))
    config_source_file = (self._config.get_source_for_option(config_section, dest) or
        self._config.get_source_for_option(Config.DEFAULT_SECTION, dest))
    if config_source_file is not None:
      config_source_file = os.path.relpath(config_source_file)
      config_details = 'in {}'.format(config_source_file)

    # Get value from environment, and capture details about its derivation.
    udest = dest.upper()
    if self._scope == GLOBAL_SCOPE:
      # For convenience, we allow three forms of env var for global scope options.
      # The fully-specified env var is PANTS_GLOBAL_FOO, which is uniform with PANTS_<SCOPE>_FOO
      # for all the other scopes.  However we also allow simply PANTS_FOO. And if the option name
      # itself starts with 'pants-' then we also allow simply FOO. E.g., PANTS_WORKDIR instead of
      # PANTS_PANTS_WORKDIR or PANTS_GLOBAL_PANTS_WORKDIR. We take the first specified value we
      # find, in this order: PANTS_GLOBAL_FOO, PANTS_FOO, FOO.
      env_vars = ['PANTS_GLOBAL_{0}'.format(udest), 'PANTS_{0}'.format(udest)]
      if udest.startswith('PANTS_'):
        env_vars.append(udest)
    else:
      sanitized_env_var_scope = self._ENV_SANITIZER_RE.sub('_', self._scope.upper())
      env_vars = ['PANTS_{0}_{1}'.format(sanitized_env_var_scope, udest)]

    env_val_or_str = None
    env_details = None
    if self._env:
      for env_var in env_vars:
        if env_var in self._env:
          env_val_or_str = expand(self._env.get(env_var))
          env_details = 'from env var {}'.format(env_var)
          break

    # Get value from cmd-line flags.
    flag_vals = [to_value_type(expand(x)) for x in flag_val_strs]
    if is_list_option(kwargs):
      # Note: It's important to set flag_val to None if no flags were specified, so we can
      # distinguish between no flags set vs. explicit setting of the value to [].
      flag_val = ListValueComponent.merge(flag_vals) if flag_vals else None
    elif is_dict_option(kwargs):
      # Note: It's important to set flag_val to None if no flags were specified, so we can
      # distinguish between no flags set vs. explicit setting of the value to {}.
      flag_val = DictValueComponent.merge(flag_vals) if flag_vals else None
    elif len(flag_vals) > 1:
      raise ParseError('Multiple cmd line flags specified for option {} in {}'.format(
          dest, self._scope_str()))
    elif len(flag_vals) == 1:
      flag_val = flag_vals[0]
    else:
      flag_val = None

    # Rank all available values.
    # Note that some of these values may already be of the value type, but type conversion
    # is idempotent, so this is OK.

    values_to_rank = [to_value_type(x) for x in
                      [flag_val, env_val_or_str, config_val_or_str,
                       config_default_val_or_str, kwargs.get('default'), None]]
    # Note that ranked_vals will always have at least one element, and all elements will be
    # instances of RankedValue (so none will be None, although they may wrap a None value).
    ranked_vals = list(reversed(list(RankedValue.prioritized_iter(*values_to_rank))))

    def record_option(value, rank, option_details=None):
      deprecation_version = kwargs.get('removal_version')
      self._option_tracker.record_option(scope=self._scope,
                                         option=dest,
                                         value=value,
                                         rank=rank,
                                         deprecation_version=deprecation_version,
                                         details=option_details)

    # Record info about the derivation of each of the contributing values.
    detail_history = []
    for ranked_val in ranked_vals:
      if ranked_val.rank in (RankedValue.CONFIG, RankedValue.CONFIG_DEFAULT):
        details = config_details
      elif ranked_val.rank == RankedValue.ENVIRONMENT:
        details = env_details
      else:
        details = None
      if details:
        detail_history.append(details)
      record_option(value=ranked_val.value, rank=ranked_val.rank, option_details=details)

    # Helper function to check various validity constraints on final option values.
    def check(val):
      if val is not None:
        choices = kwargs.get('choices')
        # If the `type` argument has an `all_variants` attribute, use that as `choices` if not
        # already set. Using an attribute instead of checking a subclass allows `type` arguments
        # which are functions to have an implicit fallback `choices` set as well.
        if choices is None and 'type' in kwargs:
          type_arg = kwargs.get('type')
          if hasattr(type_arg, 'all_variants'):
            choices = list(type_arg.all_variants)
        # TODO: convert this into an enum() pattern match!
        if choices is not None and val not in choices:
          raise ParseError('`{}` is not an allowed value for option {} in {}. '
                           'Must be one of: {}'.format(val, dest, self._scope_str(), choices))
        elif kwargs.get('type') == dir_option and not os.path.isdir(val):
          raise ParseError('Directory value `{}` for option {} in {} does not exist.'.format(
              val, dest, self._scope_str()))
        elif kwargs.get('type') == file_option and not os.path.isfile(val):
          raise ParseError('File value `{}` for option {} in {} does not exist.'.format(
              val, dest, self._scope_str()))

    # Generate the final value from all available values, and check that it (or its members,
    # if a list) are in the set of allowed choices.
    if is_list_option(kwargs):
      merged_rank = ranked_vals[-1].rank
      merged_val = ListValueComponent.merge(
          [rv.value for rv in ranked_vals if rv.value is not None]).val
      # TODO: run `check()` for all elements of a list option too!!!
      merged_val = [self._convert_member_type(kwargs.get('member_type', str), x)
                    for x in merged_val]
      for val in merged_val:
        check(val)
      ret = RankedValue(merged_rank, merged_val)
    elif is_dict_option(kwargs):
      # TODO: convert `member_type` for dict values too!
      merged_rank = ranked_vals[-1].rank
      merged_val = DictValueComponent.merge(
          [rv.value for rv in ranked_vals if rv.value is not None]).val
      for val in merged_val:
        check(val)
      ret = RankedValue(merged_rank, merged_val)
    else:
      ret = ranked_vals[-1]
      check(ret.value)

    # Record info about the derivation of the final value.
    merged_details = ', '.join(detail_history) if detail_history else None
    record_option(value=ret.value, rank=ret.rank, option_details=merged_details)

    # All done!
    return ret