kong/kong/init.lua (1,172 lines of code) (raw):
-- Kong, the biggest ape in town
--
-- /\ ____
-- <> ( oo )
-- <>_| ^^ |_
-- <> @ \
-- /~~\ . . _ |
-- /~~~~\ | |
-- /~~~~~~\/ _| |
-- |[][][]/ / [m]
-- |[][][[m]
-- |[][][]|
-- |[][][]|
-- |[][][]|
-- |[][][]|
-- |[][][]|
-- |[][][]|
-- |[][][]|
-- |[][][]|
-- |[|--|]|
-- |[| |]|
-- ========
-- ==========
-- |[[ ]]|
-- ==========
local pcall = pcall
pcall(require, "luarocks.loader")
assert(package.loaded["resty.core"], "lua-resty-core must be loaded; make " ..
"sure 'lua_load_resty_core' is not "..
"disabled.")
local constants = require "kong.constants"
do
-- let's ensure the required shared dictionaries are
-- declared via lua_shared_dict in the Nginx conf
for _, dict in ipairs(constants.DICTS) do
if not ngx.shared[dict] then
return error("missing shared dict '" .. dict .. "' in Nginx " ..
"configuration, are you using a custom template? " ..
"Make sure the 'lua_shared_dict " .. dict .. " [SIZE];' " ..
"directive is defined.")
end
end
-- if we're running `nginx -t` then don't initialize
if os.getenv("KONG_NGINX_CONF_CHECK") then
return {
init = function() end,
}
end
end
require("kong.globalpatches")()
local kong_global = require "kong.global"
local PHASES = kong_global.phases
_G.kong = kong_global.new() -- no versioned PDK for plugins for now
local DB = require "kong.db"
local dns = require "kong.tools.dns"
local meta = require "kong.meta"
local lapis = require "lapis"
local runloop = require "kong.runloop.handler"
local stream_api = require "kong.tools.stream_api"
local declarative = require "kong.db.declarative"
local ngx_balancer = require "ngx.balancer"
local kong_resty_ctx = require "kong.resty.ctx"
local certificate = require "kong.runloop.certificate"
local concurrency = require "kong.concurrency"
local cache_warmup = require "kong.cache.warmup"
local balancer = require "kong.runloop.balancer"
local kong_error_handlers = require "kong.error_handlers"
local migrations_utils = require "kong.cmd.utils.migrations"
local plugin_servers = require "kong.runloop.plugin_servers"
local lmdb_txn = require "resty.lmdb.transaction"
local instrumentation = require "kong.tracing.instrumentation"
local tablepool = require "tablepool"
local get_ctx_table = require("resty.core.ctx").get_ctx_table
local kong = kong
local ngx = ngx
local now = ngx.now
local update_time = ngx.update_time
local var = ngx.var
local arg = ngx.arg
local header = ngx.header
local ngx_log = ngx.log
local ngx_ALERT = ngx.ALERT
local ngx_CRIT = ngx.CRIT
local ngx_ERR = ngx.ERR
local ngx_WARN = ngx.WARN
local ngx_NOTICE = ngx.NOTICE
local ngx_INFO = ngx.INFO
local ngx_DEBUG = ngx.DEBUG
local is_http_module = ngx.config.subsystem == "http"
local is_stream_module = ngx.config.subsystem == "stream"
local start_time = ngx.req.start_time
local type = type
local error = error
local ipairs = ipairs
local assert = assert
local tostring = tostring
local coroutine = coroutine
local fetch_table = tablepool.fetch
local release_table = tablepool.release
local get_last_failure = ngx_balancer.get_last_failure
local set_current_peer = ngx_balancer.set_current_peer
local set_timeouts = ngx_balancer.set_timeouts
local set_more_tries = ngx_balancer.set_more_tries
local enable_keepalive = ngx_balancer.enable_keepalive
local DECLARATIVE_LOAD_KEY = constants.DECLARATIVE_LOAD_KEY
local CTX_NS = "ctx"
local CTX_NARR = 0
local CTX_NREC = 50 -- normally Kong has ~32 keys in ctx
local declarative_entities
local declarative_meta
local schema_state
local stash_init_worker_error
local log_init_worker_errors
do
local init_worker_errors
local init_worker_errors_str
local ctx_k = {}
stash_init_worker_error = function(err)
if err == nil then
return
end
err = tostring(err)
if not init_worker_errors then
init_worker_errors = {}
end
table.insert(init_worker_errors, err)
init_worker_errors_str = table.concat(init_worker_errors, ", ")
return ngx_log(ngx_CRIT, "worker initialization error: ", err,
"; this node must be restarted")
end
log_init_worker_errors = function(ctx)
if not init_worker_errors_str or ctx[ctx_k] then
return
end
ctx[ctx_k] = true
return ngx_log(ngx_ALERT, "unsafe request processing due to earlier ",
"initialization errors; this node must be ",
"restarted (", init_worker_errors_str, ")")
end
end
local reset_kong_shm
do
local preserve_keys = {
"kong:node_id",
"events:requests",
"events:requests:http",
"events:requests:https",
"events:requests:h2c",
"events:requests:h2",
"events:requests:grpc",
"events:requests:grpcs",
"events:requests:ws",
"events:requests:wss",
"events:requests:go_plugins",
"events:streams",
"events:streams:tcp",
"events:streams:tls",
}
reset_kong_shm = function(config)
local kong_shm = ngx.shared.kong
local dbless = config.database == "off"
local preserved = {}
if dbless then
if not (config.declarative_config or config.declarative_config_string) then
preserved[DECLARATIVE_LOAD_KEY] = kong_shm:get(DECLARATIVE_LOAD_KEY)
end
end
for _, key in ipairs(preserve_keys) do
preserved[key] = kong_shm:get(key) -- ignore errors
end
kong_shm:flush_all()
for key, value in pairs(preserved) do
kong_shm:set(key, value)
end
kong_shm:flush_expired(0)
end
end
local function setup_plugin_context(ctx, plugin)
if plugin.handler._go then
ctx.ran_go_plugin = true
end
kong_global.set_named_ctx(kong, "plugin", plugin.handler, ctx)
kong_global.set_namespaced_log(kong, plugin.name, ctx)
end
local function reset_plugin_context(ctx, old_ws)
kong_global.reset_log(kong, ctx)
if old_ws then
ctx.workspace = old_ws
end
end
local function execute_init_worker_plugins_iterator(plugins_iterator, ctx)
local errors
for plugin in plugins_iterator:iterate_init_worker() do
kong_global.set_namespaced_log(kong, plugin.name, ctx)
-- guard against failed handler in "init_worker" phase only because it will
-- cause Kong to not correctly initialize and can not be recovered automatically.
local ok, err = pcall(plugin.handler.init_worker, plugin.handler)
if not ok then
errors = errors or {}
errors[#errors + 1] = {
plugin = plugin.name,
err = err,
}
end
kong_global.reset_log(kong, ctx)
end
return errors
end
local function execute_collecting_plugins_iterator(plugins_iterator, phase, ctx)
local old_ws = ctx.workspace
ctx.delay_response = true
for plugin, configuration in plugins_iterator:iterate(phase, ctx) do
if not ctx.delayed_response then
local span
if phase == "access" then
span = instrumentation.plugin_access(plugin)
end
setup_plugin_context(ctx, plugin)
local co = coroutine.create(plugin.handler[phase])
local cok, cerr = coroutine.resume(co, plugin.handler, configuration)
if not cok then
-- set tracing error
if span then
span:record_error(cerr)
span:set_status(2)
end
kong.log.err(cerr)
ctx.delayed_response = {
status_code = 500,
content = { message = "An unexpected error occurred" },
}
-- plugin that throws runtime exception should be marked as `error`
ctx.KONG_UNEXPECTED = true
end
reset_plugin_context(ctx, old_ws)
-- ends tracing span
if span then
span:finish()
end
end
end
ctx.delay_response = nil
end
local function execute_plugins_iterator(plugins_iterator, phase, ctx)
local old_ws = ctx.workspace
for plugin, configuration in plugins_iterator:iterate(phase, ctx) do
local span
if phase == "rewrite" then
span = instrumentation.plugin_rewrite(plugin)
end
setup_plugin_context(ctx, plugin)
plugin.handler[phase](plugin.handler, configuration)
reset_plugin_context(ctx, old_ws)
if span then
span:finish()
end
end
end
local function execute_collected_plugins_iterator(plugins_iterator, phase, ctx)
local old_ws = ctx.workspace
for plugin, configuration in plugins_iterator.iterate_collected_plugins(phase, ctx) do
local span
if phase == "header_filter" then
span = instrumentation.plugin_header_filter(plugin)
end
setup_plugin_context(ctx, plugin)
plugin.handler[phase](plugin.handler, configuration)
reset_plugin_context(ctx, old_ws)
if span then
span:finish()
end
end
end
local function execute_cache_warmup(kong_config)
if kong_config.database == "off" then
return true
end
if ngx.worker.id() == 0 then
local ok, err = cache_warmup.execute(kong_config.db_cache_warmup_entities)
if not ok then
return nil, err
end
end
return true
end
local function get_updated_now_ms()
update_time()
return now() * 1000 -- time is kept in seconds with millisecond resolution.
end
local function flush_delayed_response(ctx)
ctx.delay_response = nil
ctx.buffered_proxying = nil
if type(ctx.delayed_response_callback) == "function" then
ctx.delayed_response_callback(ctx)
return -- avoid tail call
end
kong.response.exit(ctx.delayed_response.status_code,
ctx.delayed_response.content,
ctx.delayed_response.headers)
end
local function has_declarative_config(kong_config)
return kong_config.declarative_config or kong_config.declarative_config_string
end
local function parse_declarative_config(kong_config)
local dc = declarative.new_config(kong_config)
if not has_declarative_config(kong_config) then
-- return an empty configuration,
-- including only the default workspace
local entities, _, _, meta = dc:parse_table({ _format_version = "2.1" })
return entities, nil, meta
end
local entities, err, _, meta
if kong_config.declarative_config ~= nil then
entities, err, _, meta = dc:parse_file(kong_config.declarative_config)
elseif kong_config.declarative_config_string ~= nil then
entities, err, _, meta = dc:parse_string(kong_config.declarative_config_string)
end
if not entities then
if kong_config.declarative_config ~= nil then
return nil, "error parsing declarative config file " ..
kong_config.declarative_config .. ":\n" .. err
elseif kong_config.declarative_config_string ~= nil then
return nil, "error parsing declarative string " ..
kong_config.declarative_config_string .. ":\n" .. err
end
end
return entities, nil, meta
end
local function declarative_init_build()
local default_ws = kong.db.workspaces:select_by_name("default")
kong.default_workspace = default_ws and default_ws.id or kong.default_workspace
local ok, err = runloop.build_plugins_iterator("init")
if not ok then
return nil, "error building initial plugins iterator: " .. err
end
ok, err = runloop.build_router("init")
if not ok then
return nil, "error building initial router: " .. err
end
return true
end
local function load_declarative_config(kong_config, entities, meta)
local opts = {
name = "declarative_config",
}
local kong_shm = ngx.shared.kong
local ok, err = concurrency.with_worker_mutex(opts, function()
local value = kong_shm:get(DECLARATIVE_LOAD_KEY)
if value then
return true
end
local ok, err = declarative.load_into_cache(entities, meta)
if not ok then
return nil, err
end
if kong_config.declarative_config then
kong.log.notice("declarative config loaded from ",
kong_config.declarative_config)
end
ok, err = kong_shm:safe_set(DECLARATIVE_LOAD_KEY, true)
if not ok then
kong.log.warn("failed marking declarative_config as loaded: ", err)
end
return true
end)
if ok then
return declarative_init_build()
end
return nil, err
end
local function list_migrations(migtable)
local list = {}
for _, t in ipairs(migtable) do
local mignames = {}
for _, mig in ipairs(t.migrations) do
table.insert(mignames, mig.name)
end
table.insert(list, string.format("%s (%s)", t.subsystem,
table.concat(mignames, ", ")))
end
return table.concat(list, " ")
end
-- Kong public context handlers.
-- @section kong_handlers
local Kong = {}
function Kong.init()
local pl_path = require "pl.path"
local conf_loader = require "kong.conf_loader"
-- check if kong global is the correct one
if not kong.version then
error("configuration error: make sure your template is not setting a " ..
"global named 'kong' (please use 'Kong' instead)")
end
-- retrieve kong_config
local conf_path = pl_path.join(ngx.config.prefix(), ".kong_env")
local config = assert(conf_loader(conf_path, nil, { from_kong_env = true }))
reset_kong_shm(config)
-- special math.randomseed from kong.globalpatches not taking any argument.
-- Must only be called in the init or init_worker phases, to avoid
-- duplicated seeds.
math.randomseed()
kong_global.init_pdk(kong, config)
instrumentation.init(config)
local db = assert(DB.new(config))
instrumentation.db_query(db.connector)
assert(db:init_connector())
schema_state = assert(db:schema_state())
migrations_utils.check_state(schema_state)
if schema_state.missing_migrations or schema_state.pending_migrations then
if schema_state.missing_migrations then
ngx_log(ngx_WARN, "database is missing some migrations:\n",
schema_state.missing_migrations)
end
if schema_state.pending_migrations then
ngx_log(ngx_WARN, "database has pending migrations:\n",
schema_state.pending_migrations)
end
end
assert(db:connect())
kong.db = db
kong.dns = dns(config)
if config.proxy_ssl_enabled or config.stream_ssl_enabled then
certificate.init()
end
if is_http_module and (config.role == "data_plane" or config.role == "control_plane")
then
kong.clustering = require("kong.clustering").new(config)
end
assert(db.vaults:load_vault_schemas(config.loaded_vaults))
-- Load plugins as late as possible so that everything is set up
assert(db.plugins:load_plugin_schemas(config.loaded_plugins))
if is_stream_module then
stream_api.load_handlers()
end
if config.database == "off" then
if is_http_module or
(#config.proxy_listeners == 0 and
#config.admin_listeners == 0 and
#config.status_listeners == 0)
then
local err
declarative_entities, err, declarative_meta = parse_declarative_config(kong.configuration)
if not declarative_entities then
error(err)
end
end
else
local default_ws = db.workspaces:select_by_name("default")
kong.default_workspace = default_ws and default_ws.id
local ok, err = runloop.build_plugins_iterator("init")
if not ok then
error("error building initial plugins: " .. tostring(err))
end
if config.role ~= "control_plane" then
assert(runloop.build_router("init"))
ok, err = runloop.set_init_versions_in_cache()
if not ok then
error("error setting initial versions for router and plugins iterator in cache: " ..
tostring(err))
end
end
end
db:close()
require("resty.kong.var").patch_metatable()
end
function Kong.init_worker()
local ctx = ngx.ctx
ctx.KONG_PHASE = PHASES.init_worker
-- special math.randomseed from kong.globalpatches not taking any argument.
-- Must only be called in the init or init_worker phases, to avoid
-- duplicated seeds.
math.randomseed()
-- setup timerng to _G.kong
kong.timer = _G.timerng
_G.timerng = nil
kong.timer:set_debug(kong.configuration.log_level == "debug")
kong.timer:start()
-- init DB
local ok, err = kong.db:init_worker()
if not ok then
stash_init_worker_error("failed to instantiate 'kong.db' module: " .. err)
return
end
if ngx.worker.id() == 0 then
if schema_state.missing_migrations then
ngx_log(ngx_WARN, "missing migrations: ",
list_migrations(schema_state.missing_migrations))
end
if schema_state.pending_migrations then
ngx_log(ngx_INFO, "starting with pending migrations: ",
list_migrations(schema_state.pending_migrations))
end
end
local worker_events, err = kong_global.init_worker_events()
if not worker_events then
stash_init_worker_error("failed to instantiate 'kong.worker_events' " ..
"module: " .. err)
return
end
kong.worker_events = worker_events
local cluster_events, err = kong_global.init_cluster_events(kong.configuration, kong.db)
if not cluster_events then
stash_init_worker_error("failed to instantiate 'kong.cluster_events' " ..
"module: " .. err)
return
end
kong.cluster_events = cluster_events
local cache, err = kong_global.init_cache(kong.configuration, cluster_events, worker_events)
if not cache then
stash_init_worker_error("failed to instantiate 'kong.cache' module: " ..
err)
return
end
kong.cache = cache
local core_cache, err = kong_global.init_core_cache(kong.configuration, cluster_events, worker_events)
if not core_cache then
stash_init_worker_error("failed to instantiate 'kong.core_cache' module: " ..
err)
return
end
kong.core_cache = core_cache
kong.db:set_events_handler(worker_events)
if kong.configuration.database == "off" then
-- databases in LMDB need to be explicitly created, otherwise `get`
-- operations will return error instead of `nil`. This ensures the default
-- namespace always exists in the
local t = lmdb_txn.begin(1)
t:db_open(true)
ok, err = t:commit()
if not ok then
stash_init_worker_error("failed to create and open LMDB database: " .. err)
return
end
if not has_declarative_config(kong.configuration) and
declarative.get_current_hash() ~= nil then
-- if there is no declarative config set and a config is present in LMDB,
-- just build the router and plugins iterator
ngx_log(ngx_INFO, "found persisted lmdb config, loading...")
local ok, err = declarative_init_build()
if not ok then
stash_init_worker_error("failed to initialize declarative config: " .. err)
return
end
elseif declarative_entities then
ok, err = load_declarative_config(kong.configuration,
declarative_entities,
declarative_meta)
if not ok then
stash_init_worker_error("failed to load declarative config file: " .. err)
return
end
else
-- stream does not need to load declarative config again, just build
-- the router and plugins iterator
local ok, err = declarative_init_build()
if not ok then
stash_init_worker_error("failed to initialize declarative config: " .. err)
return
end
end
end
local is_not_control_plane = kong.configuration.role ~= "control_plane"
if is_not_control_plane then
ok, err = execute_cache_warmup(kong.configuration)
if not ok then
ngx_log(ngx_ERR, "failed to warm up the DB cache: " .. err)
end
end
ok, err = runloop.update_plugins_iterator()
if not ok then
stash_init_worker_error("failed to build the plugins iterator: " .. err)
return
end
if is_not_control_plane then
ok, err = runloop.update_router()
if not ok then
stash_init_worker_error("failed to build the router: " .. err)
return
end
end
runloop.init_worker.before()
-- run plugins init_worker context
local plugins_iterator = runloop.get_plugins_iterator()
local errors = execute_init_worker_plugins_iterator(plugins_iterator, ctx)
if errors then
for _, e in ipairs(errors) do
local err = "failed to execute the \"init_worker\" " ..
"handler for plugin \"" .. e.plugin .."\": " .. e.err
stash_init_worker_error(err)
end
end
runloop.init_worker.after()
if is_not_control_plane and ngx.worker.id() == 0 then
plugin_servers.start()
end
if kong.clustering then
kong.clustering:init_worker()
end
end
function Kong.exit_worker()
if kong.configuration.role ~= "control_plane" and ngx.worker.id() == 0 then
plugin_servers.stop()
end
end
function Kong.ssl_certificate()
-- Note: ctx here is for a connection (not for a single request)
local ctx = get_ctx_table(fetch_table(CTX_NS, CTX_NARR, CTX_NREC))
ctx.KONG_PHASE = PHASES.certificate
log_init_worker_errors(ctx)
-- this is the first phase to run on an HTTPS request
ctx.workspace = kong.default_workspace
runloop.certificate.before(ctx)
local plugins_iterator = runloop.get_updated_plugins_iterator()
execute_plugins_iterator(plugins_iterator, "certificate", ctx)
runloop.certificate.after(ctx)
-- TODO: do we want to keep connection context?
kong.table.clear(ngx.ctx)
end
function Kong.preread()
local ctx = get_ctx_table(fetch_table(CTX_NS, CTX_NARR, CTX_NREC))
if not ctx.KONG_PROCESSING_START then
ctx.KONG_PROCESSING_START = start_time() * 1000
end
if not ctx.KONG_PREREAD_START then
ctx.KONG_PREREAD_START = now() * 1000
end
ctx.KONG_PHASE = PHASES.preread
log_init_worker_errors(ctx)
local preread_terminate = runloop.preread.before(ctx)
-- if proxying to a second layer TLS terminator is required
-- abort further execution and return back to Nginx
if preread_terminate then
return
end
local plugins_iterator = runloop.get_updated_plugins_iterator()
execute_collecting_plugins_iterator(plugins_iterator, "preread", ctx)
if ctx.delayed_response then
ctx.KONG_PREREAD_ENDED_AT = get_updated_now_ms()
ctx.KONG_PREREAD_TIME = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PREREAD_START
ctx.KONG_RESPONSE_LATENCY = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PROCESSING_START
return flush_delayed_response(ctx)
end
ctx.delay_response = nil
if not ctx.service then
ctx.KONG_PREREAD_ENDED_AT = get_updated_now_ms()
ctx.KONG_PREREAD_TIME = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PREREAD_START
ctx.KONG_RESPONSE_LATENCY = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PROCESSING_START
ngx_log(ngx_WARN, "no Service found with those values")
return ngx.exit(503)
end
runloop.preread.after(ctx)
ctx.KONG_PREREAD_ENDED_AT = get_updated_now_ms()
ctx.KONG_PREREAD_TIME = ctx.KONG_PREREAD_ENDED_AT - ctx.KONG_PREREAD_START
-- we intent to proxy, though balancer may fail on that
ctx.KONG_PROXIED = true
end
function Kong.rewrite()
local proxy_mode = var.kong_proxy_mode
if proxy_mode == "grpc" or proxy_mode == "unbuffered" then
kong_resty_ctx.apply_ref() -- if kong_proxy_mode is gRPC/unbuffered, this is executing
local ctx = ngx.ctx -- after an internal redirect. Restore (and restash)
kong_resty_ctx.stash_ref(ctx) -- context to avoid re-executing phases
ctx.KONG_REWRITE_ENDED_AT = now() * 1000
ctx.KONG_REWRITE_TIME = ctx.KONG_REWRITE_ENDED_AT - ctx.KONG_REWRITE_START
return
end
local is_https = var.https == "on"
local ctx
if is_https then
ctx = ngx.ctx
else
ctx = get_ctx_table(fetch_table(CTX_NS, CTX_NARR, CTX_NREC))
end
if not ctx.KONG_PROCESSING_START then
ctx.KONG_PROCESSING_START = start_time() * 1000
end
if not ctx.KONG_REWRITE_START then
ctx.KONG_REWRITE_START = now() * 1000
end
ctx.KONG_PHASE = PHASES.rewrite
kong_resty_ctx.stash_ref(ctx)
if not is_https then
log_init_worker_errors(ctx)
end
runloop.rewrite.before(ctx)
if not ctx.workspace then
ctx.workspace = kong.default_workspace
end
-- On HTTPS requests, the plugins iterator is already updated in the ssl_certificate phase
local plugins_iterator
if is_https then
plugins_iterator = runloop.get_plugins_iterator()
else
plugins_iterator = runloop.get_updated_plugins_iterator()
end
execute_plugins_iterator(plugins_iterator, "rewrite", ctx)
runloop.rewrite.after(ctx)
ctx.KONG_REWRITE_ENDED_AT = get_updated_now_ms()
ctx.KONG_REWRITE_TIME = ctx.KONG_REWRITE_ENDED_AT - ctx.KONG_REWRITE_START
end
function Kong.access()
local ctx = ngx.ctx
if not ctx.KONG_ACCESS_START then
ctx.KONG_ACCESS_START = now() * 1000
if ctx.KONG_REWRITE_START and not ctx.KONG_REWRITE_ENDED_AT then
ctx.KONG_REWRITE_ENDED_AT = ctx.KONG_ACCESS_START
ctx.KONG_REWRITE_TIME = ctx.KONG_REWRITE_ENDED_AT - ctx.KONG_REWRITE_START
end
end
ctx.KONG_PHASE = PHASES.access
runloop.access.before(ctx)
local plugins_iterator = runloop.get_plugins_iterator()
execute_collecting_plugins_iterator(plugins_iterator, "access", ctx)
if ctx.delayed_response then
ctx.KONG_ACCESS_ENDED_AT = get_updated_now_ms()
ctx.KONG_ACCESS_TIME = ctx.KONG_ACCESS_ENDED_AT - ctx.KONG_ACCESS_START
ctx.KONG_RESPONSE_LATENCY = ctx.KONG_ACCESS_ENDED_AT - ctx.KONG_PROCESSING_START
return flush_delayed_response(ctx)
end
ctx.delay_response = nil
if not ctx.service then
ctx.KONG_ACCESS_ENDED_AT = get_updated_now_ms()
ctx.KONG_ACCESS_TIME = ctx.KONG_ACCESS_ENDED_AT - ctx.KONG_ACCESS_START
ctx.KONG_RESPONSE_LATENCY = ctx.KONG_ACCESS_ENDED_AT - ctx.KONG_PROCESSING_START
ctx.buffered_proxying = nil
return kong.response.exit(503, { message = "no Service found with those values"})
end
runloop.access.after(ctx)
ctx.KONG_ACCESS_ENDED_AT = get_updated_now_ms()
ctx.KONG_ACCESS_TIME = ctx.KONG_ACCESS_ENDED_AT - ctx.KONG_ACCESS_START
-- we intent to proxy, though balancer may fail on that
ctx.KONG_PROXIED = true
if ctx.buffered_proxying then
local version = ngx.req.http_version()
local upgrade = var.upstream_upgrade or ""
if version < 2 and upgrade == "" then
return Kong.response()
end
if version >= 2 then
ngx_log(ngx_NOTICE, "response buffering was turned off: incompatible HTTP version (", version, ")")
else
ngx_log(ngx_NOTICE, "response buffering was turned off: connection upgrade (", upgrade, ")")
end
ctx.buffered_proxying = nil
end
end
function Kong.balancer()
-- This may be called multiple times, and no yielding here!
local now_ms = now() * 1000
local ctx = ngx.ctx
if not ctx.KONG_BALANCER_START then
ctx.KONG_BALANCER_START = now_ms
if is_stream_module then
if ctx.KONG_PREREAD_START and not ctx.KONG_PREREAD_ENDED_AT then
ctx.KONG_PREREAD_ENDED_AT = ctx.KONG_BALANCER_START
ctx.KONG_PREREAD_TIME = ctx.KONG_PREREAD_ENDED_AT -
ctx.KONG_PREREAD_START
end
else
if ctx.KONG_REWRITE_START and not ctx.KONG_REWRITE_ENDED_AT then
ctx.KONG_REWRITE_ENDED_AT = ctx.KONG_ACCESS_START or
ctx.KONG_BALANCER_START
ctx.KONG_REWRITE_TIME = ctx.KONG_REWRITE_ENDED_AT -
ctx.KONG_REWRITE_START
end
if ctx.KONG_ACCESS_START and not ctx.KONG_ACCESS_ENDED_AT then
ctx.KONG_ACCESS_ENDED_AT = ctx.KONG_BALANCER_START
ctx.KONG_ACCESS_TIME = ctx.KONG_ACCESS_ENDED_AT -
ctx.KONG_ACCESS_START
end
end
end
ctx.KONG_PHASE = PHASES.balancer
local balancer_data = ctx.balancer_data
local tries = balancer_data.tries
local current_try = {}
balancer_data.try_count = balancer_data.try_count + 1
tries[balancer_data.try_count] = current_try
current_try.balancer_start = now_ms
if balancer_data.try_count > 1 then
-- only call balancer on retry, first one is done in `runloop.access.after`
-- which runs in the ACCESS context and hence has less limitations than
-- this BALANCER context where the retries are executed
-- record failure data
local previous_try = tries[balancer_data.try_count - 1]
previous_try.state, previous_try.code = get_last_failure()
-- Report HTTP status for health checks
local balancer_instance = balancer_data.balancer
if balancer_instance then
if previous_try.state == "failed" then
if previous_try.code == 504 then
balancer_instance.report_timeout(balancer_data.balancer_handle)
else
balancer_instance.report_tcp_failure(balancer_data.balancer_handle)
end
else
balancer_instance.report_http_status(balancer_data.balancer_handle,
previous_try.code)
end
end
local ok, err, errcode = balancer.execute(balancer_data, ctx)
if not ok then
ngx_log(ngx_ERR, "failed to retry the dns/balancer resolver for ",
tostring(balancer_data.host), "' with: ", tostring(err))
ctx.KONG_BALANCER_ENDED_AT = get_updated_now_ms()
ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_BALANCER_START
ctx.KONG_PROXY_LATENCY = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_PROCESSING_START
return ngx.exit(errcode)
end
if is_http_module then
ok, err = balancer.set_host_header(balancer_data, var.upstream_scheme, var.upstream_host, true)
if not ok then
ngx_log(ngx_ERR, "failed to set balancer Host header: ", err)
return ngx.exit(500)
end
end
else
-- first try, so set the max number of retries
local retries = balancer_data.retries
if retries > 0 then
set_more_tries(retries)
end
end
local pool_opts
local kong_conf = kong.configuration
if kong_conf.upstream_keepalive_pool_size > 0 and is_http_module then
local pool = balancer_data.ip .. "|" .. balancer_data.port
if balancer_data.scheme == "https" then
-- upstream_host is SNI
pool = pool .. "|" .. var.upstream_host
if ctx.service and ctx.service.client_certificate then
pool = pool .. "|" .. ctx.service.client_certificate.id
end
end
pool_opts = {
pool = pool,
pool_size = kong_conf.upstream_keepalive_pool_size,
}
end
current_try.ip = balancer_data.ip
current_try.port = balancer_data.port
-- set the targets as resolved
ngx_log(ngx_DEBUG, "setting address (try ", balancer_data.try_count, "): ",
balancer_data.ip, ":", balancer_data.port)
local ok, err = set_current_peer(balancer_data.ip, balancer_data.port, pool_opts)
if not ok then
ngx_log(ngx_ERR, "failed to set the current peer (address: ",
tostring(balancer_data.ip), " port: ", tostring(balancer_data.port),
"): ", tostring(err))
ctx.KONG_BALANCER_ENDED_AT = get_updated_now_ms()
ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_BALANCER_START
ctx.KONG_PROXY_LATENCY = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_PROCESSING_START
return ngx.exit(500)
end
ok, err = set_timeouts(balancer_data.connect_timeout / 1000,
balancer_data.send_timeout / 1000,
balancer_data.read_timeout / 1000)
if not ok then
ngx_log(ngx_ERR, "could not set upstream timeouts: ", err)
end
if pool_opts then
ok, err = enable_keepalive(kong_conf.upstream_keepalive_idle_timeout,
kong_conf.upstream_keepalive_max_requests)
if not ok then
ngx_log(ngx_ERR, "could not enable connection keepalive: ", err)
end
ngx_log(ngx_DEBUG, "enabled connection keepalive (pool=", pool_opts.pool,
", pool_size=", pool_opts.pool_size,
", idle_timeout=", kong_conf.upstream_keepalive_idle_timeout,
", max_requests=", kong_conf.upstream_keepalive_max_requests, ")")
end
-- record overall latency
ctx.KONG_BALANCER_ENDED_AT = get_updated_now_ms()
ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_BALANCER_START
-- record try-latency
local try_latency = ctx.KONG_BALANCER_ENDED_AT - current_try.balancer_start
current_try.balancer_latency = try_latency
-- time spent in Kong before sending the request to upstream
-- start_time() is kept in seconds with millisecond resolution.
ctx.KONG_PROXY_LATENCY = ctx.KONG_BALANCER_ENDED_AT - ctx.KONG_PROCESSING_START
end
do
local HTTP_METHODS = {
GET = ngx.HTTP_GET,
HEAD = ngx.HTTP_HEAD,
PUT = ngx.HTTP_PUT,
POST = ngx.HTTP_POST,
DELETE = ngx.HTTP_DELETE,
OPTIONS = ngx.HTTP_OPTIONS,
MKCOL = ngx.HTTP_MKCOL,
COPY = ngx.HTTP_COPY,
MOVE = ngx.HTTP_MOVE,
PROPFIND = ngx.HTTP_PROPFIND,
PROPPATCH = ngx.HTTP_PROPPATCH,
LOCK = ngx.HTTP_LOCK,
UNLOCK = ngx.HTTP_UNLOCK,
PATCH = ngx.HTTP_PATCH,
TRACE = ngx.HTTP_TRACE,
}
function Kong.response()
local plugins_iterator = runloop.get_plugins_iterator()
local ctx = ngx.ctx
-- buffered proxying (that also executes the balancer)
ngx.req.read_body()
local options = {
always_forward_body = true,
share_all_vars = true,
method = HTTP_METHODS[ngx.req.get_method()],
ctx = ctx,
}
local res = ngx.location.capture("/kong_buffered_http", options)
if res.truncated and options.method ~= ngx.HTTP_HEAD then
ctx.KONG_PHASE = PHASES.error
ngx.status = 502
return kong_error_handlers(ctx)
end
ctx.KONG_PHASE = PHASES.response
local status = res.status
local headers = res.header
local body = res.body
ctx.buffered_status = status
ctx.buffered_headers = headers
ctx.buffered_body = body
-- fake response phase (this runs after the balancer)
if not ctx.KONG_RESPONSE_START then
ctx.KONG_RESPONSE_START = now() * 1000
if ctx.KONG_BALANCER_START and not ctx.KONG_BALANCER_ENDED_AT then
ctx.KONG_BALANCER_ENDED_AT = ctx.KONG_RESPONSE_START
ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT -
ctx.KONG_BALANCER_START
end
end
if not ctx.KONG_WAITING_TIME then
ctx.KONG_WAITING_TIME = ctx.KONG_RESPONSE_START -
(ctx.KONG_BALANCER_ENDED_AT or ctx.KONG_ACCESS_ENDED_AT)
end
if not ctx.KONG_PROXY_LATENCY then
ctx.KONG_PROXY_LATENCY = ctx.KONG_RESPONSE_START - ctx.KONG_PROCESSING_START
end
kong.response.set_status(status)
kong.response.set_headers(headers)
runloop.response.before(ctx)
execute_collected_plugins_iterator(plugins_iterator, "response", ctx)
runloop.response.after(ctx)
ctx.KONG_RESPONSE_ENDED_AT = get_updated_now_ms()
ctx.KONG_RESPONSE_TIME = ctx.KONG_RESPONSE_ENDED_AT - ctx.KONG_RESPONSE_START
-- buffered response
ngx.print(body)
-- jump over the balancer to header_filter
ngx.exit(status)
end
end
function Kong.header_filter()
local ctx = ngx.ctx
if not ctx.KONG_PROCESSING_START then
ctx.KONG_PROCESSING_START = start_time() * 1000
end
if not ctx.workspace then
ctx.workspace = kong.default_workspace
end
if not ctx.KONG_HEADER_FILTER_START then
ctx.KONG_HEADER_FILTER_START = now() * 1000
if ctx.KONG_REWRITE_START and not ctx.KONG_REWRITE_ENDED_AT then
ctx.KONG_REWRITE_ENDED_AT = ctx.KONG_BALANCER_START or
ctx.KONG_ACCESS_START or
ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START
ctx.KONG_REWRITE_TIME = ctx.KONG_REWRITE_ENDED_AT -
ctx.KONG_REWRITE_START
end
if ctx.KONG_ACCESS_START and not ctx.KONG_ACCESS_ENDED_AT then
ctx.KONG_ACCESS_ENDED_AT = ctx.KONG_BALANCER_START or
ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START
ctx.KONG_ACCESS_TIME = ctx.KONG_ACCESS_ENDED_AT -
ctx.KONG_ACCESS_START
end
if ctx.KONG_BALANCER_START and not ctx.KONG_BALANCER_ENDED_AT then
ctx.KONG_BALANCER_ENDED_AT = ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START
ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT -
ctx.KONG_BALANCER_START
end
if ctx.KONG_RESPONSE_START and not ctx.KONG_RESPONSE_ENDED_AT then
ctx.KONG_RESPONSE_ENDED_AT = ctx.KONG_HEADER_FILTER_START
ctx.KONG_RESPONSE_TIME = ctx.KONG_RESPONSE_ENDED_AT -
ctx.KONG_RESPONSE_START
end
end
if ctx.KONG_PROXIED then
if not ctx.KONG_WAITING_TIME then
ctx.KONG_WAITING_TIME = (ctx.KONG_RESPONSE_START or ctx.KONG_HEADER_FILTER_START) -
(ctx.KONG_BALANCER_ENDED_AT or ctx.KONG_ACCESS_ENDED_AT)
end
if not ctx.KONG_PROXY_LATENCY then
ctx.KONG_PROXY_LATENCY = (ctx.KONG_RESPONSE_START or ctx.KONG_HEADER_FILTER_START) -
ctx.KONG_PROCESSING_START
end
elseif not ctx.KONG_RESPONSE_LATENCY then
ctx.KONG_RESPONSE_LATENCY = (ctx.KONG_RESPONSE_START or ctx.KONG_HEADER_FILTER_START) -
ctx.KONG_PROCESSING_START
end
ctx.KONG_PHASE = PHASES.header_filter
runloop.header_filter.before(ctx)
local plugins_iterator = runloop.get_plugins_iterator()
execute_collected_plugins_iterator(plugins_iterator, "header_filter", ctx)
runloop.header_filter.after(ctx)
ctx.KONG_HEADER_FILTER_ENDED_AT = get_updated_now_ms()
ctx.KONG_HEADER_FILTER_TIME = ctx.KONG_HEADER_FILTER_ENDED_AT - ctx.KONG_HEADER_FILTER_START
end
function Kong.body_filter()
local ctx = ngx.ctx
if not ctx.KONG_BODY_FILTER_START then
ctx.KONG_BODY_FILTER_START = now() * 1000
if ctx.KONG_REWRITE_START and not ctx.KONG_REWRITE_ENDED_AT then
ctx.KONG_REWRITE_ENDED_AT = ctx.KONG_ACCESS_START or
ctx.KONG_BALANCER_START or
ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START or
ctx.KONG_BODY_FILTER_START
ctx.KONG_REWRITE_TIME = ctx.KONG_REWRITE_ENDED_AT -
ctx.KONG_REWRITE_START
end
if ctx.KONG_ACCESS_START and not ctx.KONG_ACCESS_ENDED_AT then
ctx.KONG_ACCESS_ENDED_AT = ctx.KONG_BALANCER_START or
ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START or
ctx.KONG_BODY_FILTER_START
ctx.KONG_ACCESS_TIME = ctx.KONG_ACCESS_ENDED_AT -
ctx.KONG_ACCESS_START
end
if ctx.KONG_BALANCER_START and not ctx.KONG_BALANCER_ENDED_AT then
ctx.KONG_BALANCER_ENDED_AT = ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START or
ctx.KONG_BODY_FILTER_START
ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT -
ctx.KONG_BALANCER_START
end
if ctx.KONG_RESPONSE_START and not ctx.KONG_RESPONSE_ENDED_AT then
ctx.KONG_RESPONSE_ENDED_AT = ctx.KONG_HEADER_FILTER_START or
ctx.KONG_BODY_FILTER_START
ctx.KONG_RESPONSE_TIME = ctx.KONG_RESPONSE_ENDED_AT -
ctx.KONG_RESPONSE_START
end
if ctx.KONG_HEADER_FILTER_START and not ctx.KONG_HEADER_FILTER_ENDED_AT then
ctx.KONG_HEADER_FILTER_ENDED_AT = ctx.KONG_BODY_FILTER_START
ctx.KONG_HEADER_FILTER_TIME = ctx.KONG_HEADER_FILTER_ENDED_AT -
ctx.KONG_HEADER_FILTER_START
end
end
ctx.KONG_PHASE = PHASES.body_filter
if ctx.response_body then
arg[1] = ctx.response_body
arg[2] = true
end
local plugins_iterator = runloop.get_plugins_iterator()
execute_collected_plugins_iterator(plugins_iterator, "body_filter", ctx)
if not arg[2] then
return
end
ctx.KONG_BODY_FILTER_ENDED_AT = get_updated_now_ms()
ctx.KONG_BODY_FILTER_TIME = ctx.KONG_BODY_FILTER_ENDED_AT - ctx.KONG_BODY_FILTER_START
if ctx.KONG_PROXIED then
-- time spent receiving the response ((response +) header_filter + body_filter)
-- we could use $upstream_response_time but we need to distinguish the waiting time
-- from the receiving time in our logging plugins (especially ALF serializer).
ctx.KONG_RECEIVE_TIME = ctx.KONG_BODY_FILTER_ENDED_AT - (ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START or
ctx.KONG_BALANCER_ENDED_AT or
ctx.KONG_BALANCER_START or
ctx.KONG_ACCESS_ENDED_AT)
end
end
function Kong.log()
local ctx = ngx.ctx
if not ctx.KONG_LOG_START then
ctx.KONG_LOG_START = now() * 1000
if is_stream_module then
if not ctx.KONG_PROCESSING_START then
ctx.KONG_PROCESSING_START = start_time() * 1000
end
if ctx.KONG_PREREAD_START and not ctx.KONG_PREREAD_ENDED_AT then
ctx.KONG_PREREAD_ENDED_AT = ctx.KONG_LOG_START
ctx.KONG_PREREAD_TIME = ctx.KONG_PREREAD_ENDED_AT -
ctx.KONG_PREREAD_START
end
if ctx.KONG_BALANCER_START and not ctx.KONG_BALANCER_ENDED_AT then
ctx.KONG_BALANCER_ENDED_AT = ctx.KONG_LOG_START
ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT -
ctx.KONG_BALANCER_START
end
if ctx.KONG_PROXIED then
if not ctx.KONG_PROXY_LATENCY then
ctx.KONG_PROXY_LATENCY = ctx.KONG_LOG_START -
ctx.KONG_PROCESSING_START
end
elseif not ctx.KONG_RESPONSE_LATENCY then
ctx.KONG_RESPONSE_LATENCY = ctx.KONG_LOG_START -
ctx.KONG_PROCESSING_START
end
else
if ctx.KONG_REWRITE_START and not ctx.KONG_REWRITE_ENDED_AT then
ctx.KONG_REWRITE_ENDED_AT = ctx.KONG_ACCESS_START or
ctx.KONG_BALANCER_START or
ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START or
ctx.BODY_FILTER_START or
ctx.KONG_LOG_START
ctx.KONG_REWRITE_TIME = ctx.KONG_REWRITE_ENDED_AT -
ctx.KONG_REWRITE_START
end
if ctx.KONG_ACCESS_START and not ctx.KONG_ACCESS_ENDED_AT then
ctx.KONG_ACCESS_ENDED_AT = ctx.KONG_BALANCER_START or
ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START or
ctx.BODY_FILTER_START or
ctx.KONG_LOG_START
ctx.KONG_ACCESS_TIME = ctx.KONG_ACCESS_ENDED_AT -
ctx.KONG_ACCESS_START
end
if ctx.KONG_BALANCER_START and not ctx.KONG_BALANCER_ENDED_AT then
ctx.KONG_BALANCER_ENDED_AT = ctx.KONG_RESPONSE_START or
ctx.KONG_HEADER_FILTER_START or
ctx.BODY_FILTER_START or
ctx.KONG_LOG_START
ctx.KONG_BALANCER_TIME = ctx.KONG_BALANCER_ENDED_AT -
ctx.KONG_BALANCER_START
end
if ctx.KONG_HEADER_FILTER_START and not ctx.KONG_HEADER_FILTER_ENDED_AT then
ctx.KONG_HEADER_FILTER_ENDED_AT = ctx.BODY_FILTER_START or
ctx.KONG_LOG_START
ctx.KONG_HEADER_FILTER_TIME = ctx.KONG_HEADER_FILTER_ENDED_AT -
ctx.KONG_HEADER_FILTER_START
end
if ctx.KONG_BODY_FILTER_START and not ctx.KONG_BODY_FILTER_ENDED_AT then
ctx.KONG_BODY_FILTER_ENDED_AT = ctx.KONG_LOG_START
ctx.KONG_BODY_FILTER_TIME = ctx.KONG_BODY_FILTER_ENDED_AT -
ctx.KONG_BODY_FILTER_START
end
if ctx.KONG_PROXIED and not ctx.KONG_WAITING_TIME then
ctx.KONG_WAITING_TIME = ctx.KONG_LOG_START -
(ctx.KONG_BALANCER_ENDED_AT or ctx.KONG_ACCESS_ENDED_AT)
end
end
end
ctx.KONG_PHASE = PHASES.log
runloop.log.before(ctx)
local plugins_iterator = runloop.get_plugins_iterator()
execute_collected_plugins_iterator(plugins_iterator, "log", ctx)
runloop.log.after(ctx)
release_table(CTX_NS, ctx)
-- this is not used for now, but perhaps we need it later?
--ctx.KONG_LOG_ENDED_AT = get_now_ms()
--ctx.KONG_LOG_TIME = ctx.KONG_LOG_ENDED_AT - ctx.KONG_LOG_START
end
function Kong.handle_error()
kong_resty_ctx.apply_ref()
local ctx = ngx.ctx
ctx.KONG_PHASE = PHASES.error
ctx.KONG_UNEXPECTED = true
local old_ws = ctx.workspace
log_init_worker_errors(ctx)
if not ctx.plugins then
local plugins_iterator = runloop.get_updated_plugins_iterator()
for _ in plugins_iterator:iterate("content", ctx) do
-- just build list of plugins
ctx.workspace = old_ws
end
end
return kong_error_handlers(ctx)
end
local function serve_content(module, options)
local ctx = ngx.ctx
ctx.KONG_PROCESSING_START = start_time() * 1000
ctx.KONG_ADMIN_CONTENT_START = ctx.KONG_ADMIN_CONTENT_START or now() * 1000
ctx.KONG_PHASE = PHASES.admin_api
log_init_worker_errors(ctx)
options = options or {}
header["Access-Control-Allow-Origin"] = options.allow_origin or "*"
lapis.serve(module)
ctx.KONG_ADMIN_CONTENT_ENDED_AT = get_updated_now_ms()
ctx.KONG_ADMIN_CONTENT_TIME = ctx.KONG_ADMIN_CONTENT_ENDED_AT - ctx.KONG_ADMIN_CONTENT_START
ctx.KONG_ADMIN_LATENCY = ctx.KONG_ADMIN_CONTENT_ENDED_AT - ctx.KONG_PROCESSING_START
end
function Kong.admin_content(options)
kong.worker_events.poll()
local ctx = ngx.ctx
if not ctx.workspace then
ctx.workspace = kong.default_workspace
end
return serve_content("kong.api", options)
end
function Kong.admin_header_filter()
local ctx = ngx.ctx
if not ctx.KONG_PROCESSING_START then
ctx.KONG_PROCESSING_START = start_time() * 1000
end
if not ctx.KONG_ADMIN_HEADER_FILTER_START then
ctx.KONG_ADMIN_HEADER_FILTER_START = now() * 1000
if ctx.KONG_ADMIN_CONTENT_START and not ctx.KONG_ADMIN_CONTENT_ENDED_AT then
ctx.KONG_ADMIN_CONTENT_ENDED_AT = ctx.KONG_ADMIN_HEADER_FILTER_START
ctx.KONG_ADMIN_CONTENT_TIME = ctx.KONG_ADMIN_CONTENT_ENDED_AT - ctx.KONG_ADMIN_CONTENT_START
end
if not ctx.KONG_ADMIN_LATENCY then
ctx.KONG_ADMIN_LATENCY = ctx.KONG_ADMIN_HEADER_FILTER_START - ctx.KONG_PROCESSING_START
end
end
local enabled_headers = kong.configuration.enabled_headers
local headers = constants.HEADERS
if enabled_headers[headers.ADMIN_LATENCY] then
header[headers.ADMIN_LATENCY] = ctx.KONG_ADMIN_LATENCY
end
if enabled_headers[headers.SERVER] then
header[headers.SERVER] = meta._SERVER_TOKENS
else
header[headers.SERVER] = nil
end
-- this is not used for now, but perhaps we need it later?
--ctx.KONG_ADMIN_HEADER_FILTER_ENDED_AT = get_now_ms()
--ctx.KONG_ADMIN_HEADER_FILTER_TIME = ctx.KONG_ADMIN_HEADER_FILTER_ENDED_AT - ctx.KONG_ADMIN_HEADER_FILTER_START
end
function Kong.status_content()
return serve_content("kong.status")
end
Kong.status_header_filter = Kong.admin_header_filter
function Kong.serve_cluster_listener(options)
log_init_worker_errors()
ngx.ctx.KONG_PHASE = PHASES.cluster_listener
return kong.clustering:handle_cp_websocket()
end
function Kong.serve_wrpc_listener(options)
log_init_worker_errors()
ngx.ctx.KONG_PHASE = PHASES.cluster_listener
return kong.clustering:handle_wrpc_websocket()
end
function Kong.stream_api()
stream_api.handle()
end
do
local cjson = require "cjson.safe"
function Kong.stream_config_listener()
local sock, err = ngx.req.socket()
if not sock then
kong.log.crit("unable to obtain request socket: ", err)
return
end
local data, err = sock:receive("*a")
if not data then
ngx_log(ngx_CRIT, "unable to receive reconfigure data: ", err)
return
end
local reconfigure_data, err = cjson.decode(data)
if not reconfigure_data then
ngx_log(ngx_ERR, "failed to json decode reconfigure data: ", err)
return
end
local ok, err = kong.worker_events.post("declarative", "reconfigure", reconfigure_data)
if ok ~= "done" then
ngx_log(ngx_ERR, "failed to rebroadcast reconfigure event in stream: ", err or ok)
end
end
end
return Kong