in luigi/contrib/postgres.py [0:0]
def run(self):
"""
Inserts data generated by rows() into target table.
If the target table doesn't exist, self.create_table will be called to attempt to create the table.
Normally you don't want to override this.
"""
if not (self.table and self.columns):
raise Exception("table and columns need to be specified")
connection = self.output().connect()
# transform all data generated by rows() using map_column and write data
# to a temporary file for import using postgres COPY
tmp_dir = luigi.configuration.get_config().get('postgres', 'local-tmp-dir', None)
tmp_file = tempfile.TemporaryFile(dir=tmp_dir)
n = 0
for row in self.rows():
n += 1
if n % 100000 == 0:
logger.info("Wrote %d lines", n)
rowstr = self.column_separator.join(self.map_column(val) for val in row)
rowstr += "\n"
tmp_file.write(rowstr.encode('utf-8'))
logger.info("Done writing, importing at %s", datetime.datetime.now())
tmp_file.seek(0)
# attempt to copy the data into postgres
# if it fails because the target table doesn't exist
# try to create it by running self.create_table
for attempt in range(2):
try:
cursor = connection.cursor()
self.init_copy(connection)
self.copy(cursor, tmp_file)
self.post_copy(connection)
if self.enable_metadata_columns:
self.post_copy_metacolumns(cursor)
except dbapi.DatabaseError as e:
if db_error_code(e) == ERROR_UNDEFINED_TABLE and attempt == 0:
# if first attempt fails with "relation not found", try creating table
logger.info("Creating table %s", self.table)
# reset() is a psycopg2-specific method
if hasattr(connection, 'reset'):
connection.reset()
else:
_pg8000_connection_reset(connection)
self.create_table(connection)
else:
raise
else:
break
# mark as complete in same transaction
self.output().touch(connection)
# commit and clean up
connection.commit()
connection.close()
tmp_file.close()