private def createReverseDirEdgeArray()

in cassovary-core/src/main/scala/com/twitter/cassovary/graph/SharedArrayBasedDirectedGraph.scala [216:338]


    private def createReverseDirEdgeArray(outEdges: Sharded2dArray[Int], nodeCollection: NodeCollection):
    Future[Option[Sharded2dArray[Int]]] = {

      val numNodes = nodeCollection.size
      val nodesWithInEdges = new NodeCollection(nodeCollection.graphInfo, forceSparseRepr)
      val reverseShardsInfo = newShardsInfo()

      val inEdgesSizes = Int2ObjectMap[AtomicInteger](nodeCollection.considerGraphSparse,
        Some(nodeCollection.graphInfo.numNodes), Some(nodeCollection.maxNodeId),isConcurrent = false)

      def partitionNodeIdsPerShard() = {
        nodeCollection foreach { id =>
          shardsInfo(EdgeShards.hash(id)).numIdsMapped += 1
          inEdgesSizes.update(id, new AtomicInteger())
        }
        shardsInfo foreach { shardInfo =>
          shardInfo.idsMapped = new Array[Int](shardInfo.numIdsMapped)
          shardInfo.numIdsMapped = 0 //reusing this variable
        }
        // fill out the array per shard by binning nodeCollection's ids into each shard
        nodeCollection foreach { id =>
          val shard = shardsInfo(EdgeShards.hash(id))
          shard.idsMapped(shard.numIdsMapped) = id
          shard.numIdsMapped += 1
        }
      }

      // do function f(shardNum, id, idIndex) for all nodes, divided into EdgeShards.numShards futures
      def doForAllNodeIdsDetail(f: (Int, Int, Int) => Unit): Future[Unit] = {
        val futures = shardsInfo map { oneShardInfo =>
          futurePool {
            for (idIndex <- 0 until oneShardInfo.numIdsMapped)
              f(oneShardInfo.shardNum, oneShardInfo.idsMapped(idIndex), idIndex)
          }
        }
        Future.join(futures)
      }

      def doForAllNodeIds(f: Int => Unit): Future[Unit] = {
        doForAllNodeIdsDetail { (_, id, _) => f(id) }
      }

      def findInEdgesSizes() = {
        log.debug("calculating incoming neighbor sizes for each node")
        doForAllNodeIds { id =>
          outEdges.foreach(id) { neighbor =>
            inEdgesSizes(neighbor).incrementAndGet()
          }
        }
      }

      def findInShardSizes(): Future[Unit] = {
        val offsetsAll = shardsInfo map { oneShardInfo => new Array[Int](oneShardInfo.numIdsMapped) }
        doForAllNodeIdsDetail { (shardNum, id, idIndex) =>
          val len = inEdgesSizes(id).get
          val offset = if (len > 0) {
            reverseShardsInfo(shardNum).numEdges.getAndAdd(1 + len) + 1
          }
          else 0
          offsetsAll(shardNum)(idIndex) = offset
        } map { _ =>
          //update offsets in serial
          shardsInfo foreach { oneShardInfo =>
            val offsetsThisShard = offsetsAll(oneShardInfo.shardNum)
            for (index <- 0 until oneShardInfo.idsMapped.length) {
              nodesWithInEdges.updateOffset(oneShardInfo.idsMapped(index), offsetsThisShard(index))
            }
          }
        }
      }

      def fillInEdgesOffsets(sharedInEdgesArray: Array[Array[Int]]): Future[Unit] = {
        log.debug("filling lengths in 2d array")
        Stat.timeFuture(statsReceiver.stat("graph_load_fill_in_edge_lengths_and_offsets")) {
          doForAllNodeIds { id =>
            val len = inEdgesSizes(id).get
            if (len > 0) {
              val shard = EdgeShards.hash(id)
              val off = nodesWithInEdges.offsets(id)
              sharedInEdgesArray(shard)(off - 1) = len
              inEdgesSizes(id).set(off) // we will start storing actual neighbors starting here
            }
          }
        }
      }

      def fillInEdges(sharedInEdgesArray: Array[Array[Int]]): Future[Unit] = {
        log.debug("filling in edges")
        Stat.timeFuture(statsReceiver.stat("graph_load_fill_in_edges")) {
          doForAllNodeIds { nodeId =>
            outEdges.foreach(nodeId) { neighborId =>
              val shard = sharedInEdgesArray(EdgeShards.hash(neighborId))
              //remember that we re-used inEdgesSizes to point to offset of edges for this neighborId
              shard(inEdgesSizes(neighborId).getAndIncrement) = nodeId
            }
          }
        }
      }

      def sortInEdges(sharedInEdgesArray: Array[Array[Int]]): Future[Unit] = {
        log.debug("sorting in edges in place")
        doForAllNodeIds { nodeId =>
          val offset = nodesWithInEdges.getEdgeOffset(nodeId)
          if (offset > 0) {
            val shardNum = EdgeShards.hash(nodeId)
            val numEdges = sharedInEdgesArray(shardNum)(offset - 1)
            java.util.Arrays.sort(sharedInEdgesArray(shardNum), offset, offset + numEdges)
          }
        }
      }

      // main set of steps to build incoming edges in the graph
      log.info("Now building the reverse direction representation")
      partitionNodeIdsPerShard()
      for {
        _ <- findInEdgesSizes()
        _ <- findInShardSizes()
        sharedInEdges = instantiateSharedArray(reverseShardsInfo)
        _ <- fillInEdgesOffsets(sharedInEdges)
        _ <- fillInEdges(sharedInEdges)
        _ <- sortInEdges(sharedInEdges)
      } yield Some(sharded2dArray(nodesWithInEdges, sharedInEdges))
    }