in luigi/contrib/mysqldb.py [0:0]
def run(self):
"""
Inserts data generated by rows() into target table.
If the target table doesn't exist, self.create_table will be called to attempt to create the table.
Normally you don't want to override this.
"""
if not (self.table and self.columns):
raise Exception("table and columns need to be specified")
connection = self.output().connect()
# attempt to copy the data into mysql
# if it fails because the target table doesn't exist
# try to create it by running self.create_table
for attempt in range(2):
try:
cursor = connection.cursor()
print("caling init copy...")
self.init_copy(connection)
self.copy(cursor)
self.post_copy(connection)
if self.enable_metadata_columns:
self.post_copy_metacolumns(cursor)
except Error as err:
if err.errno == errorcode.ER_NO_SUCH_TABLE and attempt == 0:
# if first attempt fails with "relation not found", try creating table
# logger.info("Creating table %s", self.table)
connection.reconnect()
self.create_table(connection)
else:
raise
else:
break
# mark as complete in same transaction
self.output().touch(connection)
connection.commit()
connection.close()