override def configurePipeline()

in src/scala/com/twitter/simclusters_v2/scio/multi_type_graph/assemble_multi_type_graph/AssembleMultiTypeGraphScioBaseApp.scala [366:573]


  override def configurePipeline(sc: ScioContext, opts: DateRangeOptions): Unit = {
    // Define the implicit ScioContext to read datasets from ExternalDataSources
    implicit def scioContext: ScioContext = sc

    // DAL.Environment variable for WriteExecs
    val dalEnv = if (isAdhoc) DAL.Environment.Dev else DAL.Environment.Prod

    // Define date intervals
    val interval_7days =
      new Interval(opts.interval.getEnd.minusWeeks(1), opts.interval.getEnd.minusMillis(1))
    val interval_14days =
      new Interval(opts.interval.getEnd.minusWeeks(2), opts.interval.getEnd.minusMillis(1))

    /*
     * Dataset read operations
     */
    // Get list of valid UserIds - to filter out deactivated or suspended user accounts
    val validUsers = getValidUsers(ExternalDataSources.userSource(Duration.fromDays(7)))

    // ieSource tweet engagements data for tweet favs, replies, retweets - from last 14 days
    val tweetSource = ExternalDataSources.ieSourceTweetEngagementsSource(interval_14days)

    // Read TFlock datasets
    val flockFollowSource = ExternalDataSources.flockFollowSource(Duration.fromDays(7))
    val flockBlockSource = ExternalDataSources.flockBlockSource(Duration.fromDays(7))
    val flockReportAsAbuseSource =
      ExternalDataSources.flockReportAsAbuseSource(Duration.fromDays(7))
    val flockReportAsSpamSource =
      ExternalDataSources.flockReportAsSpamSource(Duration.fromDays(7))

    // user-user fav edges
    val userUserFavSource = ExternalDataSources.userUserFavSource(Duration.fromDays(14))
    val userUserFavEdges = getFavEdges(userUserFavSource, HalfLifeInDaysForFavScore)

    // user-user follow edges
    val userUserFollowEdges = filterInvalidUsers(flockFollowSource, validUsers)

    // user-user block edges
    val userUserBlockEdges = filterInvalidUsers(flockBlockSource, validUsers)

    // user-user abuse report edges
    val userUserAbuseReportEdges = filterInvalidUsers(flockReportAsAbuseSource, validUsers)

    // user-user spam report edges
    val userUserSpamReportEdges = filterInvalidUsers(flockReportAsSpamSource, validUsers)

    // user-signup country edges
    val userSignUpCountryEdges = ExternalDataSources
      .userCountrySource(Duration.fromDays(7))

    // user-consumed language edges
    val userConsumedLanguageEdges =
      ExternalDataSources.inferredUserConsumedLanguageSource(Duration.fromDays(7))

    // user-topic follow edges
    val topicUserFollowedByEdges =
      ExternalDataSources.topicFollowGraphSource(Duration.fromDays(7))

    // user-MRNotifOpenOrClick events from last 7 days
    val userMRNotifOpenOrClickEvents =
      ExternalDataSources.magicRecsNotficationOpenOrClickEventsSource(interval_7days)

    // user-searchQuery strings from last 7 days
    val userSearchQueryEdges =
      ExternalDataSources.adaptiveSearchScribeLogsSource(interval_7days)

    /*
     * Generate the full graph
     */
    val fullGraph =
      getUserTweetInteractionGraph(tweetSource) ++
        getUserFavGraph(userUserFavEdges) ++
        getUserFollowGraph(userUserFollowEdges) ++
        getUserBlockGraph(userUserBlockEdges) ++
        getUserAbuseReportGraph(userUserAbuseReportEdges) ++
        getUserSpamReportGraph(userUserSpamReportEdges) ++
        getUserSignUpCountryGraph(userSignUpCountryEdges) ++
        getUserConsumedLanguagesGraph(userConsumedLanguageEdges) ++
        getUserTopicFollowGraph(topicUserFollowedByEdges) ++
        getMagicRecsNotifOpenOrClickTweetsGraph(userMRNotifOpenOrClickEvents) ++
        getSearchGraph(userSearchQueryEdges)

    // Get Top K RightNodes
    val topKRightNodes: SCollection[(RightNodeType, Seq[(Noun, Double)])] =
      getTopKRightNounsWithFrequencies(
        fullGraph,
        TopKConfig,
        GlobalDefaultMinFrequencyOfRightNodeType)

    // key transformation - topK nouns, keyed by the RightNodeNounType
    val topKNounsKeyedByType: SCollection[(RightNodeTypeStruct, NounWithFrequencyList)] =
      topKRightNodes
        .map {
          case (rightNodeType, rightNounsWithScoresList) =>
            val nounsListWithFrequency: Seq[NounWithFrequency] = rightNounsWithScoresList
              .map {
                case (noun, aggregatedFrequency) =>
                  NounWithFrequency(noun, aggregatedFrequency)
              }
            (RightNodeTypeStruct(rightNodeType), NounWithFrequencyList(nounsListWithFrequency))
        }

    // Get Truncated graph based on the top K RightNodes
    val truncatedGraph: SCollection[(LeftNode, RightNodeWithEdgeWeight)] =
      getTruncatedGraph(fullGraph, topKRightNodes)

    // key transformations - truncated graph, keyed by LeftNode
    // Note: By wrapping and unwrapping with the LeftNode.UserId, we don't have to deal
    // with defining our own customer ordering for LeftNode type
    val truncatedGraphKeyedBySrc: SCollection[(LeftNode, RightNodeWithEdgeWeightList)] =
      truncatedGraph
        .collect {
          case (LeftNode.UserId(userId), rightNodeWithWeight) =>
            userId -> List(rightNodeWithWeight)
        }
        .sumByKey
        .map {
          case (userId, rightNodeWithWeightList) =>
            (LeftNode.UserId(userId), RightNodeWithEdgeWeightList(rightNodeWithWeightList))
        }

    // WriteExecs
    // Write TopK RightNodes to DAL - save all the top K nodes for the clustering step
    topKNounsKeyedByType
      .map {
        case (engagementType, rightList) =>
          KeyVal(engagementType, rightList)
      }
      .saveAsCustomOutput(
        name = "WriteTopKNouns",
        DAL.writeVersionedKeyVal(
          topKRightNounsKeyValDataset,
          PathLayout.VersionedPath(prefix =
            rootMHPath + topKRightNounsOutputDir),
          instant = Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
          environmentOverride = dalEnv,
        )
      )

    // Write TopK RightNodes to DAL - only take TopKRightNounsForMHDump RightNodes for MH dump
    topKNounsKeyedByType
      .map {
        case (engagementType, rightList) =>
          val rightListMH =
            NounWithFrequencyList(rightList.nounWithFrequencyList.take(TopKRightNounsForMHDump))
          KeyVal(engagementType, rightListMH)
      }
      .saveAsCustomOutput(
        name = "WriteTopKNounsToMHForDebugger",
        DAL.writeVersionedKeyVal(
          topKRightNounsMHKeyValDataset,
          PathLayout.VersionedPath(prefix =
            rootMHPath + topKRightNounsMHOutputDir),
          instant = Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
          environmentOverride = dalEnv,
        )
      )

    // Write truncated graph (MultiTypeGraphTopKForRightNodes) to DAL in KeyVal format
    truncatedGraphKeyedBySrc
      .map {
        case (leftNode, rightNodeWithWeightList) =>
          KeyVal(leftNode, rightNodeWithWeightList)
      }.saveAsCustomOutput(
        name = "WriteTruncatedMultiTypeGraph",
        DAL.writeVersionedKeyVal(
          truncatedMultiTypeGraphKeyValDataset,
          PathLayout.VersionedPath(prefix =
            rootMHPath + truncatedMultiTypeGraphMHOutputDir),
          instant = Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
          environmentOverride = dalEnv,
        )
      )

    // Write truncated graph (MultiTypeGraphTopKForRightNodes) to DAL in thrift format
    truncatedGraph
      .map {
        case (leftNode, rightNodeWithWeight) =>
          MultiTypeGraphEdge(leftNode, rightNodeWithWeight)
      }.saveAsCustomOutput(
        name = "WriteTruncatedMultiTypeGraphThrift",
        DAL.writeSnapshot(
          multiTypeGraphTopKForRightNodesSnapshotDataset,
          PathLayout.FixedPath(rootThriftPath + truncatedMultiTypeGraphThriftOutputDir),
          Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
          DiskFormat.Thrift(),
          environmentOverride = dalEnv
        )
      )

    // Write full graph to DAL
    fullGraph
      .map {
        case (leftNode, rightNodeWithWeight) =>
          MultiTypeGraphEdge(leftNode, rightNodeWithWeight)
      }
      .saveAsCustomOutput(
        name = "WriteFullMultiTypeGraph",
        DAL.writeSnapshot(
          fullMultiTypeGraphSnapshotDataset,
          PathLayout.FixedPath(rootThriftPath + fullMultiTypeGraphThriftOutputDir),
          Instant.ofEpochMilli(opts.interval.getEndMillis - 1L),
          DiskFormat.Thrift(),
          environmentOverride = dalEnv
        )
      )

  }