deploy/docker/cp-edge/sync-routes.py (948 lines of code) (raw):
# Copyright 2017-2023 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import glob
import json
import os
import re
import subprocess
from datetime import datetime
from multiprocessing.pool import ThreadPool as Pool
import requests
import time
import urllib3
CP_CAP_CUSTOM_ENDPOINT_PREFIX = 'CP_CAP_CUSTOM_TOOL_ENDPOINT_'
CP_EDGE_ENDPOINT_TAG_NAME = 'CP_EDGE_ENDPOINT_TAG_NAME'
try:
from pykube.config import KubeConfig
from pykube.http import HTTPClient
from pykube.http import HTTPError
from pykube.objects import Pod
from pykube.objects import Service
from pykube.objects import Event
except ImportError:
raise RuntimeError('pykube is not installed. KubernetesJobTask requires pykube.')
SVC_PORT_TMPL = 'svc-port-'
SVC_PATH_TMPL = 'svc-path-'
SVC_URL_TMPL = '{{ ' \
'"url" : "{external_schema}://{external_ip}:{edge_port}/{edge_location}", ' \
'"name": "{service_name}", ' \
'"isDefault": {is_default_endpoint}, ' \
'"sameTab": {is_same_tab}, ' \
'"customDNS": {is_custom_dns}, ' \
'"regionId": {region_id} ' \
'}}'
ROUTE_ID_TMPL = '{pod_id}-{endpoint_port}-{endpoint_num}'
ROUTE_ID_PATTERN = '^(.*)-(\d+)-(\d+)$'
EDGE_ROUTE_TARGET_TMPL = '{pod_ip}:{endpoint_port}'
EDGE_ROUTE_TARGET_PATH_TMPL = '{pod_ip}:{endpoint_port}/{endpoint_path}'
EDGE_ROUTE_NO_PATH_CROP = 'CP_EDGE_NO_PATH_CROP'
EDGE_ROUTE_CREATE_DNS = 'CP_EDGE_ROUTE_CREATE_DNS'
EDGE_COOKIE_NO_REPLACE = 'CP_EDGE_COOKIE_NO_REPLACE'
EDGE_JWT_NO_AUTH = 'CP_EDGE_JWT_NO_AUTH'
EDGE_PASS_BEARER = 'CP_EDGE_PASS_BEARER'
# This can be used to add any extra option to the bearer cookie generated
# e.g. Secure;SameSite=None;Partitioned;
# Which allows external services to use the cookie (used for SSO integration with EDGE)
EDGE_BEARER_COOKIE_EXTRA = os.getenv('CP_EDGE_BEARER_COOKIE_EXTRA', '')
EDGE_DNS_RECORD_FORMAT = os.getenv('CP_EDGE_DNS_RECORD_FORMAT', '{job_name}.{region_name}')
EDGE_DISABLE_NAME_SUFFIX_FOR_DEFAULT_ENDPOINT = os.getenv('EDGE_DISABLE_NAME_SUFFIX_FOR_DEFAULT_ENDPOINT', 'True').lower() == 'true'
EDGE_EXTERNAL_APP = 'CP_EDGE_EXTERNAL_APP'
EDGE_INSTANCE_IP = 'CP_EDGE_INSTANCE_IP'
RUN_ID = 'runid'
API_UPDATE_SVC = 'run/{run_id}/serviceUrl?region={region}'
API_GET_RUNS_LIST_DETAILS = 'runs?runIds={run_ids}'
API_POST_DNS_RECORD = 'cluster/dnsrecord'
API_GET_PREF = 'preferences/{preference_name}'
NUMBER_OF_RETRIES = 10
SECS_TO_WAIT_BEFORE_RETRY = 15
STUB_LOCATION_CONFIG_EXTENSION = '.stub.loc.conf'
STUB_CUSTOM_DOMAIN_EXTENSION = '.stub.conf'
EDGE_SVC_ROLE_LABEL = 'cloud-pipeline/role'
EDGE_SVC_ROLE_LABEL_VALUE = 'EDGE'
EDGE_SVC_HOST_LABEL = 'cloud-pipeline/external-host'
EDGE_SVC_PORT_LABEL = 'cloud-pipeline/external-port'
EDGE_SVC_REGION_LABEL = 'cloud-pipeline/region'
nginx_custom_domain_config_ext = '.srv.conf'
nginx_custom_domain_loc_suffix = 'CP_EDGE_CUSTOM_DOMAIN'
nginx_custom_domain_loc_tmpl = 'include {}; # ' + nginx_custom_domain_loc_suffix
nginx_root_config_path = '/etc/nginx/nginx.conf'
nginx_sites_path = '/etc/nginx/sites-enabled'
nginx_domains_path = '/etc/nginx/sites-enabled/custom-domains'
external_apps_domains_path = '/etc/nginx/external-apps'
api_domain_path = '/etc/nginx/ingress/cp-api-srv.conf'
nginx_loc_module_template = '/etc/nginx/endpoints-config/route.template.loc.conf'
nginx_srv_module_template = '/etc/nginx/endpoints-config/route.template' + nginx_custom_domain_config_ext
nginx_sensitive_loc_module_template = '/etc/nginx/endpoints-config/sensitive.template.loc.conf'
nginx_loc_module_stub_template = '/etc/nginx/endpoints-config/route.template.stub.loc.conf'
nginx_sensitive_routes_config_path = '/etc/nginx/endpoints-config/sensitive.routes.json'
nginx_system_endpoints_config_path = '/etc/nginx/endpoints-config/system_endpoints.json'
nginx_default_location_attributes_path = '/etc/nginx/endpoints-config/default_location_attributes.json'
edge_service_port = 31000
edge_service_external_ip = ''
pki_search_path = '/etc/edge/pki/'
pki_search_suffix_cert = '-public-cert.pem'
pki_search_suffix_key = '-private-key.pem'
pki_default_cert = '/etc/edge/pki/ssl-public-cert.pem'
pki_default_cert_key = '/etc/edge/pki/ssl-private-key.pem'
DATE_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
DEFAULT_LOCATION_ATTRIBUTES = []
if os.path.exists(nginx_default_location_attributes_path):
try:
with open(nginx_default_location_attributes_path) as location_attributes_fd:
DEFAULT_LOCATION_ATTRIBUTES = json.load(location_attributes_fd)
except Exception as loc_attr_read_exception:
print('An error occured while reading default location attributes: {}'.format(loc_attr_read_exception))
else:
print('Default location attributes config was not found at {}'.format(nginx_default_location_attributes_path))
urllib3.disable_warnings()
api_url = os.environ.get('API')
api_token = os.environ.get('API_TOKEN')
api_domain_name = os.environ.get('CP_API_SRV_EXTERNAL_HOST')
if not api_domain_name:
api_domain_name = os.environ.get('CP_API_SRV_INTERNAL_HOST')
if not api_url or not api_token:
print('API url or API token are not set. Exiting')
exit(1)
edge_service_external_schema=os.environ.get('EDGE_EXTERNAL_SCHEMA', 'https')
api_headers = {'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(api_token)}
pool_size = 8
dns_services_pool = Pool(pool_size)
class ServiceEndpoint:
def __init__(self, num, port, path, additional):
self.num = num
self.port = port
self.path = path
self.additional = additional
def do_log(msg):
print('[{}] {}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), msg))
def call_api(method_url, data=None):
result = None
for n in range(NUMBER_OF_RETRIES):
try:
do_log('Calling API {}'.format(method_url))
if data:
response = requests.post(method_url, verify=False, data=data, headers=api_headers)
else:
response = requests.get(method_url, verify=False, headers=api_headers)
response_data = json.loads(response.text)
if response_data['status'] == 'OK':
do_log('Calling API ... OK')
result = response_data
else:
err_msg = 'No error message available'
if 'message' in response_data:
err_msg = response_data['message']
do_log('Calling API ... NOT OK ({})\n{}'.format(method_url, err_msg))
do_log('As the API technically succeeded, it will not be retried')
break
except Exception as api_exception:
do_log('Calling API ... NOT OK ({})\n{}'.format(method_url, str(api_exception)))
if n < NUMBER_OF_RETRIES - 1:
do_log('Sleep for {} sec and perform API call again ({}/{})'.format(SECS_TO_WAIT_BEFORE_RETRY, n + 2, NUMBER_OF_RETRIES))
time.sleep(SECS_TO_WAIT_BEFORE_RETRY)
else:
do_log('All attempts failed. API call failed')
return result
# todo: Use RunLogger from pipe commons instead
class RunLogger:
def __init__(self, run_id, task_name):
self.run_id = run_id
self.task_name = task_name
def info(self, message):
self._log(message=message, status='RUNNING')
def warning(self, message):
self._log(message='\033[93m' + message + '\033[0m', status='RUNNING')
def success(self, message):
self._log(message='\033[92m' + message + '\033[0m', status='SUCCESS')
def _log(self, message, status):
do_log("Log run log: " + message)
now = datetime.utcfromtimestamp(time.time()).strftime(DATE_FORMAT)
date = now[0:len(now) - 3]
log_entry = json.dumps({"runId": self.run_id,
"date": date,
"status": status,
"logText": message,
"taskName": self.task_name})
call_api(os.path.join(api_url, "run/{run_id}/log".format(run_id=self.run_id)), data=log_entry)
def run_sids_to_str(run_sids, is_principal):
if not run_sids:
return ""
return ",".join([shared_sid["name"] for shared_sid in run_sids if shared_sid["isPrincipal"] == is_principal])
def parse_pretty_url(pretty):
try:
pretty_obj = json.loads(pretty)
if not pretty_obj:
return None
# todo: Use only specific exception types
except Exception:
pretty_obj = { 'path': pretty }
pretty_domain = None
pretty_path = None
if 'domain' in pretty_obj and pretty_obj['domain']:
pretty_domain = pretty_obj['domain']
if 'path' in pretty_obj:
pretty_path = pretty_obj['path']
if pretty_path.startswith('/'):
pretty_path = pretty_path[len('/'):]
if not pretty_domain and not pretty_path:
return None
else:
return { 'domain': pretty_domain, 'path': pretty_path }
def substr_indices(lines, substr):
return [i for i, line in enumerate(lines) if substr in line]
def store_file_from_lines(lines, path):
with open(path, 'w') as path_file:
path_file.write('\n'.join(lines))
def get_domain_config_path(domain, is_external_app=False):
if domain == api_domain_name:
return api_domain_path
else:
domains_path = external_apps_domains_path if is_external_app else nginx_domains_path
return os.path.join(domains_path, domain + nginx_custom_domain_config_ext)
def add_custom_domain(domain, location_block, is_external_app=False):
if not os.path.isdir(nginx_domains_path):
os.mkdir(nginx_domains_path)
domain_path = get_domain_config_path(domain, is_external_app=is_external_app)
domain_cert = search_custom_domain_cert(domain)
if os.path.exists(domain_path):
do_log('-> Adding new location block to existing configuration file at {}'.format(domain_path))
with open(domain_path, 'r') as domain_path_file:
domain_path_contents = domain_path_file.read()
else:
do_log('-> Creating new custom domain configuration file at {}'.format(domain_path))
with open(nginx_srv_module_template, 'r') as nginx_srv_module_template_file:
domain_path_contents = nginx_srv_module_template_file.read()
domain_path_contents = domain_path_contents \
.replace('{edge_route_server_name}', domain) \
.replace('{edge_route_server_ssl_certificate}', domain_cert[0]) \
.replace('{edge_route_server_ssl_certificate_key}', domain_cert[1])
location_block_include = nginx_custom_domain_loc_tmpl.format(location_block)
domain_path_lines = domain_path_contents.splitlines()
# Check if the location_block already added to the domain config
existing_loc = substr_indices(domain_path_lines, location_block_include)
if existing_loc:
do_log('-> Location block {} already exists for domain {}'.format(location_block, domain))
return
# If it's a new location entry - add it to the domain config after the {edge_route_location_block} line
insert_loc = substr_indices(domain_path_lines, '# {edge_route_location_block}')
if not insert_loc:
do_log('-> Cannot find an insert location in the domain config {}'.format(domain_path))
return
domain_path_lines.insert(insert_loc[-1] + 1, location_block_include)
# Save the domain config back to file
store_file_from_lines(domain_path_lines, domain_path)
def remove_custom_domain(domain, location_block, is_external_app=False):
location_block_include = nginx_custom_domain_loc_tmpl.format(location_block)
domain_path = get_domain_config_path(domain, is_external_app=is_external_app)
if not os.path.exists(domain_path):
return False
domain_path_lines = []
with open(domain_path, 'r') as domain_path_file:
domain_path_contents = domain_path_file.read()
domain_path_lines = domain_path_contents.splitlines()
existing_loc = substr_indices(domain_path_lines, location_block_include)
if not existing_loc:
return False
del domain_path_lines[existing_loc[-1]]
if (not is_external_app and domain_path != api_domain_path and sum(nginx_custom_domain_loc_suffix in line for line in domain_path_lines) == 0):
# If no more location block exist in the domain - delete the config file
# Do not delete if this is an "external application", where the server block is managed externally
do_log('-> No more location blocks are available for {}, deleting the config file: {}'.format(domain, domain_path))
os.remove(domain_path)
else:
# Save the domain config back to file
store_file_from_lines(domain_path_lines, domain_path)
return True
def remove_custom_domain_all(location_block):
if api_domain_name:
if remove_custom_domain(api_domain_name, location_block, is_external_app=False):
do_log('-> Removed {} location block from the API domain config {}'.format(location_block, api_domain_path))
for domains_root_path in [ nginx_domains_path, external_apps_domains_path ]:
domain_path_list = [f for f in glob.glob(domains_root_path + '/*' + nginx_custom_domain_config_ext)]
for domain_path in domain_path_list:
custom_domain = os.path.basename(domain_path).replace(nginx_custom_domain_config_ext, '')
is_external_app = domains_root_path == external_apps_domains_path
if remove_custom_domain(custom_domain, location_block, is_external_app=is_external_app):
do_log('-> Removed {} location block from {} domain config'.format(location_block, custom_domain))
def search_custom_domain_cert(domain):
domain_cert_list = [f for f in glob.glob(pki_search_path + '/*' + pki_search_suffix_cert)]
domain_cert_candidates = []
for cert_path in domain_cert_list:
cert_name = os.path.basename(cert_path).replace(pki_search_suffix_cert, '')
if domain.endswith(cert_name):
domain_cert_candidates.append(cert_name)
cert_path = None
key_path = None
if domain_cert_candidates:
domain_cert_candidates.sort(key=len, reverse=True)
cert_name = domain_cert_candidates[0]
cert_path = os.path.join(pki_search_path, cert_name + pki_search_suffix_cert)
key_path = os.path.join(pki_search_path, cert_name + pki_search_suffix_key)
if not os.path.isfile(key_path):
do_log('-> Certificate for {} is found at {}, but a key does not exist at {}'.format(domain, cert_path, key_path))
key_path = None
if not cert_path or not key_path:
cert_path = pki_default_cert
key_path = pki_default_cert_key
do_log('-> Certificate:Key for {} will be used: {}:{}'.format(domain, cert_path, key_path))
return cert_path, key_path
def read_system_endpoints():
system_endpoints = {}
with open(nginx_system_endpoints_config_path, 'r') as system_endpoints_file:
system_endpoints_list = json.load(system_endpoints_file)
for endpoint in system_endpoints_list:
system_endpoints[endpoint['name']] = {
"value": endpoint['value'] if 'value' in endpoint else "true",
"endpoint": str(os.environ.get(endpoint['endpoint_env'],
endpoint['endpoint_default'])),
"endpoint_num": str(os.environ.get(endpoint['endpoint_num_env'],
endpoint['endpoint_num_default'])),
"friendly_name": endpoint['friendly_name'],
"endpoint_additional": endpoint['endpoint_additional'] if 'endpoint_additional' in endpoint else '',
"endpoint_same_tab": endpoint['endpoint_same_tab'] if 'endpoint_same_tab' in endpoint else None,
"ssl_backend": endpoint['ssl_backend'] if 'ssl_backend' in endpoint else None
}
return system_endpoints
SYSTEM_ENDPOINTS = read_system_endpoints()
SYSTEM_ENDPOINTS_NAMES = [endpoint['friendly_name'] for endpoint in SYSTEM_ENDPOINTS.values()]
def is_system_endpoint_name(endpoint):
if endpoint and "name" in endpoint and endpoint["name"]:
return endpoint["name"] in SYSTEM_ENDPOINTS_NAMES
else:
return False
# Function to construct endpoint was configured with Run Parameters.
# Group of Run Parameters started with CP_CAP_CUSTOM_TOOL_ENDPOINT_<num> considered as configuration of additional endpoint
# that should be available for this run. Full list of supported params are:
#
# CP_CAP_CUSTOM_TOOL_ENDPOINT_<num>_PORT
# CP_CAP_CUSTOM_TOOL_ENDPOINT_<num>_NAME
# CP_CAP_CUSTOM_TOOL_ENDPOINT_<num>_ADDITIONAL
# CP_CAP_CUSTOM_TOOL_ENDPOINT_<num>_NUM
# CP_CAP_CUSTOM_TOOL_ENDPOINT_<num>_SSL_BACKEND
# CP_CAP_CUSTOM_TOOL_ENDPOINT_<num>_SAME_TAB
#
# Method will group such parametes by <num> and construct from such group an endpoint.
def construct_additional_endpoints_from_run_parameters(run_details):
def extract_endpoint_num_from_run_parameter(run_parameter):
match = re.search('{}(\d+).*'.format(CP_CAP_CUSTOM_ENDPOINT_PREFIX), run_parameter["name"])
if match:
return match.group(1)
return None
custom_endpoint_run_parameters = [rp for rp in run_details["pipelineRunParameters"]
if rp["name"].startswith(CP_CAP_CUSTOM_ENDPOINT_PREFIX)]
if not custom_endpoint_run_parameters:
return []
custom_endpoints_nums = set([CP_CAP_CUSTOM_ENDPOINT_PREFIX + extract_endpoint_num_from_run_parameter(rp)
for rp in custom_endpoint_run_parameters])
do_log('Detected {} custom endpoints groups: {}.'
.format(len(custom_endpoint_run_parameters), ", ".join(str(num) for num in custom_endpoints_nums)))
custom_endpoint_param_groups = {
id : {
rp["name"] : rp["value"]
for rp in custom_endpoint_run_parameters if rp["name"].startswith(id)
} for id in custom_endpoints_nums
}
return [
{
"name" : e_id,
"endpoint": e.get(e_id + "_PORT"),
"friendly_name": e.get(e_id + "_NAME", "pipeline-" + str(run_details['id']) + "-" + e.get(e_id + "_PORT")),
"endpoint_additional": e.get(e_id + "_ADDITIONAL", ""),
"ssl_backend": e.get(e_id + "_SSL_BACKEND", False),
"endpoint_same_tab": e.get(e_id + "_SAME_TAB", False)
} for e_id, e in custom_endpoint_param_groups.items()
]
def match_sys_endpoint_value(param_value, endpoint_value):
if not param_value or not endpoint_value:
return False
if param_value.lower() == endpoint_value.lower():
return True
# This way we can set envpoint value to boolean expressions, e.g. ">0"
if not endpoint_value.isalnum():
try:
return eval(param_value + endpoint_value)
except:
return False
return False
def append_additional_endpoints(tool_endpoints, run_details):
if not tool_endpoints:
tool_endpoints = []
system_endpoints_params = SYSTEM_ENDPOINTS.keys()
overridden_endpoints_count = 0
if run_details and "pipelineRunParameters" in run_details:
# Get a list of endpoints from SYSTEM_ENDPOINTS which match the run's parameters (param name and a value)
additional_endpoints_to_configure = [SYSTEM_ENDPOINTS[x["name"]] for x in run_details["pipelineRunParameters"]
if x["name"] in system_endpoints_params
and match_sys_endpoint_value(x["value"], SYSTEM_ENDPOINTS[x["name"]]["value"])
and "endpoint" in SYSTEM_ENDPOINTS[x["name"]]
and SYSTEM_ENDPOINTS[x["name"]]["endpoint"]]
additional_endpoint_ports_to_configure = set([e["endpoint"] for e in additional_endpoints_to_configure])
# Filter out any endpoint if it matches with system ones
for custom_endpoint in construct_additional_endpoints_from_run_parameters(run_details):
if custom_endpoint["endpoint"] in additional_endpoint_ports_to_configure:
do_log('Endpoint {} with port: {} conflict with already configured ones, it will be filtered out.'
.format(custom_endpoint["name"], custom_endpoint["endpoint"]))
continue
# Append additional custom endpoint that are configured with run parameters
additional_endpoints_to_configure.append(custom_endpoint)
additional_endpoint_ports_to_configure.add(custom_endpoint["endpoint"])
# If only a single endpoint is defined for the tool - we shall make sure it is set to default. Otherwise "system endpoint" may become a default one
# If more then one endpoint is defined - we shall not make the changes, as it is up to the owner of the tool
if additional_endpoints_to_configure and len(tool_endpoints) == 1:
current_tool_endpoint = json.loads(tool_endpoints[0])
current_tool_endpoint["isDefault"] = "true"
tool_endpoints[0] = json.dumps(current_tool_endpoint)
# Append additional endpoints to the existing list
for additional_endpoint in additional_endpoints_to_configure:
tool_endpoint = { "nginx": { "port": additional_endpoint["endpoint"], "additional": additional_endpoint["endpoint_additional"] }}
system_endpoint_port = additional_endpoint["endpoint"]
system_endpoint_ssl_backend = additional_endpoint["ssl_backend"]
system_endpoint_same_tab = additional_endpoint["endpoint_same_tab"]
system_endpoint_name = None
if "friendly_name" in additional_endpoint:
tool_endpoint["name"] = additional_endpoint["friendly_name"]
system_endpoint_name = additional_endpoint["friendly_name"]
if "endpoint_num" in additional_endpoint and additional_endpoint["endpoint_num"]:
tool_endpoint["endpoint_num"] = additional_endpoint["endpoint_num"]
non_matching_with_system_tool_endpoints, \
is_default_endpoint, \
is_ssl_backend, \
is_same_tab = \
remove_from_tool_endpoints_if_fully_matches(system_endpoint_name,
system_endpoint_port, tool_endpoints)
removed_endpoints_count = len(tool_endpoints) - len(non_matching_with_system_tool_endpoints)
tool_endpoint["isDefault"] = str(is_default_endpoint).lower()
tool_endpoint["sslBackend"] = system_endpoint_ssl_backend if system_endpoint_ssl_backend else is_ssl_backend
tool_endpoint["sameTab"] = system_endpoint_same_tab if system_endpoint_same_tab else is_same_tab
if removed_endpoints_count != 0:
tool_endpoints = non_matching_with_system_tool_endpoints
overridden_endpoints_count += removed_endpoints_count
tool_endpoints.append(json.dumps(tool_endpoint))
return tool_endpoints, overridden_endpoints_count
def remove_from_tool_endpoints_if_fully_matches(endpoint_name, endpoint_port, tool_endpoints):
non_matching_tool_endpoints = []
is_default_endpoint = False
is_ssl_backend = False
is_same_tab = False
for endpoint in tool_endpoints:
tool_endpoint_obj = json.loads(endpoint)
if tool_endpoint_obj \
and endpoint_name \
and 'name' in tool_endpoint_obj \
and tool_endpoint_obj['name'] \
and tool_endpoint_obj['name'].lower() == endpoint_name.lower() \
and 'nginx' in tool_endpoint_obj \
and tool_endpoint_obj['nginx'] \
and 'port' in tool_endpoint_obj['nginx'] \
and tool_endpoint_obj['nginx']['port'] == endpoint_port:
if 'isDefault' in tool_endpoint_obj and tool_endpoint_obj['isDefault']:
is_default_endpoint = is_default_endpoint | tool_endpoint_obj['isDefault']
if 'sslBackend' in tool_endpoint_obj and tool_endpoint_obj['sslBackend']:
is_ssl_backend = is_ssl_backend | tool_endpoint_obj['sslBackend']
if 'sameTab' in tool_endpoint_obj and tool_endpoint_obj['sameTab']:
is_same_tab = is_same_tab | tool_endpoint_obj['sameTab']
else:
non_matching_tool_endpoints.append(endpoint)
return non_matching_tool_endpoints, is_default_endpoint, is_ssl_backend, is_same_tab
def get_active_runs(pods):
if not pods:
return []
pod_run_ids = [x['metadata']['labels']['runid'] for x in pods]
get_runs_list_details_method = os.path.join(api_url,
API_GET_RUNS_LIST_DETAILS.format(run_ids=','.join(pod_run_ids)))
response_data = call_api(get_runs_list_details_method)
if not response_data or 'payload' not in response_data:
do_log('Cannot get list of active runs from the API for the following IDs: {}'.format(pod_run_ids))
return []
return response_data["payload"]
def get_service_list(active_runs_list, pod_id, pod_run_id, pod_ip):
service_list = {}
run_cache = [cached_run for cached_run in active_runs_list if str(cached_run['pipelineRun']['id']) == str(pod_run_id)]
run_cache = next(iter(run_cache), None)
if not run_cache:
do_log('Cannot find the RunID {} in the list of cached runs, skipping'.format(pod_run_id))
return {}
run_info = run_cache['pipelineRun']
if run_info:
if run_info.get("status") != 'RUNNING':
do_log('Status for pipeline with id: {}, is not RUNNING. Service urls will not be proxied'.format(pod_run_id))
return {}
if 'pipelineRunParameters' in run_info:
edge_endpoint_tag_name = [rp for rp in run_info["pipelineRunParameters"]
if 'name' in rp and rp["name"] == CP_EDGE_ENDPOINT_TAG_NAME]
if edge_endpoint_tag_name and len(edge_endpoint_tag_name) > 0:
run_info_tags = run_info.get("tags")
if not (run_info_tags and "value" in edge_endpoint_tag_name[0] and run_info_tags.get(edge_endpoint_tag_name[0]["value"])):
do_log('Pipeline with id {} and run tag {} has not yet been initialized. '
'Service urls will not be proxied'
.format(pod_run_id, edge_endpoint_tag_name[0]["value"]))
return {}
pod_owner = run_info["owner"]
docker_image = run_info["dockerImage"]
runs_sids = run_info.get("runSids")
pretty_url = run_info.get("prettyUrl")
pretty_url = parse_pretty_url(pretty_url) if pretty_url else None
sensitive = run_info.get("sensitive") or False
cloud_region_id = run_info.get("instance", {}).get("cloudRegionId") or None
instance_ip = run_info.get("instance", {}).get("nodeIP") or None
do_log('Processing {} #{} by {} ({})...'.format(pod_id, pod_run_id, pod_owner, docker_image))
shared_users_sids = run_sids_to_str(runs_sids, True)
if shared_users_sids:
do_log('Detected shared user sids: {}'.format(shared_users_sids))
shared_groups_sids = run_sids_to_str(runs_sids, False)
if shared_groups_sids:
do_log('Detected shared group sids: {}'.format(shared_groups_sids))
endpoints_data = run_cache.get('tool', {}).get('endpoints') or []
tool_endpoints_count = len(endpoints_data)
do_log('Detected {} tool settings endpoints.'.format(tool_endpoints_count))
endpoints_data, overridden_endpoints_count = append_additional_endpoints(endpoints_data, run_info)
additional_system_endpoints_count = len(endpoints_data) - tool_endpoints_count
do_log('Detected {} run parameters endpoints.'.format(additional_system_endpoints_count))
if overridden_endpoints_count:
do_log('Detected {} overridden tool settings endpoints.'.format(overridden_endpoints_count))
if endpoints_data:
endpoints_count = len(endpoints_data)
for i in range(endpoints_count):
endpoint = None
try:
endpoint = json.loads(endpoints_data[i])
except Exception as endpoint_parse_exception:
do_log('Parsing endpoint #{} failed:\n{}'.format(str(i), str(endpoint_parse_exception)))
continue
if endpoint["nginx"]:
port = endpoint["nginx"]["port"]
path = endpoint["nginx"].get("path", "")
service_name = endpoint.get("name", "Default")
is_default_endpoint = endpoint.get("isDefault", False)
is_ssl_backend = endpoint.get("sslBackend", False)
is_same_tab = endpoint.get("sameTab", False)
create_dns_record = endpoint.get("customDNS", False)
additional = endpoint["nginx"].get("additional", "")
has_explicit_endpoint_num = "endpoint_num" in endpoint.keys()
custom_endpoint_num = int(endpoint["endpoint_num"]) if has_explicit_endpoint_num else i
edge_location_id = ROUTE_ID_TMPL.format(pod_id=pod_id, endpoint_port=port, endpoint_num=custom_endpoint_num)
if not pretty_url or (has_explicit_endpoint_num and not is_system_endpoint_name(endpoint)):
edge_location = edge_location_id
else:
pretty_url_path = pretty_url["path"]
if endpoints_count == 1 or (str(is_default_endpoint).lower() == "true" and EDGE_DISABLE_NAME_SUFFIX_FOR_DEFAULT_ENDPOINT):
edge_location = pretty_url_path
else:
pretty_url_suffix = endpoint["name"] if "name" in endpoint.keys() else str(custom_endpoint_num)
if pretty_url_path:
edge_location = '{}-{}'.format(pretty_url_path, pretty_url_suffix)
else:
edge_location = pretty_url_suffix
if (pretty_url and pretty_url['domain']) or create_dns_record:
edge_location_path = edge_location_id + '.inc'
else:
edge_location_path = edge_location_id + '.loc'
if EDGE_INSTANCE_IP in additional:
additional = additional.replace(EDGE_INSTANCE_IP, "")
target_ip = instance_ip
else:
target_ip = pod_ip
edge_target = \
EDGE_ROUTE_TARGET_PATH_TMPL.format(pod_ip=target_ip, endpoint_port=port, endpoint_path=path) \
if path \
else EDGE_ROUTE_TARGET_TMPL.format(pod_ip=target_ip, endpoint_port=port)
# If CP_EDGE_NO_PATH_CROP is present (any place) in the "additional" section of the route config
# then trailing "/" is not added to the proxy pass target. This will allow to forward original requests trailing path
if EDGE_ROUTE_NO_PATH_CROP in additional:
additional = additional.replace(EDGE_ROUTE_NO_PATH_CROP, "")
else:
edge_target = edge_target + "/"
if EDGE_COOKIE_NO_REPLACE in additional:
additional = additional.replace(EDGE_COOKIE_NO_REPLACE, "")
edge_cookie_location = "/"
else:
edge_cookie_location = None
#######################################################
# These parameters will be passed to the respective lua auth script
# Only applied for the non-sensitive jobs
edge_jwt_auth = True
if EDGE_JWT_NO_AUTH in additional:
additional = additional.replace(EDGE_JWT_NO_AUTH, "")
edge_jwt_auth = False
edge_pass_bearer = False
if EDGE_PASS_BEARER in additional:
additional = additional.replace(EDGE_PASS_BEARER, "")
edge_pass_bearer = True
#######################################################
is_external_app = False
if EDGE_EXTERNAL_APP in additional:
additional = additional.replace(EDGE_EXTERNAL_APP, "")
is_external_app = True
for default_attribute in DEFAULT_LOCATION_ATTRIBUTES:
if 'search_pattern' not in default_attribute or 'value' not in default_attribute:
continue
if default_attribute['search_pattern'].lower() not in additional.lower():
additional = additional + default_attribute['value']
service_list[edge_location_id] = {
"edge_location_path": edge_location_path,
"pod_id": pod_id,
"pod_ip": target_ip,
"pod_owner": pod_owner,
"shared_users_sids": shared_users_sids,
"shared_groups_sids": shared_groups_sids,
"service_name": service_name,
"is_default_endpoint": is_default_endpoint,
"is_ssl_backend": is_ssl_backend,
"is_same_tab": is_same_tab,
"edge_num": i,
"edge_location": edge_location,
"custom_domain": pretty_url.get('domain') if pretty_url else None,
"edge_target": edge_target,
"run_id": pod_run_id,
"additional": additional,
"sensitive": sensitive,
"create_dns_record": create_dns_record,
"cloudRegionId": cloud_region_id,
"external_app": is_external_app,
"cookie_location": edge_cookie_location,
"edge_jwt_auth": edge_jwt_auth,
"edge_pass_bearer": edge_pass_bearer
}
else:
do_log('No endpoints required for the tool {}'.format(docker_image))
else:
do_log('Unable to get details of a RunID {} from API due to errors'.format(pod_run_id))
return service_list
# From each pod with a container, which has endpoints ("job-type=Service" or container's environment
# has a parameter from SYSTEM_ENDPOINTS) we shall take:
# -- PodIP
# -- PodID
# -- N entries by a template
# --- svc-port-N
# --- svc-path-N
def load_pods_for_runs_with_endpoints():
pods_with_endpoints = []
all_pipeline_pods = Pod.objects(kube_api).filter(selector={'type': 'pipeline'})\
.filter(field_selector={"status.phase": "Running"})
for pod in all_pipeline_pods.response['items']:
labels = pod['metadata']['labels']
if 'job-type' in labels and labels['job-type'] == 'Service':
pods_with_endpoints.append(pod)
continue
if 'spec' in pod \
and pod['spec'] \
and 'containers' in pod['spec'] \
and pod['spec']['containers'] \
and 'env' in pod['spec']['containers'][0] \
and pod['spec']['containers'][0]['env']:
pipeline_env_parameters = pod['spec']['containers'][0]['env']
matched_sys_endpoints = list(filter(lambda env_var: env_var['name'] in SYSTEM_ENDPOINTS.keys()
and match_sys_endpoint_value(env_var['value'], SYSTEM_ENDPOINTS[env_var["name"]]["value"]),
pipeline_env_parameters))
if matched_sys_endpoints:
pods_with_endpoints.append(pod)
return pods_with_endpoints
def create_dns_record(service_spec, edge_region_id, edge_region_name):
run_logger = RunLogger(run_id=service_spec["run_id"], task_name='CreateDNSRecord')
dns_custom_record = EDGE_DNS_RECORD_FORMAT.format(job_name=service_spec["edge_location"],
region_name=edge_region_name)
dns_record_create = os.path.join(api_url, API_POST_DNS_RECORD)
if edge_region_id:
dns_record_create += "?regionId=" + edge_region_id
data = json.dumps({
'dnsRecord': dns_custom_record,
'target': edge_service_external_ip,
'format': 'RELATIVE'
})
run_logger.info('Creating DNS record {}...'.format(dns_custom_record))
dns_record_create_response = call_api(dns_record_create, data) or {}
dns_record_create_response_payload = dns_record_create_response.get('payload', {})
dns_record_status = dns_record_create_response_payload.get('status')
dns_record_domain = dns_record_create_response_payload.get('dnsRecord')
if dns_record_status != 'INSYNC':
run_logger.warning('Failed to create DNS record {}'.format(dns_custom_record))
raise ValueError('Fail to create DNS record {} for run #{}'
.format(dns_custom_record, service_spec["run_id"]))
run_logger.success('Created DNS record {}'.format(dns_record_domain))
service_spec["custom_domain"] = dns_record_domain
service_spec["edge_location"] = None
def create_service_dns_record(service_spec, route, edge_region_id, edge_region_name):
if skip_custom_dns:
do_log('Skipping custom DNS record creation for domain {}'.format(dns_domain))
dns_custom_record = EDGE_DNS_RECORD_FORMAT.format(job_name=service_spec["edge_location"],
region_name=edge_region_name)
dns_record_domain = dns_custom_record + '.' + dns_domain
do_log('Setting expected DNS as {}'.format(dns_record_domain))
service_spec["custom_domain"] = dns_record_domain
service_spec["edge_location"] = None
return route
try:
do_log('Creating DNS record for {}'.format(route))
create_dns_record(service_spec, edge_region_id, edge_region_name)
do_log("Creating DNS record for {} ... OK ".format(route))
return route
except ValueError as e:
do_log("Creating DNS record for {} ... NOT OK ({})".format(route, str(e)))
return None
def create_service_location(service_spec, service_url_dict, edge_region_id):
has_custom_domain = service_spec["custom_domain"] is not None
service_hostname = service_spec["custom_domain"] if has_custom_domain else edge_service_external_ip
service_location = '/{}/'.format(service_spec["edge_location"]) if service_spec["edge_location"] else "/"
# Replace the duplicated forward slashes with a single instance to workaround possible issue when the location is set to "/path//"
service_location = re.sub('/+', '/', service_location)
nginx_route_definition = nginx_loc_module_template_contents \
.replace('{edge_route_location}', service_location) \
.replace('{edge_route_target}', service_spec["edge_target"]) \
.replace('{edge_route_owner}', service_spec["pod_owner"]) \
.replace('{run_id}', service_spec["run_id"]) \
.replace('{edge_route_shared_users}', service_spec["shared_users_sids"]) \
.replace('{edge_route_shared_groups}', service_spec["shared_groups_sids"]) \
.replace('{edge_route_schema}', 'https' if service_spec["is_ssl_backend"] else 'http') \
.replace('{additional}', service_spec["additional"]) \
.replace('{edge_jwt_auth}', str(service_spec["edge_jwt_auth"])) \
.replace('{edge_pass_bearer}', str(service_spec["edge_pass_bearer"])) \
.replace('{bearer_cookie_extra}', EDGE_BEARER_COOKIE_EXTRA) \
.replace('{edge_cookie_location}', service_spec["cookie_location"] if service_spec["cookie_location"] else service_location)
nginx_sensitive_route_definitions = []
if service_spec["sensitive"]:
for sensitive_route in sensitive_routes:
# proxy_pass cannot have trailing slash for regexp locations
edge_target = service_spec["edge_target"]
if edge_target.endswith("/"):
edge_target = edge_target[:-1]
nginx_sensitive_route_definition = nginx_sensitive_loc_module_template_contents \
.replace('{edge_route_location}', service_location + sensitive_route['route']) \
.replace('{edge_route_sensitive_methods}', '|'.join(sensitive_route['methods'])) \
.replace('{edge_route_target}', edge_target) \
.replace('{edge_route_owner}', service_spec["pod_owner"]) \
.replace('{run_id}', service_spec["run_id"]) \
.replace('{edge_route_shared_users}', service_spec["shared_users_sids"]) \
.replace('{edge_route_shared_groups}', service_spec["shared_groups_sids"]) \
.replace('{additional}', service_spec["additional"]) \
.replace('{edge_cookie_location}', service_spec["cookie_location"] if service_spec["cookie_location"] else service_location + sensitive_route['route'])
nginx_sensitive_route_definitions.append(nginx_sensitive_route_definition)
path_to_route = os.path.join(nginx_sites_path, service_spec.get('edge_location_path') + '.conf')
if service_spec["sensitive"]:
do_log('Adding new sensitive route ' + path_to_route)
else:
do_log('Adding new route ' + path_to_route)
with open(path_to_route, "w") as added_route_file:
added_route_file.write(nginx_route_definition)
if nginx_sensitive_route_definitions:
for nginx_sensitive_route_definition in nginx_sensitive_route_definitions:
added_route_file.write(nginx_sensitive_route_definition)
if has_custom_domain:
do_log('Adding new route {} to server block {}'.format(path_to_route, service_hostname))
add_custom_domain(service_hostname, path_to_route, is_external_app=service_spec['external_app'])
check_route(path_to_route, service_location, service_spec, has_custom_domain, service_hostname)
service_url = SVC_URL_TMPL.format(external_schema=edge_service_external_schema,
external_ip=service_hostname,
edge_location=service_spec.get('edge_location') or '',
edge_port=str(edge_service_port),
service_name=service_spec['service_name'],
is_default_endpoint=str(service_spec['is_default_endpoint']).lower(),
is_same_tab=str(service_spec['is_same_tab']).lower(),
is_custom_dns=str(service_spec['create_dns_record']).lower(),
region_id=edge_region_id or 'null')
run_id = service_spec['run_id']
if run_id in service_url_dict:
service_url = service_url_dict[run_id] + ',\n' + service_url
service_url_dict[run_id] = service_url
def update_svc_url_for_run(run_id, edge_region_name):
service_url = service_url_dict.get(run_id)
if not service_url:
do_log('Assigning #{} with service url has been skipped '
'because the corresponding service url has not been found.'.format(run_id))
return
do_log('Assigning #{} with service url \n{}'.format(run_id, service_url))
update_svc_method = os.path.join(api_url, API_UPDATE_SVC.format(run_id=run_id, region=edge_region_name))
data = json.dumps({'serviceUrl': ('[' + service_url + ']')})
response_data = call_api(update_svc_method, data=data)
if response_data:
do_log('Assigning #{} with service url ... OK'.format(run_id))
else:
do_log('Assigning #{} with service url ... NOT OK.'.format(run_id))
def find_preference(api_preference_query, preference_name):
load_method = os.path.join(api_url, api_preference_query.format(preference_name=preference_name))
response = call_api(load_method) or {}
return str(response.get('payload', {}).get('value'))
def write_stub_location_configuration(path_to_route, service_location, service_spec, has_custom_domain):
nginx_route_definition = nginx_loc_module_stub_template_contents \
.replace('{edge_route_location}', service_location) \
.replace('{edge_route_owner}', service_spec["pod_owner"]) \
.replace('{edge_route_shared_users}', service_spec["shared_users_sids"]) \
.replace('{edge_route_shared_groups}', service_spec["shared_groups_sids"])
path_to_route_extension = ".conf" if has_custom_domain else ".loc.conf"
stub_extension = STUB_CUSTOM_DOMAIN_EXTENSION if has_custom_domain else STUB_LOCATION_CONFIG_EXTENSION
path_to_stub = path_to_route.replace(path_to_route_extension, stub_extension)
with open(path_to_stub, "w") as stub_file:
stub_file.write(nginx_route_definition)
do_log('Adding new stub route ' + path_to_stub)
return path_to_stub
def reload_nginx_config():
do_log('Reloading nginx...')
subprocess.check_output('nginx -s reload', shell=True)
def check_nginx_config():
test_config_command = 'nginx -c %s -t' % nginx_root_config_path
try:
subprocess.check_output(test_config_command, shell=True)
do_log('Adding new route ... OK')
return True
except subprocess.CalledProcessError as e:
do_log('Adding new route ... NOT OK (%s)' % e.returncode)
return False
def check_route(path_to_route, service_location, service_spec, has_custom_domain, service_hostname):
if check_nginx_config():
return
do_log('Deleting invalid route...')
os.remove(path_to_route)
if has_custom_domain:
do_log('Deleting invalid custom domain route...')
remove_custom_domain_all(path_to_route)
path_to_stub = write_stub_location_configuration(path_to_route,
service_location,
service_spec,
has_custom_domain)
if check_nginx_config():
if has_custom_domain:
do_log('Adding new stub route {} to server block {}'.format(path_to_stub, service_hostname))
add_custom_domain(service_hostname, path_to_stub, is_external_app=service_spec['external_app'])
return
do_log('Deleting invalid stub route...')
os.remove(path_to_stub)
def get_pods(routes):
for route in routes:
match = re.match(ROUTE_ID_PATTERN, route)
if not match:
do_log('Detected inccorrectly formatted route {}'.format(route))
continue
pod_id = match.group(1)
if not pod_id:
do_log('Detected inccorrectly formatted route {}'.format(route))
continue
yield pod_id
def get_pod(route):
pods = set(get_pods([route]))
return pods.pop() if pods else None
def get_affected_routes(involved_routes, all_routes):
involved_pods = set(get_pods(involved_routes))
return set(route for route in all_routes if get_pod(route) in involved_pods)
def is_true(value):
if not value:
return False
return value.lower() == 'true'
do_log('============ Started iteration ============')
if api_domain_name:
do_log('API domain name is determined as {}. It will be used to detect friendly URLs'.format(api_domain_name))
else:
do_log('[WARN] Cannot get API domain name from the environment')
kube_api = HTTPClient(KubeConfig.from_service_account())
kube_api.session.verify = False
edge_region_name = os.getenv('CP_EDGE_REGION') or find_preference(API_GET_PREF, 'default.edge.region')
edge_region_id = os.getenv('CP_EDGE_REGION_ID') or find_preference(API_GET_PREF, 'default.edge.region.id')
skip_custom_dns = is_true(os.getenv('CP_EDGE_SKIP_CUSTOM_DNS') or find_preference(API_GET_PREF, 'edge.skip.custom.dns'))
dns_domain = os.getenv('CP_EDGE_CUSTOM_DOMAIN') or find_preference(API_GET_PREF, 'edge.custom.domain')
# Try to get edge_service_external_ip and edge_service_port for service labels several times before get it from
# service spec IP and nodePort because it is possible that we will do it while redeploy and label just doesn't
# applied yet - so we will wait
edge_kube_service_object = None
for n in range(NUMBER_OF_RETRIES):
edge_kube_service = Service.objects(kube_api).filter(selector={
EDGE_SVC_ROLE_LABEL: EDGE_SVC_ROLE_LABEL_VALUE, EDGE_SVC_REGION_LABEL: edge_region_name})
if not edge_kube_service.response['items']:
do_log('EDGE service is not found by labels: cloud-pipeline/role=EDGE and %s=%s'
% (EDGE_SVC_REGION_LABEL, edge_region_name))
exit(1)
else:
edge_kube_service_object = edge_kube_service.response['items'][0]
edge_kube_service_object_metadata = edge_kube_service_object['metadata']
if 'labels' in edge_kube_service_object_metadata and EDGE_SVC_HOST_LABEL in edge_kube_service_object_metadata['labels']:
do_log('Getting EDGE service host from service label')
edge_service_external_ip = edge_kube_service_object_metadata['labels'][EDGE_SVC_HOST_LABEL]
if 'labels' in edge_kube_service_object_metadata and EDGE_SVC_PORT_LABEL in edge_kube_service_object_metadata['labels']:
do_log('Getting EDGE service host port from service label')
edge_service_port = edge_kube_service_object_metadata['labels'][EDGE_SVC_PORT_LABEL]
if edge_service_external_ip and edge_service_port:
break
else:
do_log('Sleep for {} sec and perform kube API call again ({}/{})'.format(SECS_TO_WAIT_BEFORE_RETRY, n + 1, NUMBER_OF_RETRIES))
time.sleep(SECS_TO_WAIT_BEFORE_RETRY)
if not edge_kube_service_object:
do_log('EDGE service is not found by labels: cloud-pipeline/role=EDGE and %s=%s'
% (EDGE_SVC_REGION_LABEL, edge_region_name))
exit(1)
if not edge_service_external_ip:
do_log('Getting EDGE service host from externalIP')
edge_service_external_ip = edge_kube_service_object['spec']['externalIPs'][0]
if not edge_service_port:
do_log('Getting EDGE service host port from nodePort')
edge_service_port = edge_kube_service_object['ports'][0]['nodePort']
do_log('EDGE: {}:{} ({} #{})'.format(edge_service_external_ip, edge_service_port,
edge_region_name, edge_region_id or 'undefined'))
pods_with_endpoints = load_pods_for_runs_with_endpoints()
runs_with_endpoints = get_active_runs(pods_with_endpoints)
services_list = {}
for pod_spec in pods_with_endpoints:
pod_id = pod_spec['metadata']['name']
pod_ip = pod_spec['status']['podIP']
pod_run_id = pod_spec['metadata']['labels']['runid']
if not pod_run_id:
do_log('RunID not found for pod: ' + pod_id + ', skipping')
continue
services_list.update(get_service_list(runs_with_endpoints, pod_id, pod_run_id, pod_ip))
routes_expected = set(services_list.keys())
do_log('Found {} expected routes'.format(len(routes_expected)))
# Find out existing routes from /etc/nginx/sites-enabled
nginx_modules_list = {}
for x in os.listdir(nginx_sites_path):
location_config_path = os.path.join(nginx_sites_path, x)
if '.conf' in x and os.path.isfile(location_config_path):
if location_config_path.endswith(STUB_LOCATION_CONFIG_EXTENSION):
do_log('Deleting stub route ' + location_config_path)
os.remove(location_config_path)
continue
if location_config_path.endswith(STUB_CUSTOM_DOMAIN_EXTENSION):
do_log('Deleting custom domain stub route ' + location_config_path)
os.remove(location_config_path)
remove_custom_domain_all(location_config_path)
continue
nginx_modules_list[x.replace('.loc.conf', '').replace('.inc.conf', '')] = x
routes_actual = set(nginx_modules_list.keys())
do_log('Found {} actual routes'.format(len(routes_actual)))
routes_to_check = routes_actual & routes_expected
routes_to_add = routes_expected - routes_actual
routes_to_delete = routes_actual - routes_expected
do_log('Found {} existing routes, these routes will be checked'.format(len(routes_to_check)))
do_log('Found {} missing routes, these routes will be created'.format(len(routes_to_add)))
do_log('Found {} expired routes, these routes will be deleted'.format(len(routes_to_delete)))
# All routes that exist in both Nginx and API are checked, whether the routes shall be updated or kept untouched.
# If some routes differ then they are deleted and created from scratch.
# Currently only modified sharing users/groups are checked.
routes_to_update = set()
for route in routes_to_check:
path_to_route = os.path.join(nginx_sites_path, nginx_modules_list[route])
do_log('Checking route {}'.format(path_to_route))
with open(path_to_route) as route_file:
route_file_contents = route_file.read()
shared_users_sids_to_check = ""
shared_groups_sids_to_check = ""
for route_search_results in re.finditer(r"shared_with_users\s{1,}\"(.+?)\";"
r"|shared_with_groups\s{1,}\"(.+?)\";",
route_file_contents):
g1 = route_search_results.group(1)
g2 = route_search_results.group(2)
shared_users_sids_to_check = g1 if g1 else shared_users_sids_to_check
shared_groups_sids_to_check = g2 if g2 else shared_groups_sids_to_check
service_spec = services_list[route]
shared_users_sids_to_update = service_spec["shared_users_sids"]
shared_groups_sids_to_update = service_spec["shared_groups_sids"]
if shared_users_sids_to_check != shared_users_sids_to_update:
do_log('Detected different shared users. Actual: "{}". Expected: "{}"'
.format(shared_users_sids_to_check, shared_users_sids_to_update))
routes_to_update.add(route)
elif shared_groups_sids_to_check != shared_groups_sids_to_update:
do_log('Detected different shared groups. Actual: "{}". Expected: "{}"'
.format(shared_groups_sids_to_check, shared_groups_sids_to_update))
routes_to_update.add(route)
do_log('Found {} changed routes, these routes will be replaced'.format(len(routes_to_update)))
# If a single route of a pod is added/deleted/updated then all other routes of the same pod are replaced.
# Otherwise, the generated service url will have missing/extra endpoints.
routes_to_replace = get_affected_routes(routes_to_add | routes_to_delete | routes_to_update, routes_to_check)
routes_to_affect = routes_to_replace - routes_to_update
do_log('Found {} affected routes, these routes will be replaced'.format(len(routes_to_affect)))
routes_to_add |= routes_to_replace
routes_to_delete |= routes_to_replace
do_log("Deleting {} routes...".format(len(routes_to_delete)))
for route in routes_to_delete:
path_to_route = os.path.join(nginx_sites_path, nginx_modules_list[route])
do_log('Deleting route {}'.format(path_to_route))
os.remove(path_to_route)
remove_custom_domain_all(path_to_route)
with open(nginx_loc_module_template, 'r') as nginx_loc_module_template_file:
nginx_loc_module_template_contents = nginx_loc_module_template_file.read()
with open(nginx_sensitive_loc_module_template, 'r') as nginx_sensitive_loc_module_template_file:
nginx_sensitive_loc_module_template_contents = nginx_sensitive_loc_module_template_file.read()
with open(nginx_sensitive_routes_config_path, 'r') as sensitive_routes_file:
sensitive_routes = json.load(sensitive_routes_file)
with open(nginx_loc_module_stub_template, 'r') as stub_template_file:
nginx_loc_module_stub_template_contents = stub_template_file.read()
# loop through all routes that we need to create, if this route doesn't have option to create custom DNS record
# we handle it in the main thread, if custom DNS record should be created, since it consume some time ~ 20 sec,
# we put it to the separate collection to handle it at the end.
regular_routes_to_add, dns_routes_to_configure = [], []
for route in routes_to_add:
service_spec = services_list[route]
if service_spec["create_dns_record"] and not service_spec["custom_domain"]:
dns_routes_to_configure.append(route)
else:
regular_routes_to_add.append(route)
service_url_dict = {}
do_log("Creating {} routes for regular endpoints...".format(len(regular_routes_to_add)))
for route in regular_routes_to_add:
service_spec = services_list[route]
create_service_location(service_spec, service_url_dict, edge_region_id)
dns_route_runs = set()
dns_route_results = []
do_log("Creating {} configurations for dns endpoints...".format(len(dns_routes_to_configure)))
for route in dns_routes_to_configure:
service_spec = services_list[route]
dns_route_runs.add(service_spec["run_id"])
dns_route_results.append(dns_services_pool.apply_async(
create_service_dns_record,
(service_spec, route, edge_region_id, edge_region_name)))
if regular_routes_to_add or routes_to_delete:
reload_nginx_config()
for run_id in service_url_dict:
if run_id not in dns_route_runs:
update_svc_url_for_run(run_id, edge_region_name)
dns_services_pool.close()
dns_services_pool.join()
dns_routes_to_add = set(result.get() for result in dns_route_results if result.get())
do_log("Creating {} routes for dns endpoints...".format(len(dns_routes_to_add)))
for route in dns_routes_to_add:
service_spec = services_list[route]
create_service_location(service_spec, service_url_dict, edge_region_id)
if dns_routes_to_add:
reload_nginx_config()
for run_id in service_url_dict:
if run_id in dns_route_runs:
update_svc_url_for_run(run_id, edge_region_name)
do_log('============ Done iteration ============')
do_log('')