def apply()

in tweetypie/server/src/main/scala/com/twitter/tweetypie/handler/RetweetBuilder.scala [73:351]


  def apply(
    validateRequest: RetweetRequest => Future[Unit],
    tweetIdGenerator: TweetIdGenerator,
    tweetRepo: TweetRepository.Type,
    userRepo: UserRepository.Type,
    tflock: TFlockClient,
    deviceSourceRepo: DeviceSourceRepository.Type,
    validateUpdateRateLimit: RateLimitChecker.Validate,
    spamChecker: Spam.Checker[RetweetSpamRequest] = Spam.DoNotCheckSpam,
    updateUserCounts: (User, Tweet) => Future[User],
    superFollowRelationsRepo: StratoSuperFollowRelationsRepository.Type,
    unretweetEdits: TweetDeletePathHandler.UnretweetEdits,
    setEditWindowToSixtyMinutes: Gate[Unit]
  ): RetweetBuilder.Type = {
    val entityExtactor = EntityExtractor.mutationAll.endo

    val sourceTweetRepo: SourceTweetRequest => Stitch[Tweet] =
      req => {
        tweetRepo(
          req.tweetId,
          WritePathQueryOptions.retweetSourceTweet(req.user, req.hydrateOptions)
        ).rescue {
            case _: FilteredState => Stitch.NotFound
          }
          .rescue {
            convertRepoExceptions(TweetCreateState.SourceTweetNotFound, TweetLookupFailure(_))
          }
      }

    val getUser = userLookup(userRepo)
    val getSourceUser = sourceUserLookup(userRepo)
    val getDeviceSource = deviceSourceLookup(deviceSourceRepo)

    /**
     * We exempt SGS test users from the check to get them through Block v2 testing.
     */
    def isSGSTestRole(user: User): Boolean =
      user.roles.exists { roles => roles.roles.contains(SGSTestRole) }

    def validateCanRetweet(
      user: User,
      sourceUser: User,
      sourceTweet: Tweet,
      request: RetweetRequest
    ): Future[Unit] =
      Future
        .join(
          validateNotCommunityTweet(sourceTweet),
          validateNotTrustedFriendsTweet(sourceTweet),
          validateSourceUserRetweetable(user, sourceUser),
          validateStaleTweet(sourceTweet),
          Future.when(!request.dark) {
            if (request.returnSuccessOnDuplicate)
              failWithRetweetIdIfAlreadyRetweeted(user, sourceTweet)
            else
              validateNotAlreadyRetweeted(user, sourceTweet)
          }
        )
        .unit

    def validateSourceUserRetweetable(user: User, sourceUser: User): Future[Unit] =
      if (sourceUser.profile.isEmpty)
        Future.exception(UserProfileEmptyException)
      else if (sourceUser.safety.isEmpty)
        Future.exception(UserSafetyEmptyException)
      else if (sourceUser.view.isEmpty)
        Future.exception(UserViewEmptyException)
      else if (user.id != sourceUser.id && sourceUser.safety.get.isProtected)
        Future.exception(TweetCreateFailure.State(TweetCreateState.CannotRetweetProtectedTweet))
      else if (sourceUser.safety.get.deactivated)
        Future.exception(TweetCreateFailure.State(TweetCreateState.CannotRetweetDeactivatedUser))
      else if (sourceUser.safety.get.suspended)
        Future.exception(TweetCreateFailure.State(TweetCreateState.CannotRetweetSuspendedUser))
      else if (sourceUser.view.get.blockedBy && !isSGSTestRole(user))
        Future.exception(TweetCreateFailure.State(TweetCreateState.CannotRetweetBlockingUser))
      else if (sourceUser.profile.get.screenName.isEmpty)
        Future.exception(
          TweetCreateFailure.State(TweetCreateState.CannotRetweetUserWithoutScreenName)
        )
      else
        Future.Unit

    def tflockGraphContains(
      graph: StatusGraph,
      fromId: Long,
      toId: Long,
      dir: Direction
    ): Future[Boolean] =
      tflock.contains(graph, fromId, toId, dir).rescue {
        case ex: OverCapacity => Future.exception(ex)
        case ex => Future.exception(TFlockLookupFailure(ex))
      }

    def getRetweetIdFromTflock(sourceTweetId: TweetId, userId: UserId): Future[Option[Long]] =
      tflock
        .selectAll(
          Select(
            sourceId = sourceTweetId,
            graph = RetweetsGraph,
            direction = Forward
          ).intersect(
            Select(
              sourceId = userId,
              graph = UserTimelineGraph,
              direction = Forward
            )
          )
        )
        .map(_.headOption)

    def validateNotAlreadyRetweeted(user: User, sourceTweet: Tweet): Future[Unit] =
      // use the perspective object from TLS if available, otherwise, check with tflock
      (sourceTweet.perspective match {
        case Some(perspective) =>
          Future.value(perspective.retweeted)
        case None =>
          // we have to query the RetweetSourceGraph in the Reverse order because
          // it is only defined in that direction, instead of bi-directionally
          tflockGraphContains(RetweetSourceGraph, user.id, sourceTweet.id, Reverse)
      }).flatMap {
        case true =>
          Future.exception(TweetCreateFailure.State(TweetCreateState.AlreadyRetweeted))
        case false => Future.Unit
      }

    def failWithRetweetIdIfAlreadyRetweeted(user: User, sourceTweet: Tweet): Future[Unit] =
      // use the perspective object from TLS if available, otherwise, check with tflock
      (sourceTweet.perspective.flatMap(_.retweetId) match {
        case Some(tweetId) => Future.value(Some(tweetId))
        case None =>
          getRetweetIdFromTflock(sourceTweet.id, user.id)
      }).flatMap {
        case None => Future.Unit
        case Some(tweetId) =>
          Future.exception(TweetCreateFailure.AlreadyRetweeted(tweetId))
      }

    def validateContributor(contributorIdOpt: Option[UserId]): Future[Unit] =
      if (contributorIdOpt.isDefined)
        Future.exception(TweetCreateFailure.State(TweetCreateState.ContributorNotSupported))
      else
        Future.Unit

    case class RetweetSource(sourceTweet: Tweet, parentUserId: UserId)

    /**
     * Recursively follows a retweet chain to the root source tweet.  Also returns user id from the
     * first walked tweet as the 'parentUserId'.
     * In practice, the depth of the chain should never be greater than 2 because
     * share.sourceStatusId should always reference the root (unlike share.parentStatusId).
     */
    def findRetweetSource(
      tweetId: TweetId,
      forUser: User,
      hydrateOptions: WritePathHydrationOptions
    ): Future[RetweetSource] =
      Stitch
        .run(sourceTweetRepo(SourceTweetRequest(tweetId, forUser, hydrateOptions)))
        .flatMap { tweet =>
          getShare(tweet) match {
            case None => Future.value(RetweetSource(tweet, getUserId(tweet)))
            case Some(share) =>
              findRetweetSource(share.sourceStatusId, forUser, hydrateOptions)
                .map(_.copy(parentUserId = getUserId(tweet)))
          }
        }

    FutureArrow { request =>
      for {
        () <- validateRequest(request)
        userFuture = Stitch.run(getUser(request.userId))
        tweetIdFuture = tweetIdGenerator()
        devsrcFuture = Stitch.run(getDeviceSource(request.createdVia))
        user <- userFuture
        tweetId <- tweetIdFuture
        devsrc <- devsrcFuture
        rtSource <- findRetweetSource(
          request.sourceStatusId,
          user,
          request.hydrationOptions.getOrElse(WritePathHydrationOptions(simpleQuotedTweet = true))
        )
        sourceTweet = rtSource.sourceTweet
        sourceUser <- Stitch.run(getSourceUser(getUserId(sourceTweet), request.userId))

        // We want to confirm that a user is actually allowed to
        // retweet an Exclusive Tweet (only available to super followers)
        () <- StratoSuperFollowRelationsRepository.Validate(
          sourceTweet.exclusiveTweetControl,
          user.id,
          superFollowRelationsRepo)

        () <- validateUser(user)
        () <- validateUpdateRateLimit((user.id, request.dark))
        () <- validateContributor(request.contributorUserId)
        () <- validateCanRetweet(user, sourceUser, sourceTweet, request)
        () <- unretweetEdits(sourceTweet.editControl, sourceTweet.id, user.id)

        spamRequest = RetweetSpamRequest(
          retweetId = tweetId,
          sourceUserId = getUserId(sourceTweet),
          sourceTweetId = sourceTweet.id,
          sourceTweetText = getText(sourceTweet),
          sourceUserName = sourceUser.profile.map(_.screenName),
          safetyMetaData = request.safetyMetaData
        )

        spamResult <- spamChecker(spamRequest)

        safety = user.safety.get

        share = Share(
          sourceStatusId = sourceTweet.id,
          sourceUserId = sourceUser.id,
          parentStatusId = request.sourceStatusId
        )

        retweetText = composeRetweetText(getText(sourceTweet), sourceUser)
        createdAt = SnowflakeId(tweetId).time

        coreData = TweetCoreData(
          userId = request.userId,
          text = retweetText,
          createdAtSecs = createdAt.inSeconds,
          createdVia = devsrc.internalName,
          share = Some(share),
          hasTakedown = safety.hasTakedown,
          trackingId = request.trackingId,
          nsfwUser = safety.nsfwUser,
          nsfwAdmin = safety.nsfwAdmin,
          narrowcast = request.narrowcast,
          nullcast = request.nullcast
        )

        retweet = Tweet(
          id = tweetId,
          coreData = Some(coreData),
          contributor = getContributor(request.userId),
          editControl = Some(
            EditControl.Initial(
              EditControlUtil
                .makeEditControlInitial(
                  tweetId = tweetId,
                  createdAt = createdAt,
                  setEditWindowToSixtyMinutes = setEditWindowToSixtyMinutes
                )
                .initial
                .copy(isEditEligible = Some(false))
            )
          ),
        )

        retweetWithEntities = entityExtactor(retweet)
        retweetWithAdditionalFields = setAdditionalFields(
          retweetWithEntities,
          request.additionalFields
        )
        // update the perspective and counts fields of the source tweet to reflect the effects
        // of the user performing a retweet, even though those effects haven't happened yet.
        updatedSourceTweet = sourceTweet.copy(
          perspective = sourceTweet.perspective.map {
            _.copy(retweeted = true, retweetId = Some(retweet.id))
          },
          counts = sourceTweet.counts.map { c => c.copy(retweetCount = c.retweetCount.map(_ + 1)) }
        )

        user <- updateUserCounts(user, retweetWithAdditionalFields)
      } yield {
        TweetBuilderResult(
          tweet = retweetWithAdditionalFields,
          user = user,
          createdAt = createdAt,
          sourceTweet = Some(updatedSourceTweet),
          sourceUser = Some(sourceUser),
          parentUserId = Some(rtSource.parentUserId),
          isSilentFail = spamResult == Spam.SilentFail
        )
      }
    }
  }