Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions src/iceberg/parquet/parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include "iceberg/parquet/parquet_writer.h"

#include <memory>
#include <string_view>

#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
#include <arrow/util/compression.h>
#include <arrow/util/key_value_metadata.h>
#include <parquet/arrow/schema.h>
#include <parquet/arrow/writer.h>
Expand All @@ -45,21 +47,31 @@ Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(

Result<::arrow::Compression::type> ParseCompression(const WriterProperties& properties) {
const auto& compression_name = properties.Get(WriterProperties::kParquetCompression);
::arrow::Compression::type compression;
if (compression_name == "uncompressed") {
return ::arrow::Compression::UNCOMPRESSED;
compression = ::arrow::Compression::UNCOMPRESSED;
} else if (compression_name == "snappy") {
return ::arrow::Compression::SNAPPY;
compression = ::arrow::Compression::SNAPPY;
} else if (compression_name == "gzip") {
return ::arrow::Compression::GZIP;
compression = ::arrow::Compression::GZIP;
} else if (compression_name == "brotli") {
return ::arrow::Compression::BROTLI;
compression = ::arrow::Compression::BROTLI;
} else if (compression_name == "lz4") {
return ::arrow::Compression::LZ4;
compression = ::arrow::Compression::LZ4;
} else if (compression_name == "zstd") {
return ::arrow::Compression::ZSTD;
compression = ::arrow::Compression::ZSTD;
} else {
return InvalidArgument("Unsupported Parquet compression codec: {}", compression_name);
}
return compression;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unreachable

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's reachable. The returns were changed to assignments except the final else branch. But I have no idea if that's a better style :-)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated ParseCompression to use early returns again while keeping the availability check after parsing.

}

Status CheckCompressionAvailable(std::string_view compression_name,
::arrow::Compression::type compression) {
ICEBERG_PRECHECK(::arrow::util::Codec::IsAvailable(compression),
"Parquet compression codec {} is not available in the current build",
compression_name);
return {};
}

Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& properties) {
Expand Down Expand Up @@ -98,6 +110,9 @@ class ParquetWriter::Impl {
auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
schema_descriptor->schema_root());

ICEBERG_RETURN_UNEXPECTED(CheckCompressionAvailable(
options.properties.Get(WriterProperties::kParquetCompression), compression));

ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
auto file_writer = ::parquet::ParquetFileWriter::Open(
output_stream_, std::move(schema_node), std::move(writer_properties),
Expand Down
48 changes: 48 additions & 0 deletions src/iceberg/test/parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/

#include <optional>
#include <string>
#include <utility>
#include <vector>

#include <arrow/array.h>
#include <arrow/c/bridge.h>
Expand All @@ -26,6 +29,7 @@
#include <arrow/record_batch.h>
#include <arrow/table.h>
#include <arrow/type.h>
#include <arrow/util/compression.h>
#include <arrow/util/key_value_metadata.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
Expand Down Expand Up @@ -124,6 +128,27 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr<Schema> s
ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
}

struct ParquetCodec {
std::string name;
::arrow::Compression::type compression;
};

std::optional<ParquetCodec> UnavailableParquetCodec() {
Copy link
Copy Markdown
Member

@raulcd raulcd May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM should this function return the list of unavailable codecs instead of the first one? Otherwise I feel like the name could be slightly misleading, maybe FirstUnavailableParquetCodec.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the helper to FirstUnavailableParquetCodec to match the first-match behavior.

const std::vector<ParquetCodec> codecs = {
{.name = "snappy", .compression = ::arrow::Compression::SNAPPY},
{.name = "gzip", .compression = ::arrow::Compression::GZIP},
{.name = "brotli", .compression = ::arrow::Compression::BROTLI},
{.name = "lz4", .compression = ::arrow::Compression::LZ4},
{.name = "zstd", .compression = ::arrow::Compression::ZSTD},
};
for (const auto& codec : codecs) {
if (!::arrow::util::Codec::IsAvailable(codec.compression)) {
return codec;
}
}
return std::nullopt;
}

} // namespace

class ParquetReaderTest : public TempFileTestBase {
Expand Down Expand Up @@ -461,6 +486,29 @@ TEST_F(ParquetReadWrite, EmptyStruct) {
IsError(ErrorKind::kNotImplemented));
}

TEST_F(ParquetReadWrite, RejectsUnavailableCompressionCodec) {
auto unavailable_codec = UnavailableParquetCodec();
if (!unavailable_codec.has_value()) {
GTEST_SKIP() << "All optional Parquet compression codecs are available";
}

auto schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
WriterProperties writer_properties;
writer_properties.Set(WriterProperties::kParquetCompression, unavailable_codec->name);

auto writer = WriterFactoryRegistry::Open(
FileFormatType::kParquet, {.path = "unavailable_codec.parquet",
.schema = schema,
.io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(),
.properties = std::move(writer_properties)});

EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(writer,
HasErrorMessage("Parquet compression codec " + unavailable_codec->name +
" is not available in the current build"));
}

TEST_F(ParquetReadWrite, SimpleStructRoundTrip) {
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeOptional(1, "a",
Expand Down
Loading