def runUpdateKnownForGeneric()

in src/scala/com/twitter/simclusters_v2/scalding/UpdateKnownForApps.scala [143:311]


  def runUpdateKnownForGeneric(
    args: Args,
    inputKnownFor: TypedPipe[(Long, Array[(Int, Float)])],
    inputSimsGraph: TypedPipe[Candidates],
    defaultEmailAddress: String,
    writeKnownForFunction: TypedPipe[(Long, Array[(Int, Float)])] => Execution[Unit],
    readKnownForFunction: => TypedPipe[(Long, Array[(Int, Float)])],
    includeEvaluationResultsInEmail: Boolean
  )(
    implicit dateRange: DateRange,
    uniqueID: UniqueID
  ): Execution[Unit] = {
    val minActiveFollowers = args.int("minActiveFollowers", 400)
    val topK = args.int("topK")
    val maxSimsNeighborsForUpdate =
      args.int("maxSimsNeighborsForUpdate", 40)
    val minNeighborsInCluster = args.int("minNeighborsInCluster", 2)
    val maxWtToSelfLoopWtMultFactor =
      args.float("maxWtToSelfLoopWtMultFactor", 2)
    val exponentForEdgeWeight = args.float("exponentForEdgeWeights", 1.0f)
    val updateMethod: ClusterScoresForNode => Double = args("updateMethod") match {
      case "sumScoreIgnoringMembershipScores" => { x: ClusterScoresForNode =>
        x.sumScoreIgnoringMembershipScores
      }
      case "ratioScoreIgnoringMembershipScores" => { x: ClusterScoresForNode =>
        x.ratioScoreIgnoringMembershipScores
      }
      case "ratioScoreUsingMembershipScores" => { x: ClusterScoresForNode =>
        x.ratioScoreUsingMembershipScores
      }
      case x @ _ =>
        throw new Exception(s"value for --updateMethod $x is unknown. It must be one of " +
          s"[sumScoreIgnoringMembershipScores, ratioScoreIgnoringMembershipScores, ratioScoreUsingMembershipScores]")
    }
    val truePositiveWtFactor = args.float("truePositiveWtFactor", 10)
    val modelVersion = args("outputModelVersion")
    val emailAddress =
      args.optional("emailAddress").getOrElse(defaultEmailAddress)

    val topUsers = TopUsersSimilarityGraph
      .topUserIds(
        DAL
          .readMostRecentSnapshot(UsersourceFlatScalaDataset, dateRange)
          .toTypedPipe,
        minActiveFollowers,
        topK).count("num_top_users")

    TopUsersSimilarityGraph
      .getSubgraphFromUserGroupedInput(
        fullGraph = inputSimsGraph,
        usersToInclude = topUsers,
        maxNeighborsPerNode = maxSimsNeighborsForUpdate,
        degreeThresholdForStat = minNeighborsInCluster
      )
      .forceToDiskExecution
      .flatMap { symmetrizedSims =>
        val modifiedSims =
          UpdateKnownForApps.simsGraphForUpdateFromSymmetrizedSims(
            symmetrizedSims = symmetrizedSims,
            exponentForEdgeWeight = exponentForEdgeWeight,
            maxWtToSelfLoopWtMultFactor = maxWtToSelfLoopWtMultFactor
          )

        val previouslyFamousUsersExec = inputKnownFor
          .leftJoin(topUsers.asKeys)
          .collect { case (userId, (clusters, None)) => userId }
          .getSummaryString(
            "Users previously in known for but not in topUsers anymore",
            numRecords = 20)

        val clusterStatsExec = UpdateKnownForApps.getClusterStats(inputKnownFor)

        val globalAvgWeightExec =
          UpdateKnownForApps.getGlobalAvgWeight(modifiedSims)

        val globalAvgMembershipScoreExec = UpdateKnownForApps.getAvgMembershipScore(inputKnownFor)

        Execution.zip(globalAvgWeightExec, clusterStatsExec, globalAvgMembershipScoreExec).flatMap {
          case (Some(globalAvgWeight), clusterStats, globalAvgMembershipScore) =>
            println("Size of clusterStats: " + clusterStats.size)
            println("First few entries from clusterStats: " + clusterStats.take(5))
            println("globalAvgWeight: " + globalAvgWeight)
            println("globalAvgMembershipScore: " + globalAvgMembershipScore)

            val knownForWithUnnormalizedScores = UpdateKnownFor
              .newKnownForScores(
                inputKnownFor,
                modifiedSims,
                globalAvgWeight,
                clusterStats,
                globalAvgMembershipScore
              )
            val writeNewKnownForExec = writeKnownForFunction(
              UpdateKnownFor.updateGeneric(
                modifiedSims,
                knownForWithUnnormalizedScores,
                clusterStats,
                minNeighborsInCluster,
                globalAvgWeight,
                globalAvgMembershipScore,
                truePositiveWtFactor,
                updateMethod
              )
            )

            writeNewKnownForExec.flatMap { _ =>
              Util.getCustomCountersString(writeNewKnownForExec).flatMap { customCountersString =>
                if (includeEvaluationResultsInEmail) {
                  // It's unfortunate that we're not using the newKnownFor directly, but are instead
                  // first writing it out and then reading it back in. The reason for doing it in this
                  // convoluted way is that when we directly use the newKnownFor, the clusterEvaluation
                  // metrics are being incorrectly computed.

                  val newKnownFor = readKnownForFunction

                  val newResultsExec =
                    ClusterEvaluation
                      .overallEvaluation(symmetrizedSims, newKnownFor, "newKnownForEval")
                  val oldResultsExec =
                    ClusterEvaluation
                      .overallEvaluation(symmetrizedSims, inputKnownFor, "oldKnownForEval")
                  val minSizeOfBiggerClusterForComparison = 10
                  val compareExec = CompareClusters.summarize(
                    CompareClusters.compare(
                      KnownForSources.transpose(inputKnownFor),
                      KnownForSources.transpose(newKnownFor),
                      minSizeOfBiggerCluster = minSizeOfBiggerClusterForComparison
                    ))

                  Execution
                    .zip(oldResultsExec, newResultsExec, compareExec, previouslyFamousUsersExec)
                    .map {
                      case (oldResults, newResults, compareResults, previouslyFamousUsersString) =>
                        val emailText = "Evaluation Results for existing knownFor:\n" +
                          Util.prettyJsonMapper.writeValueAsString(oldResults) +
                          "\n\n-------------------\n\n" +
                          "Evaluation Results for new knownFor:\n" +
                          Util.prettyJsonMapper.writeValueAsString(newResults) +
                          "\n\n-------------------\n\n" +
                          s"Cosine similarity distribution between cluster membership vectors for " +
                          s"clusters with at least $minSizeOfBiggerClusterForComparison members\n" +
                          Util.prettyJsonMapper
                            .writeValueAsString(compareResults) +
                          "\n\n-------------------\n\n" +
                          "Custom counters:\n" + customCountersString +
                          "\n\n-------------------\n\n" +
                          previouslyFamousUsersString

                        Util
                          .sendEmail(
                            emailText,
                            s"Evaluation results of new knownFor $modelVersion",
                            emailAddress)
                    }
                } else {
                  Util
                    .sendEmail(
                      customCountersString,
                      s"Change in cluster assignments for update of knownFor $modelVersion",
                      emailAddress
                    )
                  Execution.unit
                }

              }
            }
        }
      }
  }