in src/scala/com/twitter/simclusters_v2/scalding/offline_job/adhoc/TweetSimilarityEvaluationAdhocApp.scala [121:361]
override def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
// path to read the tweet -> top cluster data set. should be the same from the SimClustersTweetEmbeddingAdhocApp job
val tweetTopKClustersPath = args("tweet_top_k")
// path to read the cluster -> top tweets data set. should be the same from the SimClustersTweetEmbeddingAdhocApp job
val clusterTopKTweetsPath = args("cluster_top_k")
// path to read the sampled tweets, should be the same from TweetSimilarityEvaluationSamplingAdhocApp
val tweetsPath = args("tweets")
// see the comment of this class. this is to determine which tweet should be ground truth
val threshold = args.double("threshold", 0.8)
// see the comment of this class. this is to determine which tweet should be ground truth
val topK = args.int("topK", 100)
// output path for evaluation results
val output = args("output")
// read tweet -> top clusters data set
val tweetTopKClusters: SparseRowMatrix[TweetId, ClusterId, Double] =
SparseRowMatrix(
TypedPipe
.from(
VersionedKeyValSource[TweetId, List[(ClusterId, Double)]](tweetTopKClustersPath)
)
.mapValues(_.filter(_._2 > 0.001).toMap),
isSkinnyMatrix = true
).rowL2Normalize
// read cluster -> top tweets data set
val clusterTopTweets: SparseRowMatrix[ClusterId, TweetId, Double] =
SparseRowMatrix(
TypedPipe
.from(
VersionedKeyValSource[ClusterId, List[(TweetId, Double)]](clusterTopKTweetsPath)
)
.mapValues(_.filter(_._2 > 0.02).toMap),
isSkinnyMatrix = false
)
// read the sampled tweets from TweetSimilarityEvaluationSamplingAdhocApp
val tweetSubset = TypedPipe.from(TypedTsv[(Long, Int, Int)](tweetsPath))
// the tweet -> top clusters for the sampled tweets
val tweetEmbeddingSubset =
tweetTopKClusters.filterRows(tweetSubset.map(_._1))
// compute ground-truth top similar tweets for each sampled tweets.
// for each sampled tweets, we compute their similarity with every tweets in the tweet -> top clusters data set.
// we filter out those with similarity score smaller than the threshold and keep top k as the ground truth similar tweets
val groundTruthData = tweetTopKClusters.toSparseMatrix
.multiplySkinnySparseRowMatrix(
tweetEmbeddingSubset.toSparseMatrix.transpose.toSparseRowMatrix(true),
numReducersOpt = Some(5000)
)
.toSparseMatrix
.transpose
.filter((_, _, v) => v > threshold)
.sortWithTakePerRow(topK)(Ordering.by(-_._2))
// compute approximate similar tweets for each sampled tweets.
// this is achieved by multiplying "sampled_tweets -> top clusters" matrix with "cluster -> top tweets" matrix.
// note that in the implementation, we first compute the transponse of this matrix in order to ultlize the optimization done on skinny matrices
val predictionData = clusterTopTweets.toSparseMatrix.transpose
.multiplySkinnySparseRowMatrix(
tweetEmbeddingSubset.toSparseMatrix.transpose.toSparseRowMatrix(true),
numReducersOpt = Some(5000)
)
.toSparseMatrix
.transpose
.toTypedPipe
.map {
case (queryTweet, candidateTweet, _) =>
(queryTweet, candidateTweet)
}
.join(tweetEmbeddingSubset.toTypedPipe)
.map {
case (queryId, (candidateId, queryEmbedding)) =>
candidateId -> (queryId, queryEmbedding)
}
.join(tweetTopKClusters.toTypedPipe)
.map {
case (candidateId, ((queryId, queryEmbedding), candidateEmbedding)) =>
queryId -> (candidateId, CosineSimilarityUtil
.dotProduct(
queryEmbedding,
candidateEmbedding
))
}
.filter(_._2._2 > threshold)
.group
.sortedReverseTake(topK)(Ordering.by(_._2))
// Exist in Ground Truth but not exist in Predication
val potentialData =
groundTruthData
.leftJoin(predictionData)
.map {
case (tweetId, (groundTruthCandidates, predictedCandidates)) =>
val predictedCandidateSet = predictedCandidates.toSeq.flatten.map(_._1).toSet
val potentialTweets = groundTruthCandidates.filterNot {
case (candidateId, _) =>
predictedCandidateSet.contains(candidateId)
}
(tweetId, potentialTweets)
}
val debuggingData =
groundTruthData
.leftJoin(predictionData)
.map {
case (tweetId, (groundTruthTweets, maybepredictedTweets)) =>
val predictedTweets = maybepredictedTweets.toSeq.flatten
val predictedTweetSet = predictedTweets.map(_._1).toSet
val potentialTweets = groundTruthTweets.filterNot {
case (candidateId, _) =>
predictedTweetSet.contains(candidateId)
}
(
tweetId,
Seq(
formatList(potentialTweets),
formatList(groundTruthTweets),
formatList(predictedTweets)))
}
// for each tweet, compare the approximate topk and ground-truth topk.
// compute precision and recall, then averaging them per bucket.
val eval = tweetSubset
.map {
case (tweetId, bucket, bucketSize) =>
tweetId -> (bucket, bucketSize)
}
.leftJoin(groundTruthData)
.leftJoin(predictionData)
.map {
case (_, (((bucket, bucketSize), groundTruthOpt), predictionOpt)) =>
val groundTruth = groundTruthOpt.getOrElse(Nil).map(_._1)
val prediction = predictionOpt.getOrElse(Nil).map(_._1)
assert(groundTruth.distinct.size == groundTruth.size)
assert(prediction.distinct.size == prediction.size)
val intersection = groundTruth.toSet.intersect(prediction.toSet)
val precision =
if (prediction.nonEmpty)
intersection.size.toDouble / prediction.size.toDouble
else 0.0
val recall =
if (groundTruth.nonEmpty)
intersection.size.toDouble / groundTruth.size.toDouble
else 0.0
(
bucket,
bucketSize) -> (groundTruth.size, prediction.size, intersection.size, precision, recall, 1.0)
}
.sumByKey
.map {
case (
(bucket, bucketSize),
(groundTruthSum, predictionSum, interSectionSum, precisionSum, recallSum, count)) =>
(
bucket,
bucketSize,
groundTruthSum / count,
predictionSum / count,
interSectionSum / count,
precisionSum / count,
recallSum / count,
count)
}
// output the eval results and some sample results for eyeballing
Execution
.zip(
eval
.writeExecution(TypedTsv(output)),
groundTruthData
.map {
case (tweetId, neighbors) =>
tweetId -> neighbors
.map {
case (id, score) => s"$id:$score"
}
.mkString(",")
}
.writeExecution(
TypedTsv(args("output") + "_ground_truth")
),
predictionData
.map {
case (tweetId, neighbors) =>
tweetId -> neighbors
.map {
case (id, score) => s"$id:$score"
}
.mkString(",")
}
.writeExecution(
TypedTsv(args("output") + "_prediction")
),
potentialData
.map {
case (tweetId, neighbors) =>
tweetId -> neighbors
.map {
case (id, score) => s"$id:$score"
}
.mkString(",")
}.writeExecution(
TypedTsv(args("output") + "_potential")
),
debuggingData
.map {
case (tweetId, candidateList) =>
val value = candidateList
.map { candidates =>
candidates
.map {
case (id, score) =>
s"${id}D$score"
}.mkString("C")
}.mkString("B")
s"${tweetId}A$value"
}.writeExecution(
TypedTsv(args("output") + "_debugging")
)
)
.unit
}