in graph/builder/heron/builder.py [0:0]
def _create_spouts(graph_client: GremlinClient, topology_id: str,
topology_ref: str,
physical_plan: Dict[str, Any],
logical_plan: Dict[str, Any]) -> None:
LOG.info("Creating spout instance vertices")
# Create the spouts
physical_spouts: Dict[str, List[str]] = physical_plan["spouts"]
counter: int = 0
for spout_name, spout_data in logical_plan["spouts"].items():
LOG.debug("Creating vertices for instances of spout component: %s",
spout_name)
for instance_name in physical_spouts[spout_name]:
instance: Dict[str, Union[str, int]] = \
tracker.parse_instance_name(instance_name)
LOG.debug("Creating vertex for spout instance: %s", instance_name)
stream_manager_id: str = \
physical_plan["instances"][instance_name]["stmgrId"]
spout: Vertex = (graph_client.graph_traversal
.addV("spout")
.property("container", instance["container"])
.property("task_id", instance["task_id"])
.property("component", spout_name)
.property("stream_manager", stream_manager_id)
.property("spout_type",
spout_data["spout_type"])
.property("spout_source",
spout_data["spout_source"])
.property("topology_id", topology_id)
.property("topology_ref", topology_ref)
.next())
# Connect the spout to its container vertex
(graph_client.graph_traversal.V(spout).addE("is_within")
.to(graph_client.graph_traversal.V()
.hasLabel("container")
.has("topology_id", topology_id)
.has("topology_ref", topology_ref)
.has("id", instance["container"])
)
.next())
counter += 1
LOG.info("Created %d spout instances", counter)