in tweetypie/server/src/main/scala/com/twitter/tweetypie/federated/columns/CreateTweetColumn.scala [83:314]
override def execute(request: Arg, opContext: OpContext): Stitch[Result] = {
val ctx = getRequestContext(opContext)
// First, do any request parameter validation that can result in an error
// prior to calling into thriftTweetService.postTweet.
val safetyLevel = ctx.safetyLevel.getOrElse(throw SafetyLevelMissingErr)
val trackingId = request.engagementRequest match {
case Some(engagementRequest: EngagementRequest) if ctx.hasPrivilegePromotedTweetsInTimeline =>
TrackingId.parse(engagementRequest.impressionId, statsReceiver)
case Some(e: EngagementRequest) =>
throw ClientNotPrivilegedErr
case None =>
None
}
val deviceSource = ctx.deviceSource.getOrElse(throw GenericAccessDeniedErr)
if (request.nullcast && !ctx.hasPrivilegeNullcastingAccess) {
throw GenericAccessDeniedErr
}
val safetyMetadata = SafetyMetadataUtils.makeSafetyMetaData(
sessionHash = ctx.sessionHash,
knownDeviceToken = ctx.knownDeviceToken,
contributorId = ctx.contributorId
)
val cardReference: Option[thrift.CardReference] =
request.cardUri.filter(_.nonEmpty).map(thrift.CardReference(_))
val escherbirdEntityAnnotations: Option[thrift.EscherbirdEntityAnnotations] =
request.semanticAnnotationIds
.filter(_.nonEmpty)
.map((seq: Seq[gql.TweetAnnotation]) => seq.map(parseTweetEntityAnnotation))
.map(thrift.EscherbirdEntityAnnotations(_))
val mediaEntities = request.media.map(_.mediaEntities)
val mediaUploadIds = mediaEntities.map(_.map(_.mediaId)).filter(_.nonEmpty)
val mediaTags: Option[thrift.TweetMediaTags] = {
val mediaTagsAuthorized = !ctx.isContributorRequest
val tagMap: Map[MediaId, Seq[thrift.MediaTag]] =
mediaEntities
.getOrElse(Nil)
.filter(_ => mediaTagsAuthorized)
.filter(_.taggedUsers.nonEmpty)
.map(mediaEntity =>
mediaEntity.mediaId ->
mediaEntity.taggedUsers
.map(user_id => thrift.MediaTag(thrift.MediaTagType.User, Some(user_id))))
.toMap
Option(tagMap)
.filter(_.nonEmpty)
.map(thrift.TweetMediaTags(_))
}
// Can not have both conversation controls and communities defined for a tweet
// as they have conflicting permissions on who can reply to the tweet.
val communities = parseCommunityIds(escherbirdEntityAnnotations)
if (request.conversationControl.isDefined && communities.nonEmpty) {
throw CannotConvoControlAndCommunitiesErr
}
// Currently we do not support posting to multiple communities.
if (communities.length > 1) {
throw TooManyCommunitiesErr
}
// Kill switch for community tweets in case we need to disable them for app security.
if (communities.nonEmpty && !enableCommunityTweetCreatesDecider()) {
throw CommunityUserNotAuthorizedErr
}
// additionalFields is used to marshal multiple input params and
// should only be defined if one or more of those params are defined.
val additionalFields: Option[Tweet] =
cardReference
.orElse(escherbirdEntityAnnotations)
.orElse(mediaTags)
.map(_ =>
thrift.Tweet(
0L,
cardReference = cardReference,
escherbirdEntityAnnotations = escherbirdEntityAnnotations,
mediaTags = mediaTags
))
val transientContext: Option[TransientCreateContext] =
parseTransientContext(
request.batchCompose,
request.periscope,
ctx.twitterUserId,
)
// PostTweetRequest.additionalContext is marked as deprecated in favor of .transientContext,
// but the REST API still supports it and it is still passed along through Tweetypie, and
// FanoutService and Notifications still depend on it.
val additionalContext: Option[Map[TweetCreateContextKey, String]] =
transientContext.map(TransientContextUtil.toAdditionalContext)
val thriftPostTweetRequest = thrift.PostTweetRequest(
userId = ctx.twitterUserId,
text = request.tweetText,
createdVia = deviceSource,
inReplyToTweetId = request.reply.map(_.inReplyToTweetId),
geo = request.geo.flatMap(parseTweetCreateGeo),
autoPopulateReplyMetadata = request.reply.isDefined,
excludeReplyUserIds = request.reply.map(_.excludeReplyUserIds).filter(_.nonEmpty),
nullcast = request.nullcast,
// Send a dark request to Tweetypie if the dark_request directive is set or
// if the Tweet is undo-able.
dark = ctx.isDarkRequest || request.undoOptions.exists(_.isUndo),
hydrationOptions = Some(HydrationOptions.writePathHydrationOptions(ctx.cardsPlatformKey)),
remoteHost = ctx.remoteHost,
safetyMetaData = Some(safetyMetadata),
attachmentUrl = request.attachmentUrl,
mediaUploadIds = mediaUploadIds,
mediaMetadata = None,
transientContext = transientContext,
additionalContext = additionalContext,
conversationControl = request.conversationControl.map(parseTweetCreateConversationControl),
exclusiveTweetControlOptions = request.exclusiveTweetControlOptions.map { _ =>
thrift.ExclusiveTweetControlOptions()
},
trustedFriendsControlOptions =
request.trustedFriendsControlOptions.map(parseTrustedFriendsControlOptions),
editOptions = request.editOptions.flatMap(_.previousTweetId.map(thrift.EditOptions(_))),
collabControlOptions = request.collabControlOptions.map(parseCollabControlOptions),
additionalFields = additionalFields,
trackingId = trackingId,
noteTweetOptions = request.noteTweetOptions.map(options =>
thrift.NoteTweetOptions(
options.noteTweetId,
options.mentionedScreenNames,
options.mentionedUserIds,
options.isExpandable))
)
val stitchPostTweet =
Stitch.callFuture {
TweetyPieDeciderOverrides.ConversationControlUseFeatureSwitchResults.On {
postTweet(thriftPostTweetRequest)
}
}
for {
engagement <- request.engagementRequest
if !request.reply.exists(_.inReplyToTweetId == 0) // no op per go/rb/845242
engagementType = if (request.reply.isDefined) ReplyEngagement else TweetEngagement
} logTweetPromotedContent(engagement, engagementType, ctx.isDarkRequest)
stitchPostTweet.flatMap { result: thrift.PostTweetResult =>
result.state match {
case thrift.TweetCreateState.Ok =>
val unmentionSuccessCounter = statsReceiver.counter("unmention_info_success")
val unmentionFailuresCounter = statsReceiver.counter("unmention_info_failures")
val unmentionFailuresScope = statsReceiver.scope("unmention_info_failures")
val unmentionInfoStitch = result.tweet match {
case Some(tweet) =>
unmentionInfoRepository(tweet)
.onFailure { t =>
unmentionFailuresCounter.incr()
unmentionFailuresScope.counter(Throwables.mkString(t): _*).incr()
}
.onSuccess { _ =>
unmentionSuccessCounter.incr()
}
.rescue {
case _ =>
Stitch.None
}
case _ =>
Stitch.None
}
val vibeSuccessCounter = statsReceiver.counter("vibe_success")
val vibeFailuresCounter = statsReceiver.counter("vibe_failures")
val vibeFailuresScope = statsReceiver.scope("vibe_failures")
val vibeStitch = result.tweet match {
case Some(tweet) =>
vibeRepository(tweet)
.onSuccess { _ =>
vibeSuccessCounter.incr()
}
.onFailure { t =>
vibeFailuresCounter.incr()
vibeFailuresScope.counter(Throwables.mkString(t): _*).incr()
}
.rescue {
case _ =>
Stitch.None
}
case _ =>
Stitch.None
}
Stitch
.join(unmentionInfoStitch, vibeStitch)
.liftToOption()
.flatMap { prefetchFields =>
val r = PrefetchedDataRequest(
tweet = result.tweet.get,
sourceTweet = result.sourceTweet,
quotedTweet = result.quotedTweet,
safetyLevel = safetyLevel,
unmentionInfo = prefetchFields.flatMap(params => params._1),
vibe = prefetchFields.flatMap(params => params._2),
requestContext = getWeaverbirdCtx()
)
prefetchedDataRepository(r)
.liftToOption()
.map((prefetchedData: Option[PrefetchedDataResponse]) => {
gql.CreateTweetResponseWithSubqueryPrefetchItems(
data = Some(gql.CreateTweetResponse(result.tweet.map(_.id))),
subqueryPrefetchItems = prefetchedData.map(_.value)
)
})
}
case errState =>
throw toCreateTweetErr(errState, result.bounce, result.failureReason)
}
}
}