in luigi/contrib/hdfs/target.py [0:0]
def __init__(self, path=None, format=None, is_tmp=False, fs=None):
if path is None:
assert is_tmp
path = tmppath()
super(HdfsTarget, self).__init__(path)
if format is None:
format = luigi.format.get_default_format() >> hdfs_format.Plain
old_format = (
(
hasattr(format, 'hdfs_writer') or
hasattr(format, 'hdfs_reader')
) and
not hasattr(format, 'output')
)
if not old_format and getattr(format, 'output', '') != 'hdfs':
format = format >> hdfs_format.Plain
if old_format:
warnings.warn(
'hdfs_writer and hdfs_reader method for format is deprecated,'
'specify the property output of your format as \'hdfs\' instead',
DeprecationWarning,
stacklevel=2
)
if hasattr(format, 'hdfs_writer'):
format_writer = format.hdfs_writer
else:
w_format = format >> hdfs_format.Plain
format_writer = w_format.pipe_writer
if hasattr(format, 'hdfs_reader'):
format_reader = format.hdfs_reader
else:
r_format = format >> hdfs_format.Plain
format_reader = r_format.pipe_reader
format = hdfs_format.CompatibleHdfsFormat(
format_writer,
format_reader,
)
else:
format = hdfs_format.CompatibleHdfsFormat(
format.pipe_writer,
format.pipe_reader,
getattr(format, 'input', None),
)
self.format = format
self.is_tmp = is_tmp
(scheme, netloc, path, query, fragment) = urlparse.urlsplit(path)
if ":" in path:
raise ValueError('colon is not allowed in hdfs filenames')
self._fs = fs or hdfs_clients.get_autoconfig_client()