function DB:cluster_mutex()

in kong/kong/db/init.lua [284:416]


  function DB:cluster_mutex(key, opts, cb)
    if type(key) ~= "string" then
      error("key must be a string", 2)
    end

    local owner
    local ttl
    local no_wait
    local no_cleanup

    if opts ~= nil then
      if type(opts) ~= "table" then
        error("opts must be a table", 2)
      end

      if opts.ttl and type(opts.ttl) ~= "number" then
        error("opts.ttl must be a number", 2)
      end

      if opts.owner and type(opts.owner) ~= "string" then
        error("opts.owner must be a string", 2)
      end

      if opts.no_wait and type(opts.no_wait) ~= "boolean" then
        error("opts.no_wait must be a boolean", 2)
      end

      if opts.no_cleanup and type(opts.no_cleanup) ~= "boolean" then
        error("opts.no_cleanup must be a boolean", 2)
      end

      owner = opts.owner
      ttl = opts.ttl
      no_wait = opts.no_wait
      no_cleanup = opts.no_cleanup
    end

    if type(cb) ~= "function" then
      local mt = getmetatable(cb)

      if not mt or type(mt.__call) ~= "function" then
        error("cb must be a function", 2)
      end
    end

    if not owner then
      
      
      
      
      
      
      local id, err = knode.get_id()
      if not id then
        return nil, prefix_err(self, "failed to generate lock owner: " .. err)
      end

      owner = id
    end

    if not ttl then
      ttl = DEFAULT_LOCKS_TTL
    end

    local mutex_opts = {
      name = key,
      timeout = ttl,
    }
    return concurrency.with_worker_mutex(mutex_opts, function(elapsed)
      if elapsed ~= 0 then
        
        return false
      end

      
      

      local ok, err = self.connector:insert_lock(key, ttl, owner)
      if err then
        return nil, prefix_err(self, "failed to insert cluster lock: " .. err)
      end

      if not ok then
        if no_wait then
          
          return false
        end

        
        local step = 0.1
        local cluster_elapsed = 0

        while cluster_elapsed < ttl do
          ngx.sleep(step)
          cluster_elapsed = cluster_elapsed + step

          if cluster_elapsed >= ttl then
            break
          end

          local locked, err = self.connector:read_lock(key)
          if err then
            return nil, prefix_err(self, "failed to read cluster lock: " .. err)
          end

          if not locked then
            
            return false
          end

          step = math.min(step * 3, MAX_LOCK_WAIT_STEP)
        end

        return nil, prefix_err(self, "timeout")
      end

      

      local pok, perr = xpcall(cb, debug.traceback)
      if not pok then
        self.connector:remove_lock(key, owner)

        return nil, prefix_err(self, "cluster_mutex callback threw an error: "
                                     .. perr)
      end

      if not no_cleanup then
        self.connector:remove_lock(key, owner)
      end

      return true
    end)
  end