def create_table()

in luigi/contrib/sqla.py [0:0]


    def create_table(self, engine):
        """
        Override to provide code for creating the target table.

        By default it will be created using types specified in columns.
        If the table exists, then it binds to the existing table.

        If overridden, use the provided connection object for setting up the table in order to
        create the table and insert data using the same transaction.
        :param engine: The sqlalchemy engine instance
        :type engine: object
        """
        def construct_sqla_columns(columns):
            retval = [sqlalchemy.Column(*c[0], **c[1]) for c in columns]
            return retval

        needs_setup = (len(self.columns) == 0) or (False in [len(c) == 2 for c in self.columns]) if not self.reflect else False
        if needs_setup:
            # only names of columns specified, no types
            raise NotImplementedError("create_table() not implemented for %r and columns types not specified" % self.table)
        else:
            # if columns is specified as (name, type) tuples
            with engine.begin() as con:

                if self.schema:
                    metadata = sqlalchemy.MetaData(schema=self.schema)
                else:
                    metadata = sqlalchemy.MetaData()

                try:
                    if not con.dialect.has_table(con, self.table, self.schema or None):
                        sqla_columns = construct_sqla_columns(self.columns)
                        self.table_bound = sqlalchemy.Table(self.table, metadata, *sqla_columns)
                        metadata.create_all(engine)
                    else:
                        full_table = '.'.join([self.schema, self.table]) if self.schema else self.table
                        metadata.reflect(only=[self.table], bind=engine)
                        self.table_bound = metadata.tables[full_table]
                except Exception as e:
                    self._logger.exception(self.table + str(e))