in luigi/scheduler.py [0:0]
def _upstream_status(self, task_id, upstream_status_table):
if task_id in upstream_status_table:
return upstream_status_table[task_id]
elif self._state.has_task(task_id):
task_stack = [task_id]
while task_stack:
dep_id = task_stack.pop()
dep = self._state.get_task(dep_id)
if dep:
if dep.status == DONE:
continue
if dep_id not in upstream_status_table:
if dep.status == PENDING and dep.deps:
task_stack += [dep_id] + list(dep.deps)
upstream_status_table[dep_id] = '' # will be updated postorder
else:
dep_status = STATUS_TO_UPSTREAM_MAP.get(dep.status, '')
upstream_status_table[dep_id] = dep_status
elif upstream_status_table[dep_id] == '' and dep.deps:
# This is the postorder update step when we set the
# status based on the previously calculated child elements
status = max((upstream_status_table.get(a_task_id, '')
for a_task_id in dep.deps),
key=UPSTREAM_SEVERITY_KEY)
upstream_status_table[dep_id] = status
return upstream_status_table[dep_id]