Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions flink-python/pyflink/datastream/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from enum import Enum

from py4j.java_gateway import JavaObject
from typing import Union, Any, Generic, TypeVar, Iterable, List, Callable, Optional
from typing import Union, Any, Generic, TypeVar, Iterable, Iterator, List, Callable, Optional
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • is there a migration consideration with these changes?
  • I am curious whether that can be tested maybe by having tests implementing the generic types.
  • in terms of the PR title - the title should be either a square bracketed JIRA number or start with [hotfix].

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @davidradl, thanks for the feedback!

  • These changes are backward compatible since Python generics aren't enforced at runtime, so existing subclasses won't be affected.
  • I'll add tests that implement the generic types as you suggested.
  • Will also create a JIRA ticket and fix the PR title.

Updates coming soon!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've requested a JIRA account and will update the ticket number once it's approved.


from pyflink.datastream.state import ValueState, ValueStateDescriptor, ListStateDescriptor, \
ListState, MapStateDescriptor, MapState, ReducingStateDescriptor, ReducingState, \
Expand Down Expand Up @@ -203,14 +203,14 @@ class Function(ABC):
"""
The base class for all user-defined functions.
"""
def open(self, runtime_context: RuntimeContext):
def open(self, runtime_context: RuntimeContext) -> None:
pass

def close(self):
def close(self) -> None:
pass


class MapFunction(Function):
class MapFunction(Function, Generic[IN, OUT]):
"""
Base class for Map functions. Map functions take elements and transform them, element wise. A
Map function always produces a single result element for each input element. Typical
Expand All @@ -225,7 +225,7 @@ class MapFunction(Function):
"""

@abstractmethod
def map(self, value):
def map(self, value: IN) -> OUT:
"""
The mapping method. Takes an element from the input data and transforms it into exactly one
element.
Expand All @@ -236,7 +236,7 @@ def map(self, value):
pass


class CoMapFunction(Function):
class CoMapFunction(Function, Generic[IN, OUT]):
"""
A CoMapFunction implements a map() transformation over two connected streams.

Expand All @@ -252,7 +252,7 @@ class CoMapFunction(Function):
"""

@abstractmethod
def map1(self, value):
def map1(self, value: IN) -> OUT:
"""
This method is called for each element in the first of the connected streams.

Expand All @@ -262,7 +262,7 @@ def map1(self, value):
pass

@abstractmethod
def map2(self, value):
def map2(self, value: IN) -> OUT:
"""
This method is called for each element in the second of the connected streams.

Expand All @@ -278,25 +278,25 @@ class FlatMapFunction(Function):
one, or more elements. Typical applications can be splitting elements, or unnesting lists and
arrays. Operations that produce multiple strictly one result element per input element can also
use the MapFunction.
The basic syntax for using a MapFUnction is as follows:
The basic syntax for using a MapFunction is as follows:

::
>>> ds = ...
>>> new_ds = ds.flat_map(MyFlatMapFunction())
"""

@abstractmethod
def flat_map(self, value):
def flat_map(self, value: IN) -> Iterator[OUT]:
"""
The core mthod of the FlatMapFunction. Takes an element from the input data and transforms
The core method of the FlatMapFunction. Takes an element from the input data and transforms
it into zero, one, or more elements.
A basic implementation of flat map is as follows:

::
>>> class MyFlatMapFunction(FlatMapFunction):
>>> def flat_map(self, value):
>>> for i in range(value):
>>> yield i
... def flat_map(self, value: IN) -> Iterator[OUT]:
... for i in range(value):
... yield i

:param value: The input value.
:return: A generator
Expand Down Expand Up @@ -336,7 +336,7 @@ class CoFlatMapFunction(Function):
"""

@abstractmethod
def flat_map1(self, value):
def flat_map1(self, value: IN) -> Iterator[OUT]:
"""
This method is called for each element in the first of the connected streams.

Expand All @@ -346,7 +346,7 @@ def flat_map1(self, value):
pass

@abstractmethod
def flat_map2(self, value):
def flat_map2(self, value: IN) -> Iterator[OUT]:
"""
This method is called for each element in the second of the connected streams.

Expand All @@ -371,7 +371,7 @@ class ReduceFunction(Function):
"""

@abstractmethod
def reduce(self, value1, value2):
def reduce(self, value1: IN, value2: IN) -> IN:
"""
The core method of ReduceFunction, combining two values into one value of the same type.
The reduce function is consecutively applied to all values of a group until only a single
Expand Down Expand Up @@ -461,15 +461,15 @@ def merge(self, acc_a, acc_b):
pass


class KeySelector(Function):
class KeySelector(Function, Generic[IN, KEY]):
"""
The KeySelector allows to use deterministic objects for operations such as reduce, reduceGroup,
join coGroup, etc. If invoked multiple times on the same object, the returned key must be the
same. The extractor takes an object an returns the deterministic key for that object.
"""

@abstractmethod
def get_key(self, value):
def get_key(self, value: IN) -> KEY:
"""
User-defined function that deterministically extracts the key from an object.

Expand Down Expand Up @@ -505,7 +505,7 @@ class FilterFunction(Function):
"""

@abstractmethod
def filter(self, value):
def filter(self, value: IN) -> bool:
"""
The filter function that evaluates the predicate.

Expand Down Expand Up @@ -655,7 +655,7 @@ def timestamp(self) -> int:
pass

@abstractmethod
def process_element(self, value, ctx: 'ProcessFunction.Context'):
def process_element(self, value: IN, ctx: 'ProcessFunction.Context') -> Iterator[OUT]:
"""
Process one element from the input stream.

Expand Down Expand Up @@ -716,7 +716,7 @@ def time_domain(self) -> TimeDomain:
pass

@abstractmethod
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
def process_element(self, value: IN, ctx: 'KeyedProcessFunction.Context') -> Iterator[OUT]:
"""
Process one element from the input stream.

Expand Down Expand Up @@ -780,7 +780,7 @@ def timestamp(self) -> int:
pass

@abstractmethod
def process_element1(self, value, ctx: 'CoProcessFunction.Context'):
def process_element1(self, value: IN, ctx: 'CoProcessFunction.Context') -> Iterator[OUT]:
"""
This method is called for each element in the first of the connected streams.

Expand All @@ -795,7 +795,7 @@ def process_element1(self, value, ctx: 'CoProcessFunction.Context'):
pass

@abstractmethod
def process_element2(self, value, ctx: 'CoProcessFunction.Context'):
def process_element2(self, value: IN, ctx: 'CoProcessFunction.Context') -> Iterator[OUT]:
"""
This method is called for each element in the second of the connected streams.

Expand Down