graph/utils/heron.py (179 lines of code) (raw):

# Copyright 2018 Twitter, Inc. # Licensed under the Apache License, Version 2.0 # http://www.apache.org/licenses/LICENSE-2.0 """ This module contains utility methods for dealing with physical graphs in the graph database.""" import logging import datetime as dt import os import json from collections import defaultdict from multiprocessing import Process, Queue from string import Template from typing import List, Dict, Any, Optional, Tuple from gremlin_python.process.graph_traversal import outE from caladrius.graph.gremlin.client import GremlinClient from caladrius.graph.builder.heron import builder from caladrius.common.heron import tracker from caladrius.common.heron import zookeeper LOG: logging.Logger = logging.getLogger(__name__) # The format is paths/topologyname_cluster_environ_time.json file_path_template = Template("paths/$topology-$cluster-$environ-$time.json") def find_all_paths(parent_to_child, start, path=[], path_dict=defaultdict()): """This is a recursive function that finds all paths from the given start node to sinks and returns them.""" p = path.copy() p.append(start) if start not in parent_to_child: return p, path_dict paths = [] for node in parent_to_child[start]: if node in path_dict: new_paths = path_dict[node] else: new_paths, path_dict = find_all_paths(parent_to_child, node, p, path_dict) path_dict[node] = new_paths.copy() if type(new_paths[0]) is int: # there is only one downstream path from current node paths.append(new_paths) else: # there are multiple downstream paths from current node for new_path in new_paths: paths.append(new_path) # LOG.info("Returning paths for node: %d", start) return paths, path_dict def path_helper(parent_to_child: dict, spouts: List) -> List: """This is a helper function that creates a separate process for every spout and uses that process to calculate all paths to sinks for that spout. This function is expected to be compute intensive and so, multi-processing is required.""" paths: List = [] path_dict = defaultdict() for spout in spouts: results, path_dict = find_all_paths(parent_to_child, spout, path_dict=path_dict) for result in results: paths.append(result) return paths def get_all_paths(graph_client: GremlinClient, topology_id: str) -> List[List[str]]: """ This function first gets all spouts from gremlin. Then it creates a dictionary, mapping all tasks to downstream tasks. It passes this dictionary along to another function that calculates all paths from the provided spouts to sinks. Arguments: graph_client (GremlinClient): The client instance for the graph database. topology_id (str): The topology ID string. Returns: All possible paths from sources to sinks. """ LOG.info("Graph size: %d vertices %d edges", graph_client.graph_traversal.V().count().toList()[0], graph_client.graph_traversal.E().count().toList()[0]) conn_type = "logically_connected" start: dt.datetime = dt.datetime.now() spouts = graph_client.graph_traversal.V().has("topology_id", topology_id).\ hasLabel("spout").where(outE(conn_type)).dedup().toList() spout_tasks = [] parent_to_child = dict() for spout in spouts: downstream_task_vertices = [spout] spout_tasks.append(graph_client.graph_traversal.V(spout).properties('task_id').value().next()) while len(downstream_task_vertices) != 0: for vertex in downstream_task_vertices: vertex_task_id = graph_client.graph_traversal.V(vertex).properties('task_id').value().next() downstream_task_vertices = graph_client.graph_traversal.V(vertex).out(conn_type).dedup().toList() downstream_task_ids = graph_client.graph_traversal.V(vertex).out(conn_type).properties( 'task_id').value().dedup().toList() if len(downstream_task_vertices) != 0: parent_to_child[vertex_task_id] = downstream_task_ids paths: List = path_helper(parent_to_child, spout_tasks) LOG.info("Number of paths returned: %d", len(paths)) end: dt.datetime = dt.datetime.now() LOG.info("Time spent in fetching all paths: %d seconds", (end - start).total_seconds()) return paths def get_current_refs(graph_client: GremlinClient, topology_id: str) -> List[str]: """ Gets a list of topology reference strings for graphs with the supplied topology id. Arguments: graph_client (GremlinClient): The client instance for the graph database. topology_id (str): The topology ID string. Returns: A list of topology reference strings. """ refs: List[str] = (graph_client.graph_traversal.V() .has("topology_id", topology_id) .values("topology_ref").dedup().toList()) return [ref for ref in refs if "current" in ref] def most_recent_graph_ref(graph_client: GremlinClient, topology_id: str ) -> Optional[Tuple[str, dt.datetime]]: """ Gets the most recent topology reference, for the supplied topology ID in a tuple with the creation datetime object. Arguments: graph_client (GremlinClient): The client instance for the graph database. topology_id (str): The topology ID string. Returns: A 2-tuple where the first item is the topology reference string and the second is the graph creation datetime. """ LOG.info("Finding the most recent graph reference for topology: %s", topology_id) current_refs: List[str] = get_current_refs(graph_client, topology_id) if current_refs: time_list: List[Tuple[str, dt.datetime]] = [] for ref in current_refs: timestamp: str = ref.split("/")[1].split("+")[0] time_dt: dt.datetime = dt.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") time_list.append((ref, time_dt.astimezone(dt.timezone.utc))) return sorted(time_list, key=lambda x: x[1])[-1] else: LOG.info("No graphs found for topology %s", topology_id) return None def _physical_plan_still_current(topology_id: str, most_recent_graph_ts: dt.datetime, zk_connection: str, zk_root_node: str, zk_time_offset: int) -> bool: LOG.info("Checking if the physical plan in the graph database for " "topology: %s is still current", topology_id) recent_topo_update_ts: dt.datetime = \ zookeeper.last_topo_update_ts_html(zk_connection, zk_root_node, topology_id, zk_time_offset) if most_recent_graph_ts > recent_topo_update_ts: return True return False def _build_graph(graph_client: GremlinClient, tracker_url: str, cluster: str, environ: str, topology_id: str, ref_prefix: str = "current" ) -> str: topology_ref: str = (ref_prefix + "/" + dt.datetime.now(dt.timezone.utc).isoformat()) logical_plan: Dict[str, Any] = \ tracker.get_logical_plan(tracker_url, cluster, environ, topology_id) physical_plan: Dict[str, Any] = \ tracker.get_physical_plan(tracker_url, cluster, environ, topology_id) builder.create_physical_graph(graph_client, topology_id, topology_ref, logical_plan, physical_plan) return topology_ref def read_paths(zk_config: Dict[str, any], topology_id: str, cluster: str, environ: str,) -> List: zookeeper_url = zk_config["heron.statemgr.connection.string"] parts = zookeeper_url.split(".") parts[1] = cluster zookeeper_url = ".".join(parts) recent_topo_update_ts: dt.datetime = zookeeper.last_topo_update_ts_html(zookeeper_url, zk_config["heron.statemgr.root.path"], topology_id, zk_config["zk.time.offset"]) file_name = file_path_template.substitute(topology=topology_id, cluster=cluster, environ=environ, time=recent_topo_update_ts.strftime('%m_%d_%Y_%I_%M_%S')) with open(file_name) as file: path_data = json.load(file) return path_data["paths"] def paths_check(graph_client: GremlinClient, zk_config: Dict[str, any], cluster: str, environ: str, topology_id: str): """ Checks to see if we have a file containing all paths for the topology) Arguments: graph_client (GremlinClient): The client instance for the graph database. zk_config (dict): A dictionary containing ZK config information. "heron.statemgr.connection.string" and "heron.statemgr.root.path" should be present. cluster (str): The name of the cluster the topology is running on. environ (str): The environment the topology is running in. topology_id (str): The topology ID string. """ zookeeper_url = zk_config["heron.statemgr.connection.string"] parts = zookeeper_url.split(".") parts[1] = cluster zookeeper_url = ".".join(parts) LOG.info(zookeeper_url) recent_topo_update_ts: dt.datetime = zookeeper.last_topo_update_ts_html(zookeeper_url, zk_config["heron.statemgr.root.path"], topology_id, zk_config["zk.time.offset"]) # test to see if a file exists with the right name file_name = file_path_template.substitute(topology=topology_id, cluster=cluster, environ=environ, time=recent_topo_update_ts.strftime('%m_%d_%Y_%I_%M_%S')) LOG.info("Paths file: %s", file_name) if not os.path.exists(file_name): # fetch paths and then write to file all_paths = get_all_paths(graph_client, topology_id) # writing with open(file_name, "w+") as file: json.dump({'paths': all_paths}, file) def graph_check(graph_client: GremlinClient, zk_config: Dict[str, Any], tracker_url: str, cluster: str, environ: str, topology_id: str) -> str: """ Checks to see if the specified topology has an entry in the graph database and if so whether that entry was created since the latest change to the physical plan object stored in the ZooKeeper cluster (defined in the supplied config object) Arguments: graph_client (GremlinClient): The client instance for the graph database. zk_config (dict): A dictionary containing ZK config information. "heron.statemgr.connection.string" and "heron.statemgr.root.path" should be present. tracker_url (str): The URL for the Heron Tracker API. cluster (str): The name of the cluster the topology is running on. environ (str): The environment the topology is running in. topology_id (str): The topology ID string. Returns: The topology reference for the physical graph that was either found or created in the graph database. """ most_recent_graph: Optional[Tuple[str, dt.datetime]] = \ most_recent_graph_ref(graph_client, topology_id) zookeeper_url = zk_config["heron.statemgr.connection.string"] parts = zookeeper_url.split(".") parts[1] = cluster zookeeper_url = ".".join(parts) LOG.info("Zookeeper URL: %s", zookeeper_url) if not most_recent_graph: LOG.info("There are currently no physical graphs in the database " "for topology %s", topology_id) topology_ref: str = _build_graph(graph_client, tracker_url, cluster, environ, topology_id) elif not _physical_plan_still_current( topology_id, most_recent_graph[1], zookeeper_url, zk_config["heron.statemgr.root.path"], zk_config["zk.time.offset"]): LOG.info("The physical plan for topology %s has changed since " "the last physical graph (reference: %s) was built", topology_id, most_recent_graph[0]) topology_ref = _build_graph(graph_client, tracker_url, cluster, environ, topology_id) else: topology_ref = most_recent_graph[0] LOG.info("The current physical plan for topology %s is already " "represented in the graph database with reference %s", topology_id, topology_ref) return topology_ref