function _mt:insert()

in kong/kong/db/strategies/cassandra/init.lua [835:987]


function _mt:insert(entity, options)
  local schema = self.schema
  local args = new_tab(#schema.fields, 0)
  local ttl = schema.ttl and options and options.ttl
  local has_composite_cache_key = schema.cache_key and #schema.cache_key > 1

  local has_ws_id, ws_id, err = check_workspace(self, options, false)
  if err then
    return nil, err
  end

  local cql_batch
  local batch_mode

  local mode = 'insert'

  local primary_key
  if schema.fields.tags then
    primary_key = schema:extract_pk_values(entity)
    local err_t
    cql_batch, err_t = build_tags_cql(primary_key, schema, entity["tags"], ttl)
    if err_t then
      return nil, err_t
    end

    if cql_batch then
      
      
      mode = 'insert_no_transaction'
      batch_mode = true
    end
  end

  local cql, err
  if ttl then
    cql, err = get_query(self, mode .. "_ttl")
    if err then
      return nil, err
    end

    cql = fmt(cql, ttl)

  else
    cql, err = get_query(self, mode)
    if err then
      return nil, err
    end
  end

  

  for field_name, field in schema:each_field() do
    if field.type == "foreign" then
      local foreign_pk = entity[field_name]

      if foreign_pk ~= null then
        
        local exists, err_t = foreign_pk_exists(self, field_name, field, foreign_pk, ws_id)
        if not exists then
          return nil, err_t
        end
      end

      local db_columns = self.foreign_keys_db_columns[field_name]
      serialize_foreign_pk(db_columns, args, nil, foreign_pk, ws_id)

    else
      if field.unique
        and entity[field_name] ~= null
        and entity[field_name] ~= nil
      then
        
        
        
        
        primary_key = primary_key or schema:extract_pk_values(entity)
        local _, err_t = check_unique(self, primary_key, entity, field_name, ws_id)
        if err_t then
          return nil, err_t
        end
      end

      insert(args, serialize_arg(field, entity[field_name], ws_id))
    end
  end

  if has_composite_cache_key then
    primary_key = primary_key or schema:extract_pk_values(entity)
    local _, err_t = check_unique(self, primary_key, entity, "cache_key", ws_id)
    if err_t then
      return nil, err_t
    end

    insert(args, serialize_arg(cache_key_field, entity["cache_key"], ws_id))
  end

  if has_ws_id then
    insert(args, serialize_arg(ws_id_field, ws_id, ws_id))
  end

  

  local res, err
  if batch_mode then
    
    insert(cql_batch, 1, {cql, args})
    res, err = self.connector:batch(cql_batch, nil, "write", true)
  else
    res, err = self.connector:query(cql, args, nil, "write")
  end

  if not res then
    return nil, self.errors:database_error("could not execute insertion query: "
                                           .. err)
  end

  
  
  if not batch_mode and res[1][APPLIED_COLUMN] == false then
    
    
    primary_key = primary_key or schema:extract_pk_values(entity)

    return nil, self.errors:primary_key_violation(primary_key)
  end

  
  
  

  clear_tab(res)

  for field_name, field in schema:each_field() do
    local value = entity[field_name]

    if field.type == "foreign" then
      if value ~= null and value ~= nil then
        value = field.schema:extract_pk_values(value)

      else
        value = null
      end
    end

    res[field_name] = value
  end

  if has_ws_id then
    res.ws_id = ws_id
  end

  return res
end