def _parse()

in cstar/jobreader.py [0:0]


def _parse(input, file, output_directory, job, job_id, stop_after, max_days, endpoint_mapper, retry=False):
    data = json.loads(input)

    if 'version' not in data:
        raise BadFileFormatVersion("Incompatible file format version, wanted %d" %
                                   (cstar.jobwriter.FILE_FORMAT_VERSION,))
    if data['version'] != cstar.jobwriter.FILE_FORMAT_VERSION:
        raise BadFileFormatVersion("Incompatible file format version, wanted %d but %s is of version %d" %
                                   (cstar.jobwriter.FILE_FORMAT_VERSION, file, data['version']))

    creation_time = datetime.datetime.utcfromtimestamp(data["creation_timestamp"])
    age = (datetime.datetime.utcnow() - creation_time).days
    if age > max_days:
        raise FileTooOld(("Job created %d days ago, which is more than the current maximum age of %d. " +
                          "Use --max-job-age %d if you really want to run this job.") % (age, max_days, age + 1))

    state = data['state']
    job.command = data['command']
    job.job_id = job_id
    job.timeout = data['timeout']
    job.env = data['env']
    job.job_runner = getattr(cstar.jobrunner, data["job_runner"])
    job.key_space = data['key_space'] if 'key_space' in data else None
    job.output_directory = output_directory
    job.sleep_on_new_runner = data['sleep_on_new_runner']
    job.ssh_username = data['ssh_username']
    job.ssh_identity_file = data['ssh_identity_file']
    job.ssh_password = data['ssh_password']
    job.ssh_lib = data['ssh_lib']
    job.use_sudo = data['use_sudo']
    job.sudo_args = data['sudo_args']
    job.jmx_username = data['jmx_username']
    job.jmx_passwordfile = data['jmx_passwordfile']
    job.addl_jmx_args = data['addl_jmx_args']
    job.hosts_variables = data['hosts_variables']

    strategy = cstar.strategy.parse(state['strategy'])
    cluster_parallel = state['cluster_parallel']
    dc_parallel = state['dc_parallel']
    dc_filter = state['dc_filter'] if 'dc_filter' in state else None
    max_concurrency = state['max_concurrency']
    job.resolve_hostnames = state['resolve_hostnames'] if 'resolve_hostnames' in state.keys() else False
    job.cache_directory = state['cache_directory'] if 'cache_directory' in state.keys() else os.path.expanduser("~/.cstar/cache")

    progress = cstar.progress.Progress(
        running=[cstar.topology.Host(*arr) for arr in state['progress']['running']],
        done=[cstar.topology.Host(*arr) for arr in state['progress']['done']],
        failed=[cstar.topology.Host(*arr) for arr in state['progress']['failed']])

    if retry==True:
        progress.failed = set([])

    original_topology = cstar.topology.Topology(cstar.topology.Host(*arr) for arr in state['original_topology'])
    current_topology = cstar.topology.Topology(cstar.topology.Host(*arr) for arr in state['current_topology'])

    debug("Run on hosts", original_topology)
    debug("in topology", current_topology)

    if strategy is cstar.strategy.Strategy.TOPOLOGY:
        endpoint_mapping = endpoint_mapper(original_topology)
    else:
        endpoint_mapping = None

    job.state = cstar.state.State(
        original_topology=original_topology,
        strategy=strategy,
        endpoint_mapping=endpoint_mapping,
        cluster_parallel=cluster_parallel,
        dc_parallel=dc_parallel,
        dc_filter=dc_filter,
        max_concurrency=max_concurrency,
        current_topology=current_topology,
        stop_after=stop_after,
        progress=progress,
        ignore_down_nodes=state['ignore_down_nodes'])