flink-python/pyflink/datastream/functions.py (86 lines of code) (raw):

################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import abc from typing import Union from py4j.java_gateway import JavaObject from pyflink.java_gateway import get_gateway class Function(abc.ABC): """ The base class for all user-defined functions. """ pass class MapFunction(Function): """ Base class for Map functions. Map functions take elements and transform them, element wise. A Map function always produces a single result element for each input element. Typical applications are parsing elements, converting data types, or projecting out fields. Operations that produce multiple result elements from a single input element can be implemented using the FlatMapFunction. The basic syntax for using a MapFunction is as follows: :: >>> ds = ... >>> new_ds = ds.map(MyMapFunction()) """ @abc.abstractmethod def map(self, value): """ The mapping method. Takes an element from the input data and transforms it into exactly one element. :param value: The input value. :return: The transformed value. """ pass class CoMapFunction(Function): """ A CoMapFunction implements a map() transformation over two connected streams. The same instance of the transformation function is used to transform both of the connected streams. That way, the stream transformations can share state. The basic syntax for using a CoMapFunction is as follows: :: >>> ds1 = ... >>> ds2 = ... >>> new_ds = ds1.connect(ds2).map(MyCoMapFunction()) """ @abc.abstractmethod def map1(self, value): """ This method is called for each element in the first of the connected streams. :param value: The stream element :return: The resulting element """ pass @abc.abstractmethod def map2(self, value): """ This method is called for each element in the second of the connected streams. :param value: The stream element :return: The resulting element """ pass class FlatMapFunction(Function): """ Base class for flatMap functions. FLatMap functions take elements and transform them, into zero, one, or more elements. Typical applications can be splitting elements, or unesting lists and arrays. Operations that produce multiple strictly one result element per input element can also use the MapFunction. The basic syntax for using a MapFUnction is as follows: :: >>> ds = ... >>> new_ds = ds.flat_map(MyFlatMapFunction()) """ @abc.abstractmethod def flat_map(self, value): """ The core mthod of the FlatMapFunction. Takes an element from the input data and transforms it into zero, one, or more elements. A basic implementation of flat map is as follows: :: >>> class MyFlatMapFunction(FlatMapFunction): >>> def flat_map(self, value): >>> for i in range(value): >>> yield i :param value: The input value. :return: A genertaor """ pass class CoFlatMapFunction(Function): """ A CoFlatMapFunction implements a flat-map transformation over two connected streams. The same instance of the transformation function is used to transform both of the connected streams. That way, the stream transformations can share state. An example for the use of connected streams would be to apply rules that change over time onto elements of a stream. One of the connected streams has the rules, the other stream the elements to apply the rules to. The operation on the connected stream maintains the current set of rules in the state. It may receive either a rule update (from the first stream) and update the state, or a data element (from the second stream) and apply the rules in the state to the element. The result of applying the rules would be emitted. The basic syntax for using a CoFlatMapFunction is as follows: :: >>> ds1 = ... >>> ds2 = ... >>> class MyCoFlatMapFunction(CoFlatMapFunction): >>> def flat_map1(self, value): >>> for i in range(value): >>> yield i >>> def flat_map2(self, value): >>> for i in range(value): >>> yield i >>> new_ds = ds1.connect(ds2).flat_map(MyCoFlatMapFunction()) """ @abc.abstractmethod def flat_map1(self, value): """ This method is called for each element in the first of the connected streams. :param value: The input value. :return: A genertaor """ pass @abc.abstractmethod def flat_map2(self, value): """ This method is called for each element in the second of the connected streams. :param value: The input value. :return: A genertaor """ pass class ReduceFunction(Function): """ Base interface for Reduce functions. Reduce functions combine groups of elements to a single value, by taking always two elements and combining them into one. Reduce functions may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced individually. The basic syntax for using a ReduceFunction is as follows: :: >>> ds = ... >>> new_ds = ds.key_by(lambda x: x[1]).reduce(MyReduceFunction()) """ @abc.abstractmethod def reduce(self, value1, value2): """ The core method of ReduceFunction, combining two values into one value of the same type. The reduce function is consecutively applied to all values of a group until only a single value remains. :param value1: The first value to combine. :param value2: The second value to combine. :return: The combined value of both input values. """ pass class KeySelector(Function): """ The KeySelector allows to use deterministic objects for operations such as reduce, reduceGroup, join coGroup, etc. If invoked multiple times on the same object, the returned key must be the same. The extractor takes an object an returns the deterministic key for that object. """ @abc.abstractmethod def get_key(self, value): """ User-defined function that deterministically extracts the key from an object. :param value: The object to get the key from. :return: The extracted key. """ pass class FilterFunction(Function): """ A filter function is a predicate applied individually to each record. The predicate decides whether to keep the element, or to discard it. The basic syntax for using a FilterFunction is as follows: : >>> ds = ... >>> result = ds.filter(MyFilterFunction()) Note that the system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption can lead to incorrect results. """ @abc.abstractmethod def filter(self, value): """ The filter function that evaluates the predicate. :param value: The value to be filtered. :return: True for values that should be retained, false for values to be filtered out. """ pass class FunctionWrapper(object): """ A basic wrapper class for user defined function. """ def __init__(self, func): self._func = func class MapFunctionWrapper(FunctionWrapper): """ A wrapper class for MapFunction. It's used for wrapping up user defined function in a MapFunction when user does not implement a MapFunction but directly pass a function object or a lambda function to map() function. """ def __init__(self, func): """ The constructor of MapFunctionWrapper. :param func: user defined function object. """ super(MapFunctionWrapper, self).__init__(func) def map(self, value): """ A delegated map function to invoke user defined function. :param value: The input value. :return: the return value of user defined map function. """ return self._func(value) class FlatMapFunctionWrapper(FunctionWrapper): """ A wrapper class for FlatMapFunction. It's used for wrapping up user defined function in a FlatMapFunction when user does not implement a FlatMapFunction but directly pass a function object or a lambda function to flat_map() function. """ def __init__(self, func): """ The constructor of MapFunctionWrapper. :param func: user defined function object. """ super(FlatMapFunctionWrapper, self).__init__(func) def flat_map(self, value): """ A delegated flat_map function to invoke user defined function. :param value: The input value. :return: the return value of user defined flat_map function. """ return self._func(value) class FilterFunctionWrapper(FunctionWrapper): """ A wrapper class for FilterFunction. It's used for wrapping up user defined function in a FilterFunction when user does not implement a FilterFunction but directly pass a function object or a lambda function to filter() function. """ def __init__(self, func): super(FilterFunctionWrapper, self).__init__(func) def filter(self, value): return self._func(value) class ReduceFunctionWrapper(FunctionWrapper): """ A wrapper class for ReduceFunction. It's used for wrapping up user defined function in a ReduceFunction when user does not implement a ReduceFunction but directly pass a function object or a lambda function to reduce() function. """ def __init__(self, func): """ The constructor of ReduceFunctionWrapper. :param func: user defined function object. """ super(ReduceFunctionWrapper, self).__init__(func) def reduce(self, value1, value2): """ A delegated reduce function to invoke user defined function. :param value1: The first value to combine. :param value2: The second value to combine. :return: The combined value of both input values. """ return self._func(value1, value2) class KeySelectorFunctionWrapper(FunctionWrapper): """ A wrapper class for KeySelector. It's used for wrapping up user defined function in a KeySelector when user does not implement a KeySelector but directly pass a function object or a lambda function to key_by() function. """ def __init__(self, func): """ The constructor of MapFunctionWrapper. :param func: user defined function object. """ super(KeySelectorFunctionWrapper, self).__init__(func) def get_key(self, value): """ A delegated get_key function to invoke user defined function. :param value: The input value. :return: the return value of user defined get_key function. """ return self._func(value) def _get_python_env(): """ An util function to get a python user defined function execution environment. """ gateway = get_gateway() exec_type = gateway.jvm.org.apache.flink.table.functions.python.PythonEnv.ExecType.PROCESS return gateway.jvm.org.apache.flink.table.functions.python.PythonEnv(exec_type) class JavaFunctionWrapper(object): """ A wrapper class that maintains a Function implemented in Java. """ def __init__(self, j_function: Union[str, JavaObject]): if isinstance(j_function, str): j_func_class = get_gateway().jvm.__getattr__(j_function) j_function = j_func_class() self._j_function = j_function def get_java_function(self): return self._j_function class SourceFunction(JavaFunctionWrapper): """ Base class for all stream data source in Flink. """ def __init__(self, source_func: Union[str, JavaObject]): """ Constructor of SinkFunction. :param source_func: The java SourceFunction object. """ super(SourceFunction, self).__init__(source_func) class SinkFunction(JavaFunctionWrapper): """ The base class for SinkFunctions. """ def __init__(self, sink_func: Union[str, JavaObject]): """ Constructor of SinkFunction. :param sink_func: The java SinkFunction object or the full name of the SinkFunction class. """ super(SinkFunction, self).__init__(sink_func)