def gen()

in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala [100:281]


  def gen(
      inputType: RowType,
      outputType: RowType,
      buffLimitSize: Int,
      windowStart: Long,
      windowSize: Long,
      slideSize: Long): GeneratedOperator[OneInputStreamOperator[RowData, RowData]] = {
    val className = if (isFinal) "HashWinAgg" else "LocalHashWinAgg"
    val suffix = if (grouping.isEmpty) "WithoutKeys" else "WithKeys"

    val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
    // add logger
    val logTerm = CodeGenUtils.newName("LOG")
    ctx.addReusableLogger(logTerm, className)

    // gen code to do aggregate using aggregate map
    val aggMapKey = newName("aggMapKey")
    val aggMapKeyWriter = newName("aggMapKeyWriter")
    val (processElementPerWindow, outputResultFromMap) = genHashWindowAggCodes(
      buffLimitSize,
      windowSize,
      slideSize,
      inputTerm,
      inputType,
      outputType,
      aggMapKey,
      logTerm)

    val (processCode, outputWhenEndInputCode) = if (isFinal && isMerge) {
      // prepare for aggregate map key's projection
      val projAggMapKeyCode = ProjectionCodeGenerator.generateProjectionExpression(ctx,
        inputType,
        aggMapKeyRowType,
        grouping :+ grouping.length,
        inputTerm = inputTerm,
        outRecordTerm = aggMapKey,
        outRecordWriterTerm = aggMapKeyWriter).code

      val processCode =
        s"""
           |if (!$inputTerm.isNullAt($inputTimeFieldIndex)) {
           |  hasInput = true;
           |  // input field access for group key projection, window/pane assign
           |  // and aggregate map update
           |  ${ctx.reuseInputUnboxingCode(inputTerm)}
           |  // build aggregate map key
           |  $projAggMapKeyCode
           |  // look up aggregate map and aggregate
           |  $processElementPerWindow
           |}
       """.stripMargin

      (processCode, outputResultFromMap)
    } else {
      // gen code to assign windows/pane to current input
      val assignTimestampExprs = genTimestampAssignExprs(
        enableAssignPane, windowStart, windowSize, slideSize, window, inputTerm, inputType)

      val processCode =
        if (!isSlidingWindowWithOverlapping(enableAssignPane, window, slideSize, windowSize)) {
          // Each input will be assigned with only one window, in the cases of
          // Tumbling window, Sliding window with slideSize >= windowSize or with pane optimization.
          assert(assignTimestampExprs.size == 1)
          val assignTimestampExpr = assignTimestampExprs.head

          // prepare for aggregate map key's projection
          val accessAssignedTimestampExpr = GeneratedExpression(
            assignTimestampExpr.resultTerm, "false", "", timestampInternalType)
          val prepareInitAggMapKeyExpr = prepareAggMapKeyExpr(inputTerm, inputType,
            Some(accessAssignedTimestampExpr), aggMapKeyRowType, aggMapKey, aggMapKeyWriter)
          val processAggregate =
            s"""
               |  // build aggregate map key
               |  ${prepareInitAggMapKeyExpr.code}
               |  // aggregate by each input with assigned timestamp
               |  $processElementPerWindow
           """.stripMargin

          // gen code to filter invalid windows in the case of jumping window
          val processEachInput = if (isJumpingWindow(slideSize, windowSize)) {
            val checkValidWindow = s"${getInputTimeValue(inputTerm, inputTimeFieldIndex)} < " +
                s"${assignTimestampExpr.resultTerm} + ${windowSize}L"
            s"""
               |if ($checkValidWindow) {
               |  // build aggregate map key
               |  ${prepareInitAggMapKeyExpr.code}
               |  // aggregate by each input with assigned timestamp
               |  $processAggregate
               |}
           """.stripMargin
          } else {
            processAggregate
          }
          s"""
             |if (!$inputTerm.isNullAt($inputTimeFieldIndex)) {
             |  hasInput = true;
             |  // input field access for group key projection, window/pane assign
             |   // and aggregate map update
             |  ${ctx.reuseInputUnboxingCode(inputTerm)}
             |  // assign timestamp(window or pane)
             |  ${assignTimestampExpr.code}
             |  // process each input
             |  $processEachInput
             |}""".stripMargin
        } else {
          // Otherwise, each input will be assigned with overlapping windows.
          assert(assignTimestampExprs.size > 1)
          val assignedWindows = newName("assignedWindows")
          ctx.addReusableMember(
            s"transient java.util.List<java.lang.Long> $assignedWindows" +
                s" = new java.util.ArrayList<java.lang.Long>();")
          val prepareCodes = for (expr <- assignTimestampExprs) yield {
            s"""
               |${expr.code}
               |$assignedWindows.add(${expr.resultTerm});
               """.stripMargin
          }
          val code =
            s"""
               |$assignedWindows.clear();
               |${prepareCodes.mkString("\n").trim}
               """.stripMargin
          val assignTimestampExpr =
            new GeneratedExpression(assignedWindows, "false", code,
              fromTypeInfoToLogicalType(new ListTypeInfo(Types.LONG)))

          // gen code to filter invalid overlapping windows
          val assignedTimestamp = newName("assignedTimestamp")
          val timestampTerm = s"${getInputTimeValue(inputTerm, inputTimeFieldIndex)}"
          val checkValidWindow = s"$timestampTerm >= $assignedTimestamp " +
              s" && $timestampTerm < $assignedTimestamp + ${windowSize}L"

          // prepare for aggregate map key's projection
          val prepareInitAggMapKeyExpr = prepareAggMapKeyExpr(
            inputTerm, inputType, None, aggMapKeyRowType, aggMapKey, aggMapKeyWriter)
          val realAssignedValue = if (inputTimeIsDate) {
            convertToIntValue(s"$assignedTimestamp")
          } else {
            assignedTimestamp
          }
          val updateAssignedTsCode = s"$aggMapKey.set$timestampInternalTypeName(${
            grouping.length
          }, $realAssignedValue);"

          s"""
             |if (!$inputTerm.isNullAt($inputTimeFieldIndex)) {
             |  hasInput = true;
             |  // input field access for group key projection, window/pane assign
             |  // and aggregate map update
             |  ${ctx.reuseInputUnboxingCode(inputTerm)}
             |  // assign windows/pane
             |  ${assignTimestampExpr.code}
             |  // build aggregate map key
             |  ${prepareInitAggMapKeyExpr.code}
             |  // we assigned all the possible overlapping windows in this case,
             |  // so need filtering the invalid window here
             |  for (Long $assignedTimestamp : ${assignTimestampExpr.resultTerm}) {
             |    if ($checkValidWindow) {
             |     // update input's assigned timestamp
             |     $updateAssignedTsCode
             |     $processElementPerWindow
             |    } else {
             |     break;
             |    }
             |  }
             |}
       """.stripMargin
        }
      (processCode, outputResultFromMap)
    }

    val baseClass = classOf[TableStreamOperator[_]].getName
    val endInputCode = if (isFinal) {
      s"""
         |$outputWhenEndInputCode
         """.stripMargin
    } else {
      outputWhenEndInputCode
    }
    AggCodeGenHelper.generateOperator(
      ctx, className + suffix, baseClass, processCode, endInputCode, inputType)
  }