in kong/kong/clustering/data_plane.lua [91:289]
function _M:communicate(premature)
if premature then
return
end
local conf = self.conf
local log_suffix = " [" .. conf.cluster_control_plane .. "]"
local reconnection_delay = math.random(5, 10)
local c, uri, err = clustering_utils.connect_cp(
"/v1/outlet", conf, self.cert, self.cert_key)
if not c then
ngx_log(ngx_ERR, _log_prefix, "connection to control plane ", uri, " broken: ", err,
" (retrying after ", reconnection_delay, " seconds)", log_suffix)
assert(ngx.timer.at(reconnection_delay, function(premature)
self:communicate(premature)
end))
return
end
local _
_, err = c:send_binary(cjson_encode({ type = "basic_info",
plugins = self.plugins_list, }))
if err then
ngx_log(ngx_ERR, _log_prefix, "unable to send basic information to control plane: ", uri,
" err: ", err, " (retrying after ", reconnection_delay, " seconds)", log_suffix)
c:close()
assert(ngx.timer.at(reconnection_delay, function(premature)
self:communicate(premature)
end))
return
end
local config_semaphore = semaphore.new(0)
local ping_immediately
local config_exit
local next_data
local config_thread = ngx.thread.spawn(function()
while not exiting() and not config_exit do
local ok, err = config_semaphore:wait(1)
if ok then
local data = next_data
if data then
local msg = assert(inflate_gzip(data))
yield()
msg = assert(cjson_decode(msg))
yield()
if msg.type == "reconfigure" then
if msg.timestamp then
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane with timestamp: ",
msg.timestamp, log_suffix)
else
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane", log_suffix)
end
local config_table = assert(msg.config_table)
local pok, res
pok, res, err = pcall(config_helper.update, self.declarative_config,
config_table, msg.config_hash, msg.hashes)
if pok then
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
end
ping_immediately = true
else
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
end
if next_data == data then
next_data = nil
end
end
end
elseif err ~= "timeout" then
ngx_log(ngx_ERR, _log_prefix, "semaphore wait error: ", err)
end
end
end)
local write_thread = ngx.thread.spawn(function()
while not exiting() do
send_ping(c, log_suffix)
for _ = 1, PING_INTERVAL do
ngx_sleep(1)
if exiting() then
return
end
if ping_immediately then
ping_immediately = nil
break
end
end
end
end)
local read_thread = ngx.thread.spawn(function()
local last_seen = ngx_time()
while not exiting() do
local data, typ, err = c:recv_frame()
if err then
if not is_timeout(err) then
return nil, "error while receiving frame from control plane: " .. err
end
local waited = ngx_time() - last_seen
if waited > PING_WAIT then
return nil, "did not receive pong frame from control plane within " .. PING_WAIT .. " seconds"
end
else
if typ == "close" then
ngx_log(ngx_DEBUG, _log_prefix, "received close frame from control plane", log_suffix)
return
end
last_seen = ngx_time()
if typ == "binary" then
next_data = data
if config_semaphore:count() <= 0 then
config_semaphore:post()
end
elseif typ == "pong" then
ngx_log(ngx_DEBUG, _log_prefix, "received pong frame from control plane", log_suffix)
else
ngx_log(ngx_NOTICE, _log_prefix, "received unknown (", tostring(typ), ") frame from control plane",
log_suffix)
end
end
end
end)
local ok, err, perr = ngx.thread.wait(read_thread, write_thread, config_thread)
ngx.thread.kill(read_thread)
ngx.thread.kill(write_thread)
c:close()
if not ok then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)
elseif perr then
ngx_log(ngx_ERR, _log_prefix, perr, log_suffix)
end
config_exit = true
ok, err, perr = ngx.thread.wait(config_thread)
if not ok then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)
elseif perr then
ngx_log(ngx_ERR, _log_prefix, perr, log_suffix)
end
if not exiting() then
assert(ngx.timer.at(reconnection_delay, function(premature)
self:communicate(premature)
end))
end
end