diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index b91935abe06c..5599e6f9c870 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence -CH_BRANCH=rebase_ch/20260425-25.12.10.7 -CH_COMMIT=54c5bf9a97b +CH_BRANCH=rebase_ch/20260425-25.12.10.7-complextype +CH_COMMIT=b1510a2 diff --git a/cpp-ch/local-engine/Parser/ExpressionParser.cpp b/cpp-ch/local-engine/Parser/ExpressionParser.cpp index 3604d17d8f53..b2f687991a6f 100644 --- a/cpp-ch/local-engine/Parser/ExpressionParser.cpp +++ b/cpp-ch/local-engine/Parser/ExpressionParser.cpp @@ -264,7 +264,13 @@ ExpressionParser::addConstColumn(DB::ActionsDAG & actions_dag, const DB::DataTyp { String name = DB::fieldToString(field).substr(0, 10); name = getUniqueName(name); - const auto * res_node = &actions_dag.addColumn(DB::ColumnWithTypeAndName(type->createColumnConst(1, field), type, name)); + /// Substrait null literals carry a concrete type; CH `createColumnConst` inserts into the nested column (e.g. ColumnArray), + /// which cannot accept `Field::Null` unless the type is Nullable(...). + DB::DataTypePtr const_type = type; + if (field.isNull() && !type->isNullable()) + const_type = makeNullable(type); + const auto * res_node = &actions_dag.addColumn( + DB::ColumnWithTypeAndName(const_type->createColumnConst(1, field), const_type, name)); if (reuseCSE()) { // The new node, res_node will be remained in the ActionsDAG, but it will not affect the execution. @@ -307,6 +313,7 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG & const auto & input_type = args[0]->result_type; DataTypePtr denull_input_type = removeNullable(input_type); DataTypePtr output_type = TypeParser::parseType(substrait_type); + DataTypePtr cast_output_type = input_type->isNullable() && !output_type->isNullable() ? makeNullable(output_type) : output_type; DataTypePtr denull_output_type = removeNullable(output_type); const ActionsDAG::Node * result_node = nullptr; if (substrait_type.has_binary()) @@ -351,7 +358,7 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG & else if (isString(denull_input_type) && substrait_type.has_bool_()) { /// cast(string to boolean) - args.emplace_back(addConstColumn(actions_dag, std::make_shared(), output_type->getName())); + args.emplace_back(addConstColumn(actions_dag, std::make_shared(), cast_output_type->getName())); result_node = toFunctionNode(actions_dag, "accurateCastOrNull", args); } else if (isString(denull_input_type) && isInt(denull_output_type)) @@ -360,13 +367,13 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG & /// Refer to https://github.com/apache/gluten/issues/4956 and https://github.com/apache/gluten/issues/8598 const auto * trim_str_arg = addConstColumn(actions_dag, std::make_shared(), " \t\n\r\f"); args[0] = toFunctionNode(actions_dag, "trimBothSpark", {args[0], trim_str_arg}); - args.emplace_back(addConstColumn(actions_dag, std::make_shared(), output_type->getName())); + args.emplace_back(addConstColumn(actions_dag, std::make_shared(), cast_output_type->getName())); result_node = toFunctionNode(actions_dag, "CAST", args); } else { /// Common process: CAST(input, type) - args.emplace_back(addConstColumn(actions_dag, std::make_shared(), output_type->getName())); + args.emplace_back(addConstColumn(actions_dag, std::make_shared(), cast_output_type->getName())); result_node = toFunctionNode(actions_dag, "CAST", args); } diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp index 00b8062d77a4..837bd5efc8ca 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp @@ -201,7 +201,7 @@ QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR auto source = std::make_shared(getContext(), header, local_files); auto source_pipe = Pipe(source); auto source_step = std::make_unique(getContext(), std::move(source_pipe), "substrait local files"); - if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && onlyFlatType) + if (format_settings.parquet.use_native_reader_v3 && !readRowIndex) source_step->setStepDescription("ParquetReaderV3"); else source_step->setStepDescription("ParquetReader"); diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 8c702e20922f..ac03455c4a0c 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -148,6 +149,8 @@ void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, con const auto & origin_column = origin_columns[i]; const auto & origin_type = origin_column.type; auto final_type = TypeParser::parseType(output_schema.types(i)); + if (origin_type->isNullable() && !final_type->isNullable()) + final_type = makeNullable(final_type); /// Intermediate aggregate data is special, no check here. if (typeid_cast(origin_column.type.get()) || origin_type->equals(*final_type)) diff --git a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp index a6ff339ef9f0..a6c1df5c1031 100644 --- a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp +++ b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp @@ -64,7 +64,14 @@ ALWAYS_INLINE static void writeRowToColumns(const std::vector columns[i]->insert(spark_row_reader.getField(i)); // read decimal128 } else - columns[i]->insert(spark_row_reader.getField(i)); + { + DB::Field field = spark_row_reader.getField(i); + /// Spark UnsafeRow marks null top-level values; CH non-Nullable columns cannot insert Null (e.g. Array/Map/Tuple). + if (field.isNull() && !spark_row_reader.getFieldTypes()[i]->isNullable()) + columns[i]->insertDefault(); + else + columns[i]->insert(std::move(field)); + } } } diff --git a/cpp-ch/local-engine/Parser/TypeParser.cpp b/cpp-ch/local-engine/Parser/TypeParser.cpp index bdb8c52e9c21..10a719264d0b 100644 --- a/cpp-ch/local-engine/Parser/TypeParser.cpp +++ b/cpp-ch/local-engine/Parser/TypeParser.cpp @@ -84,7 +84,8 @@ DB::DataTypePtr TypeParser::getCHTypeByName(const String & spark_type_name) return DB::DataTypeFactory::instance().get(ch_type_name); } -DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, std::list * field_names) +DB::DataTypePtr TypeParser::parseType( + const substrait::Type & substrait_type, std::list * field_names, bool keep_list_nullability) { DB::DataTypePtr ch_type = nullptr; @@ -185,14 +186,14 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st for (int i = 0; i < types.size(); ++i) { struct_field_names.push_back(field_names->front()); - struct_field_types[i] = parseType(types[i], field_names); + struct_field_types[i] = parseType(types[i], field_names, keep_list_nullability); } } else { /// Construct CH tuple type without DFS rule. for (int i = 0; i < types.size(); ++i) - struct_field_types[i] = parseType(types[i]); + struct_field_types[i] = parseType(types[i], nullptr, keep_list_nullability); const auto & names = substrait_type.struct_().names(); for (const auto & name : names) @@ -209,9 +210,16 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st } else if (substrait_type.has_list()) { - auto ch_nested_type = parseType(substrait_type.list().type()); + auto ch_nested_type = parseType(substrait_type.list().type(), nullptr, true); ch_type = std::make_shared(ch_nested_type); - ch_type = tryWrapNullable(substrait_type.list().nullability(), ch_type); + /// ClickHouse doesn't support Nullable(Array(...)) well in many execution paths. + /// In our parquet reader path, null arrays are represented as empty arrays (no null map). + /// So for top-level Substrait LIST, we intentionally drop outer nullability and keep Array(...). + /// + /// Note: element nullability is still preserved via ch_nested_type; if the element is also a LIST, + /// its own nullability must be preserved to represent Array(Nullable(Array(...))). + if (keep_list_nullability) + ch_type = tryWrapNullable(substrait_type.list().nullability(), ch_type); } else if (substrait_type.has_map()) { @@ -223,8 +231,8 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st } else { - auto ch_key_type = parseType(substrait_type.map().key()); - auto ch_val_type = parseType(substrait_type.map().value()); + auto ch_key_type = parseType(substrait_type.map().key(), nullptr, keep_list_nullability); + auto ch_val_type = parseType(substrait_type.map().value(), nullptr, keep_list_nullability); ch_type = std::make_shared(ch_key_type, ch_val_type); ch_type = tryWrapNullable(substrait_type.map().nullability(), ch_type); } diff --git a/cpp-ch/local-engine/Parser/TypeParser.h b/cpp-ch/local-engine/Parser/TypeParser.h index 4a0b4f738587..fef3965293f3 100644 --- a/cpp-ch/local-engine/Parser/TypeParser.h +++ b/cpp-ch/local-engine/Parser/TypeParser.h @@ -35,7 +35,8 @@ namespace local_engine static DB::DataTypePtr getCHTypeByName(const String& spark_type_name); /// When parsing named structure, we need the field names. - static DB::DataTypePtr parseType(const substrait::Type& substrait_type, std::list* field_names); + static DB::DataTypePtr + parseType(const substrait::Type& substrait_type, std::list* field_names, bool keep_list_nullability = false); inline static DB::DataTypePtr parseType(const substrait::Type& substrait_type) { diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp index 3a0f5d79c131..6c247b5886e1 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -221,7 +221,7 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr // TODO: rebase-25.12, support complex types when there is a nullable type // for example: parquet type is Array, requested type is Nullable(Array(Nullable(String))) - if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && onlyFlatType) + if (format_settings.parquet.use_native_reader_v3 && !readRowIndex) { LOG_TRACE( &Poco::Logger::get("ParquetFormatFile"),