in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala [424:612]
private def genOutputByMerging(
windowSize: Long,
slideSize: Long,
bufferLimitSize: Int,
outputType: RowType,
aggregateMapTerm: String,
argsMapping: Array[Array[(Int, LogicalType)]],
aggBuffMapping: Array[Array[(Int, LogicalType)]],
aggKeyTypeTerm: String,
aggBufferTypeTerm: String,
aggMapKeyType: RowType,
aggBufferType: RowType): String = {
val keyComputerTerm = CodeGenUtils.newName("keyComputer")
val recordComparatorTerm = CodeGenUtils.newName("recordComparator")
val prepareSorterCode = HashAggCodeGenHelper.genKVSorterPrepareCode(
ctx, keyComputerTerm, recordComparatorTerm, aggMapKeyType)
val memPoolTypeTerm = classOf[BytesHashMapSpillMemorySegmentPool].getName
val binaryRowSerializerTypeTerm = classOf[BinaryRowDataSerializer].getName
val sorterBufferType = classOf[BinaryKVInMemorySortBuffer].getName
val sorterBufferTerm = newName("buffer")
val createSorterBufferCode =
s"""
| $prepareSorterCode
| $sorterBufferType $sorterBufferTerm = $sorterBufferType.createBuffer(
| $keyComputerTerm,
| new $binaryRowSerializerTypeTerm($aggKeyTypeTerm.length),
| new $binaryRowSerializerTypeTerm($aggBufferTypeTerm.length),
| $recordComparatorTerm,
| $aggregateMapTerm.getRecordAreaMemorySegments(),
| $aggregateMapTerm.getNumElements(),
| new $memPoolTypeTerm($aggregateMapTerm.getBucketAreaMemorySegments())
| );
""".stripMargin
val reuseAggMapKeyTerm = newName("reusedKey")
val reuseAggBufferTerm = newName("reusedValue")
val reuseKVTerm = newName("reusedKV")
val binaryRow = classOf[BinaryRowData].getName
val kvType = classOf[JTuple2[_,_]].getName
ctx.addReusableMember(
s"transient $binaryRow $reuseAggMapKeyTerm = new $binaryRow(${aggMapKeyType.getFieldCount});")
ctx.addReusableMember(
s"transient $binaryRow $reuseAggBufferTerm = new $binaryRow(${aggBufferType.getFieldCount});")
ctx.addReusableMember(
s"transient $kvType<$binaryRow, $binaryRow> $reuseKVTerm = " +
s"new $kvType<$binaryRow, $binaryRow>($reuseAggMapKeyTerm, $reuseAggBufferTerm);"
)
// ---------------------------------------------------------------------------------------------
// gen code to create a buffer to group all the elements having the same grouping key
val windowElementType = getWindowsGroupingElementInfo()
// project into aggregate map key and value into prepared window element
val bufferWindowElementTerm = newName("prepareWinElement")
val bufferWindowElementWriterTerm = newName("prepareWinElementWriter")
val exprCodegen = new ExprCodeGenerator(ctx, false)
// TODO refine this. Is it possible to reuse grouping key projection?
val accessKeyExprs = for (idx <- 0 until aggMapKeyType.getFieldCount - 1) yield
GenerateUtils.generateFieldAccess(
ctx, aggMapKeyType, reuseAggMapKeyTerm, idx)
val accessTimestampExpr = GenerateUtils.generateFieldAccess(
ctx,
aggMapKeyType,
reuseAggMapKeyTerm,
aggMapKeyType.getFieldCount - 1)
val accessValExprs = for (idx <- 0 until aggBufferType.getFieldCount) yield
GenerateUtils.generateFieldAccess(ctx, aggBufferType, reuseAggBufferTerm, idx)
val accessExprs = (accessKeyExprs :+ GeneratedExpression(
accessTimestampExpr.resultTerm,
"false",
accessTimestampExpr.code,
timestampInternalType)) ++ accessValExprs
val buildWindowsGroupingElementExpr = exprCodegen.generateResultExpression(
accessExprs,
windowElementType,
classOf[BinaryRowData],
outRow = bufferWindowElementTerm,
outRowWriter = Some(bufferWindowElementWriterTerm))
// ---------------------------------------------------------------------------------------------
// gen code to apply aggregate functions to grouping window elements
val timeWindowType = classOf[TimeWindow].getName
val currentWindow = newName("currentWindow")
ctx.addReusableMember(s"transient $timeWindowType $currentWindow = null;")
// gen code to assign window and aggregate
val windowsGrouping = CodeGenUtils.newName("windowsGrouping")
val (processCode, endCode) = if (grouping.isEmpty) {
val (triggerWindowAgg, endWindowAgg) = genWindowAggCodes(
enablePreAcc = true,
ctx,
windowSize,
slideSize,
windowsGrouping,
bufferLimitSize,
windowElementType,
inputTimeFieldIndex,
currentWindow,
None,
outputType)
val process =
s"""
|// prepare windows grouping input
|${buildWindowsGroupingElementExpr.code}
|$windowsGrouping
| .addInputToBuffer(($BINARY_ROW)${buildWindowsGroupingElementExpr.resultTerm});
|$triggerWindowAgg
""".stripMargin
(process, endWindowAgg)
} else {
// project grouping keys from aggregate map's key
val groupKeyTerm = newName("groupKey")
val groupKeyWriterTerm = newName("groupKeyWriter")
val projGroupingKeyCode = ProjectionCodeGenerator.generateProjectionExpression(ctx,
aggMapKeyType,
groupKeyRowType,
grouping.indices.toArray,
inputTerm = reuseAggMapKeyTerm,
outRecordTerm = groupKeyTerm,
outRecordWriterTerm = groupKeyWriterTerm).code
("GroupingKeyFromAggMapKey", ctx,
groupKeyRowType, grouping.indices.toArray,
aggMapKeyType, reuseAggMapKeyTerm, groupKeyTerm, groupKeyWriterTerm)
// gen code to check group key changed
val lastKeyTerm = newName("lastKey")
ctx.addReusableMember(s"transient $BINARY_ROW $lastKeyTerm = null;")
val keyNotEqualsCode = genGroupKeyChangedCheckCode(groupKeyTerm, lastKeyTerm)
val (triggerWindowAgg, endWindowAgg) = genWindowAggCodes(
enablePreAcc = true,
ctx,
windowSize,
slideSize,
windowsGrouping,
bufferLimitSize,
windowElementType,
inputTimeFieldIndex,
currentWindow,
Some(lastKeyTerm),
outputType)
val process =
s"""
|// project agg grouping key
|$projGroupingKeyCode
|// prepare windows grouping input
|${buildWindowsGroupingElementExpr.code}
|if ($lastKeyTerm == null) {
| $lastKeyTerm = $groupKeyTerm.copy();
|} else if ($keyNotEqualsCode) {
| $endWindowAgg
| $lastKeyTerm = $groupKeyTerm.copy();
|}
|$windowsGrouping
| .addInputToBuffer(($BINARY_ROW)${buildWindowsGroupingElementExpr.resultTerm});
|$triggerWindowAgg
""".stripMargin
val end =
s"""
| $endWindowAgg
| $lastKeyTerm = null;
""".stripMargin
(process, end)
}
val sortType = classOf[QuickSort].getName
val bufferIteratorType = classOf[MutableObjectIterator[_]].getName
s"""
| if (hasInput) {
| // sort by grouping keys and assigned timestamp
| $createSorterBufferCode
| new $sortType().sort($sorterBufferTerm);
| // merge and get result
| $bufferIteratorType<$kvType<$binaryRow, $binaryRow>> iterator =
| $sorterBufferTerm.getIterator();
| while (iterator.next($reuseKVTerm) != null) {
| // reusable input fields access
| ${ctx.reuseInputUnboxingCode(bufferWindowElementTerm)}
| $processCode
| }
| $endCode
| }
""".stripMargin
}