in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala [344:575]
private[flink] def genPreAccumulate(
ctx: CodeGeneratorContext,
windowStart: Long,
slideSize: Long,
windowSize: Long,
inputTerm: String,
inputType: RowType,
outputType: RowType,
windowsTerm: String,
windowElementType: RowType,
lastKey: Option[String],
triggerWindowAggCode: String,
endWindowAggCode: String): (String, String) = {
// gen code to assign timestamp
def genAssignTimestampExpr(
ctx: CodeGeneratorContext,
inputTerm: String,
inputType: RowType): GeneratedExpression = {
if (isFinal && isMerge) {
// get assigned timestamp by local window agg
val ret = GenerateUtils.generateFieldAccess(
ctx,
windowedGroupKeyType,
inputTerm,
windowedGroupKeyType.getFieldCount - 1)
if (inputTimeIsDate) {
val timestamp = ctx.addReusableLocalVariable("long", "timestamp")
val convertToLongCode =
s"""
| ${ret.code}
| $timestamp = ${convertToLongValue(ret.resultTerm)};
""".stripMargin
GeneratedExpression(timestamp, ret.nullTerm, convertToLongCode, new BigIntType())
} else {
ret
}
} else {
// assign timestamp with each input
window match {
case SlidingGroupWindow(_, timeField, size, slide) if isTimeIntervalLiteral(size) =>
val (slideSize, windowSize) = (asLong(slide), asLong(size))
if (enableAssignPane) {
val paneSize = ArithmeticUtils.gcd(windowSize, slideSize)
genAlignedWindowStartExpr(
ctx, inputTerm, inputType, timeField, windowStart, paneSize)
} else {
assert(slideSize >= windowSize)
genAlignedWindowStartExpr(
ctx, inputTerm, inputType, timeField, windowStart, slideSize)
}
case TumblingGroupWindow(_, timeField, size) =>
genAlignedWindowStartExpr(
ctx, inputTerm, inputType, timeField, windowStart, asLong(size))
case _ =>
throw new RuntimeException(s"Bug. Assign pane for $window is not supported.")
}
}
}
val assignedTsExpr = genAssignTimestampExpr(ctx, inputTerm, inputType)
// gen code to do aggregate by assigned ts
val lastTimestampTerm = CodeGenUtils.newName("lastTimestamp")
ctx.addReusableMember(s"transient long $lastTimestampTerm = -1;")
val preAccResult = CodeGenUtils.newName("prepareWinElement")
val preAccResultWriter = CodeGenUtils.newName("prepareWinElementWriter")
ctx.addReusableOutputRecord(
windowElementType, classOf[BinaryRowData], preAccResult, Some(preAccResultWriter))
val timeWindowType = classOf[TimeWindow].getName
val currentWindow = newName("currentWindow")
ctx.addReusableMember(s"transient $timeWindowType $currentWindow = null;")
// output or merge pre accumulate results by window
val (initAggBufferCode, doAggregateCode, mergeOrOutput, mergeOrOutputLastPane) =
if (isFinal && enableAssignPane) {
// case: global/complete window agg: Sliding window with with pane optimization
val offset = if (isMerge) grouping.length + 1 else grouping.length
val argsMapping = buildAggregateArgsMapping(
isMerge, offset, inputType, auxGrouping, aggInfos, aggBufferTypes)
val aggBufferExprs = genFlatAggBufferExprs(
isMerge,
ctx,
builder,
auxGrouping,
aggInfos,
argsMapping,
aggBufferNames,
aggBufferTypes)
val initAggBufferCode = genInitFlatAggregateBuffer(
ctx,
builder,
inputType,
inputTerm,
grouping,
auxGrouping,
aggInfos,
functionIdentifiers,
aggBufferExprs)
val doAggregateCode = genAggregateByFlatAggregateBuffer(
isMerge,
ctx,
builder,
inputType,
inputTerm,
auxGrouping,
aggInfos,
functionIdentifiers,
argsMapping,
aggBufferNames,
aggBufferTypes,
aggBufferExprs)
// project pre accumulated results into a binary row to fit to WindowsGrouping
val exprCodegen = new ExprCodeGenerator(ctx, false)
val setResultExprs = grouping.indices.map(
generateFieldAccess(
ctx, groupKeyRowType, lastKey.get, _)) ++
(GeneratedExpression(lastTimestampTerm, NEVER_NULL, NO_CODE, new BigIntType())
+: aggBufferExprs)
val setPanedAggResultExpr = exprCodegen.generateResultExpression(
setResultExprs,
windowElementType,
classOf[BinaryRowData],
preAccResult,
Some(preAccResultWriter))
// using windows grouping buffer to merge paned agg results
val merge =
s"""
|${setPanedAggResultExpr.code}
|// buffer into current group buffer
|$windowsTerm.addInputToBuffer(($BINARY_ROW)${setPanedAggResultExpr.resultTerm});
|// trigger window aggregate
|$triggerWindowAggCode
""".stripMargin
val mergeLast =
s"""
|${setPanedAggResultExpr.code}
|// buffer into current group buffer
|$windowsTerm.addInputToBuffer(($BINARY_ROW)${setPanedAggResultExpr.resultTerm});
|// last pane triggered windows will be triggered again when grouping keys changed
|$endWindowAggCode
""".stripMargin
(initAggBufferCode, doAggregateCode, merge, mergeLast)
} else {
// case1: local window agg
// case2: global window agg: Tumbling window, Sliding window with windowSize == slideSize
// or without pane optimization
// case3: complete window agg: Tumbling window, Sliding window with windowSize == slideSize
val (initAggBuffCode, doAggCode, outputWinAggResExpr) = genSortWindowAggCodes(
isMerge,
ctx,
inputTerm,
inputType,
outputType,
lastKey,
currentWindow)
val output =
s"""
|// update current window
|$currentWindow =
| $timeWindowType.of($lastTimestampTerm, $lastTimestampTerm + ${windowSize}L);
|// build window agg output
|${outputWinAggResExpr.code}
|// output result
|${generateCollect(outputWinAggResExpr.resultTerm)}
""".stripMargin
(initAggBuffCode, doAggCode, output, output)
}
val preAccCode =
s"""
| hasInput = true;
| // aggregate in sort agg way
| if ($lastTimestampTerm == -1) {
| $initAggBufferCode
| $lastTimestampTerm = ${assignedTsExpr.resultTerm};
| } else if ($lastTimestampTerm != ${assignedTsExpr.resultTerm}) {
| $mergeOrOutput
| // update active timestamp
| $lastTimestampTerm = ${assignedTsExpr.resultTerm};
| // init agg buffer
| $initAggBufferCode
| }
| // accumulate
| $doAggregateCode
""".stripMargin
// gen code to filter invalid windows in the case of jumping window
val processEachInput = if (!isMerge && isJumpingWindow(slideSize, windowSize)) {
s"""
|if (${getInputTimeValue(inputTerm, inputTimeFieldIndex)}) <
| ${assignedTsExpr.resultTerm} + ${windowSize}L) {
| $preAccCode
|}
""".stripMargin
} else {
preAccCode
}
val processFuncName = CodeGenUtils.newName("preAccumulate")
val inputTypeTerm = boxedTypeTermForType(inputType)
ctx.addReusableMember(
s"""
|private void $processFuncName($inputTypeTerm $inputTerm) throws java.lang.Exception {
| ${ctx.reuseLocalVariableCode()}
| // assign timestamp (pane/window)
| ${ctx.reuseInputUnboxingCode(inputTerm)}
| ${assignedTsExpr.code}
| $processEachInput
|}
""".stripMargin)
val endProcessFuncName = CodeGenUtils.newName("endPreAccumulate")
val setLastPaneAggResultCode =
s"""
| // merge paned agg results or output directly
| $mergeOrOutputLastPane
| $lastTimestampTerm = -1;
""".stripMargin
ctx.addReusableMember(
s"""
|private void $endProcessFuncName() throws java.lang.Exception {
| ${ctx.reuseLocalVariableCode()}
| $setLastPaneAggResultCode
|}
""".stripMargin)
(s"$processFuncName($inputTerm);", s"$endProcessFuncName();")
}