override protected def configurePipeline()

in src/scala/com/twitter/interaction_graph/scio/agg_all/InteractionGraphAggregationJob.scala [89:313]


  override protected def configurePipeline(
    scioContext: ScioContext,
    pipelineOptions: InteractionGraphAggregationOption
  ): Unit = {
    @transient
    implicit lazy val sc: ScioContext = scioContext
    implicit lazy val dateInterval: Interval = pipelineOptions.interval
    val yesterday = DateUtil.subtract(dateInterval, Duration.fromDays(1))

    val dalEnvironment: String = pipelineOptions
      .as(classOf[ServiceIdentifierOptions])
      .getEnvironment()
    val dalWriteEnvironment = if (pipelineOptions.getDALWriteEnvironment != null) {
      pipelineOptions.getDALWriteEnvironment
    } else {
      dalEnvironment
    }
    val dateStr: String = pipelineOptions.getDate().value.getStart.toString("yyyy-MM-dd")
    logger.info(s"dateStr $dateStr")
    val project: String = "twttr-recos-ml-prod"
    val datasetName: String = "realgraph"
    val bqTableName: String = "scores"
    val fullBqTableName: String = s"$project:$datasetName.$bqTableName"

    val scoreExport: SCollection[ScoredEdge] =
      sc.customInput(
        s"Read from BQ table $fullBqTableName",
        BigQueryIO
          .read(parseRow)
          .fromQuery(s"""SELECT source_id, destination_id, prob_explicit, followed
               |FROM `$project.$datasetName.$bqTableName`
               |WHERE ds = '$dateStr'""".stripMargin)
          .usingStandardSql()
          .withMethod(TypedRead.Method.DEFAULT)
      )

    val source = InteractionGraphAggregationSource(pipelineOptions)

    val (addressEdgeFeatures, addressVertexFeatures) = source.readAddressBookFeatures()

    val (clientEventLogsEdgeFeatures, clientEventLogsVertexFeatures) =
      source.readClientEventLogsFeatures(dateInterval)

    val (flockEdgeFeatures, flockVertexFeatures) = source.readFlockFeatures()

    val (directInteractionsEdgeFeatures, directInteractionsVertexFeatures) =
      source.readDirectInteractionsFeatures(dateInterval)

    val invalidUsers = UserUtil.getInvalidUsers(source.readFlatUsers())

    val (prevAggEdge, prevAggVertex) = source.readAggregatedFeatures(yesterday)

    val prevAggregatedVertex: SCollection[Vertex] =
      UserUtil
        .filterUsersByIdMapping[Vertex](
          prevAggVertex,
          invalidUsers,
          v => v.userId
        )

    /** Remove status-based features (flock/ab) from current graph, because we only need the latest
     *  This is to allow us to filter and roll-up a smaller dataset, to which we will still add
     *  back the status-based features for the complete scoredAggregates (that other teams will read).
     */
    val prevAggEdgeFiltered = prevAggEdge
      .filter { e =>
        e.sourceId != e.destinationId
      }
      .withName("filtering status-based edges")
      .flatMap(FeatureGeneratorUtil.removeStatusFeatures)
    val prevAggEdgeValid: SCollection[Edge] =
      UserUtil
        .filterUsersByMultipleIdMappings[Edge](
          prevAggEdgeFiltered,
          invalidUsers,
          Seq(e => e.sourceId, e => e.destinationId)
        )

    val aggregatedActivityVertexDaily = UserUtil
      .filterUsersByIdMapping[Vertex](
        FeatureGeneratorUtil
          .combineVertexFeatures(
            clientEventLogsVertexFeatures ++
              directInteractionsVertexFeatures ++
              addressVertexFeatures ++
              flockVertexFeatures
          ),
        invalidUsers,
        v => v.userId
      )

    // we split up the roll-up of decayed counts between status vs activity/count-based features
    val aggregatedActivityEdgeDaily = FeatureGeneratorUtil
      .combineEdgeFeatures(clientEventLogsEdgeFeatures ++ directInteractionsEdgeFeatures)

    // Vertex level, Add the decay sum for history and daily
    val aggregatedActivityVertex = FeatureGeneratorUtil
      .combineVertexFeaturesWithDecay(
        prevAggregatedVertex,
        aggregatedActivityVertexDaily,
        InteractionGraphScoringConfig.ONE_MINUS_ALPHA,
        InteractionGraphScoringConfig.ALPHA
      )

    // Edge level, Add the decay sum for history and daily
    val aggregatedActivityEdge = FeatureGeneratorUtil
      .combineEdgeFeaturesWithDecay(
        prevAggEdgeValid,
        aggregatedActivityEdgeDaily,
        InteractionGraphScoringConfig.ONE_MINUS_ALPHA,
        InteractionGraphScoringConfig.ALPHA
      )
      .filter(FeatureGeneratorUtil.edgeWithFeatureOtherThanDwellTime)
      .withName("removing edges that only have dwell time features")

    val edgeKeyedScores = scoreExport.keyBy { e => (e.sourceId, e.destinationId) }

    val scoredAggregatedActivityEdge = aggregatedActivityEdge
      .keyBy { e => (e.sourceId, e.destinationId) }
      .withName("join with scores")
      .leftOuterJoin(edgeKeyedScores)
      .map {
        case (_, (e, scoredEdgeOpt)) =>
          val scoreOpt = scoredEdgeOpt.map(_.score)
          e.copy(weight = if (scoreOpt.nonEmpty) {
            ScioMetrics.counter("after joining edge with scores", "has score").inc()
            scoreOpt
          } else {
            ScioMetrics.counter("after joining edge with scores", "no score").inc()
            None
          })
      }

    val combinedFeatures = FeatureGeneratorUtil
      .combineEdgeFeatures(aggregatedActivityEdge ++ addressEdgeFeatures ++ flockEdgeFeatures)
      .keyBy { e => (e.sourceId, e.destinationId) }

    val aggregatedActivityScoredEdge =
      edgeKeyedScores
        .withName("join with combined edge features")
        .leftOuterJoin(combinedFeatures)
        .map {
          case (_, (scoredEdge, combinedFeaturesOpt)) =>
            if (combinedFeaturesOpt.exists(_.features.nonEmpty)) {
              ScioMetrics.counter("after joining scored edge with features", "has features").inc()
              Edge(
                sourceId = scoredEdge.sourceId,
                destinationId = scoredEdge.destinationId,
                weight = Some(scoredEdge.score),
                features = combinedFeaturesOpt.map(_.features).getOrElse(Nil)
              )
            } else {
              ScioMetrics.counter("after joining scored edge with features", "no features").inc()
              Edge(
                sourceId = scoredEdge.sourceId,
                destinationId = scoredEdge.destinationId,
                weight = Some(scoredEdge.score),
                features = Nil
              )
            }
        }

    val realGraphFeatures =
      getTopKTimelineFeatures(aggregatedActivityScoredEdge, pipelineOptions.getMaxDestinationIds)

    aggregatedActivityVertex.saveAsCustomOutput(
      "Write History Aggregated Vertex Records",
      DAL.writeSnapshot[Vertex](
        dataset = InteractionGraphHistoryAggregatedVertexSnapshotScalaDataset,
        pathLayout = PathLayout.DailyPath(pipelineOptions.getOutputPath + "/aggregated_vertex"),
        endDate = Instant.ofEpochMilli(dateInterval.getEndMillis),
        diskFormat = DiskFormat.Parquet,
        environmentOverride = Environment.valueOf(dalWriteEnvironment),
        writeOption = WriteOptions(numOfShards = Some(pipelineOptions.getNumberOfShards / 10))
      )
    )

    scoredAggregatedActivityEdge.saveAsCustomOutput(
      "Write History Aggregated Edge Records",
      DAL.writeSnapshot[Edge](
        dataset = InteractionGraphHistoryAggregatedEdgeSnapshotScalaDataset,
        pathLayout = PathLayout.DailyPath(pipelineOptions.getOutputPath + "/aggregated_raw_edge"),
        endDate = Instant.ofEpochMilli(dateInterval.getEndMillis),
        diskFormat = DiskFormat.Parquet,
        environmentOverride = Environment.valueOf(dalWriteEnvironment),
        writeOption = WriteOptions(numOfShards = Some(pipelineOptions.getNumberOfShards))
      )
    )

    aggregatedActivityVertexDaily.saveAsCustomOutput(
      "Write Daily Aggregated Vertex Records",
      DAL.write[Vertex](
        dataset = InteractionGraphAggregatedVertexDailyScalaDataset,
        pathLayout =
          PathLayout.DailyPath(pipelineOptions.getOutputPath + "/aggregated_vertex_daily"),
        interval = dateInterval,
        diskFormat = DiskFormat.Parquet,
        environmentOverride = Environment.valueOf(dalWriteEnvironment),
        writeOption = WriteOptions(numOfShards = Some(pipelineOptions.getNumberOfShards / 10))
      )
    )

    aggregatedActivityEdgeDaily.saveAsCustomOutput(
      "Write Daily Aggregated Edge Records",
      DAL.write[Edge](
        dataset = InteractionGraphAggregatedEdgeDailyScalaDataset,
        pathLayout = PathLayout.DailyPath(pipelineOptions.getOutputPath + "/aggregated_edge_daily"),
        interval = dateInterval,
        diskFormat = DiskFormat.Parquet,
        environmentOverride = Environment.valueOf(dalWriteEnvironment),
        writeOption = WriteOptions(numOfShards = Some(pipelineOptions.getNumberOfShards))
      )
    )

    realGraphFeatures.saveAsCustomOutput(
      "Write Timeline Real Graph Features",
      DAL.writeVersionedKeyVal[KeyVal[Long, UserSession]](
        dataset = RealGraphFeaturesScalaDataset,
        pathLayout =
          PathLayout.VersionedPath(pipelineOptions.getOutputPath + "/real_graph_features"),
        environmentOverride = Environment.valueOf(dalWriteEnvironment),
        writeOption = WriteOptions(numOfShards = Some(pipelineOptions.getNumberOfShards))
      )
    )
  }