override def runOnDateRange()

in src/scala/com/twitter/simclusters_v2/scalding/offline_job/adhoc/TweetSimilarityEvaluationAdhocApp.scala [121:361]


  override def runOnDateRange(
    args: Args
  )(
    implicit dateRange: DateRange,
    timeZone: TimeZone,
    uniqueID: UniqueID
  ): Execution[Unit] = {

    // path to read the tweet -> top cluster data set. should be the same from the SimClustersTweetEmbeddingAdhocApp job
    val tweetTopKClustersPath = args("tweet_top_k")

    // path to read the cluster -> top tweets data set. should be the same from the SimClustersTweetEmbeddingAdhocApp job
    val clusterTopKTweetsPath = args("cluster_top_k")

    // path to read the sampled tweets, should be the same from TweetSimilarityEvaluationSamplingAdhocApp
    val tweetsPath = args("tweets")

    // see the comment of this class. this is to determine which tweet should be ground truth
    val threshold = args.double("threshold", 0.8)

    // see the comment of this class. this is to determine which tweet should be ground truth
    val topK = args.int("topK", 100)

    // output path for evaluation results
    val output = args("output")

    // read tweet -> top clusters data set
    val tweetTopKClusters: SparseRowMatrix[TweetId, ClusterId, Double] =
      SparseRowMatrix(
        TypedPipe
          .from(
            VersionedKeyValSource[TweetId, List[(ClusterId, Double)]](tweetTopKClustersPath)
          )
          .mapValues(_.filter(_._2 > 0.001).toMap),
        isSkinnyMatrix = true
      ).rowL2Normalize

    // read cluster -> top tweets data set
    val clusterTopTweets: SparseRowMatrix[ClusterId, TweetId, Double] =
      SparseRowMatrix(
        TypedPipe
          .from(
            VersionedKeyValSource[ClusterId, List[(TweetId, Double)]](clusterTopKTweetsPath)
          )
          .mapValues(_.filter(_._2 > 0.02).toMap),
        isSkinnyMatrix = false
      )

    // read the sampled tweets from TweetSimilarityEvaluationSamplingAdhocApp
    val tweetSubset = TypedPipe.from(TypedTsv[(Long, Int, Int)](tweetsPath))

    // the tweet -> top clusters for the sampled tweets
    val tweetEmbeddingSubset =
      tweetTopKClusters.filterRows(tweetSubset.map(_._1))

    // compute ground-truth top similar tweets for each sampled tweets.
    // for each sampled tweets, we compute their similarity with every tweets in the tweet -> top clusters data set.
    // we filter out those with similarity score smaller than the threshold and keep top k as the ground truth similar tweets
    val groundTruthData = tweetTopKClusters.toSparseMatrix
      .multiplySkinnySparseRowMatrix(
        tweetEmbeddingSubset.toSparseMatrix.transpose.toSparseRowMatrix(true),
        numReducersOpt = Some(5000)
      )
      .toSparseMatrix
      .transpose
      .filter((_, _, v) => v > threshold)
      .sortWithTakePerRow(topK)(Ordering.by(-_._2))

    // compute approximate similar tweets for each sampled tweets.
    // this is achieved by multiplying "sampled_tweets -> top clusters" matrix with "cluster -> top tweets" matrix.
    // note that in the implementation, we first compute the transponse of this matrix in order to ultlize the optimization done on skinny matrices
    val predictionData = clusterTopTweets.toSparseMatrix.transpose
      .multiplySkinnySparseRowMatrix(
        tweetEmbeddingSubset.toSparseMatrix.transpose.toSparseRowMatrix(true),
        numReducersOpt = Some(5000)
      )
      .toSparseMatrix
      .transpose
      .toTypedPipe
      .map {
        case (queryTweet, candidateTweet, _) =>
          (queryTweet, candidateTweet)
      }
      .join(tweetEmbeddingSubset.toTypedPipe)
      .map {
        case (queryId, (candidateId, queryEmbedding)) =>
          candidateId -> (queryId, queryEmbedding)
      }
      .join(tweetTopKClusters.toTypedPipe)
      .map {
        case (candidateId, ((queryId, queryEmbedding), candidateEmbedding)) =>
          queryId -> (candidateId, CosineSimilarityUtil
            .dotProduct(
              queryEmbedding,
              candidateEmbedding
            ))
      }
      .filter(_._2._2 > threshold)
      .group
      .sortedReverseTake(topK)(Ordering.by(_._2))

    // Exist in Ground Truth but not exist in Predication
    val potentialData =
      groundTruthData
        .leftJoin(predictionData)
        .map {
          case (tweetId, (groundTruthCandidates, predictedCandidates)) =>
            val predictedCandidateSet = predictedCandidates.toSeq.flatten.map(_._1).toSet
            val potentialTweets = groundTruthCandidates.filterNot {
              case (candidateId, _) =>
                predictedCandidateSet.contains(candidateId)
            }
            (tweetId, potentialTweets)
        }

    val debuggingData =
      groundTruthData
        .leftJoin(predictionData)
        .map {
          case (tweetId, (groundTruthTweets, maybepredictedTweets)) =>
            val predictedTweets = maybepredictedTweets.toSeq.flatten
            val predictedTweetSet = predictedTweets.map(_._1).toSet
            val potentialTweets = groundTruthTweets.filterNot {
              case (candidateId, _) =>
                predictedTweetSet.contains(candidateId)
            }

            (
              tweetId,
              Seq(
                formatList(potentialTweets),
                formatList(groundTruthTweets),
                formatList(predictedTweets)))
        }

    // for each tweet, compare the approximate topk and ground-truth topk.
    // compute precision and recall, then averaging them per bucket.
    val eval = tweetSubset
      .map {
        case (tweetId, bucket, bucketSize) =>
          tweetId -> (bucket, bucketSize)
      }
      .leftJoin(groundTruthData)
      .leftJoin(predictionData)
      .map {
        case (_, (((bucket, bucketSize), groundTruthOpt), predictionOpt)) =>
          val groundTruth = groundTruthOpt.getOrElse(Nil).map(_._1)
          val prediction = predictionOpt.getOrElse(Nil).map(_._1)

          assert(groundTruth.distinct.size == groundTruth.size)
          assert(prediction.distinct.size == prediction.size)

          val intersection = groundTruth.toSet.intersect(prediction.toSet)

          val precision =
            if (prediction.nonEmpty)
              intersection.size.toDouble / prediction.size.toDouble
            else 0.0
          val recall =
            if (groundTruth.nonEmpty)
              intersection.size.toDouble / groundTruth.size.toDouble
            else 0.0

          (
            bucket,
            bucketSize) -> (groundTruth.size, prediction.size, intersection.size, precision, recall, 1.0)
      }
      .sumByKey
      .map {
        case (
              (bucket, bucketSize),
              (groundTruthSum, predictionSum, interSectionSum, precisionSum, recallSum, count)) =>
          (
            bucket,
            bucketSize,
            groundTruthSum / count,
            predictionSum / count,
            interSectionSum / count,
            precisionSum / count,
            recallSum / count,
            count)
      }

    // output the eval results and some sample results for eyeballing
    Execution
      .zip(
        eval
          .writeExecution(TypedTsv(output)),
        groundTruthData
          .map {
            case (tweetId, neighbors) =>
              tweetId -> neighbors
                .map {
                  case (id, score) => s"$id:$score"
                }
                .mkString(",")
          }
          .writeExecution(
            TypedTsv(args("output") + "_ground_truth")
          ),
        predictionData
          .map {
            case (tweetId, neighbors) =>
              tweetId -> neighbors
                .map {
                  case (id, score) => s"$id:$score"
                }
                .mkString(",")
          }
          .writeExecution(
            TypedTsv(args("output") + "_prediction")
          ),
        potentialData
          .map {
            case (tweetId, neighbors) =>
              tweetId -> neighbors
                .map {
                  case (id, score) => s"$id:$score"
                }
                .mkString(",")
          }.writeExecution(
            TypedTsv(args("output") + "_potential")
          ),
        debuggingData
          .map {
            case (tweetId, candidateList) =>
              val value = candidateList
                .map { candidates =>
                  candidates
                    .map {
                      case (id, score) =>
                        s"${id}D$score"
                    }.mkString("C")
                }.mkString("B")
              s"${tweetId}A$value"
          }.writeExecution(
            TypedTsv(args("output") + "_debugging")
          )
      )
      .unit
  }