in luigi/contrib/hdfs/hadoopcli_clients.py [0:0]
def listdir(self, path, ignore_directories=False, ignore_files=False,
include_size=False, include_type=False, include_time=False, recursive=False):
if not path:
path = "." # default to current/home catalog
if recursive:
cmd = load_hadoop_cmd() + ['fs'] + self.recursive_listdir_cmd + [path]
else:
cmd = load_hadoop_cmd() + ['fs', '-ls', path]
lines = self.call_check(cmd).split('\n')
for line in lines:
if not line:
continue
elif line.startswith('OpenJDK 64-Bit Server VM warning') or line.startswith('It\'s highly recommended') or line.startswith('Found'):
continue # "hadoop fs -ls" outputs "Found %d items" as its first line
elif ignore_directories and line[0] == 'd':
continue
elif ignore_files and line[0] == '-':
continue
data = line.split(' ')
file = data[-1]
size = int(data[-4])
line_type = line[0]
extra_data = ()
if include_size:
extra_data += (size,)
if include_type:
extra_data += (line_type,)
if include_time:
time_str = '%sT%s' % (data[-3], data[-2])
modification_time = datetime.datetime.strptime(time_str,
'%Y-%m-%dT%H:%M')
extra_data += (modification_time,)
if len(extra_data) > 0:
yield (file,) + extra_data
else:
yield file