kong/spec/02-integration/06-invalidations/01-cluster_events_spec.lua (256 lines of code) (raw):

_G.ngx.config.debug = true local helpers = require "spec.helpers" local kong_cluster_events = require "kong.cluster_events" for _, strategy in helpers.each_strategy() do describe("cluster_events with db [#" .. strategy .. "]", function() local db lazy_setup(function() local _ _, db = helpers.get_db_utils(strategy, {}) end) lazy_teardown(function() local cluster_events = assert(kong_cluster_events.new { db = db }) cluster_events.strategy:truncate_events() end) before_each(function() ngx.shared.kong:flush_all() ngx.shared.kong:flush_expired() ngx.shared.kong_cluster_events:flush_all() ngx.shared.kong_cluster_events:flush_expired() local cluster_events = assert(kong_cluster_events.new { db = db }) cluster_events.strategy:truncate_events() end) describe("new()", function() it("creates an instance", function() local cluster_events, err = kong_cluster_events.new { db = db } assert.is_nil(err) assert.is_table(cluster_events) end) it("instantiates only once (singleton)", function() finally(function() _G.ngx.config.debug = true package.loaded["kong.cluster_events"] = nil kong_cluster_events = require "kong.cluster_events" end) _G.ngx.config.debug = false package.loaded["kong.cluster_events"] = nil kong_cluster_events = require "kong.cluster_events" assert(kong_cluster_events.new { db = db }) assert.has_error(function() assert(kong_cluster_events.new { db = db }) end, "kong.cluster_events was already instantiated", nil, true) end) it("generates an identical node_id for all instances on a node", function() local cluster_events_1 = assert(kong_cluster_events.new { db = db, }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, }) assert.is_string(cluster_events_1.node_id) assert.equal(cluster_events_1.node_id, cluster_events_2.node_id) end) it("instantiates but does not start polling", function() local cluster_events = assert(kong_cluster_events.new { db = db }) assert.is_false(cluster_events.polling) end) end) describe("pub/sub", function() local spy_func local uuid_1 = "a1e04ff0-3416-11e7-ba48-784f437104fa" local uuid_2 = "bbbd53dc-3416-11e7-aea6-784f437104fa" local cb = function(...) spy_func(...) end before_each(function() spy_func = spy.new(function() end) end) it("broadcasts on a given channel", function() -- nodes must not have the same node_id, to mimic 2 different Kong nodes -- on a cluster local cluster_events_1 = assert(kong_cluster_events.new { db = db, node_id = uuid_1 }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, node_id = uuid_2 }) assert(cluster_events_1:subscribe("my_channel", cb, false)) assert(cluster_events_1:subscribe("my_other_channel", cb, false)) assert(cluster_events_2:broadcast("my_channel", "hello world")) assert.spy(spy_func).was_not_called() assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(1) assert(cluster_events_2:broadcast("my_channel", "hello world")) assert.spy(spy_func).was_called(1) assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(2) assert(cluster_events_2:broadcast("my_other_channel", "hello world")) assert.spy(spy_func).was_called(2) assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(3) end) it("broadcasts data to subscribers", function() -- nodes must not have the same node_id, to mimic 2 different Kong nodes -- on a cluster local cluster_events_1 = assert(kong_cluster_events.new { db = db, node_id = uuid_1, }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, node_id = uuid_2, }) assert(cluster_events_1:subscribe("my_channel", cb, false)) assert(cluster_events_2:broadcast("my_channel", "hello world")) assert.spy(spy_func).was_not_called() assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(1) assert.spy(spy_func).was_called_with("hello world") end) it("does not broadcast events on the same node", function() -- same node_id local cluster_events_1 = assert(kong_cluster_events.new { db = db, node_id = uuid_1, }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, node_id = uuid_1, }) assert(cluster_events_1:subscribe("my_channel", cb, false)) assert(cluster_events_2:broadcast("my_channel", "hello world")) assert.spy(spy_func).was_not_called() assert(cluster_events_1:poll()) assert.spy(spy_func).was_not_called() end) it("starts interval polling when subscribing", function() local cluster_events_1 = assert(kong_cluster_events.new { db = db, poll_interval = 0.3, node_id = uuid_1 }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, node_id = uuid_2 }) finally(function() cluster_events_1.polling = false ngx.sleep(0.4) end) local called = 0 assert(cluster_events_1:subscribe("my_channel", function() called = called + 1 end)) assert(cluster_events_2:broadcast("my_channel", "hello world")) assert.equal(0, called) helpers.wait_until(function() return called == 1 end, 10) assert(cluster_events_2:broadcast("my_channel", "hello world")) assert.equal(1, called) helpers.wait_until(function() return called == 2 end, 10) end) it("applies a poll_offset to lookback potentially missed events", function() local cluster_events_1 = assert(kong_cluster_events.new { db = db, node_id = uuid_1, poll_offset = 2, }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, node_id = uuid_2, poll_offset = 2, }) assert(cluster_events_1:subscribe("grace_period_channel", cb, false)) assert(cluster_events_2:broadcast("grace_period_channel", "hello world")) assert.spy(spy_func).was_not_called() assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(1) -- only called once assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(1) -- reset shm storing ran events cluster_events_1.events_shm:flush_all() cluster_events_1.events_shm:flush_expired() ngx.sleep(1) -- ran again because of the lookback assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(2) -- was effectively called again ngx.sleep(1.001) -- 2.001 > poll_offset (2) assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(2) -- not called again this time end) it("handles more than <PAGE_SIZE> events at once", function() local cluster_events_1 = assert(kong_cluster_events.new { db = db, node_id = uuid_1, }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, node_id = uuid_2, }) assert(cluster_events_1:subscribe("busy_channel", cb, false)) -- default page size is 100 for i = 1, 201 do assert(cluster_events_2:broadcast("busy_channel", "hello world")) end assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(201) end) it("runs callbacks in protected mode", function() local cluster_events_1 = assert(kong_cluster_events.new { db = db, node_id = uuid_1, }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, node_id = uuid_2, }) assert(cluster_events_1:subscribe("errors_channel", function() error("foo") end, false)) -- false to not start auto polling assert(cluster_events_2:broadcast("errors_channel", "hello world")) assert.has_no_error(function() cluster_events_1:poll() end) end) it("broadcasts an event with a delay", function() local cluster_events_1 = assert(kong_cluster_events.new { db = db, node_id = uuid_1, }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, node_id = uuid_2, }) assert(cluster_events_1:subscribe("nbf_channel", cb, false)) -- false to not start auto polling local delay = 1 assert(cluster_events_2:broadcast("nbf_channel", "hello world", delay)) assert(cluster_events_1:poll()) assert.spy(spy_func).was_not_called() -- not called yet ngx.sleep(0.001) -- still yield in case our timer is set to 0 assert(cluster_events_1:poll()) assert.spy(spy_func).was_not_called() -- still not called ngx.sleep(delay) -- go past our desired `nbf` delay assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(1) -- called end) it("broadcasts an event with a polling delay for subscribers", function() local delay = 1 local cluster_events_1 = assert(kong_cluster_events.new { db = db, node_id = uuid_1, poll_delay = delay, }) local cluster_events_2 = assert(kong_cluster_events.new { db = db, node_id = uuid_2, poll_delay = delay, }) assert(cluster_events_1:subscribe("nbf_channel", cb, false)) -- false to not start auto polling assert(cluster_events_2:broadcast("nbf_channel", "hello world")) assert(cluster_events_1:poll()) assert.spy(spy_func).was_not_called() -- not called yet ngx.sleep(0.001) -- still yield in case our timer is set to 0 assert(cluster_events_1:poll()) assert.spy(spy_func).was_not_called() -- still not called ngx.sleep(delay) -- go past our desired `nbf` delay assert(cluster_events_1:poll()) assert.spy(spy_func).was_called(1) -- called end) end) end) end