def apply()

in tweetypie/server/src/main/scala/com/twitter/tweetypie/handler/TweetBuilder.scala [696:1179]


  def apply(
    stats: StatsReceiver,
    validateRequest: PostTweetRequest => Future[Unit],
    validateEdit: EditValidator.Type,
    validateUser: User => Future[Unit] = TweetBuilder.validateUser,
    validateUpdateRateLimit: RateLimitChecker.Validate,
    tweetIdGenerator: TweetIdGenerator,
    userRepo: UserRepository.Type,
    deviceSourceRepo: DeviceSourceRepository.Type,
    communityMembershipRepo: StratoCommunityMembershipRepository.Type,
    communityAccessRepo: StratoCommunityAccessRepository.Type,
    urlShortener: UrlShortener.Type,
    urlEntityBuilder: UrlEntityBuilder.Type,
    geoBuilder: GeoBuilder.Type,
    replyBuilder: ReplyBuilder.Type,
    mediaBuilder: MediaBuilder.Type,
    attachmentBuilder: AttachmentBuilder.Type,
    duplicateTweetFinder: DuplicateTweetFinder.Type,
    spamChecker: Spam.Checker[TweetSpamRequest],
    filterInvalidData: (Tweet, PostTweetRequest, UrlShortener.Context) => Future[Tweet],
    updateUserCounts: (User, Tweet) => Future[User],
    validateConversationControl: ConversationControlBuilder.Validate.Type,
    conversationControlBuilder: ConversationControlBuilder.Type,
    validateTweetWrite: TweetWriteValidator.Type,
    nudgeBuilder: NudgeBuilder.Type,
    communitiesValidator: CommunitiesValidator.Type,
    collabControlBuilder: CollabControlBuilder.Type,
    editControlBuilder: EditControlBuilder.Type,
    featureSwitches: FeatureSwitches
  ): TweetBuilder.Type = {
    val entityExtractor = EntityExtractor.mutationWithoutUrls.endo
    val getUser = userLookup(userRepo)
    val getDeviceSource = deviceSourceLookup(deviceSourceRepo)

    // create a tco of the permalink for given a tweetId
    val permalinkShortener = (tweetId: TweetId, ctx: UrlShortener.Context) =>
      urlShortener((s"https://twitter.com/i/web/status/$tweetId", ctx)).rescue {
        // propagate OverCapacity
        case e: OverCapacity => Future.exception(e)
        // convert any other failure into UrlShorteningFailure
        case e => Future.exception(UrlShorteningFailure(e))
      }

    def extractGeoSearchRequestId(tweetGeoOpt: Option[TweetCreateGeo]): Option[GeoSearchRequestId] =
      for {
        tweetGeo <- tweetGeoOpt
        geoSearchRequestId <- tweetGeo.geoSearchRequestId
      } yield GeoSearchRequestId(geoSearchRequestId.id)

    def featureSwitchResults(user: User, stats: StatsReceiver): Option[FeatureSwitchResults] =
      TwitterContext()
        .flatMap { viewer =>
          UserViewerRecipient(user, viewer, stats)
        }.map { recipient =>
          featureSwitches.matchRecipient(recipient)
        }

    FutureArrow { request =>
      for {
        () <- validateRequest(request)

        (tweetId, user, devsrc) <- Future.join(
          tweetIdGenerator().rescue { case t => Future.exception(SnowflakeFailure(t)) },
          Stitch.run(getUser(request.userId)),
          Stitch.run(getDeviceSource(request.createdVia))
        )

        () <- validateUser(user)
        () <- validateUpdateRateLimit((user.id, request.dark))

        // Feature Switch results are calculated once and shared between multiple builders
        matchedResults = featureSwitchResults(user, stats)

        () <- validateConversationControl(
          ConversationControlBuilder.Validate.Request(
            matchedResults = matchedResults,
            conversationControl = request.conversationControl,
            inReplyToTweetId = request.inReplyToTweetId
          )
        )

        // strip illegal chars, normalize newlines, collapse blank lines, etc.
        text = preprocessText(request.text)

        () <- prevalidateTextLength(text, stats)

        attachmentResult <- attachmentBuilder(
          AttachmentBuilderRequest(
            tweetId = tweetId,
            user = user,
            mediaUploadIds = request.mediaUploadIds,
            cardReference = request.additionalFields.flatMap(_.cardReference),
            attachmentUrl = request.attachmentUrl,
            remoteHost = request.remoteHost,
            darkTraffic = request.dark,
            deviceSource = devsrc
          )
        )

        // updated text with appended attachment url, if any.
        text <- Future.value(
          attachmentResult.attachmentUrl match {
            case None => text
            case Some(url) => s"$text $url"
          }
        )

        spamResult <- spamChecker(
          TweetSpamRequest(
            tweetId = tweetId,
            userId = request.userId,
            text = text,
            mediaTags = request.additionalFields.flatMap(_.mediaTags),
            safetyMetaData = request.safetyMetaData,
            inReplyToTweetId = request.inReplyToTweetId,
            quotedTweetId = attachmentResult.quotedTweet.map(_.tweetId),
            quotedTweetUserId = attachmentResult.quotedTweet.map(_.userId)
          )
        )

        safety = user.safety.get
        createdAt = SnowflakeId(tweetId).time

        urlShortenerCtx = UrlShortener.Context(
          tweetId = tweetId,
          userId = user.id,
          createdAt = createdAt,
          userProtected = safety.isProtected,
          clientAppId = devsrc.clientAppId,
          remoteHost = request.remoteHost,
          dark = request.dark
        )

        replyRequest = ReplyBuilder.Request(
          authorId = request.userId,
          authorScreenName = user.profile.map(_.screenName).get,
          inReplyToTweetId = request.inReplyToTweetId,
          tweetText = text,
          prependImplicitMentions = request.autoPopulateReplyMetadata,
          enableTweetToNarrowcasting = request.enableTweetToNarrowcasting,
          excludeUserIds = request.excludeReplyUserIds.getOrElse(Nil),
          spamResult = spamResult,
          batchMode = request.transientContext.flatMap(_.batchCompose)
        )

        replyResult <- replyBuilder(replyRequest)
        replyOpt = replyResult.map(_.reply)

        replyConversationId <- replyResult match {
          case Some(r) if r.reply.inReplyToStatusId.nonEmpty =>
            r.conversationId match {
              case None =>
                // Throw this specific exception to make it easier to
                // count how often we hit this corner case.
                Future.exception(MissingConversationId(r.reply.inReplyToStatusId.get))
              case conversationIdOpt => Future.value(conversationIdOpt)
            }
          case _ => Future.value(None)
        }

        // Validate that the current user can reply to this conversation, based on
        // the conversation's ConversationControl.
        // Note: currently we only validate conversation controls access on replies,
        // therefore we use the conversationId from the inReplyToStatus.
        // Validate that the exclusive tweet control option is only used by allowed users.
        () <- validateTweetWrite(
          TweetWriteValidator.Request(
            replyConversationId,
            request.userId,
            request.exclusiveTweetControlOptions,
            replyResult.flatMap(_.exclusiveTweetControl),
            request.trustedFriendsControlOptions,
            replyResult.flatMap(_.trustedFriendsControl),
            attachmentResult.quotedTweet,
            replyResult.flatMap(_.reply.inReplyToStatusId),
            replyResult.flatMap(_.editControl),
            request.editOptions
          )
        )

        convoId = replyConversationId match {
          case Some(replyConvoId) => replyConvoId
          case None =>
            // This is a root tweet, so the tweet id is the conversation id.
            tweetId
        }

        () <- nudgeBuilder(
          NudgeBuilderRequest(
            text = text,
            inReplyToTweetId = replyOpt.flatMap(_.inReplyToStatusId),
            conversationId = if (convoId == tweetId) None else Some(convoId),
            hasQuotedTweet = attachmentResult.quotedTweet.nonEmpty,
            nudgeOptions = request.nudgeOptions,
            tweetId = Some(tweetId),
          )
        )

        // updated text with implicit reply mentions inserted, if any
        text <- Future.value(
          replyResult.map(_.tweetText).getOrElse(text)
        )

        // updated text with urls replaced with t.cos
        ((text, urlEntities), (geoCoords, placeIdOpt)) <- Future.join(
          urlEntityBuilder((text, urlShortenerCtx))
            .map {
              case (text, urlEntities) =>
                UrlEntityBuilder.updateTextAndUrls(text, urlEntities)(partialHtmlEncode)
            },
          if (request.geo.isEmpty)
            Future.value((None, None))
          else
            geoBuilder(
              GeoBuilder.Request(
                request.geo.get,
                user.account.map(_.geoEnabled).getOrElse(false),
                user.account.map(_.language).getOrElse("en")
              )
            ).map(r => (r.geoCoordinates, r.placeId))
        )

        // updated text with trailing media url
        MediaBuilder.Result(text, mediaEntities, mediaKeys) <-
          request.mediaUploadIds.getOrElse(Nil) match {
            case Nil => Future.value(MediaBuilder.Result(text, Nil, Nil))
            case ids =>
              mediaBuilder(
                MediaBuilder.Request(
                  mediaUploadIds = ids,
                  text = text,
                  tweetId = tweetId,
                  userId = user.id,
                  userScreenName = user.profile.get.screenName,
                  isProtected = user.safety.get.isProtected,
                  createdAt = createdAt,
                  dark = request.dark,
                  productMetadata = request.mediaMetadata.map(_.toMap)
                )
              )
          }

        () <- Future.when(!request.dark) {
          val reqInfo =
            DuplicateTweetFinder.RequestInfo.fromPostTweetRequest(request, text)

          duplicateTweetFinder(reqInfo).flatMap {
            case None => Future.Unit
            case Some(duplicateId) =>
              log.debug(s"timeline_duplicate_check_failed:$duplicateId")
              Future.exception(TweetCreateFailure.State(TweetCreateState.Duplicate))
          }
        }

        textVisibility = getTextVisibility(
          text = text,
          replyResult = replyResult,
          urlEntities = urlEntities,
          mediaEntities = mediaEntities,
          attachmentUrl = attachmentResult.attachmentUrl
        )

        () <- validateTextLength(
          text = text,
          visibleText = textVisibility.visibleText,
          replyResult = replyResult,
          stats = stats
        )

        communities =
          request.additionalFields
            .flatMap(CommunityAnnotation.additionalFieldsToCommunityIDs)
            .map(ids => Communities(communityIds = ids))

        rootExclusiveControls = request.exclusiveTweetControlOptions.map { _ =>
          ExclusiveTweetControl(request.userId)
        }

        () <- validateExclusiveTweetNotReplies(rootExclusiveControls, replyResult)
        () <- validateExclusiveTweetParams(rootExclusiveControls, communities)

        replyExclusiveControls = replyResult.flatMap(_.exclusiveTweetControl)

        // The userId is pulled off of the request rather than being supplied
        // via the ExclusiveTweetControlOptions because additional fields
        // can be set by clients to contain any value they want.
        // This could include userIds that don't match their actual userId.
        // Only one of replyResult or request.exclusiveTweetControlOptions will be defined.
        exclusiveTweetControl = replyExclusiveControls.orElse(rootExclusiveControls)

        rootTrustedFriendsControl = request.trustedFriendsControlOptions.map { options =>
          TrustedFriendsControl(options.trustedFriendsListId)
        }

        () <- validateTrustedFriendsNotReplies(rootTrustedFriendsControl, replyResult)
        () <- validateTrustedFriendsParams(
          rootTrustedFriendsControl,
          request.conversationControl,
          communities,
          exclusiveTweetControl
        )

        replyTrustedFriendsControl = replyResult.flatMap(_.trustedFriendsControl)

        trustedFriendsControl = replyTrustedFriendsControl.orElse(rootTrustedFriendsControl)

        collabControl <- collabControlBuilder(
          CollabControlBuilder.Request(
            collabControlOptions = request.collabControlOptions,
            replyResult = replyResult,
            communities = communities,
            trustedFriendsControl = trustedFriendsControl,
            conversationControl = request.conversationControl,
            exclusiveTweetControl = exclusiveTweetControl,
            userId = request.userId
          ))

        isCollabInvitation = collabControl.isDefined && (collabControl.get match {
          case CollabControl.CollabInvitation(_: CollabInvitation) => true
          case _ => false
        })

        coreData = TweetCoreData(
          userId = request.userId,
          text = text,
          createdAtSecs = createdAt.inSeconds,
          createdVia = devsrc.internalName,
          reply = replyOpt,
          hasTakedown = safety.hasTakedown,
          // We want to nullcast community tweets and CollabInvitations
          // This will disable tweet fanout to followers' home timelines,
          // and filter the tweets from appearing from the tweeter's profile
          // or search results for the tweeter's tweets.
          nullcast =
            request.nullcast || CommunityUtil.hasCommunity(communities) || isCollabInvitation,
          narrowcast = request.narrowcast,
          nsfwUser = request.possiblySensitive.getOrElse(safety.nsfwUser),
          nsfwAdmin = safety.nsfwAdmin,
          trackingId = request.trackingId,
          placeId = placeIdOpt,
          coordinates = geoCoords,
          conversationId = Some(convoId),
          // Set hasMedia to true if we know that there is media,
          // and leave it unknown if not, so that it will be
          // correctly set for pasted media.
          hasMedia = if (mediaEntities.nonEmpty) Some(true) else None
        )

        tweet = Tweet(
          id = tweetId,
          coreData = Some(coreData),
          urls = Some(urlEntities),
          media = Some(mediaEntities),
          mediaKeys = if (mediaKeys.nonEmpty) Some(mediaKeys) else None,
          contributor = getContributor(request.userId),
          visibleTextRange = textVisibility.visibleTextRange,
          selfThreadMetadata = replyResult.flatMap(_.selfThreadMetadata),
          directedAtUserMetadata = replyResult.map(_.directedAtMetadata),
          composerSource = request.composerSource,
          quotedTweet = attachmentResult.quotedTweet,
          exclusiveTweetControl = exclusiveTweetControl,
          trustedFriendsControl = trustedFriendsControl,
          collabControl = collabControl,
          noteTweet = request.noteTweetOptions.map(options =>
            NoteTweet(options.noteTweetId, options.isExpandable))
        )

        editControl <- editControlBuilder(
          EditControlBuilder.Request(
            postTweetRequest = request,
            tweet = tweet,
            matchedResults = matchedResults
          )
        )

        tweet <- Future.value(tweet.copy(editControl = editControl))

        tweet <- Future.value(entityExtractor(tweet))

        () <- validateEntities(tweet)

        tweet <- {
          val cctlRequest =
            ConversationControlBuilder.Request.fromTweet(
              tweet,
              request.conversationControl,
              request.noteTweetOptions.flatMap(_.mentionedUserIds))
          Stitch.run(conversationControlBuilder(cctlRequest)).map { conversationControl =>
            tweet.copy(conversationControl = conversationControl)
          }
        }

        tweet <- Future.value(
          setAdditionalFields(tweet, request.additionalFields)
        )
        () <- validateCommunityMembership(communityMembershipRepo, communityAccessRepo, communities)
        () <- validateCommunityReply(communities, replyResult)
        () <- communitiesValidator(
          CommunitiesValidator.Request(matchedResults, safety.isProtected, communities))

        tweet <- Future.value(tweet.copy(communities = communities))

        tweet <- Future.value(
          tweet.copy(underlyingCreativesContainerId = request.underlyingCreativesContainerId)
        )

        // For certain tweets we want to write a self-permalink which is used to generate modified
        // tweet text for legacy clients that contains a link. NOTE: this permalink is for
        // the tweet being created - we also create permalinks for related tweets further down
        // e.g. if this tweet is an edit, we might create a permalink for the initial tweet as well
        tweet <- {
          val isBeyond140 = textVisibility.isExtendedWithExtraChars(attachmentResult.extraChars)
          val isEditTweet = request.editOptions.isDefined
          val isMixedMedia = Media.isMixedMedia(mediaEntities)
          val isNoteTweet = request.noteTweetOptions.isDefined

          if (isBeyond140 || isEditTweet || isMixedMedia || isNoteTweet)
            permalinkShortener(tweetId, urlShortenerCtx)
              .map { selfPermalink =>
                tweet.copy(
                  selfPermalink = Some(selfPermalink),
                  extendedTweetMetadata = Some(ExtendedTweetMetadataBuilder(tweet, selfPermalink))
                )
              }
          else {
            Future.value(tweet)
          }
        }

        // When an edit tweet is created we have to update some information on the
        // initial tweet, this object stores info about those updates for use
        // in the tweet insert store.
        // We update the editControl for each edit tweet and for the first edit tweet
        // we update the self permalink.
        initialTweetUpdateRequest: Option[InitialTweetUpdateRequest] <- editControl match {
          case Some(EditControl.Edit(edit)) =>
            // Identifies the first edit of an initial tweet
            val isFirstEdit =
              request.editOptions.map(_.previousTweetId).contains(edit.initialTweetId)

            // A potential permalink for this tweet being created's initial tweet
            val selfPermalinkForInitial: Future[Option[ShortenedUrl]] =
              if (isFirstEdit) {
                // `tweet` is the first edit of an initial tweet, which means
                // we need to write a self permalink. We create it here in
                // TweetBuilder and pass it through to the tweet store to
                // be written to the initial tweet.
                permalinkShortener(edit.initialTweetId, urlShortenerCtx).map(Some(_))
              } else {
                Future.value(None)
              }

            selfPermalinkForInitial.map { link =>
              Some(
                InitialTweetUpdateRequest(
                  initialTweetId = edit.initialTweetId,
                  editTweetId = tweet.id,
                  selfPermalink = link
                ))
            }

          // This is not an edit this is the initial tweet - so there are no initial
          // tweet updates
          case _ => Future.value(None)
        }

        tweet <- filterInvalidData(tweet, request, urlShortenerCtx)

        () <- validateEdit(tweet, request.editOptions)

        user <- updateUserCounts(user, tweet)

      } yield {
        TweetBuilderResult(
          tweet,
          user,
          createdAt,
          isSilentFail = spamResult == Spam.SilentFail,
          geoSearchRequestId = extractGeoSearchRequestId(request.geo),
          initialTweetUpdateRequest = initialTweetUpdateRequest
        )
      }
    }
  }