in cassovary-core/src/main/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraph.scala [283:380]
private def fillMissingInEdges(nodeColl: NodeCollection, nodesOutEdges: Seq[Seq[Node]],
nodesWithNoOutEdges: Seq[Node]):
Future[Unit] = {
log.debug("calculating in edges sizes")
val inEdgeSizes = Int2ObjectMap[AtomicInteger](nodeColl.considerGraphSparse,
Some(nodeColl.numNodes), Some(nodeColl.maxNodeId), isConcurrent = false)
nodeColl.nodeIdsIterator foreach { i => inEdgeSizes.update(i, new AtomicInteger()) }
def addInEdge(dest: Int, source: Int): Unit = {
val inEdgeIndex = inEdgeSizes(dest).getAndIncrement
nodeColl(dest).asInstanceOf[FillingInEdgesBiDirectionalNode].inEdges(inEdgeIndex) = source
}
def incEdgeSize(id: Int) { inEdgeSizes(id).incrementAndGet() }
def getAndResetEdgeSize(id: Int) = {
val sz = inEdgeSizes(id).intValue()
if (sz > 0) inEdgeSizes(id).set(0)
sz
}
// Calculates sizes of incoming edges arrays.
def findInEdgesSizes(nodesOutEdges: Seq[Seq[Node]]): Future[Unit] = {
Stat.time(statsReceiver.stat("graph_load_find_in_edge_sizes")) {
val futures = nodesOutEdges map {
nodes => futurePool {
nodes foreach {
node => node.outboundNodes foreach { outEdge =>
incEdgeSize(outEdge)
}
}
}
}
Future.join(futures)
}
}
def instantiateInEdges(): Future[Unit] = {
log.debug("instantiate in edges")
Stat.time(statsReceiver.stat("graph_load_instantiate_in_edge_arrays")) {
val futures = (nodesOutEdges.iterator ++ Iterator(nodesWithNoOutEdges)).map {
(nodes: Seq[Node]) => futurePool {
nodes foreach { node =>
// reset inEdgesSizes, and use it as index pointer of
// the current insertion place when adding in edges
val edgeSize = getAndResetEdgeSize(node.id)
if (edgeSize > 0) {
node.asInstanceOf[FillingInEdgesBiDirectionalNode].createInEdges(edgeSize)
}
}
}
}.toSeq
Future.join(futures)
}
}
def populateInEdges(): Future[Unit] = {
log.debug("populate in edges")
Stat.time(statsReceiver.stat("graph_load_read_in_edge_from_dump_files")) {
val futures = nodesOutEdges.map {
(nodes: Seq[Node]) => futurePool {
nodes foreach { node =>
node.outboundNodes foreach { outEdge =>
addInEdge(outEdge, node.id)
}
}
}
}
Future.join(futures)
}
}
def finishInEdgesFilling(): Future[Unit] = {
log.debug("finishing filling")
Stat.time(statsReceiver.stat("finishing_filling_in_edges")) {
val futures = nodesOutEdges.map {
nodes => futurePool {
nodes.foreach {
node =>
node.asInstanceOf[FillingInEdgesBiDirectionalNode].sortInNeighbors()
}
}
}
Future.join(futures)
}
}
for {
_ <- findInEdgesSizes(nodesOutEdges)
_ <- instantiateInEdges()
_ <- populateInEdges()
_ <- when(neighborsSortingStrategy != LeaveUnsorted) (finishInEdgesFilling())
} yield ()
}