kong/kong/runloop/balancer/targets.lua (344 lines of code) (raw):

--- --- manages a cache of targets belonging to an upstream. --- each one represents a hostname with a weight, --- health status and a list of addresses. --- --- maybe it could eventually be merged into the DAO object? --- local dns_client = require "kong.resty.dns.client" local upstreams = require "kong.runloop.balancer.upstreams" local balancers = require "kong.runloop.balancer.balancers" local dns_utils = require "kong.resty.dns.utils" local ngx = ngx local null = ngx.null local ngx_now = ngx.now local log = ngx.log local string_format = string.format local string_match = string.match local ipairs = ipairs local tonumber = tonumber local table_sort = table.sort local assert = assert local exiting = ngx.worker.exiting local CRIT = ngx.CRIT local DEBUG = ngx.DEBUG local ERR = ngx.ERR local WARN = ngx.WARN local SRV_0_WEIGHT = 1 -- SRV record with weight 0 should be hit minimally, hence we replace by 1 local EMPTY = setmetatable({}, {__newindex = function() error("The 'EMPTY' table is read-only") end}) local GLOBAL_QUERY_OPTS = { workspace = null, show_ws_id = true } -- global binary heap for all balancers to share as a single update timer for -- renewing DNS records local renewal_heap = require("binaryheap").minUnique() local renewal_weak_cache = setmetatable({}, { __mode = "v" }) local targets_M = {} -- forward local declarations local resolve_timer_callback local resolve_timer_running local queryDns function targets_M.init() dns_client = require("kong.tools.dns")(kong.configuration) -- configure DNS client if renewal_heap:size() > 0 then renewal_heap = require("binaryheap").minUnique() renewal_weak_cache = setmetatable({}, { __mode = "v" }) end if not resolve_timer_running then resolve_timer_running = assert(ngx.timer.at(1, resolve_timer_callback)) end end local _rtype_to_name function targets_M.get_dns_name_from_record_type(rtype) if not _rtype_to_name then _rtype_to_name = {} for k, v in pairs(dns_client) do if tostring(k):sub(1,5) == "TYPE_" then _rtype_to_name[v] = k:sub(6,-1) end end end return _rtype_to_name[rtype] or "unknown" end ------------------------------------------------------------------------------ -- Loads the targets from the DB. -- @param upstream_id Upstream uuid for which to load the target -- @return The target array, with target entity tables. local function load_targets_into_memory(upstream_id) local targets, err, err_t = kong.db.targets:select_by_upstream_raw( { id = upstream_id }, GLOBAL_QUERY_OPTS) if not targets then return nil, err, err_t end -- perform some raw data updates for _, target in ipairs(targets) do -- split `target` field into `name` and `port` local port target.name, port = string_match(target.target, "^(.-):(%d+)$") target.port = tonumber(port) target.addresses = {} target.totalWeight = 0 target.unavailableWeight = 0 target.nameType = dns_utils.hostnameType(target.name) end return targets end --_load_targets_into_memory = load_targets_into_memory local function get_dns_renewal_key(target) if target and (target.balancer or target.upstream) then local id = (target.balancer and target.balancer.upstream_id) or (target.upstream and target.upstream.id) if target.target then return id .. ":" .. target.target elseif target.name and target.port then return id .. ":" .. target.name .. ":" .. target.port end end return nil, "target object does not contain name and port" end ------------------------------------------------------------------------------ -- Fetch targets, from cache or the DB. -- @param upstream The upstream entity object -- @return The targets array, with target entity tables. function targets_M.fetch_targets(upstream) local targets_cache_key = "balancer:targets:" .. upstream.id return kong.core_cache:get( targets_cache_key, nil, load_targets_into_memory, upstream.id) end function targets_M.resolve_targets(targets_list) for _, target in ipairs(targets_list) do queryDns(target) end return targets_list end --============================================================================== -- Event Callbacks --============================================================================== -------------------------------------------------------------------------------- -- Called on any changes to a target. -- @param operation "create", "update" or "delete" -- @param target Target table with `upstream.id` field function targets_M.on_target_event(operation, target) local upstream_id = target.upstream.id local upstream_name = target.upstream.name log(DEBUG, "target ", operation, " for upstream ", upstream_id, upstream_name and " (" .. upstream_name ..")" or "") kong.core_cache:invalidate_local("balancer:targets:" .. upstream_id) -- cancel DNS renewal if operation ~= "create" then local key, err = get_dns_renewal_key(target) if key then renewal_weak_cache[key] = nil renewal_heap:remove(key) else log(ERR, "could not stop DNS renewal for target removed from ", upstream_id, ": ", err) end end local upstream = upstreams.get_upstream_by_id(upstream_id) if not upstream then log(ERR, "target ", operation, ": upstream not found for ", upstream_id, upstream_name and " (" .. upstream_name ..")" or "") return end -- move this to upstreams? local balancer = balancers.get_balancer_by_id(upstream_id) if not balancer then log(ERR, "target ", operation, ": balancer not found for ", upstream_id, upstream_name and " (" .. upstream_name ..")" or "") return end local new_balancer, err = balancers.create_balancer(upstream, true) if not new_balancer then return nil, err end return true end --============================================================================== -- DNS --============================================================================== -- define sort order for DNS query results local sortQuery = function(a,b) return a.__balancerSortKey < b.__balancerSortKey end local sorts = { [dns_client.TYPE_A] = function(result) local sorted = {} -- build table with keys for i, v in ipairs(result) do sorted[i] = v v.__balancerSortKey = v.address end -- sort by the keys table_sort(sorted, sortQuery) -- reverse index for i, v in ipairs(sorted) do sorted[v.__balancerSortKey] = i end return sorted end, [dns_client.TYPE_SRV] = function(result) local sorted = {} -- build table with keys for i, v in ipairs(result) do sorted[i] = v v.__balancerSortKey = string_format("%06d:%s:%s", v.priority, v.target, v.port) end -- sort by the keys table_sort(sorted, sortQuery) -- reverse index for i, v in ipairs(sorted) do sorted[v.__balancerSortKey] = i end return sorted end, } sorts[dns_client.TYPE_AAAA] = sorts[dns_client.TYPE_A] -- A and AAAA use the same sorting order sorts = setmetatable(sorts,{ -- all record types not mentioned above are unsupported, throw error __index = function(_, key) error("Unknown/unsupported DNS record type; "..tostring(key)) end, }) local atomic_tracker = setmetatable({},{ __mode = "k" }) local function assert_atomicity(f, self, ...) -- if the following assertion failed, then the function probably yielded and -- allowed other threads to enter simultaneously. -- This was added to prevent issues like -- https://github.com/Kong/lua-resty-dns-client/issues/49 -- to reappear in the future, providing a clear understanding of what is wrong atomic_tracker[self.balancer] = assert(not atomic_tracker[self.balancer], "Failed to run atomically, multiple threads updating balancer simultaneously") local ok, err = f(self, ...) atomic_tracker[self.balancer] = nil return ok, err end -- Timer invoked to update DNS records function resolve_timer_callback(premature) if premature then return end local now = ngx_now() while (renewal_heap:peekValue() or math.huge) < now do local key = renewal_heap:pop() local target = renewal_weak_cache[key] -- can return nil if GC'ed if target then log(DEBUG, "executing requery for: ", target.name) queryDns(target, false) -- timer-context; cacheOnly always false end end if exiting() then return end local err resolve_timer_running, err = ngx.timer.at(1, resolve_timer_callback) if not resolve_timer_running then log(CRIT, "could not reschedule DNS resolver timer: ", err) end end -- schedules a DNS update for a host in the global timer queue. This uses only -- a single timer for all balancers. -- IMPORTANT: this construct should not prevent GC of the Host object local function schedule_dns_renewal(target) local record_expiry = (target.lastQuery or EMPTY).expire or 0 local key, err = get_dns_renewal_key(target) if err then local tgt_name = target.name or target.target or "[empty hostname]" log(ERR, "could not schedule DNS renewal for target ", tgt_name, ":", err) return end -- because of the DNS cache, a stale record will most likely be returned by the -- client, and queryDns didn't do anything, other than start a background renewal -- query. In that case record_expiry is based on the stale old query (lastQuery) -- and it will be in the past. So we schedule a renew at least 0.5 seconds in -- the future, so by then the background query is complete and that second call -- to queryDns will do the actual updates. Without math.max is would create a -- busy loop and hang. local new_renew_at = math.max(ngx_now(), record_expiry) + 0.5 local old_renew_at = renewal_heap:valueByPayload(key) -- always store the host in the registry, because the same key might be reused -- by a new host-object for the same hostname in case of quick delete/add sequence renewal_weak_cache[key] = target if old_renew_at then renewal_heap:update(key, new_renew_at) else renewal_heap:insert(new_renew_at, key) end end local function update_dns_result(target, newQuery) local balancer = target and target.balancer local oldQuery = target.lastQuery or {} local oldSorted = target.lastSorted or {} -- we're using the dns' own cache to check for changes. -- if our previous result is the same table as the current result, then nothing changed if oldQuery == newQuery then log(DEBUG, "no dns changes detected for ", target.name) return true -- exit, nothing changed end -- To detect ttl = 0 we validate both the old and new record. This is done to ensure -- we do not hit the edgecase of https://github.com/Kong/lua-resty-dns-client/issues/51 -- So if we get a ttl=0 twice in a row (the old one, and the new one), we update it. And -- if the very first request ever reports ttl=0 (we assume we're not hitting the edgecase -- in that case) if (newQuery[1] or EMPTY).ttl == 0 and (((oldQuery[1] or EMPTY).ttl or 0) == 0 or oldQuery.__ttl0Flag) then -- ttl = 0 means we need to lookup on every request. -- To enable lookup on each request we 'abuse' a virtual SRV record. We set the ttl -- to `ttl0Interval` seconds, and set the `target` field to the hostname that needs -- resolving. Now `getPeer` will resolve on each request if the target is not an IP address, -- and after `ttl0Interval` seconds we'll retry to see whether the ttl has changed to non-0. -- Note: if the original record is an SRV we cannot use the dns provided weights, -- because we can/are not going to possibly change weights on each request -- so we fix them at the `nodeWeight` property, as with A and AAAA records. if oldQuery.__ttl0Flag then -- still ttl 0 so nothing changed oldQuery.touched = ngx_now() oldQuery.expire = oldQuery.touched + balancer.ttl0Interval log(DEBUG, "no dns changes detected for ", target.name, ", still using ttl=0") return true end log(DEBUG, "ttl=0 detected for ", target.name) newQuery = { { type = dns_client.TYPE_SRV, target = target.name, name = target.name, port = target.port, weight = target.weight, priority = 1, ttl = balancer.ttl0Interval, }, expire = ngx_now() + balancer.ttl0Interval, touched = ngx_now(), __ttl0Flag = true, -- flag marking this record as a fake SRV one } end -- a new dns record, was returned, but contents could still be the same, so check for changes -- sort table in unique order local rtype = (newQuery[1] or EMPTY).type if not rtype then -- we got an empty query table, so assume A record, because it's empty -- all existing addresses will be removed log(DEBUG, "blank dns record for ", target.name, ", assuming A-record") rtype = dns_client.TYPE_A end local newSorted = sorts[rtype](newQuery) local dirty if rtype ~= (oldSorted[1] or EMPTY).type then -- DNS recordtype changed; recycle everything log(DEBUG, "dns record type changed for ", target.name, ", ", (oldSorted[1] or EMPTY).type, " -> ",rtype) for i = #oldSorted, 1, -1 do -- reverse order because we're deleting items balancer:disableAddress(target, oldSorted[i]) end for _, entry in ipairs(newSorted) do -- use sorted table for deterministic order balancer:addAddress(target, entry) end dirty = true else -- new record, but the same type local topPriority = (newSorted[1] or EMPTY).priority -- nil for non-SRV records local done = {} local dCount = 0 for _, newEntry in ipairs(newSorted) do if newEntry.priority ~= topPriority then break end -- exit when priority changes, as SRV only uses top priority local key = newEntry.__balancerSortKey local oldEntry = oldSorted[oldSorted[key] or "__key_not_found__"] if not oldEntry then -- it's a new entry log(DEBUG, "new dns record entry for ", target.name, ": ", (newEntry.target or newEntry.address), ":", newEntry.port) -- port = nil for A or AAAA records balancer:addAddress(target, newEntry) dirty = true else -- it already existed (same ip, port) if newEntry.weight and newEntry.weight ~= oldEntry.weight and not (newEntry.weight == 0 and oldEntry.weight == SRV_0_WEIGHT) then -- weight changed (can only be an SRV) --host:findAddress(oldEntry):change(newEntry.weight == 0 and SRV_0_WEIGHT or newEntry.weight) balancer:changeWeight(target, oldEntry, newEntry.weight == 0 and SRV_0_WEIGHT or newEntry.weight) dirty = true else log(DEBUG, "unchanged dns record entry for ", target.name, ": ", (newEntry.target or newEntry.address), ":", newEntry.port) -- port = nil for A or AAAA records end done[key] = true dCount = dCount + 1 end end if dCount ~= #oldSorted then -- not all existing entries were handled, remove the ones that are not in the -- new query result for _, entry in ipairs(oldSorted) do if not done[entry.__balancerSortKey] then log(DEBUG, "removed dns record entry for ", target.name, ": ", (entry.target or entry.address), ":", entry.port) -- port = nil for A or AAAA records balancer:disableAddress(target, entry) end end dirty = true end end target.lastQuery = newQuery target.lastSorted = newSorted if dirty then -- above we already added and updated records. Removed addresses are disabled, and -- need yet to be deleted from the Host log(DEBUG, "updating balancer based on dns changes for ", target.name) -- allow balancer to update its algorithm balancer:afterHostUpdate(target) -- delete addresses previously disabled balancer:deleteDisabledAddresses(target) end log(DEBUG, "querying dns and updating for ", target.name, " completed") return true end -- Queries the DNS for this hostname. Updates the underlying address objects. -- This method always succeeds, but it might leave the balancer in a 0-weight -- state if none of the hosts resolves. function queryDns(target, cacheOnly) log(DEBUG, "querying dns for ", target.name) -- first thing we do is the dns query, this is the only place we possibly -- yield (cosockets in the dns lib). So once that is done, we're 'atomic' -- again, and we shouldn't have any nasty race conditions. -- Note: the other place we may yield would be the callbacks, who's content -- we do not control, hence they are executed delayed, to ascertain -- atomicity. local newQuery, err, try_list = dns_client.resolve(target.name, nil, cacheOnly) if err then log(WARN, "querying dns for ", target.name, " failed: ", err , ". Tried ", tostring(try_list)) -- query failed, create a fake record -- the empty record will cause all existing addresses to be removed newQuery = { expire = ngx_now() + target.balancer.requeryInterval, touched = ngx_now(), __dnsError = err, } end assert_atomicity(update_dns_result, target, newQuery) schedule_dns_renewal(target) end local function targetExpired(target) return not target.lastQuery or target.lastQuery.expire < ngx_now() end function targets_M.getAddressPeer(address, cacheOnly) if not address.available then return nil, balancers.errors.ERR_ADDRESS_UNAVAILABLE end local target = address.target if targetExpired(target) and not cacheOnly then queryDns(target, cacheOnly) if address.target ~= target then return nil, balancers.errors.ERR_DNS_UPDATED end end if address.ipType == "name" then -- missing classification. (can it be a "name"?) -- SRV type record with a named target local ip, port, try_list = dns_client.toip(address.ip, address.port, cacheOnly) if not ip then port = tostring(port) .. ". Tried: " .. tostring(try_list) return ip, port end return ip, port, address.hostHeader end return address.ip, address.port, address.hostHeader end return targets_M