private def genOutputByMerging()

in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala [424:612]


  private def genOutputByMerging(
      windowSize: Long,
      slideSize: Long,
      bufferLimitSize: Int,
      outputType: RowType,
      aggregateMapTerm: String,
      argsMapping: Array[Array[(Int, LogicalType)]],
      aggBuffMapping: Array[Array[(Int, LogicalType)]],
      aggKeyTypeTerm: String,
      aggBufferTypeTerm: String,
      aggMapKeyType: RowType,
      aggBufferType: RowType): String = {
    val keyComputerTerm = CodeGenUtils.newName("keyComputer")
    val recordComparatorTerm = CodeGenUtils.newName("recordComparator")
    val prepareSorterCode = HashAggCodeGenHelper.genKVSorterPrepareCode(
      ctx, keyComputerTerm, recordComparatorTerm, aggMapKeyType)

    val memPoolTypeTerm = classOf[BytesHashMapSpillMemorySegmentPool].getName
    val binaryRowSerializerTypeTerm = classOf[BinaryRowDataSerializer].getName

    val sorterBufferType = classOf[BinaryKVInMemorySortBuffer].getName
    val sorterBufferTerm = newName("buffer")

    val createSorterBufferCode =
      s"""
         |   $prepareSorterCode
         |   $sorterBufferType $sorterBufferTerm = $sorterBufferType.createBuffer(
         |      $keyComputerTerm,
         |      new $binaryRowSerializerTypeTerm($aggKeyTypeTerm.length),
         |      new $binaryRowSerializerTypeTerm($aggBufferTypeTerm.length),
         |      $recordComparatorTerm,
         |      $aggregateMapTerm.getRecordAreaMemorySegments(),
         |      $aggregateMapTerm.getNumElements(),
         |      new $memPoolTypeTerm($aggregateMapTerm.getBucketAreaMemorySegments())
         |   );
       """.stripMargin

    val reuseAggMapKeyTerm = newName("reusedKey")
    val reuseAggBufferTerm = newName("reusedValue")
    val reuseKVTerm = newName("reusedKV")
    val binaryRow = classOf[BinaryRowData].getName
    val kvType = classOf[JTuple2[_,_]].getName
    ctx.addReusableMember(
      s"transient $binaryRow $reuseAggMapKeyTerm = new $binaryRow(${aggMapKeyType.getFieldCount});")
    ctx.addReusableMember(
      s"transient $binaryRow $reuseAggBufferTerm = new $binaryRow(${aggBufferType.getFieldCount});")
    ctx.addReusableMember(
      s"transient $kvType<$binaryRow, $binaryRow> $reuseKVTerm = " +
          s"new  $kvType<$binaryRow, $binaryRow>($reuseAggMapKeyTerm, $reuseAggBufferTerm);"
    )

    // ---------------------------------------------------------------------------------------------
    // gen code to create a buffer to group all the elements having the same grouping key
    val windowElementType = getWindowsGroupingElementInfo()
    // project into aggregate map key and value into prepared window element
    val bufferWindowElementTerm = newName("prepareWinElement")
    val bufferWindowElementWriterTerm = newName("prepareWinElementWriter")
    val exprCodegen = new ExprCodeGenerator(ctx, false)
    // TODO refine this. Is it possible to reuse grouping key projection?
    val accessKeyExprs = for (idx <- 0 until aggMapKeyType.getFieldCount - 1) yield
      GenerateUtils.generateFieldAccess(
        ctx, aggMapKeyType, reuseAggMapKeyTerm, idx)
    val accessTimestampExpr = GenerateUtils.generateFieldAccess(
      ctx,
      aggMapKeyType,
      reuseAggMapKeyTerm,
      aggMapKeyType.getFieldCount - 1)
    val accessValExprs = for (idx <- 0 until aggBufferType.getFieldCount) yield
      GenerateUtils.generateFieldAccess(ctx, aggBufferType, reuseAggBufferTerm, idx)
    val accessExprs = (accessKeyExprs :+ GeneratedExpression(
      accessTimestampExpr.resultTerm,
      "false",
      accessTimestampExpr.code,
      timestampInternalType)) ++ accessValExprs
    val buildWindowsGroupingElementExpr = exprCodegen.generateResultExpression(
      accessExprs,
      windowElementType,
      classOf[BinaryRowData],
      outRow = bufferWindowElementTerm,
      outRowWriter = Some(bufferWindowElementWriterTerm))

    // ---------------------------------------------------------------------------------------------
    // gen code to apply aggregate functions to grouping window elements
    val timeWindowType = classOf[TimeWindow].getName
    val currentWindow = newName("currentWindow")
    ctx.addReusableMember(s"transient $timeWindowType $currentWindow = null;")

    // gen code to assign window and aggregate
    val windowsGrouping = CodeGenUtils.newName("windowsGrouping")
    val (processCode, endCode) = if (grouping.isEmpty) {
      val (triggerWindowAgg, endWindowAgg) = genWindowAggCodes(
        enablePreAcc = true,
        ctx,
        windowSize,
        slideSize,
        windowsGrouping,
        bufferLimitSize,
        windowElementType,
        inputTimeFieldIndex,
        currentWindow,
        None,
        outputType)
      val process =
        s"""
           |// prepare windows grouping input
           |${buildWindowsGroupingElementExpr.code}
           |$windowsGrouping
           |  .addInputToBuffer(($BINARY_ROW)${buildWindowsGroupingElementExpr.resultTerm});
           |$triggerWindowAgg
         """.stripMargin
      (process, endWindowAgg)
    } else {
      // project grouping keys from aggregate map's key
      val groupKeyTerm = newName("groupKey")
      val groupKeyWriterTerm = newName("groupKeyWriter")
      val projGroupingKeyCode = ProjectionCodeGenerator.generateProjectionExpression(ctx,
        aggMapKeyType,
        groupKeyRowType,
        grouping.indices.toArray,
        inputTerm = reuseAggMapKeyTerm,
        outRecordTerm = groupKeyTerm,
        outRecordWriterTerm = groupKeyWriterTerm).code


      ("GroupingKeyFromAggMapKey", ctx,
        groupKeyRowType, grouping.indices.toArray,
        aggMapKeyType, reuseAggMapKeyTerm, groupKeyTerm, groupKeyWriterTerm)

      // gen code to check group key changed
      val lastKeyTerm = newName("lastKey")
      ctx.addReusableMember(s"transient $BINARY_ROW $lastKeyTerm = null;")
      val keyNotEqualsCode = genGroupKeyChangedCheckCode(groupKeyTerm, lastKeyTerm)

      val (triggerWindowAgg, endWindowAgg) = genWindowAggCodes(
        enablePreAcc = true,
        ctx,
        windowSize,
        slideSize,
        windowsGrouping,
        bufferLimitSize,
        windowElementType,
        inputTimeFieldIndex,
        currentWindow,
        Some(lastKeyTerm),
        outputType)

      val process =
        s"""
           |// project agg grouping key
           |$projGroupingKeyCode
           |// prepare windows grouping input
           |${buildWindowsGroupingElementExpr.code}
           |if ($lastKeyTerm == null) {
           |  $lastKeyTerm = $groupKeyTerm.copy();
           |} else if ($keyNotEqualsCode) {
           |  $endWindowAgg
           |  $lastKeyTerm = $groupKeyTerm.copy();
           |}
           |$windowsGrouping
           |  .addInputToBuffer(($BINARY_ROW)${buildWindowsGroupingElementExpr.resultTerm});
           |$triggerWindowAgg
         """.stripMargin
      val end =
        s"""
           | $endWindowAgg
           | $lastKeyTerm = null;
       """.stripMargin
      (process, end)
    }

    val sortType = classOf[QuickSort].getName
    val bufferIteratorType = classOf[MutableObjectIterator[_]].getName
    s"""
       | if (hasInput) {
       |  // sort by grouping keys and assigned timestamp
       |  $createSorterBufferCode
       |  new $sortType().sort($sorterBufferTerm);
       |  // merge and get result
       |  $bufferIteratorType<$kvType<$binaryRow, $binaryRow>> iterator =
       |    $sorterBufferTerm.getIterator();
       |  while (iterator.next($reuseKVTerm) != null) {
       |      // reusable input fields access
       |      ${ctx.reuseInputUnboxingCode(bufferWindowElementTerm)}
       |      $processCode
       |   }
       |  $endCode
       | }
       """.stripMargin
  }