in luigi/tools/range.py [0:0]
def requires(self):
# cache because we anticipate a fair amount of computation
if hasattr(self, '_cached_requires'):
return self._cached_requires
if not self.start and not self.stop:
raise ParameterException("At least one of start and stop needs to be specified")
if not self.start and not self.reverse:
raise ParameterException("Either start needs to be specified or reverse needs to be True")
if self.start and self.stop and self.start > self.stop:
raise ParameterException("Can't have start > stop")
# TODO check overridden complete() and exists()
now = datetime.utcfromtimestamp(time.time() if self.now is None else self.now)
moving_start = self.moving_start(now)
finite_start = moving_start if self.start is None else max(self.parameter_to_datetime(self.start), moving_start)
moving_stop = self.moving_stop(now)
finite_stop = moving_stop if self.stop is None else min(self.parameter_to_datetime(self.stop), moving_stop)
datetimes = self.finite_datetimes(finite_start, finite_stop) if finite_start <= finite_stop else []
if datetimes:
logger.debug('Actually checking if range %s of %s is complete',
self._format_range(datetimes), self.of_cls.task_family)
missing_datetimes = sorted(self._missing_datetimes(datetimes))
logger.debug('Range %s lacked %d of expected %d %s instances',
self._format_range(datetimes), len(missing_datetimes), len(datetimes), self.of_cls.task_family)
else:
missing_datetimes = []
logger.debug('Empty range. No %s instances expected', self.of_cls.task_family)
self._emit_metrics(missing_datetimes, finite_start, finite_stop)
if self.reverse:
required_datetimes = missing_datetimes[-self.task_limit:]
else:
required_datetimes = missing_datetimes[:self.task_limit]
if required_datetimes:
logger.debug('Requiring %d missing %s instances in range %s',
len(required_datetimes), self.of_cls.task_family, self._format_range(required_datetimes))
if self.reverse:
required_datetimes.reverse() # TODO priorities, so that within the batch tasks are ordered too
self._cached_requires = [self._instantiate_task_cls(self.datetime_to_parameter(d)) for d in required_datetimes]
return self._cached_requires