kong/spec/fixtures/balancer_utils.lua (523 lines of code) (raw):
local cjson = require "cjson"
local declarative = require "kong.db.declarative"
local helpers = require "spec.helpers"
local utils = require "kong.tools.utils"
local https_server = require "spec.fixtures.https_server"
local CONSISTENCY_FREQ = 0.1
local FIRST_PORT = 20000
local HEALTHCHECK_INTERVAL = 0.01
local SLOTS = 10
local TEST_LOG = false -- extra verbose logging
local healthchecks_defaults = {
active = {
timeout = 1,
concurrency = 10,
http_path = "/",
healthy = {
interval = 0, -- 0 = disabled by default
http_statuses = { 200, 302 },
successes = 2,
},
unhealthy = {
interval = 0, -- 0 = disabled by default
http_statuses = { 429, 404,
500, 501, 502, 503, 504, 505 },
tcp_failures = 2,
timeouts = 3,
http_failures = 5,
},
},
passive = {
healthy = {
http_statuses = { 200, 201, 202, 203, 204, 205, 206, 207, 208, 226,
300, 301, 302, 303, 304, 305, 306, 307, 308 },
successes = 5,
},
unhealthy = {
http_statuses = { 429, 500, 503 },
tcp_failures = 2,
timeouts = 7,
http_failures = 5,
},
},
}
local prefix = ""
local function healthchecks_config(config)
return utils.deep_merge(healthchecks_defaults, config)
end
local function direct_request(host, port, path, protocol, host_header)
local pok, client = pcall(helpers.http_client, {
host = host,
port = port,
scheme = protocol,
})
if not pok then
return nil, "pcall: " .. client .. " : " .. host ..":"..port
end
if not client then
return nil, "client"
end
local res, err = client:send {
method = "GET",
path = path,
headers = { ["Host"] = host_header or host }
}
local body = res and res:read_body()
client:close()
if err then
return nil, err
end
return body
end
local function post_target_endpoint(upstream_id, host, port, endpoint)
if host == "[::1]" then
host = "[0000:0000:0000:0000:0000:0000:0000:0001]"
end
local path = "/upstreams/" .. upstream_id
.. "/targets/"
.. utils.format_host(host, port)
.. "/" .. endpoint
local api_client = helpers.admin_client()
local res, err = assert(api_client:post(prefix .. path, {
headers = {
["Content-Type"] = "application/json",
},
body = {},
}))
api_client:close()
return res, err
end
local function client_requests(n, host_or_headers, proxy_host, proxy_port, protocol, uri)
local oks, fails = 0, 0
local last_status
for _ = 1, n do
local client
if proxy_host and proxy_port then
client = helpers.http_client({
host = proxy_host,
port = proxy_port,
scheme = protocol,
})
else
if protocol == "https" then
client = helpers.proxy_ssl_client()
else
client = helpers.proxy_client()
end
end
local res = client:send {
method = "GET",
path = uri or "/",
headers = type(host_or_headers) == "string"
and { ["Host"] = host_or_headers }
or host_or_headers
or {}
}
if not res then
fails = fails + 1
if TEST_LOG then
print("FAIL (no body)")
end
elseif res.status == 200 then
oks = oks + 1
if TEST_LOG then
print("OK ", res.status, res:read_body())
end
elseif res.status > 399 then
fails = fails + 1
if TEST_LOG then
print("FAIL ", res.status, res:read_body())
end
end
last_status = res and res.status
client:close()
end
return oks, fails, last_status
end
local add_upstream
local remove_upstream
local patch_upstream
local get_upstream
local get_upstream_health
local get_balancer_health
local put_target_address_health
local get_router_version
local add_target
local update_target
local add_api
local patch_api
local gen_port
local gen_multi_host
local invalidate_router
do
local gen_sym
do
local sym = 0
gen_sym = function(name)
sym = sym + 1
return name .. "_" .. sym
end
end
local function api_send(method, path, body, forced_port)
local api_client = helpers.admin_client(nil, forced_port)
local res, err = api_client:send({
method = method,
path = prefix .. path,
headers = {
["Content-Type"] = "application/json"
},
body = body,
})
if not res then
api_client:close()
return nil, err
end
local res_body = res.status ~= 204 and cjson.decode((res:read_body()))
api_client:close()
return res.status, res_body
end
add_upstream = function(bp, data)
local upstream_id = utils.uuid()
local req = utils.deep_copy(data) or {}
local upstream_name = req.name or gen_sym("upstream")
req.name = upstream_name
req.slots = req.slots or SLOTS
req.id = upstream_id
bp.upstreams:insert(req)
return upstream_name, upstream_id
end
remove_upstream = function(bp, upstream_id)
bp.upstreams:remove({ id = upstream_id })
end
patch_upstream = function(upstream_id, data)
local res = api_send("PATCH", "/upstreams/" .. upstream_id, data)
assert(res == 200)
end
get_upstream = function(upstream_id, forced_port)
local path = "/upstreams/" .. upstream_id
local status, body = api_send("GET", path, nil, forced_port)
if status == 200 then
return body
end
end
get_upstream_health = function(upstream_id, forced_port)
local path = "/upstreams/" .. upstream_id .."/health"
local status, body = api_send("GET", path, nil, forced_port)
if status == 200 then
return body
end
end
get_balancer_health = function(upstream_id, forced_port)
local path = "/upstreams/" .. upstream_id .."/health?balancer_health=1"
local status, body = api_send("GET", path, nil, forced_port)
if status == 200 then
return body
end
end
put_target_address_health = function(upstream_id, target_id, address, mode, forced_port)
local path = "/upstreams/" .. upstream_id .. "/targets/" .. target_id .. "/" .. address .. "/" .. mode
return api_send("PUT", path, {}, forced_port)
end
get_router_version = function(forced_port)
local path = "/cache/router:version"
local status, body = api_send("GET", path, nil, forced_port)
if status == 200 then
return body.message
end
end
invalidate_router = function(forced_port)
local path = "/cache/router:version"
local status, body = api_send("DELETE", path, nil, forced_port)
if status == 204 then
return true
end
return nil, body
end
do
local os_name
do
local pd = io.popen("uname -s")
os_name = pd:read("*l")
pd:close()
end
local function port_in_use(port)
if os_name ~= "Linux" then
return false
end
return os.execute("netstat -n | grep -q -w " .. port)
end
local port = FIRST_PORT
gen_port = function()
repeat
port = port + 1
until not port_in_use(port)
return port
end
end
do
local host_num = 0
gen_multi_host = function()
host_num = host_num + 1
return "multiple-hosts-" .. tostring(host_num) .. ".test"
end
end
add_target = function(bp, upstream_id, host, port, data)
port = port or gen_port()
local req = utils.deep_copy(data) or {}
if host == "[::1]" then
host = "[0000:0000:0000:0000:0000:0000:0000:0001]"
end
req.target = req.target or utils.format_host(host, port)
req.weight = req.weight or 10
req.upstream = { id = upstream_id }
local new_target = bp.targets:insert(req)
return port, new_target
end
update_target = function(bp, upstream_id, host, port, data)
local req = utils.deep_copy(data) or {}
if host == "[::1]" then
host = "[0000:0000:0000:0000:0000:0000:0000:0001]"
end
req.target = req.target or utils.format_host(host, port)
req.weight = req.weight or 10
req.upstream = { id = upstream_id }
bp.targets:update(req.id or req.target, req)
end
add_api = function(bp, upstream_name, opts)
opts = opts or {}
local route_id = utils.uuid()
local service_id = utils.uuid()
local route_host = gen_sym("host")
local sproto = opts.service_protocol or opts.route_protocol or "http"
local rproto = opts.route_protocol or "http"
local rpaths = {
"/",
"~/(?<namespace>[^/]+)/(?<id>[0-9]+)/?", -- uri capture hash value
}
bp.services:insert({
id = service_id,
url = sproto .. "://" .. upstream_name .. ":" .. (rproto == "tcp" and 9100 or 80),
read_timeout = opts.read_timeout,
write_timeout = opts.write_timeout,
connect_timeout = opts.connect_timeout,
retries = opts.retries,
protocol = sproto,
})
bp.routes:insert({
id = route_id,
service = { id = service_id },
protocols = { rproto },
hosts = rproto ~= "tcp" and { route_host } or nil,
destinations = (rproto == "tcp") and {{ port = 9100 }} or nil,
paths = rproto ~= "tcp" and rpaths or nil,
})
bp.plugins:insert({
name = "post-function",
service = { id = service_id },
config = {
header_filter = {[[
local value = ngx.ctx and
ngx.ctx.balancer_data and
ngx.ctx.balancer_data.hash_value
if value == "" or value == nil then
value = "NONE"
end
ngx.header["x-balancer-hash-value"] = value
ngx.header["x-uri"] = ngx.var.request_uri
]]},
},
})
return route_host, service_id, route_id
end
patch_api = function(bp, service_id, new_upstream, read_timeout)
bp.services:update(service_id, {
url = new_upstream,
read_timeout = read_timeout,
})
end
end
local poll_wait_health
local poll_wait_address_health
do
local function poll_wait(upstream_id, host, port, admin_port, fn)
if host == "[::1]" then
host = "[0000:0000:0000:0000:0000:0000:0000:0001]"
end
local hard_timeout = ngx.now() + 70
while ngx.now() < hard_timeout do
local health = get_upstream_health(upstream_id, admin_port)
if health then
for _, d in ipairs(health.data) do
if d.target == host .. ":" .. port and fn(d) then
return true
end
end
end
ngx.sleep(0.1) -- poll-wait
end
return false
end
poll_wait_health = function(upstream_id, host, port, value, admin_port)
local ok = poll_wait(upstream_id, host, port, admin_port, function(d)
return d.health == value
end)
if ok then
return true
end
assert(false, "timed out waiting for " .. host .. ":" .. port .. " in " ..
upstream_id .. " to become " .. value)
end
poll_wait_address_health = function(upstream_id, host, port, address_host, address_port, value)
local ok = poll_wait(upstream_id, host, port, nil, function(d)
for _, ad in ipairs(d.data.addresses) do
if ad.ip == address_host
and ad.port == address_port
and ad.health == value then
return true
end
end
end)
if ok then
return true
end
assert(false, "timed out waiting for " .. address_host .. ":" .. address_port .. " in " ..
upstream_id .. " to become " .. value)
end
end
local function wait_for_router_update(bp, old_rv, localhost, proxy_port, admin_port)
-- add dummy upstream just to rebuild router
local dummy_upstream_name, dummy_upstream_id = add_upstream(bp)
local dummy_port = add_target(bp, dummy_upstream_id, localhost)
local dummy_api_host = add_api(bp, dummy_upstream_name)
local dummy_server = https_server.new(dummy_port, localhost)
dummy_server:start()
-- forces the router to be rebuild, reduces the flakiness of the test suite
-- TODO: find out what's wrong with router invalidation in the particular
-- test setup causing the flakiness
assert(invalidate_router(admin_port))
helpers.wait_until(function()
client_requests(1, dummy_api_host, "127.0.0.1", proxy_port)
local rv = get_router_version(admin_port)
return rv ~= old_rv
end, 5)
dummy_server:shutdown()
end
local function tcp_client_requests(nreqs, host, port)
local fails, ok1, ok2 = 0, 0, 0
for _ = 1, nreqs do
local sock = ngx.socket.tcp()
assert(sock:connect(host, port))
assert(sock:send("hello\n"))
local response, err = sock:receive()
if err then
fails = fails + 1
elseif response:match("^1 ") then
ok1 = ok1 + 1
elseif response:match("^2 ") then
ok2 = ok2 + 1
end
end
return ok1, ok2, fails
end
local function begin_testcase_setup(strategy, bp)
if strategy == "off" then
bp.done()
end
end
local function begin_testcase_setup_update(strategy, bp)
if strategy == "off" then
bp.reset_back()
end
end
local function end_testcase_setup(strategy, bp, consistency)
if strategy == "off" then
local cfg = bp.done()
local yaml = declarative.to_yaml_string(cfg)
local admin_client = helpers.admin_client()
local res = assert(admin_client:send {
method = "POST",
path = "/config",
body = {
config = yaml,
},
headers = {
["Content-Type"] = "multipart/form-data",
}
})
assert(res ~= nil)
assert(res.status == 201)
admin_client:close()
end
if consistency == "eventual" then
ngx.sleep(CONSISTENCY_FREQ*2) -- wait for proxy state consistency timer
end
end
local function get_db_utils_for_dc_and_admin_api(strategy, tables)
local bp = assert(helpers.get_db_utils(strategy, tables))
if strategy ~= "off" then
bp = require("spec.fixtures.admin_api")
end
return bp
end
local function setup_prefix(p)
prefix = p
local bp = require("spec.fixtures.admin_api")
bp.set_prefix(prefix)
end
local function teardown_prefix()
prefix = ""
local bp = require("spec.fixtures.admin_api")
bp.set_prefix(prefix)
end
local function test_with_prefixes(itt, strategy, prefixes)
return function(description, fn)
if strategy == "off" then
itt(description, fn)
return
end
for _, name in ipairs(prefixes) do
itt(name .. ": " .. description, function()
setup_prefix("/" .. name)
local ok = fn()
teardown_prefix()
return ok
end)
end
end
end
local localhosts = {
ipv4 = "127.0.0.1",
ipv6 = "[::1]",
hostname = "localhost",
}
local consistencies = {"strict", "eventual"}
local balancer_utils = {}
--balancer_utils.
balancer_utils.add_api = add_api
balancer_utils.add_target = add_target
balancer_utils.update_target = update_target
balancer_utils.add_upstream = add_upstream
balancer_utils.remove_upstream = remove_upstream
balancer_utils.begin_testcase_setup = begin_testcase_setup
balancer_utils.begin_testcase_setup_update = begin_testcase_setup_update
balancer_utils.client_requests = client_requests
balancer_utils.consistencies = consistencies
balancer_utils.CONSISTENCY_FREQ = CONSISTENCY_FREQ
balancer_utils.direct_request = direct_request
balancer_utils.end_testcase_setup = end_testcase_setup
balancer_utils.gen_multi_host = gen_multi_host
balancer_utils.gen_port = gen_port
balancer_utils.get_balancer_health = get_balancer_health
balancer_utils.get_db_utils_for_dc_and_admin_api = get_db_utils_for_dc_and_admin_api
balancer_utils.get_router_version = get_router_version
balancer_utils.get_upstream = get_upstream
balancer_utils.get_upstream_health = get_upstream_health
balancer_utils.healthchecks_config = healthchecks_config
balancer_utils.HEALTHCHECK_INTERVAL = HEALTHCHECK_INTERVAL
balancer_utils.localhosts = localhosts
balancer_utils.patch_api = patch_api
balancer_utils.patch_upstream = patch_upstream
balancer_utils.poll_wait_address_health = poll_wait_address_health
balancer_utils.poll_wait_health = poll_wait_health
balancer_utils.put_target_address_health = put_target_address_health
balancer_utils.post_target_endpoint = post_target_endpoint
balancer_utils.SLOTS = SLOTS
balancer_utils.tcp_client_requests = tcp_client_requests
balancer_utils.wait_for_router_update = wait_for_router_update
balancer_utils.test_with_prefixes = test_with_prefixes
return balancer_utils