def apply()

in tweetypie/server/src/main/scala/com/twitter/tweetypie/config/TweetHydrators.scala [44:340]


  def apply(
    stats: StatsReceiver,
    deciderGates: TweetypieDeciderGates,
    repos: LogicalRepositories,
    tweetDataCache: LockingCache[TweetId, Cached[TweetData]],
    hasMedia: Tweet => Boolean,
    featureSwitchesWithoutExperiments: FeatureSwitches,
    clientIdHelper: ClientIdHelper
  ): TweetHydrators = {
    import repos._

    val repairStats = stats.scope("repairs")
    val hydratorStats = stats.scope("hydrators")

    def scoped[A](stats: StatsReceiver, name: String)(f: StatsReceiver => A): A = {
      val scopedStats = stats.scope(name)
      f(scopedStats)
    }

    val isFailureException: Throwable => Boolean = {
      case _: FilteredState => false
      case NotFound => false
      case _ => true
    }

    def hydratorExceptionCategorizer(failureScope: String) =
      ExceptionCategorizer.const("filtered").onlyIf(_.isInstanceOf[FilteredState]) ++
        ExceptionCategorizer.const("not_found").onlyIf(_ == NotFound) ++
        TpExceptionCounter.defaultCategorizer(failureScope).onlyIf(isFailureException)

    val hydratorExceptionCounter: (StatsReceiver, String) => ExceptionCounter =
      (stats, scope) => TpExceptionCounter(stats, hydratorExceptionCategorizer(scope))

    val tweetHydrator =
      TweetHydration(
        hydratorStats = hydratorStats,
        hydrateFeatureSwitchResults =
          FeatureSwitchResultsHydrator(featureSwitchesWithoutExperiments, clientIdHelper),
        hydrateMentions = MentionEntitiesHydrator
          .once(MentionEntityHydrator(userIdentityRepo))
          .observe(hydratorStats.scope("mentions"), hydratorExceptionCounter),
        hydrateLanguage = LanguageHydrator(languageRepo)
          .observe(hydratorStats.scope("language"), hydratorExceptionCounter),
        hydrateUrls = scoped(hydratorStats, "url") { stats =>
          UrlEntitiesHydrator
            .once(UrlEntityHydrator(urlRepo, stats))
            .observe(stats, hydratorExceptionCounter)
        },
        hydrateQuotedTweetRef = QuotedTweetRefHydrator
          .once(
            QuotedTweetRefHydrator(tweetRepo)
          )
          .observe(hydratorStats.scope("quoted_tweet_ref"), hydratorExceptionCounter),
        hydrateQuotedTweetRefUrls = QuotedTweetRefUrlsHydrator(userIdentityRepo)
          .observe(hydratorStats.scope("quoted_tweet_ref_urls"), hydratorExceptionCounter),
        hydrateMediaCacheable = MediaEntitiesHydrator.Cacheable
          .once(
            MediaEntityHydrator.Cacheable(
              hydrateMediaUrls = MediaUrlFieldsHydrator()
                .observe(hydratorStats.scope("media_urls"), hydratorExceptionCounter),
              hydrateMediaIsProtected = MediaIsProtectedHydrator(userProtectionRepo)
                .observe(hydratorStats.scope("media_is_protected"), hydratorExceptionCounter)
            )
          )
          .observe(hydratorStats.scope("media_cacheable"), hydratorExceptionCounter)
          .ifEnabled(deciderGates.hydrateMedia),
        hydrateReplyScreenName = ReplyScreenNameHydrator
          .once(ReplyScreenNameHydrator(userIdentityRepo))
          .observe(hydratorStats.scope("in_reply_to_screen_name"), hydratorExceptionCounter),
        hydrateConvoId = ConversationIdHydrator(conversationIdRepo)
          .observe(hydratorStats.scope("conversation_id"), hydratorExceptionCounter),
        hydratePerspective = // Don't cache with the tweet because it depends on the request
          PerspectiveHydrator(
            repo = perspectiveRepo,
            shouldHydrateBookmarksPerspective = deciderGates.hydrateBookmarksPerspective,
            stats = hydratorStats.scope("perspective_by_safety_label")
          ).observe(hydratorStats.scope("perspective"), hydratorExceptionCounter)
            .ifEnabled(deciderGates.hydratePerspectives),
        hydrateEditPerspective = EditPerspectiveHydrator(
          repo = perspectiveRepo,
          timelinesGate = deciderGates.hydratePerspectivesEditsForTimelines,
          tweetDetailsGate = deciderGates.hydratePerspectivesEditsForTweetDetail,
          otherSafetyLevelsGate = deciderGates.hydratePerspectivesEditsForOtherSafetyLevels,
          bookmarksGate = deciderGates.hydrateBookmarksPerspective,
          stats = hydratorStats
        ).observe(hydratorStats.scope("edit_perspective"), hydratorExceptionCounter),
        hydrateConversationMuted = // Don't cache because it depends on the request.  If
          // possible, this hydrator should be in the same stage as
          // PerspectiveHydrator, so that the calls can be batched
          // together.
          ConversationMutedHydrator(conversationMutedRepo)
            .observe(hydratorStats.scope("conversation_muted"), hydratorExceptionCounter)
            .ifEnabled(deciderGates.hydrateConversationMuted),
        hydrateContributor = ContributorHydrator
          .once(ContributorHydrator(userIdentityRepo))
          .observe(hydratorStats.scope("contributors"), hydratorExceptionCounter),
        hydrateTakedowns = TakedownHydrator(takedownRepo)
          .observe(hydratorStats.scope("takedowns"), hydratorExceptionCounter),
        hydrateDirectedAt = scoped(hydratorStats, "directed_at") { stats =>
          DirectedAtHydrator
            .once(DirectedAtHydrator(userIdentityRepo, stats))
            .observe(stats, hydratorExceptionCounter)
        },
        hydrateGeoScrub = GeoScrubHydrator(
          geoScrubTimestampRepo,
          Scribe("test_tweetypie_read_time_geo_scrubs")
            .contramap[TweetId](_.toString)
        ).observe(hydratorStats.scope("geo_scrub"), hydratorExceptionCounter),
        hydrateCacheableRepairs = ValueHydrator
          .fromMutation[Tweet, TweetQuery.Options](
            RepairMutation(
              repairStats.scope("on_read"),
              "created_at" ->
                new CreatedAtRepairer(Scribe("test_tweetypie_bad_created_at")),
              "retweet_media" -> RetweetMediaRepairer,
              "parent_status_id" -> RetweetParentStatusIdRepairer.tweetMutation,
              "visible_text_range" -> NegativeVisibleTextRangeRepairer.tweetMutation
            )
          )
          .lensed(TweetData.Lenses.tweet)
          .onlyIf((td, opts) => opts.cause.reading(td.tweet.id)),
        hydrateMediaUncacheable = MediaEntityHydrator
          .Uncacheable(
            hydrateMediaKey = MediaKeyHydrator()
              .observe(hydratorStats.scope("media_key"), hydratorExceptionCounter),
            hydrateMediaInfo = scoped(hydratorStats, "media_info") { stats =>
              MediaInfoHydrator(mediaMetadataRepo, stats)
                .observe(stats, hydratorExceptionCounter)
            }
          )
          .observe(hydratorStats.scope("media_uncacheable"), hydratorExceptionCounter)
          .liftSeq
          .ifEnabled(deciderGates.hydrateMedia),
        hydratePostCacheRepairs =
          // clean-up partially hydrated entities before any of the hydrators that look at
          // url and media entities run, so that they never see bad entities.
          ValueHydrator.fromMutation[TweetData, TweetQuery.Options](
            RepairMutation(
              repairStats.scope("on_read"),
              "partial_entity_cleanup" -> PartialEntityCleaner(repairStats),
              "strip_not_display_coords" -> StripHiddenGeoCoordinates
            ).lensed(TweetData.Lenses.tweet)
          ),
        hydrateTweetLegacyFormat = scoped(hydratorStats, "tweet_legacy_formatter") { stats =>
          TweetLegacyFormatter(stats)
            .observe(stats, hydratorExceptionCounter)
            .onlyIf((td, opts) => opts.cause.reading(td.tweet.id))
        },
        hydrateQuoteTweetVisibility = QuoteTweetVisibilityHydrator(quotedTweetVisibilityRepo)
          .observe(hydratorStats.scope("quote_tweet_visibility"), hydratorExceptionCounter),
        hydrateQuotedTweet = QuotedTweetHydrator(tweetResultRepo)
          .observe(hydratorStats.scope("quoted_tweet"), hydratorExceptionCounter),
        hydratePastedMedia =
          // Don't cache with the tweet because we want to automatically drop this media if
          // the referenced tweet is deleted or becomes non-public.
          PastedMediaHydrator(pastedMediaRepo)
            .observe(hydratorStats.scope("pasted_media"))
            .ifEnabled(deciderGates.hydratePastedMedia),
        hydrateMediaRefs = MediaRefsHydrator(
          optionalTweetRepo,
          deciderGates.mediaRefsHydratorIncludePastedMedia
        ).observe(hydratorStats.scope("media_refs"))
          .ifEnabled(deciderGates.hydrateMediaRefs),
        hydrateMediaTags = // depends on AdditionalFieldsHydrator
          MediaTagsHydrator(userViewRepo)
            .observe(hydratorStats.scope("media_tags"), hydratorExceptionCounter)
            .ifEnabled(deciderGates.hydrateMediaTags),
        hydrateClassicCards = CardHydrator(cardRepo)
          .observe(hydratorStats.scope("cards"), hydratorExceptionCounter),
        hydrateCard2 = Card2Hydrator(card2Repo)
          .observe(hydratorStats.scope("card2")),
        hydrateContributorVisibility =
          // Filter out contributors field for all but the user who owns the tweet
          ContributorVisibilityFilter()
            .observe(hydratorStats.scope("contributor_visibility"), hydratorExceptionCounter),
        hydrateHasMedia =
          // Sets hasMedia. Comes after PastedMediaHydrator in order to include pasted
          // pics as well as other media & urls.
          HasMediaHydrator(hasMedia)
            .observe(hydratorStats.scope("has_media"), hydratorExceptionCounter)
            .ifEnabled(deciderGates.hydrateHasMedia),
        hydrateTweetCounts = // Don't cache counts with the tweet because it has its own cache with
          // a different TTL
          TweetCountsHydrator(tweetCountsRepo, deciderGates.hydrateBookmarksCount)
            .observe(hydratorStats.scope("tweet_counts"), hydratorExceptionCounter)
            .ifEnabled(deciderGates.hydrateCounts),
        hydratePreviousTweetCounts = // previous counts are not cached
          scoped(hydratorStats, "previous_counts") { stats =>
            PreviousTweetCountsHydrator(tweetCountsRepo, deciderGates.hydrateBookmarksCount)
              .observe(stats, hydratorExceptionCounter)
              .ifEnabled(deciderGates.hydratePreviousCounts)
          },
        hydratePlace =
          // Don't cache with the tweet because Place has its own tweetypie cache keyspace
          // with a different TTL, and it's more efficient to store separately.
          // See com.twitter.tweetypie.repository.PlaceKey
          PlaceHydrator(placeRepo)
            .observe(hydratorStats.scope("place"), hydratorExceptionCounter)
            .ifEnabled(deciderGates.hydratePlaces),
        hydrateDeviceSource = // Don't cache with the tweet because it has its own cache,
          // and it's more efficient to cache it separately
          DeviceSourceHydrator(deviceSourceRepo)
            .observe(hydratorStats.scope("device_source"), hydratorExceptionCounter)
            .ifEnabled(deciderGates.hydrateDeviceSources),
        hydrateProfileGeo =
          // Don't cache gnip profile geo as read request volume is expected to be low
          ProfileGeoHydrator(profileGeoRepo)
            .observe(hydratorStats.scope("profile_geo"), hydratorExceptionCounter)
            .ifEnabled(deciderGates.hydrateGnipProfileGeoEnrichment),
        hydrateSourceTweet = scoped(hydratorStats, "source_tweet") { stats =>
          SourceTweetHydrator(
            tweetResultRepo,
            stats,
            FutureEffect
              .inParallel(
                Scribe(DetachedRetweet, "tweetypie_detached_retweets"),
                Scribe(DetachedRetweet, "test_tweetypie_detached_retweets"),
              )
          ).observe(stats, hydratorExceptionCounter)
        },
        hydrateIM1837State = IM1837FilterHydrator()
          .observe(hydratorStats.scope("im1837_filter"), hydratorExceptionCounter)
          .onlyIf { (_, ctx) =>
            ctx.opts.forExternalConsumption && ctx.opts.cause.reading(ctx.tweetId)
          },
        hydrateIM2884State = scoped(hydratorStats, "im2884_filter") { stats =>
          IM2884FilterHydrator(stats)
            .observe(stats, hydratorExceptionCounter)
            .onlyIf { (_, ctx) =>
              ctx.opts.forExternalConsumption && ctx.opts.cause.reading(ctx.tweetId)
            }
        },
        hydrateIM3433State = scoped(hydratorStats, "im3433_filter") { stats =>
          IM3433FilterHydrator(stats)
            .observe(stats, hydratorExceptionCounter)
            .onlyIf { (_, ctx) =>
              ctx.opts.forExternalConsumption && ctx.opts.cause.reading(ctx.tweetId)
            }
        },
        hydrateTweetAuthorVisibility = TweetAuthorVisibilityHydrator(userVisibilityRepo)
          .observe(hydratorStats.scope("tweet_author_visibility"), hydratorExceptionCounter)
          .onlyIf((_, ctx) => ctx.opts.cause.reading(ctx.tweetId)),
        hydrateReportedTweetVisibility = ReportedTweetFilter()
          .observe(hydratorStats.scope("reported_tweet_filter"), hydratorExceptionCounter),
        scrubSuperfluousUrlEntities = ValueHydrator
          .fromMutation[Tweet, TweetQuery.Options](SuperfluousUrlEntityScrubber.mutation)
          .lensed(TweetData.Lenses.tweet),
        copyFromSourceTweet = CopyFromSourceTweet.hydrator
          .observe(hydratorStats.scope("copy_from_source_tweet"), hydratorExceptionCounter),
        hydrateTweetVisibility = scoped(hydratorStats, "tweet_visibility") { stats =>
          TweetVisibilityHydrator(
            tweetVisibilityRepo,
            deciderGates.failClosedInVF,
            stats
          ).observe(stats, hydratorExceptionCounter)
        },
        hydrateEscherbirdAnnotations = EscherbirdAnnotationHydrator(escherbirdAnnotationRepo)
          .observe(hydratorStats.scope("escherbird_annotations"), hydratorExceptionCounter)
          .ifEnabled(deciderGates.hydrateEscherbirdAnnotations),
        hydrateScrubEngagements = ScrubEngagementHydrator()
          .observe(hydratorStats.scope("scrub_engagements"), hydratorExceptionCounter)
          .ifEnabled(deciderGates.hydrateScrubEngagements),
        hydrateConversationControl = scoped(hydratorStats, "tweet_conversation_control") { stats =>
          ConversationControlHydrator(
            conversationControlRepo,
            deciderGates.disableInviteViaMention,
            stats
          ).observe(stats, hydratorExceptionCounter)
        },
        hydrateEditControl = scoped(hydratorStats, "tweet_edit_control") { stats =>
          EditControlHydrator(
            tweetRepo,
            deciderGates.setEditTimeWindowToSixtyMinutes,
            stats
          ).observe(stats, hydratorExceptionCounter)
        },
        hydrateUnmentionData = UnmentionDataHydrator(),
        hydrateNoteTweetSuffix = NoteTweetSuffixHydrator().observe(stats, hydratorExceptionCounter)
      )

    new TweetHydrators {
      val hydrator: TweetDataValueHydrator =
        tweetHydrator.onlyIf { (tweetData, opts) =>
          // When the caller requests fetchStoredTweets and Tweets are fetched from Manhattan
          // irrespective of state, the stored data for some Tweets may be incomplete.
          // We skip the hydration of those Tweets.
          !opts.fetchStoredTweets ||
          tweetData.storedTweetResult.exists(_.canHydrate)
        }

      val cacheChangesEffect: Effect[ValueState[TweetData]] =
        TweetHydration.cacheChanges(
          tweetDataCache,
          hydratorStats.scope("tweet_caching")
        )
    }
  }