kong/spec/02-integration/05-proxy/28-stream_plugins_triggering_spec.lua (233 lines of code) (raw):

local helpers = require "spec.helpers" local pl_file = require "pl.file" local cjson = require "cjson" local TEST_CONF = helpers.test_conf local MESSAGE = "echo, ping, pong. echo, ping, pong. echo, ping, pong.\n" local function find_in_file(pat, cnt) local f = assert(io.open(TEST_CONF.prefix .. "/" .. TEST_CONF.proxy_error_log, "r")) local line = f:read("*l") local count = 0 while line do if line:match(pat) then count = count + 1 end line = f:read("*l") end return cnt == -1 and count >= 1 or count == cnt end local function wait() -- wait for the second log phase to finish, otherwise it might not appear -- in the logs when executing this helpers.wait_until(function() local logs = pl_file.read(TEST_CONF.prefix .. "/" .. TEST_CONF.proxy_error_log) local _, count = logs:gsub("%[logger%] log phase", "") return count >= 1 end, 10) end -- Phases and counters for unary stream requests local phases = { ["%[logger%] init_worker phase"] = 1, ["%[logger%] preread phase"] = 1, ["%[logger%] log phase"] = 1, } local phases_2 = { ["%[logger%] init_worker phase"] = 1, ["%[logger%] preread phase"] = 0, ["%[logger%] log phase"] = 1, } local phases_tls = { ["%[logger%] init_worker phase"] = 1, ["%[logger%] certificate phase"] = 1, ["%[logger%] preread phase"] = 1, ["%[logger%] log phase"] = 1, } local phases_tls_2 = { ["%[logger%] init_worker phase"] = 1, ["%[logger%] certificate phase"] = 1, ["%[logger%] preread phase"] = 0, ["%[logger%] log phase"] = 1, } local function assert_phases(phrases) for phase, count in pairs(phrases) do assert(find_in_file(phase, count)) end end for _, strategy in helpers.each_strategy() do describe("#stream Proxying [#" .. strategy .. "]", function() local bp before_each(function() bp = helpers.get_db_utils(strategy, { "routes", "services", "plugins", }, { "logger", }) local tcp_srv = bp.services:insert({ name = "tcp", host = helpers.mock_upstream_host, port = helpers.mock_upstream_stream_port, protocol = "tcp" }) bp.routes:insert { destinations = { { port = 19000, }, }, protocols = { "tcp", }, service = tcp_srv, } local tls_srv = bp.services:insert({ name = "tls", host = helpers.mock_upstream_host, port = helpers.mock_upstream_stream_ssl_port, protocol = "tls" }) bp.routes:insert { destinations = { { port = 19443, }, }, protocols = { "tls", }, service = tls_srv, } bp.plugins:insert { name = "logger", } assert(helpers.start_kong({ database = strategy, nginx_conf = "spec/fixtures/custom_nginx.template", plugins = "logger", proxy_listen = "off", admin_listen = "off", stream_listen = helpers.get_proxy_ip(false) .. ":19000," .. helpers.get_proxy_ip(false) .. ":19443 ssl" })) end) after_each(function() helpers.stop_kong() end) it("tcp", function() local tcp = ngx.socket.tcp() assert(tcp:connect(helpers.get_proxy_ip(false), 19000)) assert(tcp:send(MESSAGE)) local body = assert(tcp:receive("*a")) assert.equal(MESSAGE, body) tcp:close() wait() assert_phases(phases) end) it("tls", function() local tcp = ngx.socket.tcp() assert(tcp:connect(helpers.get_proxy_ip(true), 19443)) assert(tcp:sslhandshake(nil, nil, false)) assert(tcp:send(MESSAGE)) local body = assert(tcp:receive("*a")) assert.equal(MESSAGE, body) tcp:close() wait() assert_phases(phases_tls) end) end) describe("#stream Proxying [#" .. strategy .. "]", function() local bp before_each(function() bp = helpers.get_db_utils(strategy, { "routes", "services", "plugins", }, { "logger", "short-circuit", }) local tcp_srv = bp.services:insert({ name = "tcp", host = helpers.mock_upstream_host, port = helpers.mock_upstream_stream_port, protocol = "tcp" }) bp.routes:insert { destinations = { { port = 19000, }, }, protocols = { "tcp", }, service = tcp_srv, } local tls_srv = bp.services:insert({ name = "tls", host = helpers.mock_upstream_host, port = helpers.mock_upstream_stream_ssl_port, protocol = "tls" }) bp.routes:insert { destinations = { { port = 19443, }, }, protocols = { "tls", }, service = tls_srv, } bp.plugins:insert { name = "logger", } bp.plugins:insert { name = "short-circuit", config = { status = 200, message = "plugin executed" }, } assert(helpers.start_kong({ database = strategy, nginx_conf = "spec/fixtures/custom_nginx.template", plugins = "logger,short-circuit", proxy_listen = "off", admin_listen = "off", stream_listen = helpers.get_proxy_ip(false) .. ":19000," .. helpers.get_proxy_ip(false) .. ":19443 ssl" })) end) after_each(function() helpers.stop_kong() end) it("tcp (short-circuited)", function() local tcp = ngx.socket.tcp() assert(tcp:connect(helpers.get_proxy_ip(false), 19000)) local body = assert(tcp:receive("*a")) tcp:close() local json = cjson.decode(body) assert.same({ init_worker_called = true, message = "plugin executed", status = 200 }, json) wait() assert_phases(phases_2) end) it("tls (short-circuited)", function() local tcp = ngx.socket.tcp() assert(tcp:connect(helpers.get_proxy_ip(true), 19443)) assert(tcp:sslhandshake(nil, nil, false)) local body = assert(tcp:receive("*a")) tcp:close() local json = assert(cjson.decode(body)) assert.same({ init_worker_called = true, message = "plugin executed", status = 200 }, json) wait() assert_phases(phases_tls_2) end) end) end