def _upgrade_schema()

in luigi/db_task_history.py [0:0]


def _upgrade_schema(engine):
    """
    Ensure the database schema is up to date with the codebase.

    :param engine: SQLAlchemy engine of the underlying database.
    """
    inspector = reflection.Inspector.from_engine(engine)
    with engine.connect() as conn:

        # Upgrade 1.  Add task_id column and index to tasks
        if 'task_id' not in [x['name'] for x in inspector.get_columns('tasks')]:
            logger.warning('Upgrading DbTaskHistory schema: Adding tasks.task_id')
            conn.execute('ALTER TABLE tasks ADD COLUMN task_id VARCHAR(200)')
            conn.execute('CREATE INDEX ix_task_id ON tasks (task_id)')

        # Upgrade 2. Alter value column to be TEXT, note that this is idempotent so no if-guard
        if 'mysql' in engine.dialect.name:
            conn.execute('ALTER TABLE task_parameters MODIFY COLUMN value TEXT')
        elif 'oracle' in engine.dialect.name:
            conn.execute('ALTER TABLE task_parameters MODIFY value TEXT')
        elif 'mssql' in engine.dialect.name:
            conn.execute('ALTER TABLE task_parameters ALTER COLUMN value TEXT')
        elif 'postgresql' in engine.dialect.name:
            if str([x for x in inspector.get_columns('task_parameters')
                    if x['name'] == 'value'][0]['type']) != 'TEXT':
                conn.execute('ALTER TABLE task_parameters ALTER COLUMN value TYPE TEXT')
        elif 'sqlite' in engine.dialect.name:
            # SQLite does not support changing column types. A database file will need
            # to be used to pickup this migration change.
            for i in conn.execute('PRAGMA table_info(task_parameters);').fetchall():
                if i['name'] == 'value' and i['type'] != 'TEXT':
                    logger.warning(
                        'SQLite can not change column types. Please use a new database '
                        'to pickup column type changes.'
                    )
        else:
            logger.warning(
                'SQLAlcheny dialect {} could not be migrated to the TEXT type'.format(
                    engine.dialect
                )
            )