From 448fc32c56ca05fbb952c88b953f06c8c7757f7f Mon Sep 17 00:00:00 2001 From: Jackeyzhe Date: Wed, 8 Apr 2026 11:13:38 +0800 Subject: [PATCH] support timestamp in SortPartitionOperator --- .../sortpartition/SortPartitionOperator.java | 38 ++++++++++--------- .../SortPartitionOperatorTest.java | 25 ++++++------ 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java index a361c7fd48420..482792edb485e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.AlgorithmOptions; import org.apache.flink.configuration.Configuration; @@ -44,7 +45,6 @@ import org.apache.flink.streaming.api.operators.OperatorAttributes; import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.MutableObjectIterator; @@ -86,10 +86,10 @@ public class SortPartitionOperator extends AbstractStreamOperator private final int positionSortField; /** The sorter to sort record if the record is not sorted by {@link KeySelector}. */ - private PushSorter recordSorter = null; + private PushSorter> recordSorter = null; /** The sorter to sort record if the record is sorted by {@link KeySelector}. */ - private PushSorter> recordSorterForKeySelector = null; + private PushSorter> recordSorterForKeySelector = null; public SortPartitionOperator( TypeInformation inputType, int positionSortField, Order sortOrder) { @@ -131,14 +131,15 @@ protected void setup( super.setup(containingTask, config, output); ExecutionConfig executionConfig = containingTask.getEnvironment().getExecutionConfig(); if (sortFieldSelector != null) { - TypeInformation> sortTypeInfo = + TypeInformation> sortTypeInfo = Types.TUPLE( TypeExtractor.getKeySelectorTypes(sortFieldSelector, inputType), - inputType); + inputType, + Types.LONG); recordSorterForKeySelector = getSorter( sortTypeInfo.createSerializer(executionConfig.getSerializerConfig()), - ((CompositeType>) sortTypeInfo) + ((CompositeType>) sortTypeInfo) .createComparator( getSortFieldIndex(), getSortOrderIndicator(), @@ -146,10 +147,11 @@ protected void setup( executionConfig), containingTask); } else { + TypeInformation> sortTypeInfo = Types.TUPLE(inputType, Types.LONG); recordSorter = getSorter( - inputType.createSerializer(executionConfig.getSerializerConfig()), - ((CompositeType) inputType) + sortTypeInfo.createSerializer(executionConfig.getSerializerConfig()), + ((CompositeType>) sortTypeInfo) .createComparator( getSortFieldIndex(), getSortOrderIndicator(), @@ -163,31 +165,33 @@ protected void setup( public void processElement(StreamRecord element) throws Exception { if (sortFieldSelector != null) { recordSorterForKeySelector.writeRecord( - Tuple2.of(sortFieldSelector.getKey(element.getValue()), element.getValue())); + Tuple3.of( + sortFieldSelector.getKey(element.getValue()), + element.getValue(), + element.getTimestamp())); } else { - recordSorter.writeRecord(element.getValue()); + recordSorter.writeRecord(Tuple2.of(element.getValue(), element.getTimestamp())); } } @Override public void endInput() throws Exception { - TimestampedCollector outputCollector = new TimestampedCollector<>(output); if (sortFieldSelector != null) { recordSorterForKeySelector.finishReading(); - MutableObjectIterator> dataIterator = + MutableObjectIterator> dataIterator = recordSorterForKeySelector.getIterator(); - Tuple2 record = dataIterator.next(); + Tuple3 record = dataIterator.next(); while (record != null) { - outputCollector.collect(record.f1); + output.collect(new StreamRecord<>(record.f1, record.f2)); record = dataIterator.next(); } recordSorterForKeySelector.close(); } else { recordSorter.finishReading(); - MutableObjectIterator dataIterator = recordSorter.getIterator(); - INPUT record = dataIterator.next(); + MutableObjectIterator> dataIterator = recordSorter.getIterator(); + Tuple2 record = dataIterator.next(); while (record != null) { - outputCollector.collect(record); + output.collect(new StreamRecord<>(record.f0, record.f1)); record = dataIterator.next(); } recordSorter.close(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java index e0d352738720d..030367b004f65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java @@ -47,12 +47,13 @@ void testSortPartition() throws Exception { testHarness1 = new OneInputStreamOperatorTestHarness<>(operator1); Queue expectedOutput1 = new LinkedList<>(); testHarness1.setup(); - testHarness1.processElement(new StreamRecord<>(Tuple2.of(3, "3"))); - testHarness1.processElement(new StreamRecord<>(Tuple2.of(1, "1"))); + long timestamp = 1L; + testHarness1.processElement(new StreamRecord<>(Tuple2.of(3, "3"), timestamp)); + testHarness1.processElement(new StreamRecord<>(Tuple2.of(1, "1"), timestamp)); testHarness1.endInput(); testHarness1.close(); - expectedOutput1.add(new StreamRecord<>(Tuple2.of(1, "1"))); - expectedOutput1.add(new StreamRecord<>(Tuple2.of(3, "3"))); + expectedOutput1.add(new StreamRecord<>(Tuple2.of(1, "1"), timestamp)); + expectedOutput1.add(new StreamRecord<>(Tuple2.of(3, "3"), timestamp)); TestHarnessUtil.assertOutputEquals( "The sort partition result is not correct.", expectedOutput1, @@ -63,12 +64,12 @@ void testSortPartition() throws Exception { new OneInputStreamOperatorTestHarness<>(operator2); Queue expectedOutput2 = new LinkedList<>(); testHarness2.setup(); - testHarness2.processElement(new StreamRecord<>(new TestPojo("3", 3))); - testHarness2.processElement(new StreamRecord<>(new TestPojo("1", 1))); + testHarness2.processElement(new StreamRecord<>(new TestPojo("3", 3), timestamp)); + testHarness2.processElement(new StreamRecord<>(new TestPojo("1", 1), timestamp)); testHarness2.endInput(); testHarness2.close(); - expectedOutput2.add(new StreamRecord<>(new TestPojo("1", 1))); - expectedOutput2.add(new StreamRecord<>(new TestPojo("3", 3))); + expectedOutput2.add(new StreamRecord<>(new TestPojo("1", 1), timestamp)); + expectedOutput2.add(new StreamRecord<>(new TestPojo("3", 3), timestamp)); TestHarnessUtil.assertOutputEquals( "The sort partition result is not correct.", expectedOutput2, @@ -79,12 +80,12 @@ void testSortPartition() throws Exception { new OneInputStreamOperatorTestHarness<>(operator3); Queue expectedOutput3 = new LinkedList<>(); testHarness3.setup(); - testHarness3.processElement(new StreamRecord<>(new TestPojo("3", 3))); - testHarness3.processElement(new StreamRecord<>(new TestPojo("1", 1))); + testHarness3.processElement(new StreamRecord<>(new TestPojo("3", 3), timestamp)); + testHarness3.processElement(new StreamRecord<>(new TestPojo("1", 1), timestamp)); testHarness3.endInput(); testHarness3.close(); - expectedOutput3.add(new StreamRecord<>(new TestPojo("1", 1))); - expectedOutput3.add(new StreamRecord<>(new TestPojo("3", 3))); + expectedOutput3.add(new StreamRecord<>(new TestPojo("1", 1), timestamp)); + expectedOutput3.add(new StreamRecord<>(new TestPojo("3", 3), timestamp)); TestHarnessUtil.assertOutputEquals( "The sort partition result is not correct.", expectedOutput3,