function _M:communicate()

in kong/kong/clustering/data_plane.lua [91:289]


function _M:communicate(premature)
  if premature then
    
    return
  end

  local conf = self.conf

  local log_suffix = " [" .. conf.cluster_control_plane .. "]"
  local reconnection_delay = math.random(5, 10)

  local c, uri, err = clustering_utils.connect_cp(
                        "/v1/outlet", conf, self.cert, self.cert_key)
  if not c then
    ngx_log(ngx_ERR, _log_prefix, "connection to control plane ", uri, " broken: ", err,
                 " (retrying after ", reconnection_delay, " seconds)", log_suffix)

    assert(ngx.timer.at(reconnection_delay, function(premature)
      self:communicate(premature)
    end))
    return
  end

  
  
  
  local _
  _, err = c:send_binary(cjson_encode({ type = "basic_info",
                                        plugins = self.plugins_list, }))
  if err then
    ngx_log(ngx_ERR, _log_prefix, "unable to send basic information to control plane: ", uri,
                     " err: ", err, " (retrying after ", reconnection_delay, " seconds)", log_suffix)

    c:close()
    assert(ngx.timer.at(reconnection_delay, function(premature)
      self:communicate(premature)
    end))
    return
  end

  local config_semaphore = semaphore.new(0)

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  

  local ping_immediately
  local config_exit
  local next_data

  local config_thread = ngx.thread.spawn(function()
    while not exiting() and not config_exit do
      local ok, err = config_semaphore:wait(1)
      if ok then
        local data = next_data
        if data then
          local msg = assert(inflate_gzip(data))
          yield()
          msg = assert(cjson_decode(msg))
          yield()

          if msg.type == "reconfigure" then
            if msg.timestamp then
              ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane with timestamp: ",
                                 msg.timestamp, log_suffix)

            else
              ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane", log_suffix)
            end

            local config_table = assert(msg.config_table)
            local pok, res
            pok, res, err = pcall(config_helper.update, self.declarative_config,
                                  config_table, msg.config_hash, msg.hashes)
            if pok then
              if not res then
                ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
              end

              ping_immediately = true

            else
              ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
            end

            if next_data == data then
              next_data = nil
            end
          end
        end

      elseif err ~= "timeout" then
        ngx_log(ngx_ERR, _log_prefix, "semaphore wait error: ", err)
      end
    end
  end)

  local write_thread = ngx.thread.spawn(function()
    while not exiting() do
      send_ping(c, log_suffix)

      for _ = 1, PING_INTERVAL do
        ngx_sleep(1)
        if exiting() then
          return
        end
        if ping_immediately then
          ping_immediately = nil
          break
        end
      end
    end
  end)

  local read_thread = ngx.thread.spawn(function()
    local last_seen = ngx_time()
    while not exiting() do
      local data, typ, err = c:recv_frame()
      if err then
        if not is_timeout(err) then
          return nil, "error while receiving frame from control plane: " .. err
        end

        local waited = ngx_time() - last_seen
        if waited > PING_WAIT then
          return nil, "did not receive pong frame from control plane within " .. PING_WAIT .. " seconds"
        end

      else
        if typ == "close" then
          ngx_log(ngx_DEBUG, _log_prefix, "received close frame from control plane", log_suffix)
          return
        end

        last_seen = ngx_time()

        if typ == "binary" then
          next_data = data
          if config_semaphore:count() <= 0 then
            
            
            
            config_semaphore:post()
          end

        elseif typ == "pong" then
          ngx_log(ngx_DEBUG, _log_prefix, "received pong frame from control plane", log_suffix)

        else
          ngx_log(ngx_NOTICE, _log_prefix, "received unknown (", tostring(typ), ") frame from control plane",
                              log_suffix)
        end
      end
    end
  end)

  local ok, err, perr = ngx.thread.wait(read_thread, write_thread, config_thread)

  ngx.thread.kill(read_thread)
  ngx.thread.kill(write_thread)
  c:close()

  if not ok then
    ngx_log(ngx_ERR, _log_prefix, err, log_suffix)

  elseif perr then
    ngx_log(ngx_ERR, _log_prefix, perr, log_suffix)
  end

  
  
  config_exit = true

  ok, err, perr = ngx.thread.wait(config_thread)
  if not ok then
    ngx_log(ngx_ERR, _log_prefix, err, log_suffix)

  elseif perr then
    ngx_log(ngx_ERR, _log_prefix, perr, log_suffix)
  end

  if not exiting() then
    assert(ngx.timer.at(reconnection_delay, function(premature)
      self:communicate(premature)
    end))
  end
end