apps/replikate/src/main.py (209 lines of code) (raw):
#!/usr/bin/env python3
import asyncio
from tkinter.messagebox import IGNORE
from datetime import timedelta
from pykube.exceptions import HTTPError
import kopf
import pykube
import logging
import yaml
from os import environ, path
from jinja2 import Environment, FileSystemLoader
from glob import glob
from kopf import not_
from pytimeparse.timeparse import timeparse
import subprocess
import json
from requests.exceptions import HTTPError
def str_to_timedelta(value: str) -> timedelta:
try:
secs = float(value) * 60
except ValueError:
secs = timeparse(value)
return timedelta(seconds=secs)
environ.setdefault("MANIFESTS_SOURCE", "replikate-operator-manifests")
environ.setdefault("INSTANCE_ID", "kubeflow-replikate")
environ.setdefault("REMOTE_DEBUG", "disabled")
environ.setdefault("INTERVAL", "2 minutes")
CONFIGMAP = environ.get("MANIFESTS_SOURCE")
CONFIG_DIR = environ["CONFIG_DIR"]
INSTANCE_ID = environ["INSTANCE_ID"]
INTERVAL = str_to_timedelta(environ["INTERVAL"])
OPERATOR_ENABLED = f"hubctl.io/{INSTANCE_ID}"
IGNORE_NAMESPACES = []
logging.info(f"Starting operator '{INSTANCE_ID}'...")
logging.info(f"Watching for ignore annotation: {OPERATOR_ENABLED}=disabled")
if environ["REMOTE_DEBUG"] == "enabled":
import ptvsd
logging.info("Opened debug port 9229")
logging.info("Waiting to attach...")
ptvsd.enable_attach(address=('0.0.0.0', 9229))
ptvsd.wait_for_attach()
@kopf.on.create('namespace')
@kopf.on.update('namespace')
@kopf.on.resume('namespace')
def update_ignore_namespace(logger, name, meta, **_) -> bool:
""" Returns true if added to the ignore list """
labels={**meta.get('labels', {}), **meta.get('annotations', {})}
ignore = labels.get(OPERATOR_ENABLED, "enabled") == "disabled"
result = False;
if ignore and name not in IGNORE_NAMESPACES:
logger.info(f"Namespace {name} has ignore annotation, I will ignore all objects in it")
IGNORE_NAMESPACES.append(name)
result = True
elif not ignore and name in OPERATOR_ENABLED:
logger.info(f"Namespace {name} has no more ignore annotation")
IGNORE_NAMESPACES.remove(name)
else:
profile_owner = [own for own in meta.get('ownerReferences', []) if own.get('kind') == 'Profile' and path.dirname(own.get('apiVersion', '')) == 'kubeflow.org']
if not profile_owner and name not in IGNORE_NAMESPACES:
logger.info(f"Ignoring all objects in namespace {name}")
IGNORE_NAMESPACES.append(name)
result = True
return result
def load_templates(_=None):
"""Reads file content and stores it as a variable"""
global TEMPLATES
result = []
# logging.info(f"Loading manifests from {dir}")
jinja = Environment(loader=FileSystemLoader(CONFIG_DIR, followlinks=True))
for file in glob(path.join(CONFIG_DIR, "*.yaml")):
if path.isdir(file): continue
try:
t_name = path.relpath(file, CONFIG_DIR)
template = jinja.get_template(t_name)
except:
logging.debug(f"* Skipping {file}")
continue
logging.info(f"* loading template: {file}")
result.append(template)
TEMPLATES = result
def namespace_ignored(name, **_) -> bool:
return name in IGNORE_NAMESPACES
@kopf.timer('v1', 'namespaces', initial_delay=120.0, interval=INTERVAL, when=not_(namespace_ignored))
@kopf.on.create('namespaces', when=not_(namespace_ignored))
@kopf.on.update('namespaces', when=not_(namespace_ignored))
@kopf.on.resume('namespaces', when=not_(namespace_ignored))
def reconcile_ns(logger, name, body, patch, **_):
"""
Triggered on profile change or periodically
---
Synchronizes the resources describes as templates and propagate it to the profile
"""
logger.debug(f"Reconciling {name}")
self_obj = deep_merge({}, body)
# logger.info(f" >> {type(self_obj)}")
params = {"name": name, "this": self_obj}
self_hash = k8s_hash(body)
for tpl in TEMPLATES:
txt = tpl.render(params)
data = yaml.safe_load(txt)
_hash = k8s_hash(data)
if self_hash == _hash:
# this is a special case. It seems user wants to change
# the same object by template that we are watching
diff = deep_merge(body, data)
if diff:
logger.info(f"Patching self {name}: {diff}")
deep_merge(patch, diff)
continue
# see: https://github.com/nolar/kopf/issues/687
# kopf.adopt(data)
# for ref in data['metadata']['ownerReferences']:
# if ref.get('Kind') == body['kind'] and ref.get('Name') == name:
# ref['controller'] = False
client = pykube.object_factory(api, data['apiVersion'], data['kind'])
resource = client(api, data)
if resource.exists():
resource.reload()
changed = deep_merge(resource.obj, data)
if changed:
logger.info(f"* {resource.kind.lower()}/{resource.name}: update {changed}")
try:
resource.update()
except HTTPError as e:
logger.exception(f"Failed to update {resource.kind.lower()}/{resource.name}")
else:
logger.debug(f"* {resource.kind.lower()}/{resource.name}: already up to date")
else:
logger.info(f"* {resource.kind.lower()}/{resource.name}: creating")
try:
resource.create()
except HTTPError as e:
logger.exception(f"Failed to create {resource.kind.lower()}/{resource.name}")
if CONFIGMAP:
logging.info(f"Watching configmap {CONFIGMAP}")
@kopf.on.update('configmaps', field='metadata.name', value='CONFIGMAP')
def reload_templates(name, logger, **_):
logger.info(f"Reloading: {CONFIG_DIR}")
load_templates()
@kopf.on.startup()
async def startup_fn_simple(**_):
global LOCK
LOCK = asyncio.Lock()
load_templates()
# asyncio.create_task(watch_for_change())
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.posting.level = logging.WARNING
settings.watching.connect_timeout = 1 * 60
settings.watching.server_timeout = 10 * 60
settings.scanning.disabled = True
@kopf.on.startup()
async def init_kubernetes_client(logger, **_):
global api, config, Namespaces
token = "/var/run/secrets/kubernetes.io/serviceaccount/token"
kubeconfig = environ.get("KUBECONFIG")
if path.isfile(token):
logger.info(f'From token file: {token}')
token_dir = path.dirname(token)
config = pykube.KubeConfig.from_service_account(path=token_dir)
elif kubeconfig:
logger.info(f'From environment {kubeconfig}')
config = pykube.KubeConfig.from_file(filename=kubeconfig)
else:
config = pykube.KubeConfig.from_file()
# WORKAROUND: pykube doesn't know how to deal with null values in kubeconfig
config.user.setdefault('exec', {})
config.user['exec']['args'] = config.user['exec'].get('args') or []
config.user['exec']['env'] = config.user['exec'].get('env') or []
api = pykube.HTTPClient(config)
@kopf.on.login(errors=kopf.ErrorsMode.PERMANENT)
async def init_connection(logger, **_):
ca = config.cluster.get('certificate-authority')
cert = config.user.get('client-certificate')
pkey = config.user.get('client-key')
token = config.user.get('token')
# Handling case if EKS
if not cert and not pkey and not token:
exec_conf = config.user.get('exec')
if exec_conf.get('command'):
logger.info("Retrieving token...")
cmd_env_vars = dict(environ)
for env_var in exec_conf.get("env") or []:
cmd_env_vars[env_var["name"]] = env_var["value"]
output = subprocess.check_output(
[exec_conf["command"]] + exec_conf["args"], env=cmd_env_vars
)
parsed_out = json.loads(output)
token = parsed_out.get("status", {}).get("token")
# NOTE: temporary disabled EKS case, we possibly can gateway with code above
# else:
# logger.info("Retrieving ")
# cluster = exec_conf.get('args')[-1]
# if cluster:
# logger.info(f"Getting auth token for eks cluster {cluster}")
# from eks_token import get_token
# token = get_token(cluster_name=cluster)['status']['token']
return kopf.ConnectionInfo(
server=config.cluster.get('server'),
ca_path=ca.filename() if ca else None,
insecure=config.cluster.get('insecure-skip-tls-verify'),
username=config.user.get('username'),
password=config.user.get('password'),
scheme='Bearer',
token=token,
certificate_path=cert.filename() if cert else None,
private_key_path=pkey.filename() if pkey else None,
default_namespace=config.namespace,
)
def merge_list(accum, sample):
diff = [x for x in sample if x not in accum]
accum.extend(diff)
return diff
def deep_merge(accum, sample):
"""
Recursively merges a dictionary on the right to the accumulator on the left
Returns merge difference (applied values to the accumulator)
"""
result = {}
for key, value in sample.items():
if value == None: continue
if isinstance(value, dict):
if key not in accum: accum[key] = {}
diff = deep_merge(accum[key], value)
if diff: result[key] = diff
if isinstance(value, list):
if key not in accum: accum[key] = []
diff = merge_list(accum[key], value)
if diff: result[key] = diff
else:
if key not in accum:
accum[key] = value
result[key] = accum[key]
return result
def k8s_hash(obj: dict) -> int:
""" uses kind, apiVersion, name and namespace\
to compute hash """
# avoiding None as part of dict
return hash("/".join([
obj.get("apiVersion") or "",
obj.get("kind") or "",
obj.get("metadata", {}).get("namespace") or "",
obj.get("metadata", {}).get("name") or ""
]))