final override def apply()

in home-mixer/server/src/main/scala/com/twitter/home_mixer/functional_component/side_effect/UpdateTimelinesPersistenceStoreSideEffect.scala [69:196]


  final override def apply(
    inputs: PipelineResultSideEffect.Inputs[PipelineQuery, Timeline]
  ): Stitch[Unit] = {
    if (inputs.response.instructions.nonEmpty) {
      val timelineKind = inputs.query.product match {
        case FollowingProduct => TimelineKind.homeLatest
        case ForYouProduct => TimelineKind.home
        case other => throw new UnsupportedOperationException(s"Unknown product: $other")
      }
      val timelineQuery = TimelineQuery(
        id = inputs.query.getRequiredUserId,
        kind = timelineKind,
        options = TimelineQueryOptions(
          contextualUserId = inputs.query.getOptionalUserId,
          deviceContext = tls.DeviceContext.empty.copy(
            userAgent = inputs.query.clientContext.userAgent,
            clientAppId = inputs.query.clientContext.appId)
        )
      )

      val tweetIdToItemCandidateMap: Map[Long, ItemCandidateWithDetails] =
        inputs.selectedCandidates.flatMap {
          case item: ItemCandidateWithDetails if item.candidate.id.isInstanceOf[Long] =>
            Seq((item.candidateIdLong, item))
          case module: ModuleCandidateWithDetails
              if module.candidates.headOption.exists(_.candidate.id.isInstanceOf[Long]) =>
            module.candidates.map(item => (item.candidateIdLong, item))
          case _ => Seq.empty
        }.toMap

      val entries = inputs.response.instructions.collect {
        case AddEntriesTimelineInstruction(entries) =>
          entries.collect {
            // includes tweets, tweet previews, and promoted tweets
            case entry: TweetItem if entry.sortIndex.isDefined => {
              Seq(
                buildTweetEntryWithItemIds(
                  tweetIdToItemCandidateMap(entry.id),
                  entry.sortIndex.get
                ))
            }
            // tweet conversation modules are flattened to individual tweets in the persistence store
            case module: TimelineModule
                if module.sortIndex.isDefined && module.items.headOption.exists(
                  _.item.isInstanceOf[TweetItem]) =>
              module.items.map { item =>
                buildTweetEntryWithItemIds(
                  tweetIdToItemCandidateMap(item.item.id.asInstanceOf[Long]),
                  module.sortIndex.get)
              }
            case module: TimelineModule
                if module.sortIndex.isDefined && module.entryNamespace.toString == WhoToFollowCandidateDecorator.EntryNamespaceString =>
              val userIds = module.items
                .map(item =>
                  UpdateTimelinesPersistenceStoreSideEffect.EmptyItemIds.copy(userId =
                    Some(item.item.id.asInstanceOf[Long])))
              Seq(
                EntryWithItemIds(
                  entityIdType = EntityIdType.WhoToFollow,
                  sortIndex = module.sortIndex.get,
                  size = module.items.size.toShort,
                  itemIds = Some(userIds)
                ))
            case module: TimelineModule
                if module.sortIndex.isDefined && module.entryNamespace.toString == WhoToSubscribeCandidateDecorator.EntryNamespaceString =>
              val userIds = module.items
                .map(item =>
                  UpdateTimelinesPersistenceStoreSideEffect.EmptyItemIds.copy(userId =
                    Some(item.item.id.asInstanceOf[Long])))
              Seq(
                EntryWithItemIds(
                  entityIdType = EntityIdType.WhoToSubscribe,
                  sortIndex = module.sortIndex.get,
                  size = module.items.size.toShort,
                  itemIds = Some(userIds)
                ))
          }.flatten
        case ShowCoverInstruction(cover) =>
          Seq(
            EntryWithItemIds(
              entityIdType = EntityIdType.Prompt,
              sortIndex = cover.sortIndex.get,
              size = 1,
              itemIds = None
            )
          )
        case ReplaceEntryTimelineInstruction(entry) =>
          val namespaceLength = TweetItem.TweetEntryNamespace.toString.length
          Seq(
            EntryWithItemIds(
              entityIdType = EntityIdType.Tweet,
              sortIndex = entry.sortIndex.get,
              size = 1,
              itemIds = Some(
                Seq(
                  ItemIds(
                    tweetId =
                      entry.entryIdToReplace.map(e => e.substring(namespaceLength + 1).toLong),
                    sourceTweetId = None,
                    quoteTweetId = None,
                    sourceAuthorId = None,
                    quoteAuthorId = None,
                    inReplyToTweetId = None,
                    inReplyToAuthorId = None,
                    semanticCoreId = None,
                    articleId = None,
                    hasRelevancePrompt = None,
                    promptData = None,
                    tweetScore = None,
                    entryIdToReplace = entry.entryIdToReplace,
                    tweetReactiveData = None,
                    userId = None
                  )
                ))
            )
          )

      }.flatten

      val response = TimelineResponseV3(
        clientPlatform = timelineQuery.clientPlatform,
        servedTime = Time.now,
        requestType = requestTypeFromQuery(inputs.query),
        entries = entries)

      Stitch.callFuture(timelineResponseBatchesClient.insertResponse(timelineQuery, response))
    } else Stitch.Unit
  }