in product-mixer/core/src/main/scala/com/twitter/product_mixer/core/pipeline/scoring/ScoringPipelineBuilder.scala [53:366]
def build(
parentComponentIdentifierStack: ComponentIdentifierStack,
config: ScoringPipelineConfig[Query, Candidate]
): ScoringPipeline[Query, Candidate] = {
val pipelineIdentifier = config.identifier
val context = Executor.Context(
PipelineFailureClassifier(
config.failureClassifier.orElse(StoppedGateException.classifier(ClosedGate))),
parentComponentIdentifierStack.push(pipelineIdentifier)
)
val enabledGateOpt = config.enabledDeciderParam.map { deciderParam =>
ParamGate(pipelineIdentifier + EnabledGateSuffix, deciderParam)
}
val supportedClientGateOpt = config.supportedClientParam.map { param =>
ParamGate(pipelineIdentifier + SupportedClientGateSuffix, param)
}
/**
* Evaluate enabled decider gate first since if it's off, there is no reason to proceed
* Next evaluate supported client feature switch gate, followed by customer configured gates
*/
val allGates = enabledGateOpt.toSeq ++ supportedClientGateOpt.toSeq ++ config.gates
val GatesStep = new Step[Query, GateExecutorResult] {
override def identifier: PipelineStepIdentifier = ScoringPipelineConfig.gatesStep
override lazy val executorArrow: Arrow[Query, GateExecutorResult] =
gateExecutor.arrow(allGates, context)
override def inputAdaptor(
query: ScoringPipeline.Inputs[Query],
previousResult: ScoringPipelineResult[Candidate]
): Query = {
query.query
}
override def resultUpdater(
previousPipelineResult: ScoringPipelineResult[Candidate],
executorResult: GateExecutorResult
): ScoringPipelineResult[Candidate] =
previousPipelineResult.copy(gateResults = Some(executorResult))
}
val SelectorsStep = new Step[SelectorExecutor.Inputs[Query], SelectorExecutorResult] {
override def identifier: PipelineStepIdentifier = ScoringPipelineConfig.selectorsStep
override def executorArrow: Arrow[SelectorExecutor.Inputs[Query], SelectorExecutorResult] =
selectorExecutor.arrow(config.selectors, context)
override def inputAdaptor(
query: ScoringPipeline.Inputs[Query],
previousResult: ScoringPipelineResult[Candidate]
): SelectorExecutor.Inputs[Query] = SelectorExecutor.Inputs(query.query, query.candidates)
override def resultUpdater(
previousPipelineResult: ScoringPipelineResult[Candidate],
executorResult: SelectorExecutorResult
): ScoringPipelineResult[Candidate] =
previousPipelineResult.copy(selectorResults = Some(executorResult))
}
val PreScoringFeatureHydrationPhase1Step =
new Step[
CandidateFeatureHydratorExecutor.Inputs[Query, Candidate],
CandidateFeatureHydratorExecutorResult[Candidate]
] {
override def identifier: PipelineStepIdentifier =
ScoringPipelineConfig.preScoringFeatureHydrationPhase1Step
override def executorArrow: Arrow[
CandidateFeatureHydratorExecutor.Inputs[Query, Candidate],
CandidateFeatureHydratorExecutorResult[Candidate]
] =
candidateFeatureHydratorExecutor.arrow(config.preScoringFeatureHydrationPhase1, context)
override def inputAdaptor(
query: ScoringPipeline.Inputs[Query],
previousResult: ScoringPipelineResult[Candidate]
): CandidateFeatureHydratorExecutor.Inputs[Query, Candidate] = {
val selectedCandidatesResult = previousResult.selectorResults.getOrElse {
throw InvalidStepStateException(identifier, "SelectorResults")
}.selectedCandidates
CandidateFeatureHydratorExecutor.Inputs(
query.query,
selectedCandidatesResult.asInstanceOf[Seq[CandidateWithFeatures[Candidate]]])
}
override def resultUpdater(
previousPipelineResult: ScoringPipelineResult[Candidate],
executorResult: CandidateFeatureHydratorExecutorResult[Candidate]
): ScoringPipelineResult[Candidate] = previousPipelineResult.copy(
preScoringHydrationPhase1Result = Some(executorResult)
)
}
val PreScoringFeatureHydrationPhase2Step =
new Step[
CandidateFeatureHydratorExecutor.Inputs[Query, Candidate],
CandidateFeatureHydratorExecutorResult[Candidate]
] {
override def identifier: PipelineStepIdentifier =
ScoringPipelineConfig.preScoringFeatureHydrationPhase2Step
override def executorArrow: Arrow[
CandidateFeatureHydratorExecutor.Inputs[Query, Candidate],
CandidateFeatureHydratorExecutorResult[Candidate]
] =
candidateFeatureHydratorExecutor.arrow(config.preScoringFeatureHydrationPhase2, context)
override def inputAdaptor(
query: ScoringPipeline.Inputs[Query],
previousResult: ScoringPipelineResult[Candidate]
): CandidateFeatureHydratorExecutor.Inputs[Query, Candidate] = {
val preScoringHydrationPhase1FeatureMaps: Seq[FeatureMap] =
previousResult.preScoringHydrationPhase1Result
.getOrElse(
throw InvalidStepStateException(identifier, "PreScoringHydrationPhase1Result"))
.results.map(_.features)
val itemCandidates = previousResult.selectorResults
.getOrElse(throw InvalidStepStateException(identifier, "SelectionResults"))
.selectedCandidates.collect {
case itemCandidate: ItemCandidateWithDetails => itemCandidate
}
// If there is no feature hydration (empty results), no need to attempt merging.
val candidates = if (preScoringHydrationPhase1FeatureMaps.isEmpty) {
itemCandidates
} else {
itemCandidates.zip(preScoringHydrationPhase1FeatureMaps).map {
case (itemCandidate, featureMap) =>
itemCandidate.copy(features = itemCandidate.features ++ featureMap)
}
}
CandidateFeatureHydratorExecutor.Inputs(
query.query,
candidates.asInstanceOf[Seq[CandidateWithFeatures[Candidate]]])
}
override def resultUpdater(
previousPipelineResult: ScoringPipelineResult[Candidate],
executorResult: CandidateFeatureHydratorExecutorResult[Candidate]
): ScoringPipelineResult[Candidate] = previousPipelineResult.copy(
preScoringHydrationPhase2Result = Some(executorResult)
)
}
def getMergedPreScoringFeatureMap(
stepIdentifier: PipelineStepIdentifier,
previousResult: ScoringPipelineResult[Candidate]
): Seq[FeatureMap] = {
val preScoringHydrationPhase1FeatureMaps: Seq[FeatureMap] =
previousResult.preScoringHydrationPhase1Result
.getOrElse(
throw InvalidStepStateException(
stepIdentifier,
"PreScoringHydrationPhase1Result")).results.map(_.features)
val preScoringHydrationPhase2FeatureMaps: Seq[FeatureMap] =
previousResult.preScoringHydrationPhase2Result
.getOrElse(
throw InvalidStepStateException(
stepIdentifier,
"PreScoringHydrationPhase2Result")).results.map(_.features)
/*
* If either pre-scoring hydration phase feature map is empty, no need to merge them,
* we can just take all non-empty ones.
*/
if (preScoringHydrationPhase1FeatureMaps.isEmpty) {
preScoringHydrationPhase2FeatureMaps
} else if (preScoringHydrationPhase2FeatureMaps.isEmpty) {
preScoringHydrationPhase1FeatureMaps
} else {
// No need to check the size in both, since the inputs to both hydration phases are the
// same and each phase ensures the number of candidates and ordering matches the input.
preScoringHydrationPhase1FeatureMaps.zip(preScoringHydrationPhase2FeatureMaps).map {
case (preScoringHydrationPhase1FeatureMap, preScoringHydrationPhasesFeatureMap) =>
preScoringHydrationPhase1FeatureMap ++ preScoringHydrationPhasesFeatureMap
}
}
}
val ScorersStep =
new Step[
CandidateFeatureHydratorExecutor.Inputs[Query, Candidate],
CandidateFeatureHydratorExecutorResult[Candidate]
] {
override def identifier: PipelineStepIdentifier = ScoringPipelineConfig.scorersStep
override def inputAdaptor(
query: ScoringPipeline.Inputs[Query],
previousResult: ScoringPipelineResult[Candidate]
): CandidateFeatureHydratorExecutor.Inputs[Query, Candidate] = {
val mergedPreScoringFeatureHydrationFeatures: Seq[FeatureMap] =
getMergedPreScoringFeatureMap(ScoringPipelineConfig.scorersStep, previousResult)
val itemCandidates = previousResult.selectorResults
.getOrElse(throw InvalidStepStateException(identifier, "SelectionResults"))
.selectedCandidates.collect {
case itemCandidate: ItemCandidateWithDetails => itemCandidate
}
// If there was no pre-scoring features hydration, no need to re-merge feature maps
// and construct a new item candidate
val updatedCandidates = if (mergedPreScoringFeatureHydrationFeatures.isEmpty) {
itemCandidates
} else {
itemCandidates.zip(mergedPreScoringFeatureHydrationFeatures).map {
case (itemCandidate, preScoringFeatureMap) =>
itemCandidate.copy(features = itemCandidate.features ++ preScoringFeatureMap)
}
}
CandidateFeatureHydratorExecutor.Inputs(
query.query,
updatedCandidates.asInstanceOf[Seq[CandidateWithFeatures[Candidate]]])
}
override lazy val executorArrow: Arrow[
CandidateFeatureHydratorExecutor.Inputs[Query, Candidate],
CandidateFeatureHydratorExecutorResult[
Candidate
]
] = candidateFeatureHydratorExecutor.arrow(config.scorers.toSeq, context)
override def resultUpdater(
previousPipelineResult: ScoringPipelineResult[Candidate],
executorResult: CandidateFeatureHydratorExecutorResult[Candidate]
): ScoringPipelineResult[Candidate] =
previousPipelineResult.copy(scorerResults = Some(executorResult))
}
val ResultStep =
new Step[Seq[CandidateWithFeatures[UniversalNoun[Any]]], Seq[
CandidateWithFeatures[UniversalNoun[Any]]
]] {
override def identifier: PipelineStepIdentifier = ScoringPipelineConfig.resultStep
override def executorArrow: Arrow[Seq[CandidateWithFeatures[UniversalNoun[Any]]], Seq[
CandidateWithFeatures[UniversalNoun[Any]]
]] = Arrow.identity
override def inputAdaptor(
query: Inputs[Query],
previousResult: ScoringPipelineResult[Candidate]
): Seq[CandidateWithFeatures[UniversalNoun[Any]]] = previousResult.selectorResults
.getOrElse(throw InvalidStepStateException(identifier, "SelectionResults"))
.selectedCandidates.collect {
case itemCandidate: ItemCandidateWithDetails => itemCandidate
}
override def resultUpdater(
previousPipelineResult: ScoringPipelineResult[Candidate],
executorResult: Seq[CandidateWithFeatures[UniversalNoun[Any]]]
): ScoringPipelineResult[Candidate] = {
val scorerResults: Seq[FeatureMap] = previousPipelineResult.scorerResults
.getOrElse(throw InvalidStepStateException(identifier, "ScorerResult")).results.map(
_.features)
val mergedPreScoringFeatureHydrationFeatureMaps: Seq[FeatureMap] =
getMergedPreScoringFeatureMap(ScoringPipelineConfig.resultStep, previousPipelineResult)
val itemCandidates = executorResult.asInstanceOf[Seq[ItemCandidateWithDetails]]
val finalFeatureMap = if (mergedPreScoringFeatureHydrationFeatureMaps.isEmpty) {
scorerResults
} else {
scorerResults
.zip(mergedPreScoringFeatureHydrationFeatureMaps).map {
case (preScoringFeatureMap, scoringFeatureMap) =>
preScoringFeatureMap ++ scoringFeatureMap
}
}
val finalResults = itemCandidates.zip(finalFeatureMap).map {
case (itemCandidate, featureMap) =>
ScoredCandidateResult(itemCandidate.candidate.asInstanceOf[Candidate], featureMap)
}
previousPipelineResult.withResult(finalResults)
}
}
val builtSteps = Seq(
GatesStep,
SelectorsStep,
PreScoringFeatureHydrationPhase1Step,
PreScoringFeatureHydrationPhase2Step,
ScorersStep,
ResultStep
)
/** The main execution logic for this Candidate Pipeline. */
val finalArrow: Arrow[ScoringPipeline.Inputs[Query], ScoringPipelineResult[Candidate]] =
buildCombinedArrowFromSteps(
steps = builtSteps,
context = context,
initialEmptyResult = ScoringPipelineResult.empty,
stepsInOrderFromConfig = ScoringPipelineConfig.stepsInOrder
)
val configFromBuilder = config
new ScoringPipeline[Query, Candidate] {
override private[core] val config: ScoringPipelineConfig[Query, Candidate] = configFromBuilder
override val arrow: Arrow[ScoringPipeline.Inputs[Query], ScoringPipelineResult[Candidate]] =
finalArrow
override val identifier: ScoringPipelineIdentifier = pipelineIdentifier
override val alerts: Seq[Alert] = config.alerts
override val children: Seq[Component] =
allGates ++ config.preScoringFeatureHydrationPhase1 ++ config.preScoringFeatureHydrationPhase2 ++ config.scorers
}
}