graph/gremlin/client.py (70 lines of code) (raw):
# Copyright 2018 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
""" This module contains classes and methods for connecting to and
communicating with a Gremlin Server instance. """
import logging
import errno
from socket import error as socket_error
from gremlin_python.structure.graph import Graph
from gremlin_python.process.graph_traversal import has, GraphTraversalSource
from gremlin_python.process.strategies import SubgraphStrategy
from gremlin_python.driver.driver_remote_connection \
import DriverRemoteConnection
from caladrius.config.keys import ConfKeys
LOG: logging.Logger = logging.getLogger(__name__)
class GremlinClient(object):
""" Client class for the TinkerPop Gremlin Server """
def __init__(self, config: dict, graph_name: str = "g") -> None:
self.config: dict = config
self.gremlin_server_url: str = \
self.config[ConfKeys.GREMLIN_SERVER_URL.value]
# Create remote graph traversal object
LOG.info("Connecting to graph database at: %s",
self.gremlin_server_url)
self.graph_name: str = graph_name
self.graph: Graph = Graph()
self.connect()
def __hash__(self) -> int:
return hash((self.gremlin_server_url, self.graph_name))
def __eq__(self, other: object) -> bool:
if not isinstance(other, GremlinClient):
return False
if ((self.gremlin_server_url == other.gremlin_server_url) and
(self.graph_name == other.graph_name)):
return True
return False
def connect(self) -> None:
""" Creates (or refreshes) the remote connection to the gremlin server.
Raises:
ConnectionRefusedError: If the gremlin sever at the configured
address cannot be found.
"""
connect_str: str = f"ws://{self.gremlin_server_url}/gremlin"
try:
self.graph_traversal: GraphTraversalSource = \
self.graph.traversal().withRemote(
DriverRemoteConnection(connect_str, self.graph_name))
except socket_error as serr:
if serr.errno != errno.ECONNREFUSED:
# Not the error we are looking for, re-raise
LOG.error("Socket error occurred")
raise serr
# connection refused
msg: str = (f"Connection to gremlin sever at: "
f"{self.gremlin_server_url} using connection string: "
f"{connect_str} was refused. Is the server active?")
LOG.error(msg)
raise ConnectionRefusedError(msg)
def topology_ref_exists(self, topology_id: str, topology_ref: str) -> bool:
""" Checks weather vertices exist in the graph database with the
supplied topology id and ref values.
Arguments:
topology_id (str): The topology identification string.
topology_ref (str): The reference string to check for.
Returns:
Boolean flag indicating if vertices with the supplied ID and
reference are present (True) or not (False) in the graph database.
"""
num_vertices: int = len((self.graph_traversal.V()
.has("topology_id", topology_id)
.has("topology_ref", topology_ref)
.toList()))
if num_vertices:
LOG.debug("%d vertices with the topology id: %s and reference: %s "
"are present in the graph database.", num_vertices,
topology_id, topology_ref)
return True
return False
def raise_if_missing(self, topology_id: str, topology_ref: str) -> None:
""" Checks weather vertices exist in the graph database with the
supplied topology id and ref values and raises a error if they don't.
Arguments:
topology_id (str): The topology identification string.
topology_ref (str): The reference string to check for.
Raises:
RuntimeError: If vertices with the supplied topology ID and
reference are not present in the graph database.
"""
if not self.topology_ref_exists(topology_id, topology_ref):
msg: str = (f"Topology: {topology_id} reference: {topology_ref} "
f"is not present in the graph database")
LOG.error(msg)
raise RuntimeError(msg)
def topology_subgraph(self, topology_id: str,
topology_ref: str) -> GraphTraversalSource:
""" Gets a gremlin graph traversal source limited to the sub-graph of
vertices with the supplied topology ID and topology reference
properties.
Arguments:
topology_id (str): The topology identification string.
topology_ref (str): The reference string for the version of the
topology you want to sub-graph.
Returns:
A GraphTraversalSource instance linked to the desired sub-graph
"""
LOG.debug("Creating traversal source for topology %s subgraph with "
"reference: %s", topology_id, topology_ref)
topo_graph_traversal: GraphTraversalSource = \
self.graph_traversal.withStrategies(
SubgraphStrategy(vertices=has("topology_ref", topology_ref)
.has("topology_id", topology_id)))
return topo_graph_traversal