in tfx/dsl/input_resolution/ops/consecutive_spans_op.py [0:0]
def apply(self,
input_list: Sequence[types.Artifact]) -> Sequence[types.Artifact]:
"""Returns the artifacts with the oldest consecutive spans.
The consecutive spans in the range
[first_span, max_span - skip_last_n] inclusive are considered. max_span is
the largest span present in the artifacts. If there is an artifact with a
span corresponding to each integer in that range, then the artifacts with
spans in that range are returned.
For example, if the artifacts have spans [1, 2, 3, 4, 5, 6], first_span=2,
and skip_last_n=2, max_span is calculated as 6, and the range to consider is
[2, 6 - 2] = [2, 4].
If instead the artifacts have spans [1, 2, 3, 4, 6] first_span=2,
and skip_last_n=2, the range to consider is still [2, 4]. However, only
artifacts with spans [1, 2, 3, 4] will be returned because there is no
artifact with span 5, so the the consecutive spans stop at 4.
Args:
input_list: The list of Artifacts to filter.
Returns:
Artifacts with spans in the range [first_span, max_span - skip_last_n].
The artifacts are sorted in ascending order, first by span and then by
version (if keep_all_versions = True).
"""
if self.skip_last_n < 0:
raise ValueError(f'skip_last_n must be >= 0, but was set to '
f'{self.skip_last_n}.')
valid_artifacts = ops_utils.get_valid_artifacts(
input_list, ops_utils.SPAN_AND_VERSION_PROPERTIES)
if not valid_artifacts:
return []
# Override first_span to the smallest span in artifacts if it is < 0.
if self.first_span < 0:
self.first_span = min(a.span for a in valid_artifacts)
# Increment first_span so that it is not in the denylist.
while self.first_span in set(self.denylist):
self.first_span += 1
# Perform initial filtering to get artifacts with the relevant spans.
artifacts = ops_utils.filter_artifacts_by_span(
artifacts=valid_artifacts,
span_descending=False,
n=-1,
min_span=self.first_span,
# Pass 0 for skip_last_n, because in filter_artifacts_by_span it removes
# the largest N span values (based on the index not by the value). We
# instead manually filter the spans later.
skip_last_n=0,
keep_all_versions=self.keep_all_versions,
)
# Only consider spans in the range [first_span, max_span - skip_last_n].
max_span = max(a.span for a in valid_artifacts)
last_span = max_span - self.skip_last_n
# Determine the last valid span accounting for the denylist.
for span in sorted(self.denylist):
if span <= self.first_span:
continue
last_span = min(span - 1, last_span)
break
valid_spans = set(range(self.first_span, last_span + 1))
actual_spans = {a.span for a in artifacts}
# Return the artifacts with consecutive spans.
consecutive_spans = set()
for i, span in enumerate(sorted(valid_spans & actual_spans)):
# Break at the first instance the spans are no longer consecutive.
if span != self.first_span + i:
break
consecutive_spans.add(span)
return [a for a in artifacts if a.span in consecutive_spans]