in cassovary-core/src/main/scala/com/twitter/cassovary/graph/SharedArrayBasedDirectedGraph.scala [216:338]
private def createReverseDirEdgeArray(outEdges: Sharded2dArray[Int], nodeCollection: NodeCollection):
Future[Option[Sharded2dArray[Int]]] = {
val numNodes = nodeCollection.size
val nodesWithInEdges = new NodeCollection(nodeCollection.graphInfo, forceSparseRepr)
val reverseShardsInfo = newShardsInfo()
val inEdgesSizes = Int2ObjectMap[AtomicInteger](nodeCollection.considerGraphSparse,
Some(nodeCollection.graphInfo.numNodes), Some(nodeCollection.maxNodeId),isConcurrent = false)
def partitionNodeIdsPerShard() = {
nodeCollection foreach { id =>
shardsInfo(EdgeShards.hash(id)).numIdsMapped += 1
inEdgesSizes.update(id, new AtomicInteger())
}
shardsInfo foreach { shardInfo =>
shardInfo.idsMapped = new Array[Int](shardInfo.numIdsMapped)
shardInfo.numIdsMapped = 0 //reusing this variable
}
// fill out the array per shard by binning nodeCollection's ids into each shard
nodeCollection foreach { id =>
val shard = shardsInfo(EdgeShards.hash(id))
shard.idsMapped(shard.numIdsMapped) = id
shard.numIdsMapped += 1
}
}
// do function f(shardNum, id, idIndex) for all nodes, divided into EdgeShards.numShards futures
def doForAllNodeIdsDetail(f: (Int, Int, Int) => Unit): Future[Unit] = {
val futures = shardsInfo map { oneShardInfo =>
futurePool {
for (idIndex <- 0 until oneShardInfo.numIdsMapped)
f(oneShardInfo.shardNum, oneShardInfo.idsMapped(idIndex), idIndex)
}
}
Future.join(futures)
}
def doForAllNodeIds(f: Int => Unit): Future[Unit] = {
doForAllNodeIdsDetail { (_, id, _) => f(id) }
}
def findInEdgesSizes() = {
log.debug("calculating incoming neighbor sizes for each node")
doForAllNodeIds { id =>
outEdges.foreach(id) { neighbor =>
inEdgesSizes(neighbor).incrementAndGet()
}
}
}
def findInShardSizes(): Future[Unit] = {
val offsetsAll = shardsInfo map { oneShardInfo => new Array[Int](oneShardInfo.numIdsMapped) }
doForAllNodeIdsDetail { (shardNum, id, idIndex) =>
val len = inEdgesSizes(id).get
val offset = if (len > 0) {
reverseShardsInfo(shardNum).numEdges.getAndAdd(1 + len) + 1
}
else 0
offsetsAll(shardNum)(idIndex) = offset
} map { _ =>
//update offsets in serial
shardsInfo foreach { oneShardInfo =>
val offsetsThisShard = offsetsAll(oneShardInfo.shardNum)
for (index <- 0 until oneShardInfo.idsMapped.length) {
nodesWithInEdges.updateOffset(oneShardInfo.idsMapped(index), offsetsThisShard(index))
}
}
}
}
def fillInEdgesOffsets(sharedInEdgesArray: Array[Array[Int]]): Future[Unit] = {
log.debug("filling lengths in 2d array")
Stat.timeFuture(statsReceiver.stat("graph_load_fill_in_edge_lengths_and_offsets")) {
doForAllNodeIds { id =>
val len = inEdgesSizes(id).get
if (len > 0) {
val shard = EdgeShards.hash(id)
val off = nodesWithInEdges.offsets(id)
sharedInEdgesArray(shard)(off - 1) = len
inEdgesSizes(id).set(off) // we will start storing actual neighbors starting here
}
}
}
}
def fillInEdges(sharedInEdgesArray: Array[Array[Int]]): Future[Unit] = {
log.debug("filling in edges")
Stat.timeFuture(statsReceiver.stat("graph_load_fill_in_edges")) {
doForAllNodeIds { nodeId =>
outEdges.foreach(nodeId) { neighborId =>
val shard = sharedInEdgesArray(EdgeShards.hash(neighborId))
//remember that we re-used inEdgesSizes to point to offset of edges for this neighborId
shard(inEdgesSizes(neighborId).getAndIncrement) = nodeId
}
}
}
}
def sortInEdges(sharedInEdgesArray: Array[Array[Int]]): Future[Unit] = {
log.debug("sorting in edges in place")
doForAllNodeIds { nodeId =>
val offset = nodesWithInEdges.getEdgeOffset(nodeId)
if (offset > 0) {
val shardNum = EdgeShards.hash(nodeId)
val numEdges = sharedInEdgesArray(shardNum)(offset - 1)
java.util.Arrays.sort(sharedInEdgesArray(shardNum), offset, offset + numEdges)
}
}
}
// main set of steps to build incoming edges in the graph
log.info("Now building the reverse direction representation")
partitionNodeIdsPerShard()
for {
_ <- findInEdgesSizes()
_ <- findInShardSizes()
sharedInEdges = instantiateSharedArray(reverseShardsInfo)
_ <- fillInEdgesOffsets(sharedInEdges)
_ <- fillInEdges(sharedInEdges)
_ <- sortInEdges(sharedInEdges)
} yield Some(sharded2dArray(nodesWithInEdges, sharedInEdges))
}