in kong/kong/db/strategies/cassandra/init.lua [835:987]
function _mt:insert(entity, options)
local schema = self.schema
local args = new_tab(#schema.fields, 0)
local ttl = schema.ttl and options and options.ttl
local has_composite_cache_key = schema.cache_key and #schema.cache_key > 1
local has_ws_id, ws_id, err = check_workspace(self, options, false)
if err then
return nil, err
end
local cql_batch
local batch_mode
local mode = 'insert'
local primary_key
if schema.fields.tags then
primary_key = schema:extract_pk_values(entity)
local err_t
cql_batch, err_t = build_tags_cql(primary_key, schema, entity["tags"], ttl)
if err_t then
return nil, err_t
end
if cql_batch then
mode = 'insert_no_transaction'
batch_mode = true
end
end
local cql, err
if ttl then
cql, err = get_query(self, mode .. "_ttl")
if err then
return nil, err
end
cql = fmt(cql, ttl)
else
cql, err = get_query(self, mode)
if err then
return nil, err
end
end
for field_name, field in schema:each_field() do
if field.type == "foreign" then
local foreign_pk = entity[field_name]
if foreign_pk ~= null then
local exists, err_t = foreign_pk_exists(self, field_name, field, foreign_pk, ws_id)
if not exists then
return nil, err_t
end
end
local db_columns = self.foreign_keys_db_columns[field_name]
serialize_foreign_pk(db_columns, args, nil, foreign_pk, ws_id)
else
if field.unique
and entity[field_name] ~= null
and entity[field_name] ~= nil
then
primary_key = primary_key or schema:extract_pk_values(entity)
local _, err_t = check_unique(self, primary_key, entity, field_name, ws_id)
if err_t then
return nil, err_t
end
end
insert(args, serialize_arg(field, entity[field_name], ws_id))
end
end
if has_composite_cache_key then
primary_key = primary_key or schema:extract_pk_values(entity)
local _, err_t = check_unique(self, primary_key, entity, "cache_key", ws_id)
if err_t then
return nil, err_t
end
insert(args, serialize_arg(cache_key_field, entity["cache_key"], ws_id))
end
if has_ws_id then
insert(args, serialize_arg(ws_id_field, ws_id, ws_id))
end
local res, err
if batch_mode then
insert(cql_batch, 1, {cql, args})
res, err = self.connector:batch(cql_batch, nil, "write", true)
else
res, err = self.connector:query(cql, args, nil, "write")
end
if not res then
return nil, self.errors:database_error("could not execute insertion query: "
.. err)
end
if not batch_mode and res[1][APPLIED_COLUMN] == false then
primary_key = primary_key or schema:extract_pk_values(entity)
return nil, self.errors:primary_key_violation(primary_key)
end
clear_tab(res)
for field_name, field in schema:each_field() do
local value = entity[field_name]
if field.type == "foreign" then
if value ~= null and value ~= nil then
value = field.schema:extract_pk_values(value)
else
value = null
end
end
res[field_name] = value
end
if has_ws_id then
res.ws_id = ws_id
end
return res
end