in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala [50:752]
private def translateCall(
func: FunctionDefinition,
children: Seq[Expression])
: PlannerExpression = {
// special case: requires individual handling of child expressions
func match {
case CAST =>
assert(children.size == 2)
return Cast(
children.head.accept(this),
fromDataTypeToLegacyInfo(
children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
case WINDOW_START =>
assert(children.size == 1)
val windowReference = translateWindowReference(children.head)
return WindowStart(windowReference)
case WINDOW_END =>
assert(children.size == 1)
val windowReference = translateWindowReference(children.head)
return WindowEnd(windowReference)
case PROCTIME =>
assert(children.size == 1)
val windowReference = translateWindowReference(children.head)
return ProctimeAttribute(windowReference)
case ROWTIME =>
assert(children.size == 1)
val windowReference = translateWindowReference(children.head)
return RowtimeAttribute(windowReference)
case _ =>
}
val args = children.map(_.accept(this))
func match {
// explicit legacy
case sfd: ScalarFunctionDefinition =>
val call = PlannerScalarFunctionCall(
sfd.getScalarFunction,
args)
//it configures underlying state
call.validateInput()
call
// explicit legacy
case tfd: TableFunctionDefinition =>
PlannerTableFunctionCall(
tfd.toString,
tfd.getTableFunction,
args,
tfd.getResultType)
// explicit legacy
case afd: AggregateFunctionDefinition =>
AggFunctionCall(
afd.getAggregateFunction,
afd.getResultTypeInfo,
afd.getAccumulatorTypeInfo,
args)
// explicit legacy
case tafd: TableAggregateFunctionDefinition =>
AggFunctionCall(
tafd.getTableAggregateFunction,
tafd.getResultTypeInfo,
tafd.getAccumulatorTypeInfo,
args)
// best-effort support for new type inference
case sf: ScalarFunction =>
LOG.warn(
"The new type inference for functions is only supported in the Blink planner. " +
"Falling back to legacy type inference for function '{}'.", sf.getClass)
val call = PlannerScalarFunctionCall(
sf,
args)
// it configures underlying state
call.validateInput()
call
// best-effort support for new type inference
case tf: TableFunction[_] =>
LOG.warn(
"The new type inference for functions is only supported in the Blink planner. " +
"Falling back to legacy type inference for function '{}'.", tf.getClass)
PlannerTableFunctionCall(
tf.toString,
tf,
args,
UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tf))
// best-effort support for new type inference
case af: AggregateFunction[_, _] =>
LOG.warn(
"The new type inference for functions is only supported in the Blink planner. " +
"Falling back to legacy type inference for function '{}'.", af.getClass)
AggFunctionCall(
af,
UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(af),
UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(af),
args)
// best-effort support for new type inference
case taf: TableAggregateFunction[_, _] =>
LOG.warn(
"The new type inference for functions is only supported in the Blink planner. " +
"Falling back to legacy type inference for function '{}'.", taf.getClass)
AggFunctionCall(
taf,
UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(taf),
UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(taf),
args)
case _ : UserDefinedFunction =>
throw new ValidationException(
"The new type inference for functions is only supported in the Blink planner.")
case fd: FunctionDefinition =>
fd match {
case AS =>
assert(args.size >= 2)
val name = getValue[String](args(1))
val extraNames = args
.drop(2)
.map(e => getValue[String](e))
Alias(args.head, name, extraNames)
case FLATTEN =>
assert(args.size == 1)
Flattening(args.head)
case GET =>
assert(args.size == 2)
val expr = GetCompositeField(args.head, getValue(args.last))
//it configures underlying state
expr.validateInput()
expr
case AND =>
assert(args.size >= 2)
args.reduceLeft(And)
case OR =>
assert(args.size >= 2)
args.reduceLeft(Or)
case NOT =>
assert(args.size == 1)
Not(args.head)
case EQUALS =>
assert(args.size == 2)
EqualTo(args.head, args.last)
case GREATER_THAN =>
assert(args.size == 2)
GreaterThan(args.head, args.last)
case GREATER_THAN_OR_EQUAL =>
assert(args.size == 2)
GreaterThanOrEqual(args.head, args.last)
case LESS_THAN =>
assert(args.size == 2)
LessThan(args.head, args.last)
case LESS_THAN_OR_EQUAL =>
assert(args.size == 2)
LessThanOrEqual(args.head, args.last)
case NOT_EQUALS =>
assert(args.size == 2)
NotEqualTo(args.head, args.last)
case IN =>
assert(args.size > 1)
In(args.head, args.drop(1))
case IS_NULL =>
assert(args.size == 1)
IsNull(args.head)
case IS_NOT_NULL =>
assert(args.size == 1)
IsNotNull(args.head)
case IS_TRUE =>
assert(args.size == 1)
IsTrue(args.head)
case IS_FALSE =>
assert(args.size == 1)
IsFalse(args.head)
case IS_NOT_TRUE =>
assert(args.size == 1)
IsNotTrue(args.head)
case IS_NOT_FALSE =>
assert(args.size == 1)
IsNotFalse(args.head)
case IF =>
assert(args.size == 3)
If(args.head, args(1), args.last)
case BETWEEN =>
assert(args.size == 3)
Between(args.head, args(1), args.last)
case NOT_BETWEEN =>
assert(args.size == 3)
NotBetween(args.head, args(1), args.last)
case DISTINCT =>
assert(args.size == 1)
DistinctAgg(args.head)
case AVG =>
assert(args.size == 1)
Avg(args.head)
case COUNT =>
assert(args.size == 1)
Count(args.head)
case MAX =>
assert(args.size == 1)
Max(args.head)
case MIN =>
assert(args.size == 1)
Min(args.head)
case SUM =>
assert(args.size == 1)
Sum(args.head)
case SUM0 =>
assert(args.size == 1)
Sum0(args.head)
case STDDEV_POP =>
assert(args.size == 1)
StddevPop(args.head)
case STDDEV_SAMP =>
assert(args.size == 1)
StddevSamp(args.head)
case VAR_POP =>
assert(args.size == 1)
VarPop(args.head)
case VAR_SAMP =>
assert(args.size == 1)
VarSamp(args.head)
case COLLECT =>
assert(args.size == 1)
Collect(args.head)
case CHAR_LENGTH =>
assert(args.size == 1)
CharLength(args.head)
case INIT_CAP =>
assert(args.size == 1)
InitCap(args.head)
case LIKE =>
assert(args.size == 2)
Like(args.head, args.last)
case LOWER =>
assert(args.size == 1)
Lower(args.head)
case LOWERCASE =>
assert(args.size == 1)
Lower(args.head)
case SIMILAR =>
assert(args.size == 2)
Similar(args.head, args.last)
case SUBSTRING =>
assert(args.size == 2 || args.size == 3)
if (args.size == 2) {
new Substring(args.head, args.last)
} else {
Substring(args.head, args(1), args.last)
}
case REPLACE =>
assert(args.size == 3)
Replace(args.head, args(1), args.last)
case TRIM =>
assert(args.size == 4)
val removeLeading = getValue[Boolean](args.head)
val removeTrailing = getValue[Boolean](args(1))
val trimMode = if (removeLeading && removeTrailing) {
PlannerTrimMode.BOTH
} else if (removeLeading) {
PlannerTrimMode.LEADING
} else if (removeTrailing) {
PlannerTrimMode.TRAILING
} else {
throw new TableException("Unsupported trim mode.")
}
Trim(trimMode, args(2), args(3))
case UPPER =>
assert(args.size == 1)
Upper(args.head)
case UPPERCASE =>
assert(args.size == 1)
Upper(args.head)
case POSITION =>
assert(args.size == 2)
Position(args.head, args.last)
case OVERLAY =>
assert(args.size == 3 || args.size == 4)
if (args.size == 3) {
new Overlay(args.head, args(1), args.last)
} else {
Overlay(
args.head,
args(1),
args(2),
args.last)
}
case CONCAT =>
Concat(args)
case CONCAT_WS =>
assert(args.nonEmpty)
ConcatWs(args.head, args.tail)
case LPAD =>
assert(args.size == 3)
Lpad(args.head, args(1), args.last)
case RPAD =>
assert(args.size == 3)
Rpad(args.head, args(1), args.last)
case REGEXP_EXTRACT =>
assert(args.size == 2 || args.size == 3)
if (args.size == 2) {
RegexpExtract(args.head, args.last)
} else {
RegexpExtract(args.head, args(1), args.last)
}
case FROM_BASE64 =>
assert(args.size == 1)
FromBase64(args.head)
case TO_BASE64 =>
assert(args.size == 1)
ToBase64(args.head)
case BuiltInFunctionDefinitions.UUID =>
assert(args.isEmpty)
PlannerUUID()
case LTRIM =>
assert(args.size == 1)
LTrim(args.head)
case RTRIM =>
assert(args.size == 1)
RTrim(args.head)
case REPEAT =>
assert(args.size == 2)
Repeat(args.head, args.last)
case REGEXP_REPLACE =>
assert(args.size == 3)
RegexpReplace(args.head, args(1), args.last)
case PLUS =>
assert(args.size == 2)
Plus(args.head, args.last)
case MINUS =>
assert(args.size == 2)
Minus(args.head, args.last)
case DIVIDE =>
assert(args.size == 2)
Div(args.head, args.last)
case TIMES =>
assert(args.size == 2)
Mul(args.head, args.last)
case ABS =>
assert(args.size == 1)
Abs(args.head)
case CEIL =>
assert(args.size == 1 || args.size == 2)
if (args.size == 1) {
Ceil(args.head)
} else {
TemporalCeil(args.last, args.head)
}
case EXP =>
assert(args.size == 1)
Exp(args.head)
case FLOOR =>
assert(args.size == 1 || args.size == 2)
if (args.size == 1) {
Floor(args.head)
} else {
TemporalFloor(args.last, args.head)
}
case LOG10 =>
assert(args.size == 1)
Log10(args.head)
case LOG2 =>
assert(args.size == 1)
Log2(args.head)
case LN =>
assert(args.size == 1)
Ln(args.head)
case BuiltInFunctionDefinitions.LOG =>
assert(args.size == 1 || args.size == 2)
if (args.size == 1) {
Log(args.head)
} else {
Log(args.head, args.last)
}
case POWER =>
assert(args.size == 2)
Power(args.head, args.last)
case MOD =>
assert(args.size == 2)
Mod(args.head, args.last)
case SQRT =>
assert(args.size == 1)
Sqrt(args.head)
case MINUS_PREFIX =>
assert(args.size == 1)
UnaryMinus(args.head)
case SIN =>
assert(args.size == 1)
Sin(args.head)
case COS =>
assert(args.size == 1)
Cos(args.head)
case SINH =>
assert(args.size == 1)
Sinh(args.head)
case TAN =>
assert(args.size == 1)
Tan(args.head)
case TANH =>
assert(args.size == 1)
Tanh(args.head)
case COT =>
assert(args.size == 1)
Cot(args.head)
case ASIN =>
assert(args.size == 1)
Asin(args.head)
case ACOS =>
assert(args.size == 1)
Acos(args.head)
case ATAN =>
assert(args.size == 1)
Atan(args.head)
case ATAN2 =>
assert(args.size == 2)
Atan2(args.head, args.last)
case COSH =>
assert(args.size == 1)
Cosh(args.head)
case DEGREES =>
assert(args.size == 1)
Degrees(args.head)
case RADIANS =>
assert(args.size == 1)
Radians(args.head)
case SIGN =>
assert(args.size == 1)
Sign(args.head)
case ROUND =>
assert(args.size == 2)
Round(args.head, args.last)
case PI =>
assert(args.isEmpty)
Pi()
case BuiltInFunctionDefinitions.E =>
assert(args.isEmpty)
PlannerE()
case RAND =>
assert(args.isEmpty || args.size == 1)
if (args.isEmpty) {
new Rand()
} else {
Rand(args.head)
}
case RAND_INTEGER =>
assert(args.size == 1 || args.size == 2)
if (args.size == 1) {
new RandInteger(args.head)
} else {
RandInteger(args.head, args.last)
}
case BIN =>
assert(args.size == 1)
Bin(args.head)
case HEX =>
assert(args.size == 1)
Hex(args.head)
case TRUNCATE =>
assert(args.size == 1 || args.size == 2)
if (args.size == 1) {
new Truncate(args.head)
} else {
Truncate(args.head, args.last)
}
case EXTRACT =>
assert(args.size == 2)
Extract(args.head, args.last)
case CURRENT_DATE =>
assert(args.isEmpty)
CurrentDate()
case CURRENT_TIME =>
assert(args.isEmpty)
CurrentTime()
case CURRENT_TIMESTAMP =>
assert(args.isEmpty)
CurrentTimestamp()
case LOCAL_TIME =>
assert(args.isEmpty)
LocalTime()
case LOCAL_TIMESTAMP =>
assert(args.isEmpty)
LocalTimestamp()
case TEMPORAL_OVERLAPS =>
assert(args.size == 4)
TemporalOverlaps(
args.head,
args(1),
args(2),
args.last)
case DATE_FORMAT =>
assert(args.size == 2)
DateFormat(args.head, args.last)
case TIMESTAMP_DIFF =>
assert(args.size == 3)
TimestampDiff(args.head, args(1), args.last)
case AT =>
assert(args.size == 2)
ItemAt(args.head, args.last)
case CARDINALITY =>
assert(args.size == 1)
Cardinality(args.head)
case ARRAY =>
ArrayConstructor(args)
case ARRAY_ELEMENT =>
assert(args.size == 1)
ArrayElement(args.head)
case MAP =>
MapConstructor(args)
case ROW =>
RowConstructor(args)
case ORDER_ASC =>
assert(args.size == 1)
Asc(args.head)
case ORDER_DESC =>
assert(args.size == 1)
Desc(args.head)
case MD5 =>
assert(args.size == 1)
Md5(args.head)
case SHA1 =>
assert(args.size == 1)
Sha1(args.head)
case SHA224 =>
assert(args.size == 1)
Sha224(args.head)
case SHA256 =>
assert(args.size == 1)
Sha256(args.head)
case SHA384 =>
assert(args.size == 1)
Sha384(args.head)
case SHA512 =>
assert(args.size == 1)
Sha512(args.head)
case SHA2 =>
assert(args.size == 2)
Sha2(args.head, args.last)
case OVER =>
assert(args.size >= 4)
OverCall(
args.head,
args.slice(4, args.size),
args(1),
args(2),
args(3)
)
case UNBOUNDED_RANGE =>
assert(args.isEmpty)
UnboundedRange()
case UNBOUNDED_ROW =>
assert(args.isEmpty)
UnboundedRow()
case CURRENT_RANGE =>
assert(args.isEmpty)
CurrentRange()
case CURRENT_ROW =>
assert(args.isEmpty)
CurrentRow()
case STREAM_RECORD_TIMESTAMP =>
assert(args.isEmpty)
StreamRecordTimestamp()
case _ =>
throw new TableException(s"Unsupported function definition: $fd")
}
}
}