flink-python/pyflink/fn_execution/operation_utils.py (100 lines of code) (raw):
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import datetime
import cloudpickle
from typing import Any, Tuple, Dict, List
from pyflink.fn_execution import flink_fn_execution_pb2
from pyflink.serializers import PickleSerializer
from pyflink.table.udf import DelegationTableFunction, DelegatingScalarFunction
SCALAR_FUNCTION_URN = "flink:transform:scalar_function:v1"
TABLE_FUNCTION_URN = "flink:transform:table_function:v1"
DATA_STREAM_STATELESS_FUNCTION_URN = "flink:transform:datastream_stateless_function:v1"
_func_num = 0
_constant_num = 0
def extract_user_defined_function(user_defined_function_proto) -> Tuple[str, Dict, List]:
"""
Extracts user-defined-function from the proto representation of a
:class:`UserDefinedFunction`.
:param user_defined_function_proto: the proto representation of the Python
:class:`UserDefinedFunction`
"""
def _next_func_num():
global _func_num
_func_num = _func_num + 1
return _func_num
variable_dict = {}
user_defined_funcs = []
user_defined_func = cloudpickle.loads(user_defined_function_proto.payload)
func_name = 'f%s' % _next_func_num()
if isinstance(user_defined_func, DelegatingScalarFunction) \
or isinstance(user_defined_func, DelegationTableFunction):
variable_dict[func_name] = user_defined_func.func
else:
variable_dict[func_name] = user_defined_func.eval
user_defined_funcs.append(user_defined_func)
func_args, input_variable_dict, input_funcs = _extract_input(user_defined_function_proto.inputs)
variable_dict.update(input_variable_dict)
user_defined_funcs.extend(input_funcs)
return "%s(%s)" % (func_name, func_args), variable_dict, user_defined_funcs
def _extract_input(args) -> Tuple[str, Dict, List]:
local_variable_dict = {}
local_funcs = []
args_str = []
for arg in args:
if arg.HasField("udf"):
# for chaining Python UDF input: the input argument is a Python ScalarFunction
udf_arg, udf_variable_dict, udf_funcs = extract_user_defined_function(arg.udf)
args_str.append(udf_arg)
local_variable_dict.update(udf_variable_dict)
local_funcs.extend(udf_funcs)
elif arg.HasField("inputOffset"):
# the input argument is a column of the input row
args_str.append("value[%s]" % arg.inputOffset)
else:
# the input argument is a constant value
constant_value_name, parsed_constant_value = \
_parse_constant_value(arg.inputConstant)
args_str.append(constant_value_name)
local_variable_dict[constant_value_name] = parsed_constant_value
return ",".join(args_str), local_variable_dict, local_funcs
def extract_data_stream_stateless_funcs(udfs):
"""
Extracts user-defined-function from the proto representation of a
:class:`Function`.
:param udfs: the proto representation of the Python
:class:`Function`
"""
func_type = udfs[0].functionType
udf = flink_fn_execution_pb2.UserDefinedDataStreamFunction
func = None
if func_type == udf.MAP:
func = cloudpickle.loads(udfs[0].payload).map
elif func_type == udf.FLAT_MAP:
func = cloudpickle.loads(udfs[0].payload).flat_map
elif func_type == udf.REDUCE:
reduce_func = cloudpickle.loads(udfs[0].payload).reduce
def wrap_func(value):
return reduce_func(value[0], value[1])
func = wrap_func
elif func_type == udf.CO_MAP:
co_map_func = cloudpickle.loads(udfs[0].payload)
def wrap_func(value):
return co_map_func.map1(value[1]) if value[0] else co_map_func.map2(value[2])
func = wrap_func
elif func_type == udf.CO_FLAT_MAP:
co_flat_map_func = cloudpickle.loads(udfs[0].payload)
def wrap_func(value):
return co_flat_map_func.flat_map1(
value[1]) if value[0] else co_flat_map_func.flat_map2(
value[2])
func = wrap_func
return func
def _parse_constant_value(constant_value) -> Tuple[str, Any]:
j_type = constant_value[0]
serializer = PickleSerializer()
pickled_data = serializer.loads(constant_value[1:])
# the type set contains
# TINYINT,SMALLINT,INTEGER,BIGINT,FLOAT,DOUBLE,DECIMAL,CHAR,VARCHAR,NULL,BOOLEAN
# the pickled_data doesn't need to transfer to anther python object
if j_type == 0:
parsed_constant_value = pickled_data
# the type is DATE
elif j_type == 1:
parsed_constant_value = \
datetime.date(year=1970, month=1, day=1) + datetime.timedelta(days=pickled_data)
# the type is TIME
elif j_type == 2:
seconds, milliseconds = divmod(pickled_data, 1000)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
parsed_constant_value = datetime.time(hours, minutes, seconds, milliseconds * 1000)
# the type is TIMESTAMP
elif j_type == 3:
parsed_constant_value = \
datetime.datetime(year=1970, month=1, day=1, hour=0, minute=0, second=0) \
+ datetime.timedelta(milliseconds=pickled_data)
else:
raise Exception("Unknown type %s, should never happen" % str(j_type))
def _next_constant_num():
global _constant_num
_constant_num = _constant_num + 1
return _constant_num
constant_value_name = 'c%s' % _next_constant_num()
return constant_value_name, parsed_constant_value