kong/spec/01-unit/09-balancer/02-least_connections_spec.lua (409 lines of code) (raw):
local dns_utils = require "kong.resty.dns.utils"
local mocker = require "spec.fixtures.mocker"
local utils = require "kong.tools.utils"
local ws_id = utils.uuid()
local client, balancers, targets
local helpers = require "spec.helpers.dns"
--local gettime = helpers.gettime
--local sleep = helpers.sleep
local dnsSRV = function(...) return helpers.dnsSRV(client, ...) end
local dnsA = function(...) return helpers.dnsA(client, ...) end
--local dnsAAAA = function(...) return helpers.dnsAAAA(client, ...) end
--local dnsExpire = helpers.dnsExpire
local t_insert = table.insert
local unset_register = {}
local function setup_block(consistency)
local cache_table = {}
local function mock_cache()
return {
safe_set = function(self, k, v)
cache_table[k] = v
return true
end,
get = function(self, k, _, fn, arg)
if cache_table[k] == nil then
cache_table[k] = fn(arg)
end
return cache_table[k]
end,
}
end
local function register_unsettter(f)
table.insert(unset_register, f)
end
mocker.setup(register_unsettter, {
kong = {
configuration = {
worker_consistency = consistency,
worker_state_update_frequency = 0.1,
},
core_cache = mock_cache(cache_table),
},
ngx = {
ctx = {
workspace = ws_id,
}
}
})
end
local function unsetup_block()
for _, f in ipairs(unset_register) do
f()
end
end
local upstream_index = 0
local function new_balancer(targets_list)
upstream_index = upstream_index + 1
local upname="upstream_" .. upstream_index
local hc_defaults = {
active = {
timeout = 1,
concurrency = 10,
http_path = "/",
healthy = {
interval = 0, -- 0 = probing disabled by default
http_statuses = { 200, 302 },
successes = 0, -- 0 = disabled by default
},
unhealthy = {
interval = 0, -- 0 = probing disabled by default
http_statuses = { 429, 404,
500, 501, 502, 503, 504, 505 },
tcp_failures = 0, -- 0 = disabled by default
timeouts = 0, -- 0 = disabled by default
http_failures = 0, -- 0 = disabled by default
},
},
passive = {
healthy = {
http_statuses = { 200, 201, 202, 203, 204, 205, 206, 207, 208, 226,
300, 301, 302, 303, 304, 305, 306, 307, 308 },
successes = 0,
},
unhealthy = {
http_statuses = { 429, 500, 503 },
tcp_failures = 0, -- 0 = circuit-breaker disabled by default
timeouts = 0, -- 0 = circuit-breaker disabled by default
http_failures = 0, -- 0 = circuit-breaker disabled by default
},
},
}
local my_upstream = { id=upname, name=upname, ws_id=ws_id, slots=10, healthchecks=hc_defaults, algorithm="least-connections" }
local b = (balancers.create_balancer(my_upstream, true))
for _, target in ipairs(targets_list) do
local name, port, weight = target, nil, nil
if type(target) == "table" then
name = target.name or target[1]
port = target.port or target[2]
weight = target.weight or target[3]
end
table.insert(b.targets, {
upstream = name or upname,
balancer = b,
name = name,
nameType = dns_utils.hostnameType(name),
addresses = {},
port = port or 8000,
weight = weight or 100,
totalWeight = 0,
unavailableWeight = 0,
})
end
targets.resolve_targets(b.targets)
return b
end
local function validate_lcb(b, debug)
local available, unavailable = 0, 0
local bheap = b.algorithm.binaryHeap
local num_addresses = 0
for _, target in ipairs(b.targets) do
for _, addr in ipairs(target.addresses) do
if bheap:valueByPayload(addr) then
-- it's in the heap
assert(not addr.disabled, "should be enabled when in the heap")
assert(addr.available, "should be available when in the heap")
available = available + 1
assert(bheap:valueByPayload(addr) == (addr.connectionCount+1)/addr.weight)
else
assert(not addr.disabled, "should be enabled when not in the heap")
assert(not addr.available, "should not be available when not in the heap")
unavailable = unavailable + 1
end
num_addresses = num_addresses + 1
end
end
assert(available + unavailable == num_addresses, "mismatch in counts")
return b
end
describe("[least-connections]", function()
local snapshot
setup(function()
_G.package.loaded["kong.resty.dns.client"] = nil -- make sure module is reloaded
_G.package.loaded["kong.runloop.balancer.targets"] = nil -- make sure module is reloaded
client = require "kong.resty.dns.client"
targets = require "kong.runloop.balancer.targets"
balancers = require "kong.runloop.balancer.balancers"
local healthcheckers = require "kong.runloop.balancer.healthcheckers"
healthcheckers.init()
balancers.init()
local kong = {}
_G.kong = kong
kong.worker_events = require "resty.worker.events"
kong.worker_events.configure({
shm = "kong_process_events", -- defined by "lua_shared_dict"
timeout = 5, -- life time of event data in shm
interval = 1, -- poll interval (seconds)
wait_interval = 0.010, -- wait before retry fetching event data
wait_max = 0.5, -- max wait time before discarding event
})
local function empty_each()
return function() end
end
kong.db = {
targets = {
each = empty_each,
select_by_upstream_raw = function()
return {}
end
},
upstreams = {
each = empty_each,
select = function() end,
},
}
kong.core_cache = {
_cache = {},
get = function(self, key, _, loader, arg)
local v = self._cache[key]
if v == nil then
v = loader(arg)
self._cache[key] = v
end
return v
end,
invalidate_local = function(self, key)
self._cache[key] = nil
end
}
end)
before_each(function()
setup_block()
assert(client.init {
hosts = {},
resolvConf = {
"nameserver 8.8.8.8"
},
})
snapshot = assert:snapshot()
end)
after_each(function()
snapshot:revert() -- undo any spying/stubbing etc.
unsetup_block()
collectgarbage()
collectgarbage()
end)
describe("new()", function()
it("inserts provided hosts", function()
dnsA({
{ name = "konghq.com", address = "1.2.3.4" },
})
dnsA({
{ name = "github.com", address = "1.2.3.4" },
})
dnsA({
{ name = "getkong.org", address = "1.2.3.4" },
})
local b = validate_lcb(new_balancer({
"konghq.com", -- name only, as string
{ name = "github.com" }, -- name only, as table
{ name = "getkong.org", port = 80, weight = 25 }, -- fully specified, as table
}))
assert.equal("konghq.com", b.targets[1].name)
assert.equal("github.com", b.targets[2].name)
assert.equal("getkong.org", b.targets[3].name)
end)
end)
describe("getPeer()", function()
it("honours weights", function()
dnsSRV({
{ name = "konghq.com", target = "20.20.20.20", port = 80, weight = 20 },
{ name = "konghq.com", target = "50.50.50.50", port = 80, weight = 50 },
})
local b = validate_lcb(new_balancer({ "konghq.com" }))
local counts = {}
local handles = {}
for i = 1,70 do
local ip, _, _, handle = b:getPeer()
counts[ip] = (counts[ip] or 0) + 1
t_insert(handles, handle) -- don't let them get GC'ed
end
validate_lcb(b)
assert.same({
["20.20.20.20"] = 20,
["50.50.50.50"] = 50
}, counts)
end)
it("first returns top weights, on a 0-connection balancer", function()
dnsSRV({
{ name = "konghq.com", target = "20.20.20.20", port = 80, weight = 20 },
{ name = "konghq.com", target = "50.50.50.50", port = 80, weight = 50 },
})
local b = validate_lcb(new_balancer({ "konghq.com" }))
local handles = {}
local ip, _, handle
-- first try
ip, _, _, handle= b:getPeer()
t_insert(handles, handle) -- don't let them get GC'ed
validate_lcb(b)
assert.equal("50.50.50.50", ip)
-- second try
ip, _, _, handle= b:getPeer()
t_insert(handles, handle) -- don't let them get GC'ed
validate_lcb(b)
assert.equal("50.50.50.50", ip)
-- third try
ip, _, _, handle= b:getPeer()
t_insert(handles, handle) -- don't let them get GC'ed
validate_lcb(b)
assert.equal("20.20.20.20", ip)
end)
it("doesn't use unavailable addresses", function()
dnsSRV({
{ name = "konghq.com", target = "20.20.20.20", port = 80, weight = 20 },
{ name = "konghq.com", target = "50.50.50.50", port = 80, weight = 50 },
})
local b = validate_lcb(new_balancer({ "konghq.com" }))
-- mark one as unavailable
b:setAddressStatus(b:findAddress("50.50.50.50", 80, "konghq.com"), false)
local counts = {}
local handles = {}
for i = 1,70 do
local ip, _, _, handle = assert(b:getPeer())
counts[ip] = (counts[ip] or 0) + 1
t_insert(handles, handle) -- don't let them get GC'ed
end
validate_lcb(b)
assert.same({
["20.20.20.20"] = 70,
["50.50.50.50"] = nil,
}, counts)
end)
it("uses reenabled (available) addresses again", function()
dnsSRV({
{ name = "konghq.com", target = "20.20.20.20", port = 80, weight = 20 },
{ name = "konghq.com", target = "50.50.50.50", port = 80, weight = 50 },
})
local b = validate_lcb(new_balancer({ "konghq.com" }))
-- mark one as unavailable
b:setAddressStatus(b:findAddress("20.20.20.20", 80, "konghq.com"), false)
local counts = {}
local handles = {}
for i = 1,70 do
local ip, _, _, handle = b:getPeer()
counts[ip] = (counts[ip] or 0) + 1
t_insert(handles, handle) -- don't let them get GC'ed
end
validate_lcb(b)
assert.same({
["20.20.20.20"] = nil,
["50.50.50.50"] = 70,
}, counts)
-- let's do another 70, after resetting
b:setAddressStatus(b:findAddress("20.20.20.20", 80, "konghq.com"), true)
for i = 1,70 do
local ip, _, _, handle = b:getPeer()
counts[ip] = (counts[ip] or 0) + 1
t_insert(handles, handle) -- don't let them get GC'ed
end
validate_lcb(b)
assert.same({
["20.20.20.20"] = 40,
["50.50.50.50"] = 100,
}, counts)
end)
end)
describe("retrying getPeer()", function()
it("does not return already failed addresses", function()
dnsSRV({
{ name = "konghq.com", target = "20.20.20.20", port = 80, weight = 20 },
{ name = "konghq.com", target = "50.50.50.50", port = 80, weight = 50 },
{ name = "konghq.com", target = "70.70.70.70", port = 80, weight = 70 },
})
local b = validate_lcb(new_balancer({ "konghq.com" }))
local tried = {}
local ip, _, handle
-- first try
ip, _, _, handle = b:getPeer()
tried[ip] = (tried[ip] or 0) + 1
validate_lcb(b)
-- 1st retry
ip, _, _, handle = b:getPeer(nil, handle)
assert.is_nil(tried[ip])
tried[ip] = (tried[ip] or 0) + 1
validate_lcb(b)
-- 2nd retry
ip, _, _, _ = b:getPeer(nil, handle)
assert.is_nil(tried[ip])
tried[ip] = (tried[ip] or 0) + 1
validate_lcb(b)
assert.same({
["20.20.20.20"] = 1,
["50.50.50.50"] = 1,
["70.70.70.70"] = 1,
}, tried)
end)
it("retries, after all adresses failed, restarts with previously failed ones", function()
dnsSRV({
{ name = "konghq.com", target = "20.20.20.20", port = 80, weight = 20 },
{ name = "konghq.com", target = "50.50.50.50", port = 80, weight = 50 },
{ name = "konghq.com", target = "70.70.70.70", port = 80, weight = 70 },
})
local b = validate_lcb(new_balancer({ "konghq.com" }))
local tried = {}
local ip, _, handle
for i = 1,6 do
ip, _, _, handle = b:getPeer(nil, handle)
tried[ip] = (tried[ip] or 0) + 1
validate_lcb(b)
end
assert.same({
["20.20.20.20"] = 2,
["50.50.50.50"] = 2,
["70.70.70.70"] = 2,
}, tried)
end)
it("releases the previous connection", function()
dnsSRV({
{ name = "konghq.com", target = "20.20.20.20", port = 80, weight = 20 },
{ name = "konghq.com", target = "50.50.50.50", port = 80, weight = 50 },
})
local b = validate_lcb(new_balancer({ "konghq.com" }))
local handle -- define outside loop, so it gets reused and released
for i = 1,70 do
local _
_, _, _, handle = b:getPeer(nil, handle)
end
validate_lcb(b)
local ccount = 0
for _, target in ipairs(b.targets) do
for _, addr in ipairs(target.addresses) do
ccount = ccount + addr.connectionCount
end
end
assert.equal(1, ccount)
end)
end)
describe("release()", function()
it("releases a connection", function()
dnsSRV({
{ name = "konghq.com", target = "20.20.20.20", port = 80, weight = 20 },
})
local b = validate_lcb(new_balancer({ "konghq.com" }))
local ip, _, _, handle = b:getPeer()
assert.equal("20.20.20.20", ip)
assert.equal(1, b.targets[1].addresses[1].connectionCount)
handle:release()
assert.equal(0, b.targets[1].addresses[1].connectionCount)
end)
it("releases connection of already disabled/removed address", function()
dnsSRV({
{ name = "konghq.com", target = "20.20.20.20", port = 80, weight = 20 },
})
local b = validate_lcb(new_balancer({ "konghq.com" }))
local ip, _, _, handle = b:getPeer()
assert.equal("20.20.20.20", ip)
assert.equal(1, b.targets[1].addresses[1].connectionCount)
-- remove the host and its addresses
table.remove(b.targets)
assert.equal(0, #b.targets)
local addr = handle.address
handle:release()
assert.equal(0, addr.connectionCount)
end)
end)
end)