in src/scala/com/twitter/interaction_graph/scio/common/ConversionUtil.scala [66:109]
def toRealGraphEdgeFeatures(
filterFn: RealGraphEdgeFeatures => Boolean
)(
e: Edge
): Option[RealGraphEdgeFeatures] = {
val baseFeature = RealGraphEdgeFeatures(destId = e.destinationId)
val aggregatedFeature = e.features.foldLeft(baseFeature) {
case (aggregatedFeature, edgeFeature) =>
val f = Some(toRealGraphEdgeFeatureV1(edgeFeature.tss))
ScioMetrics.counter("toRealGraphEdgeFeatures", edgeFeature.name.name).inc()
edgeFeature.name match {
case FeatureName.NumRetweets => aggregatedFeature.copy(retweetsFeature = f)
case FeatureName.NumFavorites => aggregatedFeature.copy(favsFeature = f)
case FeatureName.NumMentions => aggregatedFeature.copy(mentionsFeature = f)
case FeatureName.NumTweetClicks => aggregatedFeature.copy(tweetClicksFeature = f)
case FeatureName.NumLinkClicks => aggregatedFeature.copy(linkClicksFeature = f)
case FeatureName.NumProfileViews => aggregatedFeature.copy(profileViewsFeature = f)
case FeatureName.TotalDwellTime => aggregatedFeature.copy(dwellTimeFeature = f)
case FeatureName.NumInspectedStatuses =>
aggregatedFeature.copy(inspectedStatusesFeature = f)
case FeatureName.NumPhotoTags => aggregatedFeature.copy(photoTagsFeature = f)
case FeatureName.NumFollows => aggregatedFeature.copy(followFeature = f)
case FeatureName.NumMutualFollows => aggregatedFeature.copy(mutualFollowFeature = f)
case FeatureName.AddressBookEmail => aggregatedFeature.copy(addressBookEmailFeature = f)
case FeatureName.AddressBookPhone => aggregatedFeature.copy(addressBookPhoneFeature = f)
case FeatureName.AddressBookInBoth => aggregatedFeature.copy(addressBookInBothFeature = f)
case FeatureName.AddressBookMutualEdgeEmail =>
aggregatedFeature.copy(addressBookMutualEdgeEmailFeature = f)
case FeatureName.AddressBookMutualEdgePhone =>
aggregatedFeature.copy(addressBookMutualEdgePhoneFeature = f)
case FeatureName.AddressBookMutualEdgeInBoth =>
aggregatedFeature.copy(addressBookMutualEdgeInBothFeature = f)
case FeatureName.NumTweetQuotes => aggregatedFeature.copy(numTweetQuotes = f)
case FeatureName.NumBlocks => aggregatedFeature.copy(numBlocks = f)
case FeatureName.NumMutes => aggregatedFeature.copy(numMutes = f)
case FeatureName.NumReportAsSpams => aggregatedFeature.copy(numReportAsSpams = f)
case FeatureName.NumReportAsAbuses => aggregatedFeature.copy(numReportAsAbuses = f)
case _ => aggregatedFeature
}
}
if (filterFn(aggregatedFeature))
Some(aggregatedFeature.copy(weight = e.weight.orElse(Some(0.0))))
else None
}