Skip to content

Commit e2b24bf

Browse files
authored
feat(avro): support zstd compression level configuration (#131)
1 parent 2cb98dc commit e2b24bf

File tree

9 files changed

+155
-14
lines changed

9 files changed

+155
-14
lines changed

src/paimon/format/avro/avro_file_batch_reader_test.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ namespace paimon::avro::test {
3939
class AvroFileBatchReaderTest : public ::testing::Test, public ::testing::WithParamInterface<bool> {
4040
public:
4141
void SetUp() override {
42-
ASSERT_OK_AND_ASSIGN(file_format_, FileFormatFactory::Get("avro", {}));
42+
ASSERT_OK_AND_ASSIGN(file_format_,
43+
FileFormatFactory::Get("avro", {{Options::FILE_FORMAT, "avro"}}));
4344
fs_ = std::make_shared<LocalFileSystem>();
4445
dir_ = ::paimon::test::UniqueTestDirectory::Create();
4546
ASSERT_TRUE(dir_);

src/paimon/format/avro/avro_file_format_test.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ namespace paimon::avro::test {
4545
class AvroFileFormatTest : public testing::Test, public ::testing::WithParamInterface<std::string> {
4646
public:
4747
void SetUp() override {
48-
ASSERT_OK_AND_ASSIGN(file_format_, FileFormatFactory::Get("avro", {}));
48+
ASSERT_OK_AND_ASSIGN(file_format_,
49+
FileFormatFactory::Get("avro", {{Options::FILE_FORMAT, "avro"}}));
4950
fs_ = std::make_shared<LocalFileSystem>();
5051
dir_ = ::paimon::test::UniqueTestDirectory::Create();
5152
ASSERT_TRUE(dir_);
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <cstdint>
20+
namespace paimon::avro {
21+
22+
// write
23+
static inline const char AVRO_CODEC[] = "avro.codec";
24+
25+
} // namespace paimon::avro

src/paimon/format/avro/avro_format_writer.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,14 @@ AvroFormatWriter::AvroFormatWriter(std::unique_ptr<::avro::DataFileWriterBase>&&
5353

5454
Result<std::unique_ptr<AvroFormatWriter>> AvroFormatWriter::Create(
5555
std::unique_ptr<AvroOutputStreamImpl> out, const std::shared_ptr<arrow::Schema>& schema,
56-
const ::avro::Codec codec) {
56+
const ::avro::Codec codec, std::optional<int32_t> compression_level) {
5757
try {
5858
PAIMON_ASSIGN_OR_RAISE(::avro::ValidSchema avro_schema,
5959
AvroSchemaConverter::ArrowSchemaToAvroSchema(schema));
6060
AvroOutputStreamImpl* avro_output_stream = out.get();
61-
auto writer = std::make_unique<::avro::DataFileWriterBase>(std::move(out), avro_schema,
62-
DEFAULT_SYNC_INTERVAL, codec);
61+
auto writer = std::make_unique<::avro::DataFileWriterBase>(
62+
std::move(out), avro_schema, DEFAULT_SYNC_INTERVAL, codec, ::avro::Metadata(),
63+
compression_level);
6364
auto data_type = arrow::struct_(schema->fields());
6465
return std::unique_ptr<AvroFormatWriter>(
6566
new AvroFormatWriter(std::move(writer), avro_schema, data_type, avro_output_stream));

src/paimon/format/avro/avro_format_writer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <cstddef>
2020
#include <cstdint>
2121
#include <memory>
22+
#include <optional>
2223

2324
#include "arrow/api.h"
2425
#include "avro/DataFile.hh"
@@ -49,7 +50,7 @@ class AvroFormatWriter : public FormatWriter {
4950
public:
5051
static Result<std::unique_ptr<AvroFormatWriter>> Create(
5152
std::unique_ptr<AvroOutputStreamImpl> out, const std::shared_ptr<arrow::Schema>& schema,
52-
const ::avro::Codec codec);
53+
const ::avro::Codec codec, std::optional<int32_t> compression_level);
5354

5455
Status AddBatch(ArrowArray* batch) override;
5556

src/paimon/format/avro/avro_format_writer_test.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ class AvroFormatWriterTest : public ::testing::Test {
6868
int32_t batch_size) {
6969
::ArrowSchema c_schema;
7070
EXPECT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok());
71-
EXPECT_OK_AND_ASSIGN(auto file_format, FileFormatFactory::Get("avro", {}));
71+
EXPECT_OK_AND_ASSIGN(auto file_format,
72+
FileFormatFactory::Get("avro", {{Options::FILE_FORMAT, "avro"}}));
7273
EXPECT_OK_AND_ASSIGN(auto writer_builder,
7374
file_format->CreateWriterBuilder(&c_schema, batch_size));
7475
EXPECT_OK_AND_ASSIGN(std::shared_ptr<FormatWriter> writer,

src/paimon/format/avro/avro_writer_builder.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424

2525
#include "avro/DataFile.hh"
2626
#include "avro/Stream.hh"
27+
#include "paimon/common/utils/options_utils.h"
2728
#include "paimon/common/utils/string_utils.h"
29+
#include "paimon/core/core_options.h"
30+
#include "paimon/format/avro/avro_format_defs.h"
2831
#include "paimon/format/avro/avro_format_writer.h"
2932
#include "paimon/format/avro/avro_output_stream_impl.h"
3033
#include "paimon/format/writer_builder.h"
@@ -56,9 +59,15 @@ class AvroWriterBuilder : public WriterBuilder {
5659
Result<std::unique_ptr<FormatWriter>> Build(const std::shared_ptr<OutputStream>& out,
5760
const std::string& compression) override {
5861
auto output_stream = std::make_unique<AvroOutputStreamImpl>(out, BUFFER_SIZE, pool_);
62+
PAIMON_ASSIGN_OR_RAISE(
63+
std::string file_compression,
64+
OptionsUtils::GetValueFromMap<std::string>(options_, AVRO_CODEC, compression));
5965
PAIMON_ASSIGN_OR_RAISE(::avro::Codec codec,
60-
ToAvroCompressionKind(StringUtils::ToLowerCase(compression)));
61-
return AvroFormatWriter::Create(std::move(output_stream), schema_, codec);
66+
ToAvroCompressionKind(StringUtils::ToLowerCase(file_compression)));
67+
PAIMON_ASSIGN_OR_RAISE(std::optional<int32_t> compression_level,
68+
GetAvroCompressionLevel(codec));
69+
return AvroFormatWriter::Create(std::move(output_stream), schema_, codec,
70+
compression_level);
6271
}
6372

6473
private:
@@ -77,6 +86,14 @@ class AvroWriterBuilder : public WriterBuilder {
7786
return Status::Invalid("unknown compression " + file_compression);
7887
}
7988
}
89+
Result<std::optional<int32_t>> GetAvroCompressionLevel(const ::avro::Codec& codec) {
90+
std::optional<int32_t> compression_level;
91+
if (codec == ::avro::Codec::ZSTD_CODEC) {
92+
PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CoreOptions::FromMap(options_));
93+
compression_level = core_options.GetFileCompressionZstdLevel();
94+
}
95+
return compression_level;
96+
}
8097

8198
std::shared_ptr<MemoryPool> pool_;
8299
std::shared_ptr<arrow::Schema> schema_;

src/paimon/format/avro/avro_writer_builder_test.cpp

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
namespace paimon::avro::test {
2424

25-
TEST(ToAvroCompressionKindTest, HandlesValidCompressions) {
25+
TEST(AvroWriterBuilderTest, HandlesValidCompressions) {
2626
ASSERT_OK_AND_ASSIGN(::avro::Codec zstd_codec,
2727
AvroWriterBuilder::ToAvroCompressionKind("zstd"));
2828
ASSERT_EQ(zstd_codec, ::avro::Codec::ZSTD_CODEC);
@@ -44,11 +44,105 @@ TEST(ToAvroCompressionKindTest, HandlesValidCompressions) {
4444
ASSERT_EQ(deflate_codec, ::avro::Codec::DEFLATE_CODEC);
4545
}
4646

47-
TEST(ToAvroCompressionKindTest, HandlesInvalidCompression) {
47+
TEST(AvroWriterBuilderTest, HandlesInvalidCompression) {
4848
ASSERT_NOK(AvroWriterBuilder::ToAvroCompressionKind("unknown_compression"));
4949
}
5050

51-
TEST(ToAvroCompressionKindTest, HandlesEmptyString) {
51+
TEST(AvroWriterBuilderTest, HandlesEmptyString) {
5252
ASSERT_NOK(AvroWriterBuilder::ToAvroCompressionKind(""));
5353
}
54+
55+
TEST(AvroWriterBuilderTest, CheckAvroCodec) {
56+
arrow::FieldVector fields = {arrow::field("f0", arrow::int32())};
57+
auto schema = std::make_shared<arrow::Schema>(fields);
58+
{
59+
AvroWriterBuilder builder(schema, -1,
60+
{{Options::FILE_FORMAT, "avro"}, {"avro.codec", "snappy"}});
61+
ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd"));
62+
auto* avro_file_writer = dynamic_cast<AvroFormatWriter*>(file_writer.get());
63+
ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::SNAPPY_CODEC);
64+
ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt);
65+
}
66+
{
67+
AvroWriterBuilder builder(schema, -1,
68+
{{Options::FILE_FORMAT, "avro"}, {"avro.codec", "deflate"}});
69+
ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd"));
70+
auto* avro_file_writer = dynamic_cast<AvroFormatWriter*>(file_writer.get());
71+
ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::DEFLATE_CODEC);
72+
ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt);
73+
}
74+
{
75+
AvroWriterBuilder builder(schema, -1,
76+
{{Options::FILE_FORMAT, "avro"}, {"avro.codec", "zstd"}});
77+
ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd"));
78+
auto* avro_file_writer = dynamic_cast<AvroFormatWriter*>(file_writer.get());
79+
ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::ZSTD_CODEC);
80+
ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, 1);
81+
}
82+
{
83+
AvroWriterBuilder builder(schema, -1,
84+
{{Options::FILE_FORMAT, "avro"},
85+
{"avro.codec", "zstd"},
86+
{Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}});
87+
ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd"));
88+
auto* avro_file_writer = dynamic_cast<AvroFormatWriter*>(file_writer.get());
89+
ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::ZSTD_CODEC);
90+
ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, 3);
91+
}
92+
{
93+
AvroWriterBuilder builder(schema, -1,
94+
{{Options::FILE_FORMAT, "avro"},
95+
{"avro.codec", "null"},
96+
{Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}});
97+
ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd"));
98+
auto* avro_file_writer = dynamic_cast<AvroFormatWriter*>(file_writer.get());
99+
ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::NULL_CODEC);
100+
ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt);
101+
}
102+
{
103+
AvroWriterBuilder builder(schema, -1,
104+
{{Options::FILE_FORMAT, "avro"},
105+
{"avro.codec", "test"},
106+
{Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}});
107+
ASSERT_NOK(builder.Build(nullptr, "zstd"));
108+
}
109+
}
110+
111+
TEST(AvroWriterBuilderTest, CheckAvroCompressionLevel) {
112+
{
113+
AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}});
114+
ASSERT_OK_AND_ASSIGN(std::optional<int32_t> zstd_level,
115+
builder.GetAvroCompressionLevel(::avro::Codec::ZSTD_CODEC));
116+
ASSERT_TRUE(zstd_level.has_value());
117+
ASSERT_EQ(zstd_level.value(), 1);
118+
}
119+
{
120+
AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}});
121+
ASSERT_OK_AND_ASSIGN(std::optional<int32_t> compression_level,
122+
builder.GetAvroCompressionLevel(::avro::Codec::SNAPPY_CODEC));
123+
ASSERT_FALSE(compression_level.has_value());
124+
}
125+
{
126+
AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}});
127+
ASSERT_OK_AND_ASSIGN(std::optional<int32_t> compression_level,
128+
builder.GetAvroCompressionLevel(::avro::Codec::DEFLATE_CODEC));
129+
ASSERT_FALSE(compression_level.has_value());
130+
}
131+
{
132+
AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}});
133+
ASSERT_OK_AND_ASSIGN(std::optional<int32_t> compression_level,
134+
builder.GetAvroCompressionLevel(::avro::Codec::NULL_CODEC));
135+
ASSERT_FALSE(compression_level.has_value());
136+
}
137+
{
138+
AvroWriterBuilder builder(
139+
nullptr, -1,
140+
{{Options::FILE_FORMAT, "avro"}, {Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}});
141+
ASSERT_OK_AND_ASSIGN(std::optional<int32_t> zstd_level,
142+
builder.GetAvroCompressionLevel(::avro::Codec::ZSTD_CODEC));
143+
ASSERT_TRUE(zstd_level.has_value());
144+
ASSERT_EQ(zstd_level.value(), 3);
145+
}
146+
}
147+
54148
} // namespace paimon::avro::test

