kong/spec/01-unit/17-concurrency_spec.lua (279 lines of code) (raw):

local mocker = require("spec.fixtures.mocker") -- avoid requiring spec.helpers to avoid complicating necessary mocks local function unindent(s) return s:gsub("\n%s*", "\n"):gsub("^%s*", ""):gsub("\n$", "") end local function setup_it_block() -- keep track of created semaphores local semaphores = {} mocker.setup(finally, { ngx = { log = function() -- avoid stdout output during test end, }, kong = { log = { err = function() end, warn = function() end, }, response = { exit = function() end, }, }, modules = { { "ngx.semaphore", { _semaphores = semaphores, new = function() local s = { value = 0, wait = function(self, timeout) local t = 0 while self.value == 0 do coroutine.yield() t = t + 1 if t == timeout then return nil, "timeout" end end self.value = self.value - 1 return true end, post = function(self, n) n = n or 1 self.value = self.value + n return true end, } table.insert(semaphores, s) return s end, }}, { "kong.concurrency", {} }, } }) end describe("kong.concurrency", function() describe("with_coroutine_mutex", function() it("baseline demo without locking", function() setup_it_block() local output = {} local co = {} for c = 1, 2 do co[c] = coroutine.create(function() table.insert(output, "hello " .. c) coroutine.yield() table.insert(output, "inside " .. c) coroutine.yield() table.insert(output, "releasing " .. c) table.insert(output, "goodbye " .. c) end) end -- mock a round-robin coroutine scheduler for i = 1, 10 do coroutine.resume(co[1]) coroutine.resume(co[2]) end assert.same(unindent([[ hello 1 hello 2 inside 1 inside 2 releasing 1 goodbye 1 releasing 2 goodbye 2 ]]), table.concat(output, "\n")) end) it("locks coroutines with a mutex", function() setup_it_block() local output = {} local co = {} for c = 1, 2 do co[c] = coroutine.create(function() local concurrency = require("kong.concurrency") table.insert(output, "hello " .. c) coroutine.yield() concurrency.with_coroutine_mutex({ name = "test" }, function() table.insert(output, "inside " .. c) coroutine.yield() table.insert(output, "releasing " .. c) end) table.insert(output, "goodbye " .. c) end) end -- mock a round-robin coroutine scheduler for i = 1, 10 do coroutine.resume(co[1]) coroutine.resume(co[2]) end assert.same(unindent([[ hello 1 hello 2 inside 1 releasing 1 goodbye 1 inside 2 releasing 2 goodbye 2 ]]), table.concat(output, "\n")) end) it("has option on_timeout = 'run_unlocked'", function() setup_it_block() local output = {} local co = {} for c = 1, 2 do co[c] = coroutine.create(function() local concurrency = require("kong.concurrency") table.insert(output, "hello " .. c) coroutine.yield() local opts = { name = "test", timeout = 5, on_timeout = "run_unlocked", } concurrency.with_coroutine_mutex(opts, function() for i = 1, 10 do table.insert(output, "taking a while (" .. i .. ") " .. c) coroutine.yield() end table.insert(output, "releasing " .. c) end) table.insert(output, "goodbye " .. c) end) end -- mock a round-robin coroutine scheduler for i = 1, 20 do coroutine.resume(co[1]) coroutine.resume(co[2]) end assert.same(unindent([[ hello 1 hello 2 taking a while (1) 1 taking a while (2) 1 taking a while (3) 1 taking a while (4) 1 taking a while (5) 1 taking a while (6) 1 taking a while (1) 2 taking a while (7) 1 taking a while (2) 2 taking a while (8) 1 taking a while (3) 2 taking a while (9) 1 taking a while (4) 2 taking a while (10) 1 taking a while (5) 2 releasing 1 goodbye 1 taking a while (6) 2 taking a while (7) 2 taking a while (8) 2 taking a while (9) 2 taking a while (10) 2 releasing 2 goodbye 2 ]]), table.concat(output, "\n")) end) it("has option on_timeout = 'return_true'", function() setup_it_block() local output = {} local co = {} for c = 1, 2 do co[c] = coroutine.create(function() local concurrency = require("kong.concurrency") table.insert(output, "hello " .. c) coroutine.yield() local opts = { name = "test", timeout = 5, on_timeout = "return_true", } local ok, err = concurrency.with_coroutine_mutex(opts, function() for i = 1, 10 do table.insert(output, "taking a while (" .. i .. ") " .. c) coroutine.yield() end table.insert(output, "releasing " .. c) end) if c == 2 then assert.truthy(ok) assert.is_nil(err) end table.insert(output, "goodbye " .. c) end) end -- mock a round-robin coroutine scheduler for i = 1, 20 do coroutine.resume(co[1]) coroutine.resume(co[2]) end assert.same(unindent([[ hello 1 hello 2 taking a while (1) 1 taking a while (2) 1 taking a while (3) 1 taking a while (4) 1 taking a while (5) 1 taking a while (6) 1 goodbye 2 taking a while (7) 1 taking a while (8) 1 taking a while (9) 1 taking a while (10) 1 releasing 1 goodbye 1 ]]), table.concat(output, "\n")) end) it("supports multiple locks", function() setup_it_block() local output = {} local co = {} -- coroutines 1 and 2 share a lock for c = 1, 2 do co[c] = coroutine.create(function() local concurrency = require("kong.concurrency") table.insert(output, "hello " .. c) coroutine.yield() concurrency.with_coroutine_mutex({ name = "test1" }, function() table.insert(output, "inside " .. c) coroutine.yield() table.insert(output, "releasing " .. c) end) table.insert(output, "goodbye " .. c) end) end -- coroutines 3 and 4 share a different lock for c = 3, 4 do co[c] = coroutine.create(function() local concurrency = require("kong.concurrency") table.insert(output, "hello " .. c) coroutine.yield() concurrency.with_coroutine_mutex({ name = "test2" }, function() table.insert(output, "inside " .. c) coroutine.yield() table.insert(output, "releasing " .. c) end) table.insert(output, "goodbye " .. c) end) end -- mock a round-robin coroutine scheduler for i = 1, 10 do coroutine.resume(co[1]) coroutine.resume(co[2]) coroutine.resume(co[3]) coroutine.resume(co[4]) end assert.same(unindent([[ hello 1 hello 2 hello 3 hello 4 inside 1 inside 3 releasing 1 goodbye 1 inside 2 releasing 3 goodbye 3 inside 4 releasing 2 goodbye 2 releasing 4 goodbye 4 ]]), table.concat(output, "\n")) end) end) end)