override def execute()

in tweetypie/server/src/main/scala/com/twitter/tweetypie/federated/columns/CreateTweetColumn.scala [83:314]


  override def execute(request: Arg, opContext: OpContext): Stitch[Result] = {

    val ctx = getRequestContext(opContext)

    // First, do any request parameter validation that can result in an error
    // prior to calling into thriftTweetService.postTweet.
    val safetyLevel = ctx.safetyLevel.getOrElse(throw SafetyLevelMissingErr)

    val trackingId = request.engagementRequest match {
      case Some(engagementRequest: EngagementRequest) if ctx.hasPrivilegePromotedTweetsInTimeline =>
        TrackingId.parse(engagementRequest.impressionId, statsReceiver)
      case Some(e: EngagementRequest) =>
        throw ClientNotPrivilegedErr
      case None =>
        None
    }

    val deviceSource = ctx.deviceSource.getOrElse(throw GenericAccessDeniedErr)

    if (request.nullcast && !ctx.hasPrivilegeNullcastingAccess) {
      throw GenericAccessDeniedErr
    }

    val safetyMetadata = SafetyMetadataUtils.makeSafetyMetaData(
      sessionHash = ctx.sessionHash,
      knownDeviceToken = ctx.knownDeviceToken,
      contributorId = ctx.contributorId
    )

    val cardReference: Option[thrift.CardReference] =
      request.cardUri.filter(_.nonEmpty).map(thrift.CardReference(_))

    val escherbirdEntityAnnotations: Option[thrift.EscherbirdEntityAnnotations] =
      request.semanticAnnotationIds
        .filter(_.nonEmpty)
        .map((seq: Seq[gql.TweetAnnotation]) => seq.map(parseTweetEntityAnnotation))
        .map(thrift.EscherbirdEntityAnnotations(_))

    val mediaEntities = request.media.map(_.mediaEntities)
    val mediaUploadIds = mediaEntities.map(_.map(_.mediaId)).filter(_.nonEmpty)

    val mediaTags: Option[thrift.TweetMediaTags] = {
      val mediaTagsAuthorized = !ctx.isContributorRequest

      val tagMap: Map[MediaId, Seq[thrift.MediaTag]] =
        mediaEntities
          .getOrElse(Nil)
          .filter(_ => mediaTagsAuthorized)
          .filter(_.taggedUsers.nonEmpty)
          .map(mediaEntity =>
            mediaEntity.mediaId ->
              mediaEntity.taggedUsers
                .map(user_id => thrift.MediaTag(thrift.MediaTagType.User, Some(user_id))))
          .toMap

      Option(tagMap)
        .filter(_.nonEmpty)
        .map(thrift.TweetMediaTags(_))
    }

    // Can not have both conversation controls and communities defined for a tweet
    // as they have conflicting permissions on who can reply to the tweet.
    val communities = parseCommunityIds(escherbirdEntityAnnotations)
    if (request.conversationControl.isDefined && communities.nonEmpty) {
      throw CannotConvoControlAndCommunitiesErr
    }

    // Currently we do not support posting to multiple communities.
    if (communities.length > 1) {
      throw TooManyCommunitiesErr
    }

    // Kill switch for community tweets in case we need to disable them for app security.
    if (communities.nonEmpty && !enableCommunityTweetCreatesDecider()) {
      throw CommunityUserNotAuthorizedErr
    }

    // additionalFields is used to marshal multiple input params and
    // should only be defined if one or more of those params are defined.
    val additionalFields: Option[Tweet] =
      cardReference
        .orElse(escherbirdEntityAnnotations)
        .orElse(mediaTags)
        .map(_ =>
          thrift.Tweet(
            0L,
            cardReference = cardReference,
            escherbirdEntityAnnotations = escherbirdEntityAnnotations,
            mediaTags = mediaTags
          ))

    val transientContext: Option[TransientCreateContext] =
      parseTransientContext(
        request.batchCompose,
        request.periscope,
        ctx.twitterUserId,
      )

    // PostTweetRequest.additionalContext is marked as deprecated in favor of .transientContext,
    // but the REST API still supports it and it is still passed along through Tweetypie, and
    // FanoutService and Notifications still depend on it.
    val additionalContext: Option[Map[TweetCreateContextKey, String]] =
      transientContext.map(TransientContextUtil.toAdditionalContext)

    val thriftPostTweetRequest = thrift.PostTweetRequest(
      userId = ctx.twitterUserId,
      text = request.tweetText,
      createdVia = deviceSource,
      inReplyToTweetId = request.reply.map(_.inReplyToTweetId),
      geo = request.geo.flatMap(parseTweetCreateGeo),
      autoPopulateReplyMetadata = request.reply.isDefined,
      excludeReplyUserIds = request.reply.map(_.excludeReplyUserIds).filter(_.nonEmpty),
      nullcast = request.nullcast,
      // Send a dark request to Tweetypie if the dark_request directive is set or
      // if the Tweet is undo-able.
      dark = ctx.isDarkRequest || request.undoOptions.exists(_.isUndo),
      hydrationOptions = Some(HydrationOptions.writePathHydrationOptions(ctx.cardsPlatformKey)),
      remoteHost = ctx.remoteHost,
      safetyMetaData = Some(safetyMetadata),
      attachmentUrl = request.attachmentUrl,
      mediaUploadIds = mediaUploadIds,
      mediaMetadata = None,
      transientContext = transientContext,
      additionalContext = additionalContext,
      conversationControl = request.conversationControl.map(parseTweetCreateConversationControl),
      exclusiveTweetControlOptions = request.exclusiveTweetControlOptions.map { _ =>
        thrift.ExclusiveTweetControlOptions()
      },
      trustedFriendsControlOptions =
        request.trustedFriendsControlOptions.map(parseTrustedFriendsControlOptions),
      editOptions = request.editOptions.flatMap(_.previousTweetId.map(thrift.EditOptions(_))),
      collabControlOptions = request.collabControlOptions.map(parseCollabControlOptions),
      additionalFields = additionalFields,
      trackingId = trackingId,
      noteTweetOptions = request.noteTweetOptions.map(options =>
        thrift.NoteTweetOptions(
          options.noteTweetId,
          options.mentionedScreenNames,
          options.mentionedUserIds,
          options.isExpandable))
    )

    val stitchPostTweet =
      Stitch.callFuture {
        TweetyPieDeciderOverrides.ConversationControlUseFeatureSwitchResults.On {
          postTweet(thriftPostTweetRequest)
        }
      }

    for {
      engagement <- request.engagementRequest
      if !request.reply.exists(_.inReplyToTweetId == 0) // no op per go/rb/845242
      engagementType = if (request.reply.isDefined) ReplyEngagement else TweetEngagement
    } logTweetPromotedContent(engagement, engagementType, ctx.isDarkRequest)

    stitchPostTweet.flatMap { result: thrift.PostTweetResult =>
      result.state match {

        case thrift.TweetCreateState.Ok =>
          val unmentionSuccessCounter = statsReceiver.counter("unmention_info_success")
          val unmentionFailuresCounter = statsReceiver.counter("unmention_info_failures")
          val unmentionFailuresScope = statsReceiver.scope("unmention_info_failures")

          val unmentionInfoStitch = result.tweet match {
            case Some(tweet) =>
              unmentionInfoRepository(tweet)
                .onFailure { t =>
                  unmentionFailuresCounter.incr()
                  unmentionFailuresScope.counter(Throwables.mkString(t): _*).incr()
                }
                .onSuccess { _ =>
                  unmentionSuccessCounter.incr()
                }
                .rescue {
                  case _ =>
                    Stitch.None
                }
            case _ =>
              Stitch.None
          }

          val vibeSuccessCounter = statsReceiver.counter("vibe_success")
          val vibeFailuresCounter = statsReceiver.counter("vibe_failures")
          val vibeFailuresScope = statsReceiver.scope("vibe_failures")

          val vibeStitch = result.tweet match {
            case Some(tweet) =>
              vibeRepository(tweet)
                .onSuccess { _ =>
                  vibeSuccessCounter.incr()
                }
                .onFailure { t =>
                  vibeFailuresCounter.incr()
                  vibeFailuresScope.counter(Throwables.mkString(t): _*).incr()
                }
                .rescue {
                  case _ =>
                    Stitch.None
                }
            case _ =>
              Stitch.None
          }

          Stitch
            .join(unmentionInfoStitch, vibeStitch)
            .liftToOption()
            .flatMap { prefetchFields =>
              val r = PrefetchedDataRequest(
                tweet = result.tweet.get,
                sourceTweet = result.sourceTweet,
                quotedTweet = result.quotedTweet,
                safetyLevel = safetyLevel,
                unmentionInfo = prefetchFields.flatMap(params => params._1),
                vibe = prefetchFields.flatMap(params => params._2),
                requestContext = getWeaverbirdCtx()
              )

              prefetchedDataRepository(r)
                .liftToOption()
                .map((prefetchedData: Option[PrefetchedDataResponse]) => {
                  gql.CreateTweetResponseWithSubqueryPrefetchItems(
                    data = Some(gql.CreateTweetResponse(result.tweet.map(_.id))),
                    subqueryPrefetchItems = prefetchedData.map(_.value)
                  )
                })
            }

        case errState =>
          throw toCreateTweetErr(errState, result.bounce, result.failureReason)
      }
    }
  }