in src/scala/com/twitter/simclusters_v2/scalding/UpdateKnownForApps.scala [143:311]
def runUpdateKnownForGeneric(
args: Args,
inputKnownFor: TypedPipe[(Long, Array[(Int, Float)])],
inputSimsGraph: TypedPipe[Candidates],
defaultEmailAddress: String,
writeKnownForFunction: TypedPipe[(Long, Array[(Int, Float)])] => Execution[Unit],
readKnownForFunction: => TypedPipe[(Long, Array[(Int, Float)])],
includeEvaluationResultsInEmail: Boolean
)(
implicit dateRange: DateRange,
uniqueID: UniqueID
): Execution[Unit] = {
val minActiveFollowers = args.int("minActiveFollowers", 400)
val topK = args.int("topK")
val maxSimsNeighborsForUpdate =
args.int("maxSimsNeighborsForUpdate", 40)
val minNeighborsInCluster = args.int("minNeighborsInCluster", 2)
val maxWtToSelfLoopWtMultFactor =
args.float("maxWtToSelfLoopWtMultFactor", 2)
val exponentForEdgeWeight = args.float("exponentForEdgeWeights", 1.0f)
val updateMethod: ClusterScoresForNode => Double = args("updateMethod") match {
case "sumScoreIgnoringMembershipScores" => { x: ClusterScoresForNode =>
x.sumScoreIgnoringMembershipScores
}
case "ratioScoreIgnoringMembershipScores" => { x: ClusterScoresForNode =>
x.ratioScoreIgnoringMembershipScores
}
case "ratioScoreUsingMembershipScores" => { x: ClusterScoresForNode =>
x.ratioScoreUsingMembershipScores
}
case x @ _ =>
throw new Exception(s"value for --updateMethod $x is unknown. It must be one of " +
s"[sumScoreIgnoringMembershipScores, ratioScoreIgnoringMembershipScores, ratioScoreUsingMembershipScores]")
}
val truePositiveWtFactor = args.float("truePositiveWtFactor", 10)
val modelVersion = args("outputModelVersion")
val emailAddress =
args.optional("emailAddress").getOrElse(defaultEmailAddress)
val topUsers = TopUsersSimilarityGraph
.topUserIds(
DAL
.readMostRecentSnapshot(UsersourceFlatScalaDataset, dateRange)
.toTypedPipe,
minActiveFollowers,
topK).count("num_top_users")
TopUsersSimilarityGraph
.getSubgraphFromUserGroupedInput(
fullGraph = inputSimsGraph,
usersToInclude = topUsers,
maxNeighborsPerNode = maxSimsNeighborsForUpdate,
degreeThresholdForStat = minNeighborsInCluster
)
.forceToDiskExecution
.flatMap { symmetrizedSims =>
val modifiedSims =
UpdateKnownForApps.simsGraphForUpdateFromSymmetrizedSims(
symmetrizedSims = symmetrizedSims,
exponentForEdgeWeight = exponentForEdgeWeight,
maxWtToSelfLoopWtMultFactor = maxWtToSelfLoopWtMultFactor
)
val previouslyFamousUsersExec = inputKnownFor
.leftJoin(topUsers.asKeys)
.collect { case (userId, (clusters, None)) => userId }
.getSummaryString(
"Users previously in known for but not in topUsers anymore",
numRecords = 20)
val clusterStatsExec = UpdateKnownForApps.getClusterStats(inputKnownFor)
val globalAvgWeightExec =
UpdateKnownForApps.getGlobalAvgWeight(modifiedSims)
val globalAvgMembershipScoreExec = UpdateKnownForApps.getAvgMembershipScore(inputKnownFor)
Execution.zip(globalAvgWeightExec, clusterStatsExec, globalAvgMembershipScoreExec).flatMap {
case (Some(globalAvgWeight), clusterStats, globalAvgMembershipScore) =>
println("Size of clusterStats: " + clusterStats.size)
println("First few entries from clusterStats: " + clusterStats.take(5))
println("globalAvgWeight: " + globalAvgWeight)
println("globalAvgMembershipScore: " + globalAvgMembershipScore)
val knownForWithUnnormalizedScores = UpdateKnownFor
.newKnownForScores(
inputKnownFor,
modifiedSims,
globalAvgWeight,
clusterStats,
globalAvgMembershipScore
)
val writeNewKnownForExec = writeKnownForFunction(
UpdateKnownFor.updateGeneric(
modifiedSims,
knownForWithUnnormalizedScores,
clusterStats,
minNeighborsInCluster,
globalAvgWeight,
globalAvgMembershipScore,
truePositiveWtFactor,
updateMethod
)
)
writeNewKnownForExec.flatMap { _ =>
Util.getCustomCountersString(writeNewKnownForExec).flatMap { customCountersString =>
if (includeEvaluationResultsInEmail) {
// It's unfortunate that we're not using the newKnownFor directly, but are instead
// first writing it out and then reading it back in. The reason for doing it in this
// convoluted way is that when we directly use the newKnownFor, the clusterEvaluation
// metrics are being incorrectly computed.
val newKnownFor = readKnownForFunction
val newResultsExec =
ClusterEvaluation
.overallEvaluation(symmetrizedSims, newKnownFor, "newKnownForEval")
val oldResultsExec =
ClusterEvaluation
.overallEvaluation(symmetrizedSims, inputKnownFor, "oldKnownForEval")
val minSizeOfBiggerClusterForComparison = 10
val compareExec = CompareClusters.summarize(
CompareClusters.compare(
KnownForSources.transpose(inputKnownFor),
KnownForSources.transpose(newKnownFor),
minSizeOfBiggerCluster = minSizeOfBiggerClusterForComparison
))
Execution
.zip(oldResultsExec, newResultsExec, compareExec, previouslyFamousUsersExec)
.map {
case (oldResults, newResults, compareResults, previouslyFamousUsersString) =>
val emailText = "Evaluation Results for existing knownFor:\n" +
Util.prettyJsonMapper.writeValueAsString(oldResults) +
"\n\n-------------------\n\n" +
"Evaluation Results for new knownFor:\n" +
Util.prettyJsonMapper.writeValueAsString(newResults) +
"\n\n-------------------\n\n" +
s"Cosine similarity distribution between cluster membership vectors for " +
s"clusters with at least $minSizeOfBiggerClusterForComparison members\n" +
Util.prettyJsonMapper
.writeValueAsString(compareResults) +
"\n\n-------------------\n\n" +
"Custom counters:\n" + customCountersString +
"\n\n-------------------\n\n" +
previouslyFamousUsersString
Util
.sendEmail(
emailText,
s"Evaluation results of new knownFor $modelVersion",
emailAddress)
}
} else {
Util
.sendEmail(
customCountersString,
s"Change in cluster assignments for update of knownFor $modelVersion",
emailAddress
)
Execution.unit
}
}
}
}
}
}