diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/FlinkExpandConversionRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/FlinkExpandConversionRule.scala index a86b89ff80862..612d0eb820fe0 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/FlinkExpandConversionRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/FlinkExpandConversionRule.scala @@ -30,6 +30,8 @@ import org.apache.calcite.plan.volcano.AbstractConverter import org.apache.calcite.rel.{RelCollation, RelCollations, RelCollationTraitDef, RelNode} import org.apache.calcite.rel.RelDistribution.Type._ +import scala.collection.JavaConverters._ + /** Rule which converts an [[AbstractConverter]] to a RelNode which satisfies the target traits. */ class FlinkExpandConversionRule(flinkConvention: Convention) extends RelOptRule( @@ -68,6 +70,9 @@ class FlinkExpandConversionRule(flinkConvention: Convention) val toCollation = requiredTraits.getTrait(RelCollationTraitDef.INSTANCE) transformedNode = satisfyCollation(flinkConvention, transformedNode, toCollation) } + if (transformedNode == null) { + return + } checkSatisfyRequiredTrait(transformedNode, requiredTraits) call.transformTo(transformedNode) } @@ -151,6 +156,12 @@ object FlinkExpandConversionRule { requiredCollation: RelCollation): RelNode = { val fromCollation = node.getTraitSet.getTrait(RelCollationTraitDef.INSTANCE) if (!fromCollation.satisfies(requiredCollation)) { + val fieldCount = node.getRowType.getFieldCount + val isValidCollation = + requiredCollation.getFieldCollations.asScala.forall(_.getFieldIndex < fieldCount) + if (!isValidCollation) { + return null + } val traitSet = node.getTraitSet.replace(requiredCollation).replace(flinkConvention) val sortCollation = RelCollationTraitDef.INSTANCE.canonize(requiredCollation) flinkConvention match { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/ExpandConversionRuleFixTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/ExpandConversionRuleFixTest.java new file mode 100644 index 0000000000000..e88bf5b34f41d --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/ExpandConversionRuleFixTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.rules.physical; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; + +import org.junit.jupiter.api.Test; + +public class ExpandConversionRuleFixTest { + + @Test + public void testOrderByWithGlobalAggregate() { + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tEnv = TableEnvironment.create(settings); + tEnv.getConfig().set("parallelism.default", "1"); + + tEnv.executeSql( + "CREATE TABLE MyTable (" + + " a INT," + + " b STRING" + + ") WITH (" + + " 'connector' = 'datagen'," + + " 'number-of-rows' = '5'" + + ")"); + + String sql = "SELECT MAX(a) FROM (SELECT a, b FROM MyTable ORDER BY b ASC)"; + + tEnv.executeSql(sql).collect(); + } +}