def get_or_create_table()

in sdks/python/apache_beam/io/gcp/bigquery_tools.py [0:0]


  def get_or_create_table(
      self,
      project_id,
      dataset_id,
      table_id,
      schema,
      create_disposition,
      write_disposition,
      additional_create_parameters=None):
    """Gets or creates a table based on create and write dispositions.

    The function mimics the behavior of BigQuery import jobs when using the
    same create and write dispositions.

    Args:
      project_id: The project id owning the table.
      dataset_id: The dataset id owning the table.
      table_id: The table id.
      schema: A bigquery.TableSchema instance or None.
      create_disposition: CREATE_NEVER or CREATE_IF_NEEDED.
      write_disposition: WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE.

    Returns:
      A bigquery.Table instance if table was found or created.

    Raises:
      `RuntimeError`: For various mismatches between the state of the table and
        the create/write dispositions passed in. For example if the table is not
        empty and WRITE_EMPTY was specified then an error will be raised since
        the table was expected to be empty.
    """
    from apache_beam.io.gcp.bigquery import BigQueryDisposition

    found_table = None
    try:
      found_table = self.get_table(project_id, dataset_id, table_id)
    except HttpError as exn:
      if exn.status_code == 404:
        if create_disposition == BigQueryDisposition.CREATE_NEVER:
          raise RuntimeError(
              'Table %s:%s.%s not found but create disposition is CREATE_NEVER.'
              % (project_id, dataset_id, table_id))
      else:
        raise

    # If table exists already then handle the semantics for WRITE_EMPTY and
    # WRITE_TRUNCATE write dispositions.
    if found_table and write_disposition in (
        BigQueryDisposition.WRITE_EMPTY, BigQueryDisposition.WRITE_TRUNCATE):
      # Delete the table and recreate it (later) if WRITE_TRUNCATE was
      # specified.
      if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
        self._delete_table(project_id, dataset_id, table_id)
      elif (write_disposition == BigQueryDisposition.WRITE_EMPTY and
            not self._is_table_empty(project_id, dataset_id, table_id)):
        raise RuntimeError(
            'Table %s:%s.%s is not empty but write disposition is WRITE_EMPTY.'
            % (project_id, dataset_id, table_id))

    # Create a new table potentially reusing the schema from a previously
    # found table in case the schema was not specified.
    if schema is None and found_table is None:
      raise RuntimeError(
          'Table %s:%s.%s requires a schema. None can be inferred because the '
          'table does not exist.' % (project_id, dataset_id, table_id))
    if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
      return found_table
    else:
      created_table = None
      try:
        created_table = self._create_table(
            project_id=project_id,
            dataset_id=dataset_id,
            table_id=table_id,
            schema=schema or found_table.schema,
            additional_parameters=additional_create_parameters)
      except HttpError as exn:
        if exn.status_code == 409:
          _LOGGER.debug(
              'Skipping Creation. Table %s:%s.%s already exists.' %
              (project_id, dataset_id, table_id))
          created_table = self.get_table(project_id, dataset_id, table_id)
        else:
          raise
      _LOGGER.info(
          'Created table %s.%s.%s with schema %s. '
          'Result: %s.',
          project_id,
          dataset_id,
          table_id,
          schema or found_table.schema,
          created_table)
      # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
      # the table before this point.
      if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
        # BigQuery can route data to the old table for 2 mins max so wait
        # that much time before creating the table and writing it
        _LOGGER.warning(
            'Sleeping for 150 seconds before the write as ' +
            'BigQuery inserts can be routed to deleted table ' +
            'for 2 mins after the delete and create.')
        # TODO(BEAM-2673): Remove this sleep by migrating to load api
        time.sleep(150)
        return created_table
      else:
        return created_table