def apply()

in tweetypie/server/src/main/scala/com/twitter/tweetypie/hydrator/TweetHydration.scala [27:634]


  def apply(
    hydratorStats: StatsReceiver,
    hydrateFeatureSwitchResults: TweetDataValueHydrator,
    hydrateMentions: MentionEntitiesHydrator.Type,
    hydrateLanguage: LanguageHydrator.Type,
    hydrateUrls: UrlEntitiesHydrator.Type,
    hydrateQuotedTweetRef: QuotedTweetRefHydrator.Type,
    hydrateQuotedTweetRefUrls: QuotedTweetRefUrlsHydrator.Type,
    hydrateMediaCacheable: MediaEntitiesHydrator.Cacheable.Type,
    hydrateReplyScreenName: ReplyScreenNameHydrator.Type,
    hydrateConvoId: ConversationIdHydrator.Type,
    hydratePerspective: PerspectiveHydrator.Type,
    hydrateEditPerspective: EditPerspectiveHydrator.Type,
    hydrateConversationMuted: ConversationMutedHydrator.Type,
    hydrateContributor: ContributorHydrator.Type,
    hydrateTakedowns: TakedownHydrator.Type,
    hydrateDirectedAt: DirectedAtHydrator.Type,
    hydrateGeoScrub: GeoScrubHydrator.Type,
    hydrateCacheableRepairs: TweetDataValueHydrator,
    hydrateMediaUncacheable: MediaEntitiesHydrator.Uncacheable.Type,
    hydratePostCacheRepairs: TweetDataValueHydrator,
    hydrateTweetLegacyFormat: TweetDataValueHydrator,
    hydrateQuoteTweetVisibility: QuoteTweetVisibilityHydrator.Type,
    hydrateQuotedTweet: QuotedTweetHydrator.Type,
    hydratePastedMedia: PastedMediaHydrator.Type,
    hydrateMediaRefs: MediaRefsHydrator.Type,
    hydrateMediaTags: MediaTagsHydrator.Type,
    hydrateClassicCards: CardHydrator.Type,
    hydrateCard2: Card2Hydrator.Type,
    hydrateContributorVisibility: ContributorVisibilityFilter.Type,
    hydrateHasMedia: HasMediaHydrator.Type,
    hydrateTweetCounts: TweetCountsHydrator.Type,
    hydratePreviousTweetCounts: PreviousTweetCountsHydrator.Type,
    hydratePlace: PlaceHydrator.Type,
    hydrateDeviceSource: DeviceSourceHydrator.Type,
    hydrateProfileGeo: ProfileGeoHydrator.Type,
    hydrateSourceTweet: SourceTweetHydrator.Type,
    hydrateIM1837State: IM1837FilterHydrator.Type,
    hydrateIM2884State: IM2884FilterHydrator.Type,
    hydrateIM3433State: IM3433FilterHydrator.Type,
    hydrateTweetAuthorVisibility: TweetAuthorVisibilityHydrator.Type,
    hydrateReportedTweetVisibility: ReportedTweetFilter.Type,
    scrubSuperfluousUrlEntities: TweetDataValueHydrator,
    copyFromSourceTweet: TweetDataValueHydrator,
    hydrateTweetVisibility: TweetVisibilityHydrator.Type,
    hydrateEscherbirdAnnotations: EscherbirdAnnotationHydrator.Type,
    hydrateScrubEngagements: ScrubEngagementHydrator.Type,
    hydrateConversationControl: ConversationControlHydrator.Type,
    hydrateEditControl: EditControlHydrator.Type,
    hydrateUnmentionData: UnmentionDataHydrator.Type,
    hydrateNoteTweetSuffix: TweetDataValueHydrator
  ): TweetDataValueHydrator = {
    val scrubCachedTweet: TweetDataValueHydrator =
      ValueHydrator
        .fromMutation[Tweet, TweetQuery.Options](
          ScrubUncacheable.tweetMutation.countMutations(hydratorStats.counter("scrub_cached_tweet"))
        )
        .lensed(TweetData.Lenses.tweet)
        .onlyIf((td, opts) => opts.cause.reading(td.tweet.id))

    // We perform independent hydrations of individual bits of
    // data and pack the results into tuples instead of updating
    // the tweet for each one in order to avoid making lots of
    // copies of the tweet.

    val hydratePrimaryCacheableFields: TweetDataValueHydrator =
      ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
        val ctx = TweetCtx.from(td, opts)
        val tweet = td.tweet

        val urlsMediaQuoteTweet: Stitch[
          ValueState[(Seq[UrlEntity], Seq[MediaEntity], Option[QuotedTweet])]
        ] =
          for {
            urls <- hydrateUrls(getUrls(tweet), ctx)
            (media, quotedTweet) <- Stitch.join(
              hydrateMediaCacheable(
                getMedia(tweet),
                MediaEntityHydrator.Cacheable.Ctx(urls.value, ctx)
              ),
              for {
                qtRef <- hydrateQuotedTweetRef(
                  tweet.quotedTweet,
                  QuotedTweetRefHydrator.Ctx(urls.value, ctx)
                )
                qtRefWithUrls <- hydrateQuotedTweetRefUrls(qtRef.value, ctx)
              } yield {
                ValueState(qtRefWithUrls.value, qtRef.state ++ qtRefWithUrls.state)
              }
            )
          } yield {
            ValueState.join(urls, media, quotedTweet)
          }

        val conversationId: Stitch[ValueState[Option[ConversationId]]] =
          hydrateConvoId(getConversationId(tweet), ctx)

        val mentions: Stitch[ValueState[Seq[MentionEntity]]] =
          hydrateMentions(getMentions(tweet), ctx)

        val replyScreenName: Stitch[ValueState[Option[Reply]]] =
          hydrateReplyScreenName(getReply(tweet), ctx)

        val directedAt: Stitch[ValueState[Option[DirectedAtUser]]] =
          hydrateDirectedAt(
            getDirectedAtUser(tweet),
            DirectedAtHydrator.Ctx(
              mentions = getMentions(tweet),
              metadata = tweet.directedAtUserMetadata,
              underlyingTweetCtx = ctx
            )
          )

        val language: Stitch[ValueState[Option[Language]]] =
          hydrateLanguage(tweet.language, ctx)

        val contributor: Stitch[ValueState[Option[Contributor]]] =
          hydrateContributor(tweet.contributor, ctx)

        val geoScrub: Stitch[ValueState[(Option[GeoCoordinates], Option[PlaceId])]] =
          hydrateGeoScrub(
            (TweetLenses.geoCoordinates(tweet), TweetLenses.placeId(tweet)),
            ctx
          )

        Stitch
          .joinMap(
            urlsMediaQuoteTweet,
            conversationId,
            mentions,
            replyScreenName,
            directedAt,
            language,
            contributor,
            geoScrub
          )(ValueState.join(_, _, _, _, _, _, _, _))
          .map { values =>
            if (values.state.isEmpty) {
              ValueState.unmodified(td)
            } else {
              values.map {
                case (
                      (urls, media, quotedTweet),
                      conversationId,
                      mentions,
                      reply,
                      directedAt,
                      language,
                      contributor,
                      coreGeo
                    ) =>
                  val (coordinates, placeId) = coreGeo
                  td.copy(
                    tweet = tweet.copy(
                      coreData = tweet.coreData.map(
                        _.copy(
                          reply = reply,
                          conversationId = conversationId,
                          directedAtUser = directedAt,
                          coordinates = coordinates,
                          placeId = placeId
                        )
                      ),
                      urls = Some(urls),
                      media = Some(media),
                      mentions = Some(mentions),
                      language = language,
                      quotedTweet = quotedTweet,
                      contributor = contributor
                    )
                  )
              }
            }
          }
      }

    val assertNotScrubbed: TweetDataValueHydrator =
      ValueHydrator.fromMutation[TweetData, TweetQuery.Options](
        ScrubUncacheable
          .assertNotScrubbed(
            "output of the cacheable tweet hydrator should not require scrubbing"
          )
          .lensed(TweetData.Lenses.tweet)
      )

    val hydrateDependentUncacheableFields: TweetDataValueHydrator =
      ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
        val ctx = TweetCtx.from(td, opts)
        val tweet = td.tweet

        val quotedTweetResult: Stitch[ValueState[Option[QuotedTweetResult]]] =
          for {
            qtFilterState <- hydrateQuoteTweetVisibility(None, ctx)
            quotedTweet <- hydrateQuotedTweet(
              td.quotedTweetResult,
              QuotedTweetHydrator.Ctx(qtFilterState.value, ctx)
            )
          } yield {
            ValueState.join(qtFilterState, quotedTweet).map(_._2)
          }

        val pastedMedia: Stitch[ValueState[PastedMedia]] =
          hydratePastedMedia(
            PastedMediaHydrator.getPastedMedia(tweet),
            PastedMediaHydrator.Ctx(getUrls(tweet), ctx)
          )

        val mediaTags: Stitch[ValueState[Option[TweetMediaTags]]] =
          hydrateMediaTags(tweet.mediaTags, ctx)

        val classicCards: Stitch[ValueState[Option[Seq[Card]]]] =
          hydrateClassicCards(
            tweet.cards,
            CardHydrator.Ctx(getUrls(tweet), getMedia(tweet), ctx)
          )

        val card2: Stitch[ValueState[Option[Card2]]] =
          hydrateCard2(
            tweet.card2,
            Card2Hydrator.Ctx(
              getUrls(tweet),
              getMedia(tweet),
              getCardReference(tweet),
              ctx,
              td.featureSwitchResults
            )
          )

        val contributorVisibility: Stitch[ValueState[Option[Contributor]]] =
          hydrateContributorVisibility(tweet.contributor, ctx)

        val takedowns: Stitch[ValueState[Option[Takedowns]]] =
          hydrateTakedowns(
            None, // None because uncacheable hydrator doesn't depend on previous value
            TakedownHydrator.Ctx(Takedowns.fromTweet(tweet), ctx)
          )

        val conversationControl: Stitch[ValueState[Option[ConversationControl]]] =
          hydrateConversationControl(
            tweet.conversationControl,
            ConversationControlHydrator.Ctx(getConversationId(tweet), ctx)
          )

        // PreviousTweetCounts and Perspective hydration depends on tweet.editControl.edit_control_initial
        // having been hydrated in EditControlHydrator; thus we are chaining them together.
        val editControlWithDependencies: Stitch[
          ValueState[
            (
              Option[EditControl],
              Option[StatusPerspective],
              Option[StatusCounts],
              Option[TweetPerspective]
            )
          ]
        ] =
          for {
            (edit, perspective) <- Stitch.join(
              hydrateEditControl(tweet.editControl, ctx),
              hydratePerspective(
                tweet.perspective,
                PerspectiveHydrator.Ctx(td.featureSwitchResults, ctx))
            )
            (counts, editPerspective) <- Stitch.join(
              hydratePreviousTweetCounts(
                tweet.previousCounts,
                PreviousTweetCountsHydrator.Ctx(edit.value, td.featureSwitchResults, ctx)),
              hydrateEditPerspective(
                tweet.editPerspective,
                EditPerspectiveHydrator
                  .Ctx(perspective.value, edit.value, td.featureSwitchResults, ctx))
            )
          } yield {
            ValueState.join(edit, perspective, counts, editPerspective)
          }

        Stitch
          .joinMap(
            quotedTweetResult,
            pastedMedia,
            mediaTags,
            classicCards,
            card2,
            contributorVisibility,
            takedowns,
            conversationControl,
            editControlWithDependencies
          )(ValueState.join(_, _, _, _, _, _, _, _, _))
          .map { values =>
            if (values.state.isEmpty) {
              ValueState.unmodified(td)
            } else {
              values.map {
                case (
                      quotedTweetResult,
                      pastedMedia,
                      ownedMediaTags,
                      cards,
                      card2,
                      contributor,
                      takedowns,
                      conversationControl,
                      (editControl, perspective, previousCounts, editPerspective)
                    ) =>
                  td.copy(
                    tweet = tweet.copy(
                      media = Some(pastedMedia.mediaEntities),
                      mediaTags = pastedMedia.mergeTweetMediaTags(ownedMediaTags),
                      cards = cards,
                      card2 = card2,
                      contributor = contributor,
                      takedownCountryCodes = takedowns.map(_.countryCodes.toSeq),
                      takedownReasons = takedowns.map(_.reasons.toSeq),
                      conversationControl = conversationControl,
                      editControl = editControl,
                      previousCounts = previousCounts,
                      perspective = perspective,
                      editPerspective = editPerspective,
                    ),
                    quotedTweetResult = quotedTweetResult
                  )
              }
            }
          }
      }

    val hydrateIndependentUncacheableFields: TweetDataEditHydrator =
      EditHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
        val ctx = TweetCtx.from(td, opts)
        val tweet = td.tweet

        // Group together the results of hydrators that don't perform
        // filtering, because we don't care about the precedence of
        // exceptions from these hydrators, because the exceptions all
        // indicate failures, and picking any failure will be
        // fine. (All of the other hydrators might throw filtering
        // exceptions, so we need to make sure that we give precedence
        // to their failures.)
        val hydratorsWithoutFiltering =
          Stitch.joinMap(
            hydrateTweetCounts(tweet.counts, TweetCountsHydrator.Ctx(td.featureSwitchResults, ctx)),
            // Note: Place is cached in memcache, it is just not cached on the Tweet.
            hydratePlace(tweet.place, ctx),
            hydrateDeviceSource(tweet.deviceSource, ctx),
            hydrateProfileGeo(tweet.profileGeoEnrichment, ctx)
          )(ValueState.join(_, _, _, _))

        /**
         * Multiple hydrators throw visibility filtering exceptions so specify an order to achieve
         * a deterministic hydration result while ensuring that any retweet has a source tweet:
         * 1. hydrateSourceTweet throws SourceTweetNotFound, this is a detached-retweet so treat
         *      the retweet hydration as if it were not found
         * 2. hydrateTweetAuthorVisibility
         * 3. hydrateSourceTweet (other than SourceTweetNotFound already handled above)
         * 4. hydrateIM1837State
         * 5. hydrateIM2884State
         * 6. hydrateIM3433State
         * 7. hydratorsWithoutFiltering miscellaneous exceptions (any visibility filtering
         *      exceptions should win over failure of a hydrator)
         */
        val sourceTweetAndTweetAuthorResult =
          Stitch
            .joinMap(
              hydrateSourceTweet(td.sourceTweetResult, ctx).liftToTry,
              hydrateTweetAuthorVisibility((), ctx).liftToTry,
              hydrateIM1837State((), ctx).liftToTry,
              hydrateIM2884State((), ctx).liftToTry,
              hydrateIM3433State((), ctx).liftToTry
            ) {
              case (Throw(t @ FilteredState.Unavailable.SourceTweetNotFound(_)), _, _, _, _) =>
                Throw(t)
              case (_, Throw(t), _, _, _) => Throw(t) // TweetAuthorVisibility
              case (Throw(t), _, _, _, _) => Throw(t) // SourceTweet
              case (_, _, Throw(t), _, _) => Throw(t) // IM1837State
              case (_, _, _, Throw(t), _) => Throw(t) // IM2884State
              case (_, _, _, _, Throw(t)) => Throw(t) // IM3433State
              case (
                    Return(sourceTweetResultValue),
                    Return(authorVisibilityValue),
                    Return(im1837Value),
                    Return(im2884Value),
                    Return(im3433Value)
                  ) =>
                Return(
                  ValueState
                    .join(
                      sourceTweetResultValue,
                      authorVisibilityValue,
                      im1837Value,
                      im2884Value,
                      im3433Value
                    )
                )
            }.lowerFromTry

        StitchExceptionPrecedence(sourceTweetAndTweetAuthorResult)
          .joinWith(hydratorsWithoutFiltering)(ValueState.join(_, _))
          .toStitch
          .map { values =>
            if (values.state.isEmpty) {
              EditState.unit[TweetData]
            } else {
              EditState[TweetData] { tweetData =>
                val tweet = tweetData.tweet
                values.map {
                  case (
                        (sourceTweetResult, _, _, _, _),
                        (counts, place, deviceSource, profileGeo)
                      ) =>
                    tweetData.copy(
                      tweet = tweet.copy(
                        counts = counts,
                        place = place,
                        deviceSource = deviceSource,
                        profileGeoEnrichment = profileGeo
                      ),
                      sourceTweetResult = sourceTweetResult
                    )
                }
              }
            }
          }
      }

    val hydrateUnmentionDataToTweetData: TweetDataValueHydrator =
      TweetHydration.setOnTweetData(
        TweetData.Lenses.tweet.andThen(TweetLenses.unmentionData),
        (td: TweetData, opts: TweetQuery.Options) =>
          UnmentionDataHydrator
            .Ctx(getConversationId(td.tweet), getMentions(td.tweet), TweetCtx.from(td, opts)),
        hydrateUnmentionData
      )

    val hydrateCacheableFields: TweetDataValueHydrator =
      ValueHydrator.inSequence(
        scrubCachedTweet,
        hydratePrimaryCacheableFields,
        // Relies on mentions being hydrated in hydratePrimaryCacheableFields
        hydrateUnmentionDataToTweetData,
        assertNotScrubbed,
        hydrateCacheableRepairs
      )

    // The conversation muted hydrator needs the conversation id,
    // which comes from the primary cacheable fields, and the media hydrator
    // needs the cacheable media entities.
    val hydrateUncacheableMedia: TweetDataValueHydrator =
      ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
        val ctx = TweetCtx.from(td, opts)
        val tweet = td.tweet

        val mediaCtx =
          MediaEntityHydrator.Uncacheable.Ctx(td.tweet.mediaKeys, ctx)

        val media: Stitch[ValueState[Option[Seq[MediaEntity]]]] =
          hydrateMediaUncacheable.liftOption.apply(td.tweet.media, mediaCtx)

        val conversationMuted: Stitch[ValueState[Option[Boolean]]] =
          hydrateConversationMuted(
            tweet.conversationMuted,
            ConversationMutedHydrator.Ctx(getConversationId(tweet), ctx)
          )

        // MediaRefs need to be hydrated at this phase because they rely on the media field
        // on the Tweet, which can get unset by later hydrators.
        val mediaRefs: Stitch[ValueState[Option[Seq[MediaRef]]]] =
          hydrateMediaRefs(
            tweet.mediaRefs,
            MediaRefsHydrator.Ctx(getMedia(tweet), getMediaKeys(tweet), getUrls(tweet), ctx)
          )

        Stitch
          .joinMap(
            media,
            conversationMuted,
            mediaRefs
          )(ValueState.join(_, _, _))
          .map { values =>
            if (values.state.isEmpty) {
              ValueState.unmodified(td)
            } else {
              val tweet = td.tweet
              values.map {
                case (media, conversationMuted, mediaRefs) =>
                  td.copy(
                    tweet = tweet.copy(
                      media = media,
                      conversationMuted = conversationMuted,
                      mediaRefs = mediaRefs
                    )
                  )
              }
            }
          }
      }

    val hydrateHasMediaToTweetData: TweetDataValueHydrator =
      TweetHydration.setOnTweetData(
        TweetData.Lenses.tweet.andThen(TweetLenses.hasMedia),
        (td: TweetData, opts: TweetQuery.Options) => td.tweet,
        hydrateHasMedia
      )

    val hydrateReportedTweetVisibilityToTweetData: TweetDataValueHydrator = {
      // Create a TweetDataValueHydrator that calls hydrateReportedTweetVisibility, which
      // either throws a FilteredState.Unavailable or returns Unit.
      ValueHydrator[TweetData, TweetQuery.Options] { (td, opts) =>
        val ctx = ReportedTweetFilter.Ctx(td.tweet.perspective, TweetCtx.from(td, opts))
        hydrateReportedTweetVisibility((), ctx).map { _ =>
          ValueState.unmodified(td)
        }
      }
    }

    val hydrateTweetVisibilityToTweetData: TweetDataValueHydrator =
      TweetHydration.setOnTweetData(
        TweetData.Lenses.suppress,
        (td: TweetData, opts: TweetQuery.Options) =>
          TweetVisibilityHydrator.Ctx(td.tweet, TweetCtx.from(td, opts)),
        hydrateTweetVisibility
      )

    val hydrateEscherbirdAnnotationsToTweetAndCachedTweet: TweetDataValueHydrator =
      TweetHydration.setOnTweetAndCachedTweet(
        TweetLenses.escherbirdEntityAnnotations,
        (td: TweetData, _: TweetQuery.Options) => td.tweet,
        hydrateEscherbirdAnnotations
      )

    val scrubEngagements: TweetDataValueHydrator =
      TweetHydration.setOnTweetData(
        TweetData.Lenses.tweetCounts,
        (td: TweetData, _: TweetQuery.Options) => ScrubEngagementHydrator.Ctx(td.suppress),
        hydrateScrubEngagements
      )

    /**
     * This is where we wire up all the separate hydrators into a single [[TweetDataValueHydrator]].
     *
     * Each hydrator here is either a [[TweetDataValueHydrator]] or a [[TweetDataEditHydrator]].
     * We use [[EditHydrator]]s for anything that needs to run in parallel ([[ValueHydrator]]s can
     * only be run in sequence).
     */
    ValueHydrator.inSequence(
      // Hydrate FeatureSwitchResults first, so they can be used by other hydrators if needed
      hydrateFeatureSwitchResults,
      EditHydrator
        .inParallel(
          ValueHydrator
            .inSequence(
              // The result of running these hydrators is saved as `cacheableTweetResult` and
              // written back to cache via `cacheChangesEffect` in `hydrateRepo`
              TweetHydration.captureCacheableTweetResult(
                hydrateCacheableFields
              ),
              // Uncacheable hydrators that depend only on the cacheable fields
              hydrateUncacheableMedia,
              // clean-up partially hydrated entities before any of the hydrators that look at
              // url and media entities run, so that they never see bad entities.
              hydratePostCacheRepairs,
              // These hydrators are all dependent on each other and/or the previous hydrators
              hydrateDependentUncacheableFields,
              // Sets `hasMedia`. Comes after PastedMediaHydrator in order to include pasted
              // pics as well as other media & urls.
              hydrateHasMediaToTweetData
            )
            .toEditHydrator,
          // These hydrators do not rely on any other hydrators and so can be run in parallel
          // with the above hydrators (and with each other)
          hydrateIndependentUncacheableFields
        )
        .toValueHydrator,
      // Depends on reported perspectival having been hydrated in PerspectiveHydrator
      hydrateReportedTweetVisibilityToTweetData,
      // Remove superfluous urls entities when there is a corresponding MediaEntity for the same url
      scrubSuperfluousUrlEntities,
      // The copyFromSourceTweet hydrator needs to be located after the hydrators that produce the
      // fields to copy. It must be located after PartialEntityCleaner (part of postCacheRepairs),
      // which removes failed MediaEntities. It also depends on takedownCountryCodes having been
      // hydrated in TakedownHydrator.
      copyFromSourceTweet,
      // depends on AdditionalFieldsHydrator and CopyFromSourceTweet to copy safety labels
      hydrateTweetVisibilityToTweetData,
      // for IPI'd tweets, we want to disable tweet engagement counts from being returned
      // StatusCounts for replyCount, retweetCount.
      // scrubEngagements hydrator must come after tweet visibility hydrator.
      // tweet visibility hydrator emits the suppressed FilteredState needed for scrubbing.
      scrubEngagements,
      // this hydrator runs when writing the current tweet
      // Escherbird comes last in order to consume a tweet that's as close as possible
      // to the tweet written to tweet_events
      hydrateEscherbirdAnnotationsToTweetAndCachedTweet
        .onlyIf((td, opts) => opts.cause.writing(td.tweet.id)),
      // Add an ellipsis to the end of the text for a Tweet that has a NoteTweet associated.
      // This is so that the Tweet is displayed on the home timeline with an ellipsis, letting
      // the User know that there's more to see.
      hydrateNoteTweetSuffix,
      /**
       * Post-cache repair of QT text and entities to support rendering on all clients
       * Moving this to end of the pipeline to avoid/minimize chance of following hydrators
       * depending on modified tweet text or entities.
       * When we start persisting shortUrl in MH - permalink won't be empty. therefore,
       * we won't run QuotedTweetRefHydrator and just hydrate expanded and display
       * using QuotedTweetRefUrlsHydrator. We will use hydrated permalink to repair
       * QT text and entities for non-upgraded clients in this step.
       * */
      hydrateTweetLegacyFormat
    )
  }