kong/kong/clustering/wrpc_control_plane.lua (298 lines of code) (raw):
local _M = {}
local _MT = { __index = _M, }
local semaphore = require("ngx.semaphore")
local cjson = require("cjson.safe")
local declarative = require("kong.db.declarative")
local constants = require("kong.constants")
local clustering_utils = require("kong.clustering.utils")
local wrpc = require("kong.tools.wrpc")
local wrpc_proto = require("kong.tools.wrpc.proto")
local utils = require("kong.tools.utils")
local init_negotiation_server = require("kong.clustering.services.negotiation").init_negotiation_server
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local string = string
local setmetatable = setmetatable
local pcall = pcall
local pairs = pairs
local ngx = ngx
local ngx_log = ngx.log
local cjson_encode = cjson.encode
local kong = kong
local ngx_exit = ngx.exit
local exiting = ngx.worker.exiting
local ngx_time = ngx.time
local ngx_var = ngx.var
local timer_at = ngx.timer.at
local isempty = require("table.isempty")
local plugins_list_to_map = clustering_utils.plugins_list_to_map
local deflate_gzip = utils.deflate_gzip
local yield = utils.yield
local kong_dict = ngx.shared.kong
local ngx_DEBUG = ngx.DEBUG
local ngx_INFO = ngx.INFO
local ngx_NOTICE = ngx.NOTICE
local ngx_ERR = ngx.ERR
local ngx_WARN = ngx.WARN
local ngx_CLOSE = ngx.HTTP_CLOSE
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local _log_prefix = "[wrpc-clustering] "
local ok_table = { ok = "done", }
local initial_hash = string.rep("0", 32)
local empty_table = {}
local function handle_export_deflated_reconfigure_payload(self)
local ok, p_err, err = pcall(self.export_deflated_reconfigure_payload, self)
return ok, p_err or err
end
local function init_config_service(wrpc_service, cp)
wrpc_service:import("kong.services.config.v1.config")
wrpc_service:set_handler("ConfigService.PingCP", function(peer, data)
local client = cp.clients[peer.conn]
if client and client.update_sync_status then
client.last_seen = ngx_time()
client.config_hash = data.hash
client:update_sync_status()
ngx_log(ngx_INFO, _log_prefix, "received ping frame from data plane")
end
end)
wrpc_service:set_handler("ConfigService.ReportMetadata", function(peer, data)
local client = peer.client
local cp = peer.cp
if client then
ngx_log(ngx_INFO, _log_prefix, "received initial metadata package from client: ", client.dp_id)
client.basic_info = data
client.dp_plugins_map = plugins_list_to_map(client.basic_info.plugins or empty_table)
client.basic_info_semaphore:post()
end
local _, err
_, err, client.sync_status = cp:check_version_compatibility(client.dp_version, client.dp_plugins_map, client.log_suffix)
client:update_sync_status()
if err then
ngx_log(ngx_ERR, _log_prefix, err, client.log_suffix)
client.basic_info = nil -- drop the connection
return { error = err, }
end
return ok_table
end)
end
local wrpc_service
local function get_wrpc_service(self)
if not wrpc_service then
wrpc_service = wrpc_proto.new()
init_negotiation_server(wrpc_service, self.conf)
init_config_service(wrpc_service, self)
end
return wrpc_service
end
function _M.new(conf, cert_digest)
local self = {
clients = setmetatable({}, { __mode = "k", }),
plugins_map = {},
conf = conf,
cert_digest = cert_digest,
}
return setmetatable(self, _MT)
end
local config_version = 0
function _M:export_deflated_reconfigure_payload()
ngx_log(ngx_DEBUG, _log_prefix, "exporting config")
local config_table, err = declarative.export_config()
if not config_table then
return nil, err
end
-- update plugins map
self.plugins_configured = {}
if config_table.plugins then
for _, plugin in pairs(config_table.plugins) do
self.plugins_configured[plugin.name] = true
end
end
local config_hash, hashes = calculate_config_hash(config_table)
config_version = config_version + 1
-- store serialized plugins map for troubleshooting purposes
local shm_key_name = "clustering:cp_plugins_configured:worker_" .. ngx.worker.id()
kong_dict:set(shm_key_name, cjson_encode(self.plugins_configured))
local service = get_wrpc_service(self)
-- yield between steps to prevent long delay
local config_json = assert(cjson_encode(config_table))
yield()
local config_compressed = assert(deflate_gzip(config_json))
yield()
self.config_call_rpc, self.config_call_args = assert(service:encode_args("ConfigService.SyncConfig", {
config = config_compressed,
version = config_version,
config_hash = config_hash,
hashes = hashes,
}))
return config_table, nil
end
function _M:push_config_one_client(client)
-- if clients table is empty, we might have skipped some config
-- push event in `push_config_loop`, which means the cached config
-- might be stale, so we always export the latest config again in this case
if isempty(self.clients) or not self.config_call_rpc or not self.config_call_args then
local ok, err = handle_export_deflated_reconfigure_payload(self)
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to export config from database: ", err)
return
end
end
local ok, err, sync_status = self:check_configuration_compatibility(client.dp_plugins_map)
if not ok then
ngx_log(ngx_WARN, _log_prefix, "unable to send updated configuration to data plane: ", err, client.log_suffix)
if sync_status ~= client.sync_status then
client.sync_status = sync_status
client:update_sync_status()
end
return
end
client.peer:send_encoded_call(self.config_call_rpc, self.config_call_args)
ngx_log(ngx_DEBUG, _log_prefix, "config version #", config_version, " pushed. ", client.log_suffix)
end
function _M:push_config()
local payload, err = self:export_deflated_reconfigure_payload()
if not payload then
ngx_log(ngx_ERR, _log_prefix, "unable to export config from database: ", err)
return
end
local n = 0
for _, client in pairs(self.clients) do
local ok, sync_status
ok, err, sync_status = self:check_configuration_compatibility(client.dp_plugins_map)
if ok then
client.peer:send_encoded_call(self.config_call_rpc, self.config_call_args)
n = n + 1
else
ngx_log(ngx_WARN, _log_prefix, "unable to send updated configuration to data plane: ", err, client.log_suffix)
if sync_status ~= client.sync_status then
client.sync_status = sync_status
client:update_sync_status()
end
end
end
ngx_log(ngx_DEBUG, _log_prefix, "config version #", config_version, " pushed to ", n, " clients")
end
_M.check_version_compatibility = clustering_utils.check_version_compatibility
_M.check_configuration_compatibility = clustering_utils.check_configuration_compatibility
function _M:handle_cp_websocket()
local dp_id = ngx_var.arg_node_id
local dp_hostname = ngx_var.arg_node_hostname
local dp_ip = ngx_var.remote_addr
local dp_version = ngx_var.arg_node_version
local wb, log_suffix, ec = clustering_utils.connect_dp(
self.conf, self.cert_digest,
dp_id, dp_hostname, dp_ip, dp_version)
if not wb then
return ngx_exit(ec)
end
-- connection established
local w_peer = wrpc.new_peer(wb, get_wrpc_service(self))
w_peer.id = dp_id
local client = {
last_seen = ngx_time(),
peer = w_peer,
dp_id = dp_id,
dp_version = dp_version,
log_suffix = log_suffix,
basic_info = nil,
basic_info_semaphore = semaphore.new(),
dp_plugins_map = {},
cp_ref = self,
config_hash = initial_hash,
sync_status = CLUSTERING_SYNC_STATUS.UNKNOWN,
}
w_peer.client = client
w_peer.cp = self
w_peer:spawn_threads()
local purge_delay = self.conf.cluster_data_plane_purge_delay
function client:update_sync_status()
local ok, err = kong.db.clustering_data_planes:upsert({ id = dp_id, }, {
last_seen = self.last_seen,
config_hash = self.config_hash ~= "" and self.config_hash or nil,
hostname = dp_hostname,
ip = dp_ip,
version = dp_version,
sync_status = self.sync_status, -- TODO: import may have been failed though
}, { ttl = purge_delay })
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to update clustering data plane status: ", err, log_suffix)
end
end
do
local ok, err = client.basic_info_semaphore:wait(5)
if not ok then
err = "waiting for basic info call: " .. (err or "--")
end
if not client.basic_info then
err = "invalid basic_info data"
end
if err then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)
wb:send_close()
return ngx_exit(ngx_CLOSE)
end
end
-- after basic_info report we consider DP connected
-- initial sync
client:update_sync_status()
self:push_config_one_client(client) -- first config push
-- put it here to prevent DP from receiving broadcast config pushes before the first config pushing
self.clients[w_peer.conn] = client
ngx_log(ngx_NOTICE, _log_prefix, "data plane connected", log_suffix)
w_peer:wait_threads()
w_peer:close()
self.clients[wb] = nil
return ngx_exit(ngx_CLOSE)
end
local function push_config_loop(premature, self, push_config_semaphore, delay)
if premature then
return
end
do
local ok, err = handle_export_deflated_reconfigure_payload(self)
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to export initial config from database: ", err)
end
end
while not exiting() do
local ok, err = push_config_semaphore:wait(1)
if exiting() then
return
end
if ok then
if isempty(self.clients) then
ngx_log(ngx_DEBUG, _log_prefix, "skipping config push (no connected clients)")
else
ok, err = pcall(self.push_config, self)
if ok then
local sleep_left = delay
while sleep_left > 0 do
if sleep_left <= 1 then
ngx.sleep(sleep_left)
break
end
ngx.sleep(1)
if exiting() then
return
end
sleep_left = sleep_left - 1
end
else
ngx_log(ngx_ERR, _log_prefix, "export and pushing config failed: ", err)
end
end
elseif err ~= "timeout" then
ngx_log(ngx_ERR, _log_prefix, "semaphore wait error: ", err)
end
end
end
function _M:init_worker(plugins_list)
-- ROLE = "control_plane"
self.plugins_list = plugins_list
self.plugins_map = plugins_list_to_map(plugins_list)
self.deflated_reconfigure_payload = nil
self.reconfigure_payload = nil
self.plugins_configured = {}
self.plugin_versions = {}
for i = 1, #plugins_list do
local plugin = plugins_list[i]
self.plugin_versions[plugin.name] = plugin.version
end
local push_config_semaphore = semaphore.new()
-- When "clustering", "push_config" worker event is received by a worker,
-- it loads and pushes the config to its the connected data planes
kong.worker_events.register(function(_)
if push_config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
-- because `:count` will never yield, end result is that the semaphore
-- count is guaranteed to not exceed 1
push_config_semaphore:post()
end
end, "clustering", "push_config")
timer_at(0, push_config_loop, self, push_config_semaphore,
self.conf.db_update_frequency)
end
return _M