in home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/side_effect/UpdateTimelinesPersistenceStoreSideEffect.scala [69:196]
final override def apply(
inputs: PipelineResultSideEffect.Inputs[PipelineQuery, Timeline]
): Stitch[Unit] = {
if (inputs.response.instructions.nonEmpty) {
val timelineKind = inputs.query.product match {
case FollowingProduct => TimelineKind.homeLatest
case ForYouProduct => TimelineKind.home
case other => throw new UnsupportedOperationException(s"Unknown product: $other")
}
val timelineQuery = TimelineQuery(
id = inputs.query.getRequiredUserId,
kind = timelineKind,
options = TimelineQueryOptions(
contextualUserId = inputs.query.getOptionalUserId,
deviceContext = tls.DeviceContext.empty.copy(
userAgent = inputs.query.clientContext.userAgent,
clientAppId = inputs.query.clientContext.appId)
)
)
val tweetIdToItemCandidateMap: Map[Long, ItemCandidateWithDetails] =
inputs.selectedCandidates.flatMap {
case item: ItemCandidateWithDetails if item.candidate.id.isInstanceOf[Long] =>
Seq((item.candidateIdLong, item))
case module: ModuleCandidateWithDetails
if module.candidates.headOption.exists(_.candidate.id.isInstanceOf[Long]) =>
module.candidates.map(item => (item.candidateIdLong, item))
case _ => Seq.empty
}.toMap
val entries = inputs.response.instructions.collect {
case AddEntriesTimelineInstruction(entries) =>
entries.collect {
// includes tweets, tweet previews, and promoted tweets
case entry: TweetItem if entry.sortIndex.isDefined => {
Seq(
buildTweetEntryWithItemIds(
tweetIdToItemCandidateMap(entry.id),
entry.sortIndex.get
))
}
// tweet conversation modules are flattened to individual tweets in the persistence store
case module: TimelineModule
if module.sortIndex.isDefined && module.items.headOption.exists(
_.item.isInstanceOf[TweetItem]) =>
module.items.map { item =>
buildTweetEntryWithItemIds(
tweetIdToItemCandidateMap(item.item.id.asInstanceOf[Long]),
module.sortIndex.get)
}
case module: TimelineModule
if module.sortIndex.isDefined && module.entryNamespace.toString == WhoToFollowCandidateDecorator.EntryNamespaceString =>
val userIds = module.items
.map(item =>
UpdateTimelinesPersistenceStoreSideEffect.EmptyItemIds.copy(userId =
Some(item.item.id.asInstanceOf[Long])))
Seq(
EntryWithItemIds(
entityIdType = EntityIdType.WhoToFollow,
sortIndex = module.sortIndex.get,
size = module.items.size.toShort,
itemIds = Some(userIds)
))
case module: TimelineModule
if module.sortIndex.isDefined && module.entryNamespace.toString == WhoToSubscribeCandidateDecorator.EntryNamespaceString =>
val userIds = module.items
.map(item =>
UpdateTimelinesPersistenceStoreSideEffect.EmptyItemIds.copy(userId =
Some(item.item.id.asInstanceOf[Long])))
Seq(
EntryWithItemIds(
entityIdType = EntityIdType.WhoToSubscribe,
sortIndex = module.sortIndex.get,
size = module.items.size.toShort,
itemIds = Some(userIds)
))
}.flatten
case ShowCoverInstruction(cover) =>
Seq(
EntryWithItemIds(
entityIdType = EntityIdType.Prompt,
sortIndex = cover.sortIndex.get,
size = 1,
itemIds = None
)
)
case ReplaceEntryTimelineInstruction(entry) =>
val namespaceLength = TweetItem.TweetEntryNamespace.toString.length
Seq(
EntryWithItemIds(
entityIdType = EntityIdType.Tweet,
sortIndex = entry.sortIndex.get,
size = 1,
itemIds = Some(
Seq(
ItemIds(
tweetId =
entry.entryIdToReplace.map(e => e.substring(namespaceLength + 1).toLong),
sourceTweetId = None,
quoteTweetId = None,
sourceAuthorId = None,
quoteAuthorId = None,
inReplyToTweetId = None,
inReplyToAuthorId = None,
semanticCoreId = None,
articleId = None,
hasRelevancePrompt = None,
promptData = None,
tweetScore = None,
entryIdToReplace = entry.entryIdToReplace,
tweetReactiveData = None,
userId = None
)
))
)
)
}.flatten
val response = TimelineResponseV3(
clientPlatform = timelineQuery.clientPlatform,
servedTime = Time.now,
requestType = requestTypeFromQuery(inputs.query),
entries = entries)
Stitch.callFuture(timelineResponseBatchesClient.insertResponse(timelineQuery, response))
} else Stitch.Unit
}