in cassovary-core/src/main/scala/com/twitter/cassovary/graph/SharedArrayBasedDirectedGraph.scala [135:173]
private def fillEdgesMarkNodeOffsets(iterableSeq: Seq[Iterable[NodeIdEdgesMaxId]],
sharedEdgeArray: Array[Array[Int]],
nodeCollection: NodeCollection,
shardsInfo: Array[PerShardInfo],
partsMetaInfo: Seq[SharedGraphMetaInfo]): Future[Unit] = {
log.debug("loading nodes and out edges from file in parallel and marking all nodes")
val allNodeIdsSet = new ArrayBackedSet(nodeCollection.maxNodeId)
val futures = iterableSeq.indices.map {
index => futurePool {
val edgesIterable = iterableSeq(index)
val ids = new Array[Int](partsMetaInfo(index).numNodes)
val offsets = new Array[Int](partsMetaInfo(index).numNodes)
var id, edgesLength, shardIdx, offset, i = 0
edgesIterable.foreach { item =>
id = item.id
edgesLength = item.edges.length
shardIdx = EdgeShards.hash(id)
offset = shardsInfo(shardIdx).nextFreeEdgeIndex.getAndAdd(edgesLength + 1)
Array.copy(item.edges, 0, sharedEdgeArray(shardIdx), offset + 1, edgesLength)
ids(i) = id
offsets(i) = offset + 1
i += 1
sharedEdgeArray(shardIdx)(offset) = edgesLength
item.edges foreach { edge => allNodeIdsSet.add(edge) }
}
(ids, offsets)
}
}
Future.collect(futures).map { idsAndOffsetsAll =>
// serialize addition to nodeCollection
// don't need to keep allNodeIdsSet around, encode in offsets()
allNodeIdsSet.foreach { i => nodeCollection.updateOffset(i, 0) }
idsAndOffsetsAll foreach { case (ids, offsets) =>
ids.indices foreach { i => nodeCollection.updateOffset(ids(i), offsets(i)) }
}
}
}