diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java
index a361c7fd48420..482792edb485e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperator.java
@@ -30,6 +30,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
@@ -44,7 +45,6 @@
import org.apache.flink.streaming.api.operators.OperatorAttributes;
import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.MutableObjectIterator;
@@ -86,10 +86,10 @@ public class SortPartitionOperator extends AbstractStreamOperator
private final int positionSortField;
/** The sorter to sort record if the record is not sorted by {@link KeySelector}. */
- private PushSorter recordSorter = null;
+ private PushSorter> recordSorter = null;
/** The sorter to sort record if the record is sorted by {@link KeySelector}. */
- private PushSorter> recordSorterForKeySelector = null;
+ private PushSorter> recordSorterForKeySelector = null;
public SortPartitionOperator(
TypeInformation inputType, int positionSortField, Order sortOrder) {
@@ -131,14 +131,15 @@ protected void setup(
super.setup(containingTask, config, output);
ExecutionConfig executionConfig = containingTask.getEnvironment().getExecutionConfig();
if (sortFieldSelector != null) {
- TypeInformation> sortTypeInfo =
+ TypeInformation> sortTypeInfo =
Types.TUPLE(
TypeExtractor.getKeySelectorTypes(sortFieldSelector, inputType),
- inputType);
+ inputType,
+ Types.LONG);
recordSorterForKeySelector =
getSorter(
sortTypeInfo.createSerializer(executionConfig.getSerializerConfig()),
- ((CompositeType>) sortTypeInfo)
+ ((CompositeType>) sortTypeInfo)
.createComparator(
getSortFieldIndex(),
getSortOrderIndicator(),
@@ -146,10 +147,11 @@ protected void setup(
executionConfig),
containingTask);
} else {
+ TypeInformation> sortTypeInfo = Types.TUPLE(inputType, Types.LONG);
recordSorter =
getSorter(
- inputType.createSerializer(executionConfig.getSerializerConfig()),
- ((CompositeType) inputType)
+ sortTypeInfo.createSerializer(executionConfig.getSerializerConfig()),
+ ((CompositeType>) sortTypeInfo)
.createComparator(
getSortFieldIndex(),
getSortOrderIndicator(),
@@ -163,31 +165,33 @@ protected void setup(
public void processElement(StreamRecord element) throws Exception {
if (sortFieldSelector != null) {
recordSorterForKeySelector.writeRecord(
- Tuple2.of(sortFieldSelector.getKey(element.getValue()), element.getValue()));
+ Tuple3.of(
+ sortFieldSelector.getKey(element.getValue()),
+ element.getValue(),
+ element.getTimestamp()));
} else {
- recordSorter.writeRecord(element.getValue());
+ recordSorter.writeRecord(Tuple2.of(element.getValue(), element.getTimestamp()));
}
}
@Override
public void endInput() throws Exception {
- TimestampedCollector outputCollector = new TimestampedCollector<>(output);
if (sortFieldSelector != null) {
recordSorterForKeySelector.finishReading();
- MutableObjectIterator> dataIterator =
+ MutableObjectIterator> dataIterator =
recordSorterForKeySelector.getIterator();
- Tuple2, INPUT> record = dataIterator.next();
+ Tuple3, INPUT, Long> record = dataIterator.next();
while (record != null) {
- outputCollector.collect(record.f1);
+ output.collect(new StreamRecord<>(record.f1, record.f2));
record = dataIterator.next();
}
recordSorterForKeySelector.close();
} else {
recordSorter.finishReading();
- MutableObjectIterator dataIterator = recordSorter.getIterator();
- INPUT record = dataIterator.next();
+ MutableObjectIterator> dataIterator = recordSorter.getIterator();
+ Tuple2 record = dataIterator.next();
while (record != null) {
- outputCollector.collect(record);
+ output.collect(new StreamRecord<>(record.f0, record.f1));
record = dataIterator.next();
}
recordSorter.close();
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java
index e0d352738720d..030367b004f65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sortpartition/SortPartitionOperatorTest.java
@@ -47,12 +47,13 @@ void testSortPartition() throws Exception {
testHarness1 = new OneInputStreamOperatorTestHarness<>(operator1);
Queue