third_party/versions.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ PAIMON_GTEST_PKG_NAME=gtest-${PAIMON_GTEST_BUILD_VERSION}.tar.gz
5555
PAIMON_ARROW_BUILD_VERSION=17.0.0
5656
PAIMON_ARROW_BUILD_SHA256_CHECKSUM=9d280d8042e7cf526f8c28d170d93bfab65e50f94569f6a790982a878d8d898d
5757
PAIMON_ARROW_PKG_NAME=apache-arrow-${PAIMON_ARROW_BUILD_VERSION}.tar.gz
58-
PAIMON_AVRO_BUILD_VERSION=54b332161524086dcb6cde8afe097097eed7f3ee
59-
PAIMON_AVRO_BUILD_SHA256_CHECKSUM=00febd590b1e328d3a97b67a6d29a1d0243e0e41bb2b1582ec580d37698d1fe2
58+
PAIMON_AVRO_BUILD_VERSION=c499eefb48aa2db906c7bca14a047223806f36db
59+
PAIMON_AVRO_BUILD_SHA256_CHECKSUM=9771f1dcfe3c01aff7ff670e873e66d3406362f71941821d482de65f3d32d780
6060
PAIMON_AVRO_PKG_NAME=avro-${PAIMON_AVRO_BUILD_VERSION}.tar.gz
6161
PAIMON_FMT_BUILD_VERSION=11.2.0
6262
PAIMON_FMT_BUILD_SHA256_CHECKSUM=bc23066d87ab3168f27cef3e97d545fa63314f5c79df5ea444d41d56f962c6af

0 commit comments

Comments
 (0)