in tfx/dsl/input_resolution/strategies/span_range_strategy.py [0:0]
def _resolve(self, input_dict: Dict[str, List[types.Artifact]]):
result = {}
for k, artifact_list in input_dict.items():
in_range_artifacts = []
if self._range_config.HasField('static_range'):
start_span_number = self._range_config.static_range.start_span_number
end_span_number = self._range_config.static_range.end_span_number
# Get the artifacts within range.
for artifact in artifact_list:
if not artifact.has_custom_property(_SPAN):
raise RuntimeError(f'Span does not exist for {artifact}')
span = _get_span_custom_property(artifact)
if span >= start_span_number and span <= end_span_number:
in_range_artifacts.append(artifact)
elif self._range_config.HasField('rolling_range'):
start_span_number = self._range_config.rolling_range.start_span_number
num_spans = self._range_config.rolling_range.num_spans
if num_spans <= 0:
raise ValueError('num_spans should be positive number.')
most_recent_span = -1
# Get most recent span number.
for artifact in artifact_list:
if not artifact.has_custom_property(_SPAN):
raise RuntimeError(f'Span does not exist for {artifact}')
span = _get_span_custom_property(artifact)
if span > most_recent_span:
most_recent_span = span
start_span_number = max(start_span_number,
most_recent_span - num_spans + 1)
end_span_number = most_recent_span
# Get the artifacts within range.
for artifact in artifact_list:
span = _get_span_custom_property(artifact)
if span >= start_span_number and span <= end_span_number:
in_range_artifacts.append(artifact)
else:
raise ValueError('RangeConfig type is not supported.')
result[k] = sorted(
in_range_artifacts,
key=_get_span_custom_property,
reverse=True)
return result