in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala [100:281]
def gen(
inputType: RowType,
outputType: RowType,
buffLimitSize: Int,
windowStart: Long,
windowSize: Long,
slideSize: Long): GeneratedOperator[OneInputStreamOperator[RowData, RowData]] = {
val className = if (isFinal) "HashWinAgg" else "LocalHashWinAgg"
val suffix = if (grouping.isEmpty) "WithoutKeys" else "WithKeys"
val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
// add logger
val logTerm = CodeGenUtils.newName("LOG")
ctx.addReusableLogger(logTerm, className)
// gen code to do aggregate using aggregate map
val aggMapKey = newName("aggMapKey")
val aggMapKeyWriter = newName("aggMapKeyWriter")
val (processElementPerWindow, outputResultFromMap) = genHashWindowAggCodes(
buffLimitSize,
windowSize,
slideSize,
inputTerm,
inputType,
outputType,
aggMapKey,
logTerm)
val (processCode, outputWhenEndInputCode) = if (isFinal && isMerge) {
// prepare for aggregate map key's projection
val projAggMapKeyCode = ProjectionCodeGenerator.generateProjectionExpression(ctx,
inputType,
aggMapKeyRowType,
grouping :+ grouping.length,
inputTerm = inputTerm,
outRecordTerm = aggMapKey,
outRecordWriterTerm = aggMapKeyWriter).code
val processCode =
s"""
|if (!$inputTerm.isNullAt($inputTimeFieldIndex)) {
| hasInput = true;
| // input field access for group key projection, window/pane assign
| // and aggregate map update
| ${ctx.reuseInputUnboxingCode(inputTerm)}
| // build aggregate map key
| $projAggMapKeyCode
| // look up aggregate map and aggregate
| $processElementPerWindow
|}
""".stripMargin
(processCode, outputResultFromMap)
} else {
// gen code to assign windows/pane to current input
val assignTimestampExprs = genTimestampAssignExprs(
enableAssignPane, windowStart, windowSize, slideSize, window, inputTerm, inputType)
val processCode =
if (!isSlidingWindowWithOverlapping(enableAssignPane, window, slideSize, windowSize)) {
// Each input will be assigned with only one window, in the cases of
// Tumbling window, Sliding window with slideSize >= windowSize or with pane optimization.
assert(assignTimestampExprs.size == 1)
val assignTimestampExpr = assignTimestampExprs.head
// prepare for aggregate map key's projection
val accessAssignedTimestampExpr = GeneratedExpression(
assignTimestampExpr.resultTerm, "false", "", timestampInternalType)
val prepareInitAggMapKeyExpr = prepareAggMapKeyExpr(inputTerm, inputType,
Some(accessAssignedTimestampExpr), aggMapKeyRowType, aggMapKey, aggMapKeyWriter)
val processAggregate =
s"""
| // build aggregate map key
| ${prepareInitAggMapKeyExpr.code}
| // aggregate by each input with assigned timestamp
| $processElementPerWindow
""".stripMargin
// gen code to filter invalid windows in the case of jumping window
val processEachInput = if (isJumpingWindow(slideSize, windowSize)) {
val checkValidWindow = s"${getInputTimeValue(inputTerm, inputTimeFieldIndex)} < " +
s"${assignTimestampExpr.resultTerm} + ${windowSize}L"
s"""
|if ($checkValidWindow) {
| // build aggregate map key
| ${prepareInitAggMapKeyExpr.code}
| // aggregate by each input with assigned timestamp
| $processAggregate
|}
""".stripMargin
} else {
processAggregate
}
s"""
|if (!$inputTerm.isNullAt($inputTimeFieldIndex)) {
| hasInput = true;
| // input field access for group key projection, window/pane assign
| // and aggregate map update
| ${ctx.reuseInputUnboxingCode(inputTerm)}
| // assign timestamp(window or pane)
| ${assignTimestampExpr.code}
| // process each input
| $processEachInput
|}""".stripMargin
} else {
// Otherwise, each input will be assigned with overlapping windows.
assert(assignTimestampExprs.size > 1)
val assignedWindows = newName("assignedWindows")
ctx.addReusableMember(
s"transient java.util.List<java.lang.Long> $assignedWindows" +
s" = new java.util.ArrayList<java.lang.Long>();")
val prepareCodes = for (expr <- assignTimestampExprs) yield {
s"""
|${expr.code}
|$assignedWindows.add(${expr.resultTerm});
""".stripMargin
}
val code =
s"""
|$assignedWindows.clear();
|${prepareCodes.mkString("\n").trim}
""".stripMargin
val assignTimestampExpr =
new GeneratedExpression(assignedWindows, "false", code,
fromTypeInfoToLogicalType(new ListTypeInfo(Types.LONG)))
// gen code to filter invalid overlapping windows
val assignedTimestamp = newName("assignedTimestamp")
val timestampTerm = s"${getInputTimeValue(inputTerm, inputTimeFieldIndex)}"
val checkValidWindow = s"$timestampTerm >= $assignedTimestamp " +
s" && $timestampTerm < $assignedTimestamp + ${windowSize}L"
// prepare for aggregate map key's projection
val prepareInitAggMapKeyExpr = prepareAggMapKeyExpr(
inputTerm, inputType, None, aggMapKeyRowType, aggMapKey, aggMapKeyWriter)
val realAssignedValue = if (inputTimeIsDate) {
convertToIntValue(s"$assignedTimestamp")
} else {
assignedTimestamp
}
val updateAssignedTsCode = s"$aggMapKey.set$timestampInternalTypeName(${
grouping.length
}, $realAssignedValue);"
s"""
|if (!$inputTerm.isNullAt($inputTimeFieldIndex)) {
| hasInput = true;
| // input field access for group key projection, window/pane assign
| // and aggregate map update
| ${ctx.reuseInputUnboxingCode(inputTerm)}
| // assign windows/pane
| ${assignTimestampExpr.code}
| // build aggregate map key
| ${prepareInitAggMapKeyExpr.code}
| // we assigned all the possible overlapping windows in this case,
| // so need filtering the invalid window here
| for (Long $assignedTimestamp : ${assignTimestampExpr.resultTerm}) {
| if ($checkValidWindow) {
| // update input's assigned timestamp
| $updateAssignedTsCode
| $processElementPerWindow
| } else {
| break;
| }
| }
|}
""".stripMargin
}
(processCode, outputResultFromMap)
}
val baseClass = classOf[TableStreamOperator[_]].getName
val endInputCode = if (isFinal) {
s"""
|$outputWhenEndInputCode
""".stripMargin
} else {
outputWhenEndInputCode
}
AggCodeGenHelper.generateOperator(
ctx, className + suffix, baseClass, processCode, endInputCode, inputType)
}