in flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala [63:291]
private def translateCall(
func: FunctionDefinition,
children: Seq[Expression],
unknownFunctionHandler: () => PlannerExpression)
: PlannerExpression = {
// special case: requires individual handling of child expressions
func match {
case REINTERPRET_CAST =>
assert(children.size == 3)
return Reinterpret(
children.head.accept(this),
fromDataTypeToTypeInfo(
children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType),
getValue[Boolean](children(2).accept(this)))
case WINDOW_START =>
assert(children.size == 1)
val windowReference = translateWindowReference(children.head)
return WindowStart(windowReference)
case WINDOW_END =>
assert(children.size == 1)
val windowReference = translateWindowReference(children.head)
return WindowEnd(windowReference)
case PROCTIME =>
assert(children.size == 1)
val windowReference = translateWindowReference(children.head)
return ProctimeAttribute(windowReference)
case ROWTIME =>
assert(children.size == 1)
val windowReference = translateWindowReference(children.head)
return RowtimeAttribute(windowReference)
case THROW_EXCEPTION =>
assert(children.size == 2)
return ThrowException(
children.head.accept(this),
fromDataTypeToTypeInfo(
children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
case _ =>
}
val args = children.map(_.accept(this))
func match {
case sfd: ScalarFunctionDefinition =>
val call = PlannerScalarFunctionCall(
sfd.getScalarFunction,
args)
//it configures underlying state
call.validateInput()
call
case tfd: TableFunctionDefinition =>
PlannerTableFunctionCall(
tfd.toString,
tfd.getTableFunction,
args,
tfd.getResultType)
case afd: AggregateFunctionDefinition =>
AggFunctionCall(
afd.getAggregateFunction,
afd.getResultTypeInfo,
afd.getAccumulatorTypeInfo,
args)
case tafd: TableAggregateFunctionDefinition =>
AggFunctionCall(
tafd.getTableAggregateFunction,
tafd.getResultTypeInfo,
tafd.getAccumulatorTypeInfo,
args)
case fd: FunctionDefinition =>
fd match {
case IN =>
assert(args.size > 1)
In(args.head, args.drop(1))
case DISTINCT =>
assert(args.size == 1)
DistinctAgg(args.head)
case AVG =>
assert(args.size == 1)
Avg(args.head)
case COUNT =>
assert(args.size == 1)
Count(args.head)
case MAX =>
assert(args.size == 1)
Max(args.head)
case MIN =>
assert(args.size == 1)
Min(args.head)
case SUM =>
assert(args.size == 1)
Sum(args.head)
case SUM0 =>
assert(args.size == 1)
Sum0(args.head)
case STDDEV_POP =>
assert(args.size == 1)
StddevPop(args.head)
case STDDEV_SAMP =>
assert(args.size == 1)
StddevSamp(args.head)
case VAR_POP =>
assert(args.size == 1)
VarPop(args.head)
case VAR_SAMP =>
assert(args.size == 1)
VarSamp(args.head)
case COLLECT =>
assert(args.size == 1)
Collect(args.head)
case EXTRACT =>
assert(args.size == 2)
Extract(args.head, args.last)
case CURRENT_DATE =>
assert(args.isEmpty)
CurrentDate()
case CURRENT_TIME =>
assert(args.isEmpty)
CurrentTime()
case CURRENT_TIMESTAMP =>
assert(args.isEmpty)
CurrentTimestamp()
case LOCAL_TIME =>
assert(args.isEmpty)
LocalTime()
case LOCAL_TIMESTAMP =>
assert(args.isEmpty)
LocalTimestamp()
case TEMPORAL_OVERLAPS =>
assert(args.size == 4)
TemporalOverlaps(
args.head,
args(1),
args(2),
args.last)
case DATE_FORMAT =>
assert(args.size == 2)
DateFormat(args.head, args.last)
case TIMESTAMP_DIFF =>
assert(args.size == 3)
TimestampDiff(args.head, args(1), args.last)
case AT =>
assert(args.size == 2)
ItemAt(args.head, args.last)
case CARDINALITY =>
assert(args.size == 1)
Cardinality(args.head)
case ARRAY_ELEMENT =>
assert(args.size == 1)
ArrayElement(args.head)
case ORDER_ASC =>
assert(args.size == 1)
Asc(args.head)
case ORDER_DESC =>
assert(args.size == 1)
Desc(args.head)
case OVER =>
assert(args.size >= 4)
OverCall(
args.head,
args.slice(4, args.size),
args(1),
args(2),
args(3)
)
case UNBOUNDED_RANGE =>
assert(args.isEmpty)
UnboundedRange()
case UNBOUNDED_ROW =>
assert(args.isEmpty)
UnboundedRow()
case CURRENT_RANGE =>
assert(args.isEmpty)
CurrentRange()
case CURRENT_ROW =>
assert(args.isEmpty)
CurrentRow()
case STREAM_RECORD_TIMESTAMP =>
assert(args.isEmpty)
StreamRecordTimestamp()
case _ =>
unknownFunctionHandler()
}
}
}