diff --git a/flink-table/flink-sql-parser/pom.xml b/flink-table/flink-sql-parser/pom.xml
index 366d6d8ea1f2f..0264583bfecc7 100644
--- a/flink-table/flink-sql-parser/pom.xml
+++ b/flink-table/flink-sql-parser/pom.xml
@@ -68,13 +68,13 @@ under the License.
${calcite.version}
+
+
+
+
+
+
+
+
+
+
+
@@ -1338,6 +1367,38 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalProject(d=[$0])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r]])
+]]>
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
index 7852555acf870..229bac52ea0e8 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
@@ -529,14 +529,14 @@ FROM inventory]]>
(COUNT($3) OVER (PARTITION BY $1), 0), $SUM0($3) OVER (PARTITION BY $1), null:BIGINT)], name=[$1])
+LogicalProject(id=[$0], amount=[$2], EXPR$2=[CASE(>(COUNT($3) OVER (PARTITION BY $1), 0), SUM($3) OVER (PARTITION BY $1), null:BIGINT)], name=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, inventory]])
]]>
(w0$o0, 0), w0$o1, null:BIGINT) AS EXPR$2, name])
-+- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0, $SUM0(price) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1])
++- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0, SUM(price) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1])
+- Sort(orderBy=[name ASC])
+- Exchange(distribution=[hash[name]])
+- TableSourceScan(table=[[default_catalog, default_database, inventory, project=[id, name, amount, price], metadata=[]]], fields=[id, name, amount, price])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
index 85b1675e09676..a65002d357e19 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
@@ -181,13 +181,13 @@ Calc(select=[a, b, c], where=[((a < 10) AND (b > 20))])
@@ -198,13 +198,13 @@ Calc(select=[ARRAY(0.12, 0.5, 0.99) AS EXPR$0])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index 7c04bf6c2aaa6..0995440d9eeb7 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -377,7 +377,7 @@ Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, b
@@ -385,7 +385,7 @@ LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3
(COUNT($1) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST), 0), $SUM0($1) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST), null:INTEGER)]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, INTEGER price_sum)]
+LogicalProject(symbol=[$0], price=[$1], tax=[$2], matchRowtime=[$3], price_sum=[CASE(>(COUNT($1) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST), 0), SUM($1) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST), null:INTEGER)]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, INTEGER price_sum)]
+- LogicalProject(symbol=[$0], price=[$1], tax=[$2], matchRowtime=[$3]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime)]
+- LogicalMatch(partition=[[0]], order=[[1 ASC-nulls-first]], outputFields=[[symbol, price, tax, matchRowtime]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$2, 0), 0)]], inputFields=[[symbol, ts_ltz, price, tax]]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime)]
+- LogicalWatermarkAssigner(rowtime=[ts_ltz], watermark=[-($1, 1000:INTERVAL SECOND)]), rowType=[RecordType(VARCHAR(2147483647) symbol, TIMESTAMP_LTZ(3) *ROWTIME* ts_ltz, INTEGER price, INTEGER tax)]
@@ -199,7 +199,7 @@ LogicalProject(symbol=[$0], price=[$1], tax=[$2], matchRowtime=[$3], price_sum=[
(w0$o0, 0), w0$o1, null:INTEGER) AS price_sum]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, INTEGER price_sum)]
-+- OverAggregate(partitionBy=[symbol], orderBy=[matchRowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[symbol, price, tax, matchRowtime, COUNT(price) AS w0$o0, $SUM0(price) AS w0$o1]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, BIGINT w0$o0, INTEGER w0$o1)]
++- OverAggregate(partitionBy=[symbol], orderBy=[matchRowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[symbol, price, tax, matchRowtime, COUNT(price) AS w0$o0, SUM(price) AS w0$o1]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, BIGINT w0$o0, INTEGER w0$o1)]
+- Exchange(distribution=[hash[symbol]]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime)]
+- Match(partitionBy=[symbol], orderBy=[ts_ltz ASC], measures=[FINAL(A.price) AS price, FINAL(A.tax) AS tax, FINAL(MATCH_ROWTIME(*.ts_ltz)) AS matchRowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], define=[{A=>(PREV(A.$2, 0), 0)}]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime)]
+- Exchange(distribution=[hash[symbol]]), rowType=[RecordType(VARCHAR(2147483647) symbol, TIMESTAMP_LTZ(3) *ROWTIME* ts_ltz, INTEGER price, INTEGER tax)]
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnionTest.xml
index 2969cdc7486ee..0dbdafc413b55 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnionTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnionTest.xml
@@ -70,7 +70,7 @@ LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b)
+- LogicalUnion(all=[true]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b)]
:- LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, BIGINT b)]
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]), rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]
- +- LogicalProject(a=[$0], EXPR$1=[0:DECIMAL(2, 1)]), rowType=[RecordType(INTEGER a, DECIMAL(2, 1) EXPR$1)]
+ +- LogicalProject(a=[$0], EXPR$1=[0.0:DECIMAL(2, 1)]), rowType=[RecordType(INTEGER a, DECIMAL(2, 1) EXPR$1)]
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]), rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)]
]]>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index 92fccedf388e4..c2a34a6c49bac 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -68,7 +68,7 @@ FROM T GROUP BY a
@@ -76,7 +76,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
index dc0185da0ec8d..fdd3264866764 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
@@ -111,14 +111,14 @@ FROM MyTable
(COUNT($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE 7200000:INTERVAL HOUR PRECEDING), 0), $SUM0($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE 7200000:INTERVAL HOUR PRECEDING), null:BIGINT), COUNT($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE 7200000:INTERVAL HOUR PRECEDING))])
+LogicalProject(a=[$0], avgA=[/(CASE(>(COUNT($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE 7200000:INTERVAL HOUR PRECEDING), 0), SUM($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE 7200000:INTERVAL HOUR PRECEDING), null:BIGINT), COUNT($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE 7200000:INTERVAL HOUR PRECEDING))])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:BIGINT) / w0$o0) AS avgA])
-+- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0, $SUM0(c) AS w0$o1])
++- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0, SUM(c) AS w0$o1])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -138,14 +138,14 @@ FROM MyTable
(COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), 0), $SUM0($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), null:INTEGER)])
+LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING)], sum1=[CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), 0), SUM($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), null:INTEGER)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:INTEGER) AS sum1])
-+- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1])
++- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -158,14 +158,14 @@ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS sum1])
(COUNT($2) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST ROWS 4 PRECEDING), 0), $SUM0($2) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST ROWS 4 PRECEDING), null:BIGINT)], EXPR$2=[MIN($2) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST ROWS 4 PRECEDING)])
+LogicalProject(a=[$0], EXPR$1=[CASE(>(COUNT($2) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST ROWS 4 PRECEDING), 0), SUM($2) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST ROWS 4 PRECEDING), null:BIGINT)], EXPR$2=[MIN($2) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST ROWS 4 PRECEDING)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:BIGINT) AS EXPR$1, w0$o2 AS EXPR$2])
-+- OverAggregate(partitionBy=[a], orderBy=[$2 ASC], window=[ ROWS BETWEEN 4 PRECEDING AND CURRENT ROW], select=[a, c, $2, COUNT(c) AS w0$o0, $SUM0(c) AS w0$o1, MIN(c) AS w0$o2])
++- OverAggregate(partitionBy=[a], orderBy=[$2 ASC], window=[ ROWS BETWEEN 4 PRECEDING AND CURRENT ROW], select=[a, c, $2, COUNT(c) AS w0$o0, SUM(c) AS w0$o1, MIN(c) AS w0$o2])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, c, PROCTIME() AS $2])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -183,14 +183,14 @@ FROM MyTable
(COUNT($0) OVER (ORDER BY $3 NULLS FIRST), 0), $SUM0($0) OVER (ORDER BY $3 NULLS FIRST), null:INTEGER)])
+LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (ORDER BY $3 NULLS FIRST)], cnt2=[CASE(>(COUNT($0) OVER (ORDER BY $3 NULLS FIRST), 0), SUM($0) OVER (ORDER BY $3 NULLS FIRST), null:INTEGER)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1])
++- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1])
+- Exchange(distribution=[single])
+- Calc(select=[a, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -231,14 +231,14 @@ FROM MyTable
(COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST), 0), $SUM0($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST), null:INTEGER)])
+LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST)], cnt2=[CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST), 0), SUM($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST), null:INTEGER)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1])
++- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -282,14 +282,14 @@ FROM MyTable
(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), 0), $SUM0(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), null:INTEGER)])
+LogicalProject(c=[$2], cnt1=[COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING)], sum1=[CASE(>(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), 0), SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), null:INTEGER)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:INTEGER) AS sum1])
-+- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(DISTINCT a) AS w0$o0, $SUM0(DISTINCT a) AS w0$o1])
++- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -313,14 +313,14 @@ FROM MyTable
(COUNT($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), null:INTEGER)], cnt2=[COUNT(DISTINCT $0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING)], sum2=[CASE(>(COUNT(DISTINCT $2) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), 0), $SUM0(DISTINCT $2) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), null:BIGINT)])
+LogicalProject(b=[$1], cnt1=[COUNT($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING)], sum1=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), 0), SUM($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), null:INTEGER)], cnt2=[COUNT(DISTINCT $0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING)], sum2=[CASE(>(COUNT(DISTINCT $2) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), 0), SUM(DISTINCT $2) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS 2 PRECEDING), null:BIGINT)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:INTEGER) AS sum1, w0$o2 AS cnt2, CASE((w0$o3 > 0), w0$o4, null:BIGINT) AS sum2])
-+- OverAggregate(partitionBy=[b], orderBy=[proctime ASC], window=[ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW], select=[a, b, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, COUNT(DISTINCT a) AS w0$o2, COUNT(DISTINCT c) AS w0$o3, $SUM0(DISTINCT c) AS w0$o4])
++- OverAggregate(partitionBy=[b], orderBy=[proctime ASC], window=[ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW], select=[a, b, c, proctime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1, COUNT(DISTINCT a) AS w0$o2, COUNT(DISTINCT c) AS w0$o3, SUM(DISTINCT c) AS w0$o4])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[a, b, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -437,14 +437,14 @@ FROM MyTable
(COUNT($0) OVER (ORDER BY $4 NULLS FIRST), 0), $SUM0($0) OVER (ORDER BY $4 NULLS FIRST), null:INTEGER)])
+LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (ORDER BY $4 NULLS FIRST)], cnt2=[CASE(>(COUNT($0) OVER (ORDER BY $4 NULLS FIRST), 0), SUM($0) OVER (ORDER BY $4 NULLS FIRST), null:INTEGER)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1])
++- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1])
+- Exchange(distribution=[single])
+- Calc(select=[a, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -462,14 +462,14 @@ FROM MyTable
(COUNT($0) OVER (ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), 0), $SUM0($0) OVER (ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), null:INTEGER)])
+LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING)], cnt2=[CASE(>(COUNT($0) OVER (ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), 0), SUM($0) OVER (ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), null:INTEGER)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(orderBy=[rowtime ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1])
++- OverAggregate(orderBy=[rowtime ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1])
+- Exchange(distribution=[single])
+- Calc(select=[a, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -487,14 +487,14 @@ FROM MyTable
(COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), 0), $SUM0($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), null:INTEGER)])
+LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST)], cnt2=[CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), 0), SUM($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), null:INTEGER)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -512,14 +512,14 @@ FROM MyTable
(COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), 0), $SUM0($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), null:INTEGER)])
+LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING)], cnt2=[CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), 0), SUM($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), null:INTEGER)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, c, rowtime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -728,7 +728,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS F
@@ -45,11 +45,11 @@ LogicalUnion(all=[true])
@@ -58,30 +58,30 @@ Union(all=[true], union=[f0])
@@ -90,7 +90,7 @@ Union(all=[true], union=[f0, f1, f2])
@@ -146,30 +146,30 @@ Union(all=[true], union=[f0, f1, f2])
@@ -178,30 +178,30 @@ Union(all=[true], union=[f0, f1, f2])
@@ -244,18 +244,18 @@ Union(all=[true], union=[a, b])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
index 232213f48046e..aab16f95d4c9f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
@@ -139,7 +139,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
// id <= 20
val expr1 = relBuilder.call(LESS_THAN_OR_EQUAL, relBuilder.field(0), relBuilder.literal(20))
// id > 10.0 (note: the types of id and literal are different)
- val expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(10.0))
+ val expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(bd(10.0)))
// DIV(id, 2) > 3
val expr3 = relBuilder.call(
GREATER_THAN,
@@ -165,13 +165,13 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
// id <= 20 AND id > 10 AND DIV(id, 2) > 3
val filter2 = relBuilder.push(ts).filter(expr1, expr2, expr3).build
assertEquals(
- ValueInterval(bd(10.0), bd(20), includeLower = false),
+ ValueInterval(bd(10.0), bd(20.0), includeLower = false),
mq.getColumnInterval(filter2, 0))
// id <= 20 AND id > 10 AND score < 4.1
val filter3 = relBuilder.push(ts).filter(expr1, expr2, expr4).build
assertEquals(
- ValueInterval(bd(10.0), bd(20), includeLower = false),
+ ValueInterval(bd(10.0), bd(20.0), includeLower = false),
mq.getColumnInterval(filter3, 0))
// score > 6.0 OR score <= 4.0
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index eaca9fa23c901..d8815d3aa6bf7 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -2435,6 +2435,7 @@ class FlinkRelMdHandlerTestBase {
true,
RexWindowBounds.create(SqlWindow.createUnboundedPreceding(new SqlParserPos(0, 0)), null),
RexWindowBounds.create(SqlWindow.createCurrentRow(new SqlParserPos(0, 0)), null),
+ null,
RelCollations.of(
new RelFieldCollation(
1,
@@ -2568,6 +2569,7 @@ class FlinkRelMdHandlerTestBase {
true,
RexWindowBounds.create(SqlWindow.createUnboundedPreceding(new SqlParserPos(0, 0)), null),
RexWindowBounds.create(SqlWindow.createCurrentRow(new SqlParserPos(0, 0)), null),
+ null,
RelCollations.of(
new RelFieldCollation(
1,
@@ -2589,6 +2591,7 @@ class FlinkRelMdHandlerTestBase {
false,
RexWindowBounds.create(SqlWindow.createUnboundedPreceding(new SqlParserPos(4, 15)), null),
RexWindowBounds.create(SqlWindow.createCurrentRow(new SqlParserPos(0, 0)), null),
+ null,
RelCollations.of(
new RelFieldCollation(
2,
@@ -2634,6 +2637,7 @@ class FlinkRelMdHandlerTestBase {
false,
RexWindowBounds.create(SqlWindow.createUnboundedPreceding(new SqlParserPos(7, 19)), null),
RexWindowBounds.create(SqlWindow.createUnboundedFollowing(new SqlParserPos(0, 0)), null),
+ null,
RelCollations.EMPTY,
ImmutableList.of(
new Window.RexWinAggCall(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
index eea6b0b433605..1fc2f71faeb13 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSelectivityTest.scala
@@ -520,6 +520,7 @@ class FlinkRelMdSelectivityTest extends FlinkRelMdHandlerTestBase {
true,
RexWindowBounds.create(SqlWindow.createUnboundedPreceding(new SqlParserPos(0, 0)), null),
RexWindowBounds.create(SqlWindow.createCurrentRow(new SqlParserPos(0, 0)), null),
+ null,
RelCollations.of(
new RelFieldCollation(
1,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
index 996ef4f2de8bf..4a77662bd2b70 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
@@ -571,6 +571,7 @@ class FlinkRelMdUpsertKeysTest extends FlinkRelMdHandlerTestBase {
true,
RexWindowBounds.create(SqlWindow.createUnboundedPreceding(new SqlParserPos(0, 0)), null),
RexWindowBounds.create(SqlWindow.createCurrentRow(new SqlParserPos(0, 0)), null),
+ null,
RelCollations.of(
new RelFieldCollation(
2,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.scala
index 74e9ad050fea3..11258852f29a2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.scala
@@ -47,7 +47,7 @@ class FlinkPruneEmptyRulesTest extends TableTestBase {
CoreRules.FILTER_REDUCE_EXPRESSIONS,
CoreRules.PROJECT_REDUCE_EXPRESSIONS,
CoreRules.FILTER_SET_OP_TRANSPOSE,
- CoreRules.FILTER_PROJECT_TRANSPOSE,
+ FlinkFilterProjectTransposeRule.INSTANCE,
CoreRules.PROJECT_MERGE,
CoreRules.PROJECT_FILTER_VALUES_MERGE,
FlinkPruneEmptyRules.UNION_INSTANCE,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
index 7ecef1016c0d1..065f5b97c7750 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
@@ -737,7 +737,7 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
// TODO some bugs in SubQueryRemoveRule
// the result RelNode (LogicalJoin(condition=[=($1, $11)], joinType=[left]))
// after SubQueryRemoveRule is unexpected
- assertThatExceptionOfType(classOf[AssertionError])
+ assertThatExceptionOfType(classOf[NullPointerException])
.isThrownBy(() => util.verifyRelPlanNotExpected(sqlQuery, "joinType=[anti]"))
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
index 8785e5a77f612..7718ed7499dcf 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
@@ -964,25 +964,9 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
@Test
def testInWithCorrelatedOnHaving(): Unit = {
- // TODO There are some bugs when converting SqlNode to RelNode:
val sqlQuery = "SELECT SUM(a) AS s FROM x GROUP BY b " +
"HAVING MAX(a) IN (SELECT d FROM y WHERE y.d = x.b)"
-
- // the logical plan is:
- //
- // LogicalProject(s=[$1])
- // LogicalFilter(condition=[IN($2, {
- // LogicalProject(d=[$1])
- // LogicalFilter(condition=[=($1, $cor0.b)])
- // LogicalTableScan(table=[[builtin, default, r]])
- // })])
- // LogicalAggregate(group=[{0}], s=[SUM($1)], agg#1=[MAX($1)])
- // LogicalProject(b=[$1], a=[$0])
- // LogicalTableScan(table=[[builtin, default, l]])
- //
- // LogicalFilter lost variablesSet information.
-
- util.verifyRelPlanNotExpected(sqlQuery, "joinType=[semi]")
+ util.verifyRelPlan(sqlQuery)
}
@Test
@@ -1577,25 +1561,9 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
@Test
def testExistsWithCorrelatedOnHaving(): Unit = {
- // TODO There are some bugs when converting SqlNode to RelNode:
- val sqlQuery1 =
+ val sqlQuery =
"SELECT SUM(a) AS s FROM x GROUP BY b HAVING EXISTS (SELECT * FROM y WHERE y.d = x.b)"
-
- // the logical plan is:
- //
- // LogicalProject(s=[$1])
- // LogicalFilter(condition=[IN($2, {
- // LogicalProject(d=[$1])
- // LogicalFilter(condition=[=($1, $cor0.b)])
- // LogicalTableScan(table=[[builtin, default, r]])
- // })])
- // LogicalAggregate(group=[{0}], s=[SUM($1)], agg#1=[MAX($1)])
- // LogicalProject(b=[$1], a=[$0])
- // LogicalTableScan(table=[[builtin, default, l]])
- //
- // LogicalFilter lost variablesSet information.
-
- util.verifyRelPlanNotExpected(sqlQuery1, "joinType=[semi]")
+ util.verifyRelPlan(sqlQuery)
}
@Test
@@ -1666,7 +1634,7 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
// TODO some bugs in SubQueryRemoveRule
// the result RelNode (LogicalJoin(condition=[=($1, $8)], joinType=[left]))
// after SubQueryRemoveRule is unexpected
- assertThatExceptionOfType(classOf[AssertionError])
+ assertThatExceptionOfType(classOf[NullPointerException])
.isThrownBy(() => util.verifyRelPlanNotExpected(sqlQuery, "joinType=[semi]"))
}
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index 769a062fd6a4b..ef1a71b276d7f 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -79,7 +79,7 @@ under the License.
- 1.37.0
+ 1.38.0
3.1.11
33.4.0-jre
2.3.2