in kong/kong/db/init.lua [284:416]
function DB:cluster_mutex(key, opts, cb)
if type(key) ~= "string" then
error("key must be a string", 2)
end
local owner
local ttl
local no_wait
local no_cleanup
if opts ~= nil then
if type(opts) ~= "table" then
error("opts must be a table", 2)
end
if opts.ttl and type(opts.ttl) ~= "number" then
error("opts.ttl must be a number", 2)
end
if opts.owner and type(opts.owner) ~= "string" then
error("opts.owner must be a string", 2)
end
if opts.no_wait and type(opts.no_wait) ~= "boolean" then
error("opts.no_wait must be a boolean", 2)
end
if opts.no_cleanup and type(opts.no_cleanup) ~= "boolean" then
error("opts.no_cleanup must be a boolean", 2)
end
owner = opts.owner
ttl = opts.ttl
no_wait = opts.no_wait
no_cleanup = opts.no_cleanup
end
if type(cb) ~= "function" then
local mt = getmetatable(cb)
if not mt or type(mt.__call) ~= "function" then
error("cb must be a function", 2)
end
end
if not owner then
local id, err = knode.get_id()
if not id then
return nil, prefix_err(self, "failed to generate lock owner: " .. err)
end
owner = id
end
if not ttl then
ttl = DEFAULT_LOCKS_TTL
end
local mutex_opts = {
name = key,
timeout = ttl,
}
return concurrency.with_worker_mutex(mutex_opts, function(elapsed)
if elapsed ~= 0 then
return false
end
local ok, err = self.connector:insert_lock(key, ttl, owner)
if err then
return nil, prefix_err(self, "failed to insert cluster lock: " .. err)
end
if not ok then
if no_wait then
return false
end
local step = 0.1
local cluster_elapsed = 0
while cluster_elapsed < ttl do
ngx.sleep(step)
cluster_elapsed = cluster_elapsed + step
if cluster_elapsed >= ttl then
break
end
local locked, err = self.connector:read_lock(key)
if err then
return nil, prefix_err(self, "failed to read cluster lock: " .. err)
end
if not locked then
return false
end
step = math.min(step * 3, MAX_LOCK_WAIT_STEP)
end
return nil, prefix_err(self, "timeout")
end
local pok, perr = xpcall(cb, debug.traceback)
if not pok then
self.connector:remove_lock(key, owner)
return nil, prefix_err(self, "cluster_mutex callback threw an error: "
.. perr)
end
if not no_cleanup then
self.connector:remove_lock(key, owner)
end
return true
end)
end