container_agent/run_containers.py (308 lines of code) (raw):
#!/usr/bin/python
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Launch containers specified by a Google container manifest.
This program interprets a blob of JSON or YAML as a container manifest and
launches those containers. This assumes that the system's docker daemon runs
with the -r=false flag, otherwise the docker daemon itself will try to do
restarts whenever it gets a signal itself.
This will read one file, specified on the commandline, or stdin if no file is
provided.
This will log to syslog's LOCAL3 facility.
Environmental requirements:
- Docker 0.11 or higher
- Docker daemon runs with -r=false (for safer restart behavior)
"""
import logging
import os
import re
import sys
import time
import yaml
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from glob import glob
from hashlib import sha1
from logging import WARNING, DEBUG
from logging.handlers import SysLogHandler, SYSLOG_UDP_PORT
from IPy import IP
from docker_client import CliDockerClient
log = logging.getLogger(__name__)
SUPPORTED_CONFIG_VERSIONS = ['v1beta1']
PROTOCOL_TCP = 'TCP'
PROTOCOL_UDP = 'UDP'
VALID_PROTOCOLS = [PROTOCOL_TCP, PROTOCOL_UDP]
RE_RFC1035_NAME = re.compile(r"^[a-z]([-a-z0-9]*[a-z0-9])*$")
RE_C_TOKEN = re.compile(r"[A-Za-z_]\w*$")
MAX_PATH_LEN = 512
DOCKER_CMD = 'docker'
VOLUMES_ROOT_DIR = '/export'
class ConfigException(Exception):
pass
def IsValidIpv4Address(address):
try:
IP(address)
except ValueError:
return False
else:
return True
def IsValidProtocol(proto):
return proto in VALID_PROTOCOLS
def ProtocolString(proto):
if proto == PROTOCOL_UDP:
return 'udp'
return ''
def IsValidPort(port):
return 0 < port <= 65535
def IsRfc1035Name(name):
return RE_RFC1035_NAME.match(name)
def IsCToken(name):
return RE_C_TOKEN.match(name)
def IsValidPath(path):
return path[0] == '/' and len(path) <= MAX_PATH_LEN
def LoadVolumes(volumes):
"""Process a "volumes" block of config and return a list of volumes."""
# TODO(thockin): could be a map of name -> Volume
all_vol_names = []
for vol_index, vol in enumerate(volumes):
# Get the container name.
if 'name' not in vol:
raise ConfigException('volumes[%d] has no name' % (vol_index))
vol_name = vol['name']
if not IsRfc1035Name(vol_name):
raise ConfigException('volumes[%d].name is invalid: %s'
% (vol_index, vol_name))
if vol_name in all_vol_names:
raise ConfigException('volumes[%d].name is not unique: %s'
% (vol_index, vol_name))
all_vol_names.append(vol_name)
return all_vol_names
# TODO(thockin): We should probably fail on unknown fields in JSON objects.
class Container(object):
"""The accumulated parameters to start a Docker container."""
# Only allow the supported params.
__slots__ = ('name', 'image', 'command', 'hostname', 'working_dir',
'ports', 'mounts', 'env_vars')
def __init__(self, name, image):
self.name = name # required str
self.image = image # required str
self.command = [] # list[str]
self.hostname = None # str
self.working_dir = None # str
self.ports = [] # [(int, int, str)]
self.mounts = [] # [str]
self.env_vars = [] # [str]
def LoadUserContainers(containers, all_volumes):
"""Process a "containers" block of config and return a list of
containers."""
# TODO(thockin): could be a dict of name -> Container
all_ctrs = []
all_ctr_names = []
for ctr_index, ctr_spec in enumerate(containers):
# Verify the container name.
if 'name' not in ctr_spec:
raise ConfigException('containers[%d] has no name' % (ctr_index))
if not IsRfc1035Name(ctr_spec['name']):
raise ConfigException('containers[%d].name is invalid: %s'
% (ctr_index, ctr_spec['name']))
if ctr_spec['name'] in all_ctr_names:
raise ConfigException('containers[%d].name is not unique: %s'
% (ctr_index, ctr_spec['name']))
all_ctr_names.append(ctr_spec['name'])
# Verify the container image.
if 'image' not in ctr_spec:
raise ConfigException('containers[%s] has no image'
% (ctr_spec['name']))
# The current accumulation of parameters.
current_ctr = Container(ctr_spec['name'], ctr_spec['image'])
# Always set the hostname for user containers.
current_ctr.hostname = current_ctr.name
# Get the commandline.
current_ctr.command = ctr_spec.get('command', [])
# Get the initial working directory.
current_ctr.working_dir = ctr_spec.get('workingDir', None)
if current_ctr.working_dir is not None:
if not IsValidPath(current_ctr.working_dir):
raise ConfigException(
'containers[%s].workingDir is invalid: %s'
% (current_ctr.name, current_ctr.working_dir))
# Get the list of port mappings.
current_ctr.ports = LoadPorts(
ctr_spec.get('ports', []), current_ctr.name)
# Get the list of volumes to mount.
current_ctr.mounts = LoadVolumeMounts(
ctr_spec.get('volumeMounts', []), all_volumes, current_ctr.name)
# Get the list of environment variables.
current_ctr.env_vars = LoadEnvVars(
ctr_spec.get('env', []), current_ctr.name)
all_ctrs.append(current_ctr)
return all_ctrs
def LoadPorts(ports_spec, ctr_name):
"""Process a "ports" block of config and return a list of ports."""
# TODO(thockin): could be a dict of name -> Port
all_ports = []
all_port_names = []
all_host_port_mappings = set()
for port_index, port_spec in enumerate(ports_spec):
if 'name' in port_spec:
port_name = port_spec['name']
if not IsRfc1035Name(port_name):
raise ConfigException(
'containers[%s].ports[%d].name is invalid: %s'
% (ctr_name, port_index, port_name))
if port_name in all_port_names:
raise ConfigException(
'containers[%s].ports[%d].name is not unique: %s'
% (ctr_name, port_index, port_name))
all_port_names.append(port_name)
else:
port_name = str(port_index)
if 'containerPort' not in port_spec:
raise ConfigException(
'containers[%s].ports[%s] has no containerPort'
% (ctr_name, port_name))
ctr_port = port_spec['containerPort']
if not IsValidPort(ctr_port):
raise ConfigException(
'containers[%s].ports[%s].containerPort is invalid: %d'
% (ctr_name, port_name, ctr_port))
host_ip = port_spec.get('hostIp', '')
if host_ip and not IsValidIpv4Address(host_ip):
raise ConfigException(
'containers[%s].ports[%s].hostIp is invalid: %s'
% (ctr_name, port_name, host_ip))
host_port = port_spec.get('hostPort', ctr_port)
if not IsValidPort(host_port):
raise ConfigException(
'containers[%s].ports[%s].hostPort is invalid: %d'
% (ctr_name, port_name, host_port))
proto = port_spec.get('protocol', 'TCP')
if not IsValidProtocol(proto):
raise ConfigException(
'containers[%s].ports[%s].protocol is invalid: %s'
% (ctr_name, port_name, proto))
host_port_mapping = (host_ip, host_port, proto)
if host_port_mapping in all_host_port_mappings:
raise ConfigException(
'containers[%s].ports[%s].(hostIp,hostPort,protocol) \
is not unique: %s %d %s'
% (ctr_name, port_name, host_ip, host_port, proto))
all_host_port_mappings.add(host_port_mapping)
all_ports.append((host_ip, host_port, ctr_port, ProtocolString(proto)))
return all_ports
def LoadVolumeMounts(mounts_spec, all_volumes, ctr_name):
"""Process a "volumeMounts" block of config and return a list of mounts."""
# TODO(thockin): Could be a dict of name -> Mount
all_mounts = []
for vol_index, vol_spec in enumerate(mounts_spec):
if 'name' not in vol_spec:
raise ConfigException(
'containers[%s].volumeMounts[%d] has no name'
% (ctr_name, vol_index))
vol_name = vol_spec['name']
if not IsRfc1035Name(vol_name):
raise ConfigException(
'containers[%s].volumeMounts[%d].name'
'is invalid: %s'
% (ctr_name, vol_index, vol_name))
if vol_name not in all_volumes:
raise ConfigException(
'containers[%s].volumeMounts[%d].name'
'is not a known volume: %s'
% (ctr_name, vol_index, vol_name))
if 'path' not in vol_spec:
raise ConfigException(
'containers[%s].volumeMounts[%s] has no path'
% (ctr_name, vol_name))
vol_path = vol_spec['path']
if not IsValidPath(vol_path):
raise ConfigException(
'containers[%s].volumeMounts[%s].path is invalid: %s'
% (ctr_name, vol_name, vol_path))
read_mode = 'ro' if vol_spec.get('readOnly', False) else 'rw'
all_mounts.append(
'%s/%s:%s:%s' % (VOLUMES_ROOT_DIR, vol_name, vol_path, read_mode))
return all_mounts
def LoadEnvVars(env_spec, ctr_name):
"""Process an "env" block of config and return a list of env vars."""
# TODO(thockin): could be a dict of key -> value
all_env_vars = []
for env_index, env_spec in enumerate(env_spec):
if 'key' not in env_spec:
raise ConfigException('containers[%s].env[%d] has no key'
% (ctr_name, env_index))
env_key = env_spec['key']
if not IsCToken(env_key):
raise ConfigException('containers[%s].env[%d].key is invalid: %s'
% (ctr_name, env_index, env_key))
if 'value' not in env_spec:
raise ConfigException('containers[%s].env[%s] has no value'
% (ctr_name, env_key))
env_val = env_spec['value']
all_env_vars.append('%s=%s' % (env_key, env_val))
return all_env_vars
def CheckGroupWideConflicts(containers):
# TODO(thockin): we could put other uniqueness checks (e.g. name) here.
# Make sure not two containers have conflicting host ports.
host_ports = set()
for ctr in containers:
for host_ip, host_port, container_port, protocol in ctr.ports:
h = (host_ip, host_port, protocol)
if h in host_ports:
raise ConfigException(
'host port %s is not unique group-wide' % (h, ))
host_ports.add(h)
def FlagList(values, flag):
"""Turns a list of values into a list of flags.
This takes a list of strings, and produces a new list with an extra string
('flag') between each value.
Args:
values: a list of strings
flag: a string
Returns:
the expanded list of strings
Example:
FlagList(["a", "b", "c"], "-x") => ["-x", "a", "-x", "b", "-x", "c"]
"""
result = []
for v in values:
result.extend([flag, v])
return result
def FlagOrNothing(value, flag):
"""Turns a value into a flag list iff value is not None."""
if value is not None:
return [flag, value]
return []
def StartContainer(docker, name, ctr):
log.info("starting new container '%s'", ctr.name)
docker.run(image=ctr.image,
ports=ctr.ports,
volumes=ctr.mounts,
env=ctr.env_vars,
command=ctr.command,
name=name)
def RunContainer(docker, name, ctr):
infos = docker.inspect_container(name)
info = infos and infos[0]
if info:
running = info['State']['Running']
if not running:
exit_code = info['State']['ExitCode']
log.info("restarting exited container '%s' (%d)", name, exit_code)
docker.destroy(name)
StartContainer(docker, name, ctr)
else:
StartContainer(docker, name, ctr)
def ContainerHash(ctr):
m = sha1()
m.update(yaml.dump(ctr))
return m.hexdigest()[:8]
def RunContainers(containers, namespace):
docker = CliDockerClient()
prefix = '%s-' % (namespace, )
named = dict(('%s%s-%s' % (prefix, ctr.name, ContainerHash(ctr)), ctr)
for ctr in containers)
# First reap containers that might collide with the desired state
running = [name for name in docker.list_containers(prefix)
if name.startswith(prefix)]
for name in running:
log.debug('in %s namespace: %s', namespace, name)
if name not in named:
log.info('reaping unwanted container: %s', name)
docker.kill(name)
# Implement desired state
for name, ctr in named.iteritems():
RunContainer(docker, name, ctr)
def RunContainersFromConfigFiles(config_glob, reload_interval, namespace):
while True:
try:
files = glob(config_glob)
log.debug('loading config files: %s', files)
containers = []
volumes = {}
for filename in files:
with open(filename) as f:
file_config = yaml.load(f)
containers.extend(file_config.get('containers', []))
volumes.update(file_config.get('volumes', {}))
all_volumes = LoadVolumes(volumes)
user_containers = LoadUserContainers(containers, all_volumes)
CheckGroupWideConflicts(user_containers)
RunContainers(user_containers, namespace)
except Exception:
log.exception("exception")
log.debug('sleeping %d seconds', reload_interval)
time.sleep(float(reload_interval))
def main():
parser = ArgumentParser(description='container-agent',
formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--syslog', help='log to syslog', action='store_true')
parser.add_argument('-v', '--verbosity', action='count', default=0)
parser.add_argument('--namespace', default='container-agent')
parser.add_argument('-r', '--reload', default=5,
help='file reload interval')
parser.add_argument('containers', metavar='containers.yaml')
args = parser.parse_args()
if args.syslog:
facility = SysLogHandler.LOG_LOCAL0
if sys.platform.startswith('darwin') and \
os.path.exists('/var/run/syslog'):
handler = SysLogHandler('/var/run/syslog', facility)
elif sys.platform.startswith('sunos'):
handler = SysLogHandler(('127.0.0.1', SYSLOG_UDP_PORT), facility)
else:
handler = SysLogHandler('/dev/log', facility)
log.addHandler(handler)
logging.basicConfig(format='%(asctime)s %(levelname)-7s '
'%(filename)s:%(lineno)d %(message)s',
level=max(DEBUG, WARNING - args.verbosity * 10))
format = "%(asctime)s.%(msecs)03d [%(process)d]: %(message)s"
logging.basicConfig(level=logging.DEBUG, format=format,
datefmt='%Y-%m-%d %H:%M:%S')
RunContainersFromConfigFiles(args.containers, args.reload, args.namespace)
if __name__ == '__main__':
main()