in src/scala/com/twitter/simclusters_v2/scio/multi_type_graph/assemble_multi_type_graph/AssembleMultiTypeGraphScioBaseApp.scala [366:573]
override def configurePipeline(sc: ScioContext, opts: DateRangeOptions): Unit = {
// Define the implicit ScioContext to read datasets from ExternalDataSources
implicit def scioContext: ScioContext = sc
// DAL.Environment variable for WriteExecs
val dalEnv = if (isAdhoc) DAL.Environment.Dev else DAL.Environment.Prod
// Define date intervals
val interval_7days =
new Interval(opts.interval.getEnd.minusWeeks(1), opts.interval.getEnd.minusMillis(1))
val interval_14days =
new Interval(opts.interval.getEnd.minusWeeks(2), opts.interval.getEnd.minusMillis(1))
/*
* Dataset read operations
*/
// Get list of valid UserIds - to filter out deactivated or suspended user accounts
val validUsers = getValidUsers(ExternalDataSources.userSource(Duration.fromDays(7)))
// ieSource tweet engagements data for tweet favs, replies, retweets - from last 14 days
val tweetSource = ExternalDataSources.ieSourceTweetEngagementsSource(interval_14days)
// Read TFlock datasets
val flockFollowSource = ExternalDataSources.flockFollowSource(Duration.fromDays(7))
val flockBlockSource = ExternalDataSources.flockBlockSource(Duration.fromDays(7))
val flockReportAsAbuseSource =
ExternalDataSources.flockReportAsAbuseSource(Duration.fromDays(7))
val flockReportAsSpamSource =
ExternalDataSources.flockReportAsSpamSource(Duration.fromDays(7))
// user-user fav edges
val userUserFavSource = ExternalDataSources.userUserFavSource(Duration.fromDays(14))
val userUserFavEdges = getFavEdges(userUserFavSource, HalfLifeInDaysForFavScore)
// user-user follow edges
val userUserFollowEdges = filterInvalidUsers(flockFollowSource, validUsers)
// user-user block edges
val userUserBlockEdges = filterInvalidUsers(flockBlockSource, validUsers)
// user-user abuse report edges
val userUserAbuseReportEdges = filterInvalidUsers(flockReportAsAbuseSource, validUsers)
// user-user spam report edges
val userUserSpamReportEdges = filterInvalidUsers(flockReportAsSpamSource, validUsers)
// user-signup country edges
val userSignUpCountryEdges = ExternalDataSources
.userCountrySource(Duration.fromDays(7))
// user-consumed language edges
val userConsumedLanguageEdges =
ExternalDataSources.inferredUserConsumedLanguageSource(Duration.fromDays(7))
// user-topic follow edges
val topicUserFollowedByEdges =
ExternalDataSources.topicFollowGraphSource(Duration.fromDays(7))
// user-MRNotifOpenOrClick events from last 7 days
val userMRNotifOpenOrClickEvents =
ExternalDataSources.magicRecsNotficationOpenOrClickEventsSource(interval_7days)
// user-searchQuery strings from last 7 days
val userSearchQueryEdges =
ExternalDataSources.adaptiveSearchScribeLogsSource(interval_7days)
/*
* Generate the full graph
*/
val fullGraph =
getUserTweetInteractionGraph(tweetSource) ++
getUserFavGraph(userUserFavEdges) ++
getUserFollowGraph(userUserFollowEdges) ++
getUserBlockGraph(userUserBlockEdges) ++
getUserAbuseReportGraph(userUserAbuseReportEdges) ++
getUserSpamReportGraph(userUserSpamReportEdges) ++
getUserSignUpCountryGraph(userSignUpCountryEdges) ++
getUserConsumedLanguagesGraph(userConsumedLanguageEdges) ++
getUserTopicFollowGraph(topicUserFollowedByEdges) ++
getMagicRecsNotifOpenOrClickTweetsGraph(userMRNotifOpenOrClickEvents) ++
getSearchGraph(userSearchQueryEdges)
// Get Top K RightNodes
val topKRightNodes: SCollection[(RightNodeType, Seq[(Noun, Double)])] =
getTopKRightNounsWithFrequencies(
fullGraph,
TopKConfig,
GlobalDefaultMinFrequencyOfRightNodeType)
// key transformation - topK nouns, keyed by the RightNodeNounType
val topKNounsKeyedByType: SCollection[(RightNodeTypeStruct, NounWithFrequencyList)] =
topKRightNodes
.map {
case (rightNodeType, rightNounsWithScoresList) =>
val nounsListWithFrequency: Seq[NounWithFrequency] = rightNounsWithScoresList
.map {
case (noun, aggregatedFrequency) =>
NounWithFrequency(noun, aggregatedFrequency)
}
(RightNodeTypeStruct(rightNodeType), NounWithFrequencyList(nounsListWithFrequency))
}
// Get Truncated graph based on the top K RightNodes
val truncatedGraph: SCollection[(LeftNode, RightNodeWithEdgeWeight)] =
getTruncatedGraph(fullGraph, topKRightNodes)
// key transformations - truncated graph, keyed by LeftNode
// Note: By wrapping and unwrapping with the LeftNode.UserId, we don't have to deal
// with defining our own customer ordering for LeftNode type
val truncatedGraphKeyedBySrc: SCollection[(LeftNode, RightNodeWithEdgeWeightList)] =
truncatedGraph
.collect {
case (LeftNode.UserId(userId), rightNodeWithWeight) =>
userId -> List(rightNodeWithWeight)
}
.sumByKey
.map {
case (userId, rightNodeWithWeightList) =>
(LeftNode.UserId(userId), RightNodeWithEdgeWeightList(rightNodeWithWeightList))
}
// WriteExecs
// Write TopK RightNodes to DAL - save all the top K nodes for the clustering step
topKNounsKeyedByType
.map {
case (engagementType, rightList) =>
KeyVal(engagementType, rightList)
}
.saveAsCustomOutput(
name = "WriteTopKNouns",
DAL.writeVersionedKeyVal(
topKRightNounsKeyValDataset,
PathLayout.VersionedPath(prefix =
rootMHPath + topKRightNounsOutputDir),
instant = Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
environmentOverride = dalEnv,
)
)
// Write TopK RightNodes to DAL - only take TopKRightNounsForMHDump RightNodes for MH dump
topKNounsKeyedByType
.map {
case (engagementType, rightList) =>
val rightListMH =
NounWithFrequencyList(rightList.nounWithFrequencyList.take(TopKRightNounsForMHDump))
KeyVal(engagementType, rightListMH)
}
.saveAsCustomOutput(
name = "WriteTopKNounsToMHForDebugger",
DAL.writeVersionedKeyVal(
topKRightNounsMHKeyValDataset,
PathLayout.VersionedPath(prefix =
rootMHPath + topKRightNounsMHOutputDir),
instant = Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
environmentOverride = dalEnv,
)
)
// Write truncated graph (MultiTypeGraphTopKForRightNodes) to DAL in KeyVal format
truncatedGraphKeyedBySrc
.map {
case (leftNode, rightNodeWithWeightList) =>
KeyVal(leftNode, rightNodeWithWeightList)
}.saveAsCustomOutput(
name = "WriteTruncatedMultiTypeGraph",
DAL.writeVersionedKeyVal(
truncatedMultiTypeGraphKeyValDataset,
PathLayout.VersionedPath(prefix =
rootMHPath + truncatedMultiTypeGraphMHOutputDir),
instant = Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
environmentOverride = dalEnv,
)
)
// Write truncated graph (MultiTypeGraphTopKForRightNodes) to DAL in thrift format
truncatedGraph
.map {
case (leftNode, rightNodeWithWeight) =>
MultiTypeGraphEdge(leftNode, rightNodeWithWeight)
}.saveAsCustomOutput(
name = "WriteTruncatedMultiTypeGraphThrift",
DAL.writeSnapshot(
multiTypeGraphTopKForRightNodesSnapshotDataset,
PathLayout.FixedPath(rootThriftPath + truncatedMultiTypeGraphThriftOutputDir),
Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
DiskFormat.Thrift(),
environmentOverride = dalEnv
)
)
// Write full graph to DAL
fullGraph
.map {
case (leftNode, rightNodeWithWeight) =>
MultiTypeGraphEdge(leftNode, rightNodeWithWeight)
}
.saveAsCustomOutput(
name = "WriteFullMultiTypeGraph",
DAL.writeSnapshot(
fullMultiTypeGraphSnapshotDataset,
PathLayout.FixedPath(rootThriftPath + fullMultiTypeGraphThriftOutputDir),
Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
DiskFormat.Thrift(),
environmentOverride = dalEnv
)
)
}