in cassovary-core/src/main/scala/com/twitter/cassovary/graph/bipartite/IterativeLinkAnalyzer.scala [120:182]
def analyze(leftNodes: Seq[SuppliedNodeInfo], numIterations: Int,
neighborsProvider: Node => Seq[Int], sortOutputByScore: Boolean):
(List[(Int, Double, Seq[Int])], List[(Int, Double, Seq[Int])]) = {
def debugString(hash: collection.Map[Int, NodeInfo]) = {
(hash.toList.sortBy(_._1).map { case (id, nodeInfo) =>
"%d %.3f %s".format(id, nodeInfo.weight, nodeInfo.contributors.top(3).map{_.node.id})
}).mkString("\n")
}
def listFrom(hash: collection.Map[Int, NodeInfo]) = {
hash.valuesIterator.toList.map {
nodeInfo => (nodeInfo.node.id, nodeInfo.weight, nodeInfo.topContributorIds)
}
}
def sortedListFrom(hash: collection.Map[Int, NodeInfo]) = {
listFrom(hash).sortBy( - _._2)
}
// TODO -- perhaps use a Pair as well as make it clearer
// set up nodeInfos in both directions
// one map in each direction, of which ids are present and their info
val nodeInfos = List(new mutable.HashMap[Int, NodeInfo], new mutable.HashMap[Int, NodeInfo])
nodeInfos(0) ++= leftNodes map { suppliedNodeInfo: SuppliedNodeInfo =>
suppliedNodeInfo.node.id -> NodeInfo(suppliedNodeInfo.node,
suppliedNodeInfo.initialIterationWeight, suppliedNodeInfo.initialIterationWeight,
neighborsProvider(suppliedNodeInfo.node).length)
}
// iterate
(1 to numIterations) foreach { iter =>
val (flowReverse, resetProbUse) = if (iter % 2 == 1)
(false, resetProbOnRight)
else
(true, resetProbOnLeft)
val isRightUninitialized = (iter == 1)
val isLastIterOnEitherSide = (iter == numIterations - 1 ) || (iter == numIterations)
iterate(nodeInfos, resetProbUse, neighborsProvider, flowReverse,
isRightUninitialized, isLastIterOnEitherSide)
log.ifTrace { "After iteration %d , weights changed for %s side : \n%s".format(
iter, if (flowReverse) "left" else "right",
debugString(nodeInfos(iter % 2))) }
log.ifDebug { "Finished iteration %d".format(iter) }
}
log.ifDebug {
"Finished all iterations: LHS size = %d RHS size = %d LHS->RHS # = %d edges ".format(
nodeInfos(0).size, nodeInfos(1).size, nodeInfos(0).values.foldLeft(0) { _ + _.numNeighbors}
)}
if (sortOutputByScore) {
//sort output
val sortedLeft = Stat.time(statsReceiver.stat("bila_sort_left")) { sortedListFrom(nodeInfos(0)) }
log.ifDebug { "Finished sorting left" }
val sortedRight = Stat.time(statsReceiver.stat("bila_sort_right")) { sortedListFrom(nodeInfos(1)) }
log.ifDebug { "Finished sorting right" }
(sortedLeft, sortedRight)
} else {
(listFrom(nodeInfos(0)), listFrom(nodeInfos(1)))
}
}