in luigi/contrib/sqla.py [0:0]
def create_table(self, engine):
"""
Override to provide code for creating the target table.
By default it will be created using types specified in columns.
If the table exists, then it binds to the existing table.
If overridden, use the provided connection object for setting up the table in order to
create the table and insert data using the same transaction.
:param engine: The sqlalchemy engine instance
:type engine: object
"""
def construct_sqla_columns(columns):
retval = [sqlalchemy.Column(*c[0], **c[1]) for c in columns]
return retval
needs_setup = (len(self.columns) == 0) or (False in [len(c) == 2 for c in self.columns]) if not self.reflect else False
if needs_setup:
# only names of columns specified, no types
raise NotImplementedError("create_table() not implemented for %r and columns types not specified" % self.table)
else:
# if columns is specified as (name, type) tuples
with engine.begin() as con:
if self.schema:
metadata = sqlalchemy.MetaData(schema=self.schema)
else:
metadata = sqlalchemy.MetaData()
try:
if not con.dialect.has_table(con, self.table, self.schema or None):
sqla_columns = construct_sqla_columns(self.columns)
self.table_bound = sqlalchemy.Table(self.table, metadata, *sqla_columns)
metadata.create_all(engine)
else:
full_table = '.'.join([self.schema, self.table]) if self.schema else self.table
metadata.reflect(only=[self.table], bind=engine)
self.table_bound = metadata.tables[full_table]
except Exception as e:
self._logger.exception(self.table + str(e))