private def fillMissingInEdges()

in cassovary-core/src/main/scala/com/twitter/cassovary/graph/ArrayBasedDirectedGraph.scala [283:380]


    private def fillMissingInEdges(nodeColl: NodeCollection, nodesOutEdges: Seq[Seq[Node]],
                                   nodesWithNoOutEdges: Seq[Node]):
    Future[Unit] = {
      log.debug("calculating in edges sizes")

      val inEdgeSizes = Int2ObjectMap[AtomicInteger](nodeColl.considerGraphSparse,
        Some(nodeColl.numNodes), Some(nodeColl.maxNodeId), isConcurrent = false)
      nodeColl.nodeIdsIterator foreach { i => inEdgeSizes.update(i, new AtomicInteger()) }

      def addInEdge(dest: Int, source: Int): Unit = {
        val inEdgeIndex = inEdgeSizes(dest).getAndIncrement
        nodeColl(dest).asInstanceOf[FillingInEdgesBiDirectionalNode].inEdges(inEdgeIndex) = source
      }

      def incEdgeSize(id: Int) { inEdgeSizes(id).incrementAndGet() }

      def getAndResetEdgeSize(id: Int) = {
        val sz = inEdgeSizes(id).intValue()
        if (sz > 0) inEdgeSizes(id).set(0)
        sz
      }

       // Calculates sizes of incoming edges arrays.
      def findInEdgesSizes(nodesOutEdges: Seq[Seq[Node]]): Future[Unit] = {
        Stat.time(statsReceiver.stat("graph_load_find_in_edge_sizes")) {

          val futures = nodesOutEdges map {
            nodes => futurePool {
              nodes foreach {
                node => node.outboundNodes foreach { outEdge =>
                  incEdgeSize(outEdge)
                }
              }
            }
          }

          Future.join(futures)
        }
      }

      def instantiateInEdges(): Future[Unit] = {
        log.debug("instantiate in edges")
        Stat.time(statsReceiver.stat("graph_load_instantiate_in_edge_arrays")) {
          val futures = (nodesOutEdges.iterator ++ Iterator(nodesWithNoOutEdges)).map {
            (nodes: Seq[Node]) => futurePool {
              nodes foreach { node =>
               // reset inEdgesSizes, and use it as index pointer of
               // the current insertion place when adding in edges
                val edgeSize = getAndResetEdgeSize(node.id)
                if (edgeSize > 0) {
                  node.asInstanceOf[FillingInEdgesBiDirectionalNode].createInEdges(edgeSize)
                }
              }
            }
          }.toSeq
          Future.join(futures)
        }
      }

      def populateInEdges(): Future[Unit] = {
        log.debug("populate in edges")
        Stat.time(statsReceiver.stat("graph_load_read_in_edge_from_dump_files")) {
          val futures = nodesOutEdges.map {
            (nodes: Seq[Node]) => futurePool {
              nodes foreach { node =>
                node.outboundNodes foreach { outEdge =>
                  addInEdge(outEdge, node.id)
                }
              }
            }
          }
          Future.join(futures)
        }
      }

      def finishInEdgesFilling(): Future[Unit] = {
        log.debug("finishing filling")
        Stat.time(statsReceiver.stat("finishing_filling_in_edges")) {
          val futures = nodesOutEdges.map {
            nodes => futurePool {
              nodes.foreach {
                node =>
                  node.asInstanceOf[FillingInEdgesBiDirectionalNode].sortInNeighbors()
              }
            }
          }
          Future.join(futures)
        }
      }

      for {
        _ <- findInEdgesSizes(nodesOutEdges)
        _ <- instantiateInEdges()
        _ <- populateInEdges()
        _ <- when(neighborsSortingStrategy != LeaveUnsorted) (finishInEdgesFilling())
      } yield ()

    }