in cstar/jobreader.py [0:0]
def _parse(input, file, output_directory, job, job_id, stop_after, max_days, endpoint_mapper, retry=False):
data = json.loads(input)
if 'version' not in data:
raise BadFileFormatVersion("Incompatible file format version, wanted %d" %
(cstar.jobwriter.FILE_FORMAT_VERSION,))
if data['version'] != cstar.jobwriter.FILE_FORMAT_VERSION:
raise BadFileFormatVersion("Incompatible file format version, wanted %d but %s is of version %d" %
(cstar.jobwriter.FILE_FORMAT_VERSION, file, data['version']))
creation_time = datetime.datetime.utcfromtimestamp(data["creation_timestamp"])
age = (datetime.datetime.utcnow() - creation_time).days
if age > max_days:
raise FileTooOld(("Job created %d days ago, which is more than the current maximum age of %d. " +
"Use --max-job-age %d if you really want to run this job.") % (age, max_days, age + 1))
state = data['state']
job.command = data['command']
job.job_id = job_id
job.timeout = data['timeout']
job.env = data['env']
job.job_runner = getattr(cstar.jobrunner, data["job_runner"])
job.key_space = data['key_space'] if 'key_space' in data else None
job.output_directory = output_directory
job.sleep_on_new_runner = data['sleep_on_new_runner']
job.ssh_username = data['ssh_username']
job.ssh_identity_file = data['ssh_identity_file']
job.ssh_password = data['ssh_password']
job.ssh_lib = data['ssh_lib']
job.use_sudo = data['use_sudo']
job.sudo_args = data['sudo_args']
job.jmx_username = data['jmx_username']
job.jmx_passwordfile = data['jmx_passwordfile']
job.addl_jmx_args = data['addl_jmx_args']
job.hosts_variables = data['hosts_variables']
strategy = cstar.strategy.parse(state['strategy'])
cluster_parallel = state['cluster_parallel']
dc_parallel = state['dc_parallel']
dc_filter = state['dc_filter'] if 'dc_filter' in state else None
max_concurrency = state['max_concurrency']
job.resolve_hostnames = state['resolve_hostnames'] if 'resolve_hostnames' in state.keys() else False
job.cache_directory = state['cache_directory'] if 'cache_directory' in state.keys() else os.path.expanduser("~/.cstar/cache")
progress = cstar.progress.Progress(
running=[cstar.topology.Host(*arr) for arr in state['progress']['running']],
done=[cstar.topology.Host(*arr) for arr in state['progress']['done']],
failed=[cstar.topology.Host(*arr) for arr in state['progress']['failed']])
if retry==True:
progress.failed = set([])
original_topology = cstar.topology.Topology(cstar.topology.Host(*arr) for arr in state['original_topology'])
current_topology = cstar.topology.Topology(cstar.topology.Host(*arr) for arr in state['current_topology'])
debug("Run on hosts", original_topology)
debug("in topology", current_topology)
if strategy is cstar.strategy.Strategy.TOPOLOGY:
endpoint_mapping = endpoint_mapper(original_topology)
else:
endpoint_mapping = None
job.state = cstar.state.State(
original_topology=original_topology,
strategy=strategy,
endpoint_mapping=endpoint_mapping,
cluster_parallel=cluster_parallel,
dc_parallel=dc_parallel,
dc_filter=dc_filter,
max_concurrency=max_concurrency,
current_topology=current_topology,
stop_after=stop_after,
progress=progress,
ignore_down_nodes=state['ignore_down_nodes'])