in sdks/python/apache_beam/io/gcp/bigquery_tools.py [0:0]
def get_or_create_table(
self,
project_id,
dataset_id,
table_id,
schema,
create_disposition,
write_disposition,
additional_create_parameters=None):
"""Gets or creates a table based on create and write dispositions.
The function mimics the behavior of BigQuery import jobs when using the
same create and write dispositions.
Args:
project_id: The project id owning the table.
dataset_id: The dataset id owning the table.
table_id: The table id.
schema: A bigquery.TableSchema instance or None.
create_disposition: CREATE_NEVER or CREATE_IF_NEEDED.
write_disposition: WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE.
Returns:
A bigquery.Table instance if table was found or created.
Raises:
`RuntimeError`: For various mismatches between the state of the table and
the create/write dispositions passed in. For example if the table is not
empty and WRITE_EMPTY was specified then an error will be raised since
the table was expected to be empty.
"""
from apache_beam.io.gcp.bigquery import BigQueryDisposition
found_table = None
try:
found_table = self.get_table(project_id, dataset_id, table_id)
except HttpError as exn:
if exn.status_code == 404:
if create_disposition == BigQueryDisposition.CREATE_NEVER:
raise RuntimeError(
'Table %s:%s.%s not found but create disposition is CREATE_NEVER.'
% (project_id, dataset_id, table_id))
else:
raise
# If table exists already then handle the semantics for WRITE_EMPTY and
# WRITE_TRUNCATE write dispositions.
if found_table and write_disposition in (
BigQueryDisposition.WRITE_EMPTY, BigQueryDisposition.WRITE_TRUNCATE):
# Delete the table and recreate it (later) if WRITE_TRUNCATE was
# specified.
if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
self._delete_table(project_id, dataset_id, table_id)
elif (write_disposition == BigQueryDisposition.WRITE_EMPTY and
not self._is_table_empty(project_id, dataset_id, table_id)):
raise RuntimeError(
'Table %s:%s.%s is not empty but write disposition is WRITE_EMPTY.'
% (project_id, dataset_id, table_id))
# Create a new table potentially reusing the schema from a previously
# found table in case the schema was not specified.
if schema is None and found_table is None:
raise RuntimeError(
'Table %s:%s.%s requires a schema. None can be inferred because the '
'table does not exist.' % (project_id, dataset_id, table_id))
if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
return found_table
else:
created_table = None
try:
created_table = self._create_table(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
schema=schema or found_table.schema,
additional_parameters=additional_create_parameters)
except HttpError as exn:
if exn.status_code == 409:
_LOGGER.debug(
'Skipping Creation. Table %s:%s.%s already exists.' %
(project_id, dataset_id, table_id))
created_table = self.get_table(project_id, dataset_id, table_id)
else:
raise
_LOGGER.info(
'Created table %s.%s.%s with schema %s. '
'Result: %s.',
project_id,
dataset_id,
table_id,
schema or found_table.schema,
created_table)
# if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
# the table before this point.
if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
# BigQuery can route data to the old table for 2 mins max so wait
# that much time before creating the table and writing it
_LOGGER.warning(
'Sleeping for 150 seconds before the write as ' +
'BigQuery inserts can be routed to deleted table ' +
'for 2 mins after the delete and create.')
# TODO(BEAM-2673): Remove this sleep by migrating to load api
time.sleep(150)
return created_table
else:
return created_table