cstar/jobreader.py (82 lines of code) (raw):

# Copyright 2017 Spotify AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import json import os import cstar.jobrunner import cstar.jobwriter import cstar.strategy import cstar.topology from cstar.exceptions import BadFileFormatVersion, FileTooOld from cstar.output import debug def read(job, job_id, stop_after, output_directory=None, max_days=7, endpoint_mapper=None, retry=False): output_directory = output_directory or os.path.expanduser("~/.cstar/jobs/" + job_id) file = output_directory + "/job.json" if not endpoint_mapper: endpoint_mapper = job.get_endpoint_mapping with open(file) as f: return _parse(f.read(), file, output_directory, job, job_id, stop_after, max_days, endpoint_mapper, retry) return job def _parse(input, file, output_directory, job, job_id, stop_after, max_days, endpoint_mapper, retry=False): data = json.loads(input) if 'version' not in data: raise BadFileFormatVersion("Incompatible file format version, wanted %d" % (cstar.jobwriter.FILE_FORMAT_VERSION,)) if data['version'] != cstar.jobwriter.FILE_FORMAT_VERSION: raise BadFileFormatVersion("Incompatible file format version, wanted %d but %s is of version %d" % (cstar.jobwriter.FILE_FORMAT_VERSION, file, data['version'])) creation_time = datetime.datetime.utcfromtimestamp(data["creation_timestamp"]) age = (datetime.datetime.utcnow() - creation_time).days if age > max_days: raise FileTooOld(("Job created %d days ago, which is more than the current maximum age of %d. " + "Use --max-job-age %d if you really want to run this job.") % (age, max_days, age + 1)) state = data['state'] job.command = data['command'] job.job_id = job_id job.timeout = data['timeout'] job.env = data['env'] job.job_runner = getattr(cstar.jobrunner, data["job_runner"]) job.key_space = data['key_space'] if 'key_space' in data else None job.output_directory = output_directory job.sleep_on_new_runner = data['sleep_on_new_runner'] job.ssh_username = data['ssh_username'] job.ssh_identity_file = data['ssh_identity_file'] job.ssh_password = data['ssh_password'] job.ssh_lib = data['ssh_lib'] job.use_sudo = data['use_sudo'] job.sudo_args = data['sudo_args'] job.jmx_username = data['jmx_username'] job.jmx_passwordfile = data['jmx_passwordfile'] job.addl_jmx_args = data['addl_jmx_args'] job.hosts_variables = data['hosts_variables'] strategy = cstar.strategy.parse(state['strategy']) cluster_parallel = state['cluster_parallel'] dc_parallel = state['dc_parallel'] dc_filter = state['dc_filter'] if 'dc_filter' in state else None max_concurrency = state['max_concurrency'] job.resolve_hostnames = state['resolve_hostnames'] if 'resolve_hostnames' in state.keys() else False job.cache_directory = state['cache_directory'] if 'cache_directory' in state.keys() else os.path.expanduser("~/.cstar/cache") progress = cstar.progress.Progress( running=[cstar.topology.Host(*arr) for arr in state['progress']['running']], done=[cstar.topology.Host(*arr) for arr in state['progress']['done']], failed=[cstar.topology.Host(*arr) for arr in state['progress']['failed']]) if retry==True: progress.failed = set([]) original_topology = cstar.topology.Topology(cstar.topology.Host(*arr) for arr in state['original_topology']) current_topology = cstar.topology.Topology(cstar.topology.Host(*arr) for arr in state['current_topology']) debug("Run on hosts", original_topology) debug("in topology", current_topology) if strategy is cstar.strategy.Strategy.TOPOLOGY: endpoint_mapping = endpoint_mapper(original_topology) else: endpoint_mapping = None job.state = cstar.state.State( original_topology=original_topology, strategy=strategy, endpoint_mapping=endpoint_mapping, cluster_parallel=cluster_parallel, dc_parallel=dc_parallel, dc_filter=dc_filter, max_concurrency=max_concurrency, current_topology=current_topology, stop_after=stop_after, progress=progress, ignore_down_nodes=state['ignore_down_nodes'])