in luigi/contrib/redshift.py [0:0]
def create_table(self, connection):
"""
Override to provide code for creating the target table.
By default it will be created using types (optionally)
specified in columns.
If overridden, use the provided connection object for
setting up the table in order to create the table and
insert data using the same transaction.
"""
if len(self.columns[0]) == 1:
# only names of columns specified, no types
raise NotImplementedError("create_table() not implemented "
"for %r and columns types not "
"specified" % self.table)
elif len(self.columns[0]) == 2:
# if columns is specified as (name, type) tuples
coldefs = ','.join(
'{name} {type}'.format(
name=name,
type=type) for name, type in self.columns
)
table_constraints = ''
if self.table_constraints != '':
table_constraints = ', ' + self.table_constraints
query = ("CREATE {type} TABLE "
"{table} ({coldefs} {table_constraints}) "
"{table_attributes}").format(
type=self.table_type,
table=self.table,
coldefs=coldefs,
table_constraints=table_constraints,
table_attributes=self.table_attributes)
connection.cursor().execute(query)
elif len(self.columns[0]) == 3:
# if columns is specified as (name, type, encoding) tuples
# possible column encodings: https://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html
coldefs = ','.join(
'{name} {type} ENCODE {encoding}'.format(
name=name,
type=type,
encoding=encoding) for name, type, encoding in self.columns
)
table_constraints = ''
if self.table_constraints != '':
table_constraints = ',' + self.table_constraints
query = ("CREATE {type} TABLE "
"{table} ({coldefs} {table_constraints}) "
"{table_attributes}").format(
type=self.table_type,
table=self.table,
coldefs=coldefs,
table_constraints=table_constraints,
table_attributes=self.table_attributes)
connection.cursor().execute(query)
else:
raise ValueError("create_table() found no columns for %r"
% self.table)