in src/python/pants/option/parser.py [0:0]
def _compute_value(self, dest, kwargs, flag_val_strs):
"""Compute the value to use for an option.
The source of the default value is chosen according to the ranking in RankedValue.
"""
# Helper function to convert a string to a value of the option's type.
def to_value_type(val_str):
if val_str is None:
return None
elif kwargs.get('type') == bool:
return self._ensure_bool(val_str)
else:
type_arg = kwargs.get('type', str)
try:
return self._wrap_type(type_arg)(val_str)
except TypeError as e:
raise ParseError(
"Error applying type '{}' to option value '{}', for option '--{}' in {}: {}"
.format(type_arg.__name__, val_str, dest, self._scope_str(), e))
# Helper function to expand a fromfile=True value string, if needed.
# May return a string or a dict/list decoded from a json/yaml file.
def expand(val_or_str):
if (kwargs.get('fromfile', True) and val_or_str and
isinstance(val_or_str, str) and val_or_str.startswith('@')):
if val_or_str.startswith('@@'): # Support a literal @ for fromfile values via @@.
return val_or_str[1:]
else:
fromfile = val_or_str[1:]
try:
with open(fromfile, 'r') as fp:
s = fp.read().strip()
if fromfile.endswith('.json'):
return json.loads(s)
elif fromfile.endswith('.yml') or fromfile.endswith('.yaml'):
return yaml.safe_load(s)
else:
return s
except (IOError, ValueError, yaml.YAMLError) as e:
raise self.FromfileError('Failed to read {} in {} from file {}: {}'.format(
dest, self._scope_str(), fromfile, e))
else:
return val_or_str
# Get value from config files, and capture details about its derivation.
config_details = None
config_section = GLOBAL_SCOPE_CONFIG_SECTION if self._scope == GLOBAL_SCOPE else self._scope
config_default_val_or_str = expand(self._config.get(Config.DEFAULT_SECTION, dest, default=None))
config_val_or_str = expand(self._config.get(config_section, dest, default=None))
config_source_file = (self._config.get_source_for_option(config_section, dest) or
self._config.get_source_for_option(Config.DEFAULT_SECTION, dest))
if config_source_file is not None:
config_source_file = os.path.relpath(config_source_file)
config_details = 'in {}'.format(config_source_file)
# Get value from environment, and capture details about its derivation.
udest = dest.upper()
if self._scope == GLOBAL_SCOPE:
# For convenience, we allow three forms of env var for global scope options.
# The fully-specified env var is PANTS_GLOBAL_FOO, which is uniform with PANTS_<SCOPE>_FOO
# for all the other scopes. However we also allow simply PANTS_FOO. And if the option name
# itself starts with 'pants-' then we also allow simply FOO. E.g., PANTS_WORKDIR instead of
# PANTS_PANTS_WORKDIR or PANTS_GLOBAL_PANTS_WORKDIR. We take the first specified value we
# find, in this order: PANTS_GLOBAL_FOO, PANTS_FOO, FOO.
env_vars = ['PANTS_GLOBAL_{0}'.format(udest), 'PANTS_{0}'.format(udest)]
if udest.startswith('PANTS_'):
env_vars.append(udest)
else:
sanitized_env_var_scope = self._ENV_SANITIZER_RE.sub('_', self._scope.upper())
env_vars = ['PANTS_{0}_{1}'.format(sanitized_env_var_scope, udest)]
env_val_or_str = None
env_details = None
if self._env:
for env_var in env_vars:
if env_var in self._env:
env_val_or_str = expand(self._env.get(env_var))
env_details = 'from env var {}'.format(env_var)
break
# Get value from cmd-line flags.
flag_vals = [to_value_type(expand(x)) for x in flag_val_strs]
if is_list_option(kwargs):
# Note: It's important to set flag_val to None if no flags were specified, so we can
# distinguish between no flags set vs. explicit setting of the value to [].
flag_val = ListValueComponent.merge(flag_vals) if flag_vals else None
elif is_dict_option(kwargs):
# Note: It's important to set flag_val to None if no flags were specified, so we can
# distinguish between no flags set vs. explicit setting of the value to {}.
flag_val = DictValueComponent.merge(flag_vals) if flag_vals else None
elif len(flag_vals) > 1:
raise ParseError('Multiple cmd line flags specified for option {} in {}'.format(
dest, self._scope_str()))
elif len(flag_vals) == 1:
flag_val = flag_vals[0]
else:
flag_val = None
# Rank all available values.
# Note that some of these values may already be of the value type, but type conversion
# is idempotent, so this is OK.
values_to_rank = [to_value_type(x) for x in
[flag_val, env_val_or_str, config_val_or_str,
config_default_val_or_str, kwargs.get('default'), None]]
# Note that ranked_vals will always have at least one element, and all elements will be
# instances of RankedValue (so none will be None, although they may wrap a None value).
ranked_vals = list(reversed(list(RankedValue.prioritized_iter(*values_to_rank))))
def record_option(value, rank, option_details=None):
deprecation_version = kwargs.get('removal_version')
self._option_tracker.record_option(scope=self._scope,
option=dest,
value=value,
rank=rank,
deprecation_version=deprecation_version,
details=option_details)
# Record info about the derivation of each of the contributing values.
detail_history = []
for ranked_val in ranked_vals:
if ranked_val.rank in (RankedValue.CONFIG, RankedValue.CONFIG_DEFAULT):
details = config_details
elif ranked_val.rank == RankedValue.ENVIRONMENT:
details = env_details
else:
details = None
if details:
detail_history.append(details)
record_option(value=ranked_val.value, rank=ranked_val.rank, option_details=details)
# Helper function to check various validity constraints on final option values.
def check(val):
if val is not None:
choices = kwargs.get('choices')
# If the `type` argument has an `all_variants` attribute, use that as `choices` if not
# already set. Using an attribute instead of checking a subclass allows `type` arguments
# which are functions to have an implicit fallback `choices` set as well.
if choices is None and 'type' in kwargs:
type_arg = kwargs.get('type')
if hasattr(type_arg, 'all_variants'):
choices = list(type_arg.all_variants)
# TODO: convert this into an enum() pattern match!
if choices is not None and val not in choices:
raise ParseError('`{}` is not an allowed value for option {} in {}. '
'Must be one of: {}'.format(val, dest, self._scope_str(), choices))
elif kwargs.get('type') == dir_option and not os.path.isdir(val):
raise ParseError('Directory value `{}` for option {} in {} does not exist.'.format(
val, dest, self._scope_str()))
elif kwargs.get('type') == file_option and not os.path.isfile(val):
raise ParseError('File value `{}` for option {} in {} does not exist.'.format(
val, dest, self._scope_str()))
# Generate the final value from all available values, and check that it (or its members,
# if a list) are in the set of allowed choices.
if is_list_option(kwargs):
merged_rank = ranked_vals[-1].rank
merged_val = ListValueComponent.merge(
[rv.value for rv in ranked_vals if rv.value is not None]).val
# TODO: run `check()` for all elements of a list option too!!!
merged_val = [self._convert_member_type(kwargs.get('member_type', str), x)
for x in merged_val]
for val in merged_val:
check(val)
ret = RankedValue(merged_rank, merged_val)
elif is_dict_option(kwargs):
# TODO: convert `member_type` for dict values too!
merged_rank = ranked_vals[-1].rank
merged_val = DictValueComponent.merge(
[rv.value for rv in ranked_vals if rv.value is not None]).val
for val in merged_val:
check(val)
ret = RankedValue(merged_rank, merged_val)
else:
ret = ranked_vals[-1]
check(ret.value)
# Record info about the derivation of the final value.
merged_details = ', '.join(detail_history) if detail_history else None
record_option(value=ret.value, rank=ret.rank, option_details=merged_details)
# All done!
return ret