Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,22 @@ struct PAIMON_EXPORT Options {
/// ratio (e.g. `compaction.offpeak-ratio=20`) to enable more aggressive data compaction.
/// Default is 0.
static const char COMPACTION_OFFPEAK_RATIO[];
/// "lookup.cache.bloom.filter.enabled" - Whether to enable the bloom filter for lookup cache.
/// Default value is true.
static const char LOOKUP_CACHE_BLOOM_FILTER_ENABLED[];
/// "lookup.cache.bloom.filter.fpp" - Define the default false positive probability for lookup
/// cache bloom filters. Default value is 0.05.
static const char LOOKUP_CACHE_BLOOM_FILTER_FPP[];
/// "lookup.cache-spill-compression" - Spill compression for lookup cache, currently zstd, none,
/// lz4 are supported. Default value is zstd.
/// Noted that java paimon also supports lzo which paimon-cpp does not support for now.
static const char LOOKUP_CACHE_SPILL_COMPRESSION[];
/// "spill-compression.zstd-level" - Default spill compression zstd level. For higher
/// compression rates, it can be configured to 9, but the read and write speed will
/// significantly decrease. Default value is 1.
static const char SPILL_COMPRESSION_ZSTD_LEVEL[];
/// "cache-page-size" - Memory page size for caching. Default value is 64 kb.
static const char CACHE_PAGE_SIZE[];
};

static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ set(PAIMON_COMMON_SRCS
common/io/cache/cache_key.cpp
common/io/cache/cache_manager.cpp
common/logging/logging.cpp
common/lookup/sort/sort_lookup_store_factory.cpp
common/lookup/lookup_store_factory.cpp
common/memory/bytes.cpp
common/memory/memory_pool.cpp
common/memory/memory_segment.cpp
Expand Down Expand Up @@ -387,6 +389,7 @@ if(PAIMON_BUILD_TESTS)
common/io/buffered_input_stream_test.cpp
common/io/memory_segment_output_stream_test.cpp
common/io/offset_input_stream_test.cpp
common/lookup/sort/sort_lookup_store_test.cpp
common/logging/logging_test.cpp
common/metrics/metrics_impl_test.cpp
common/metrics/histogram_test.cpp
Expand Down
21 changes: 18 additions & 3 deletions src/paimon/common/compression/block_compression_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,28 @@

#include "paimon/common/compression/block_compression_factory.h"

#include "fmt/format.h"
#include "paimon/common/compression/lz4/lz4_block_compression_factory.h"
#include "paimon/common/compression/none_block_compression_factory.h"
#include "paimon/common/compression/zstd/zstd_block_compression_factory.h"
#include "paimon/common/utils/string_utils.h"

namespace paimon {

Result<std::shared_ptr<BlockCompressionFactory>> BlockCompressionFactory::Create(
const CompressOptions& compression) {
auto compress = StringUtils::ToLowerCase(compression.compress);
if (compress == "none") {
return std::make_shared<NoneBlockCompressionFactory>();
} else if (compress == "zstd") {
return std::make_shared<ZstdBlockCompressionFactory>(compression.zstd_level);
} else if (compress == "lz4") {
return std::make_shared<Lz4BlockCompressionFactory>();
}
// TODO(liangzi): LZO support
return Status::Invalid(fmt::format("Unsupported compression type: {}", compress));
}

Result<std::shared_ptr<BlockCompressionFactory>> BlockCompressionFactory::Create(
BlockCompressionType compression) {
switch (compression) {
Expand All @@ -33,9 +49,8 @@ Result<std::shared_ptr<BlockCompressionFactory>> BlockCompressionFactory::Create
return std::make_shared<ZstdBlockCompressionFactory>(ZSTD_COMPRESSION_LEVEL);
default:
// TODO(liangzi): LZO support
return Status::Invalid("Unsupported compression type: " +
std::to_string(static_cast<int8_t>(compression)));
return Status::Invalid(
fmt::format("Unsupported compression type: {}", static_cast<int32_t>(compression)));
}
}

} // namespace paimon
7 changes: 5 additions & 2 deletions src/paimon/common/compression/block_compression_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@
#include "paimon/common/compression/block_compression_type.h"
#include "paimon/common/compression/block_compressor.h"
#include "paimon/common/compression/block_decompressor.h"
#include "paimon/core/options/compress_options.h"
#include "paimon/result.h"

namespace paimon {

/// Each compression codec has an implementation of {@link BlockCompressionFactory} to create
/// compressors and decompressors.
class BlockCompressionFactory {
public:
static Result<std::shared_ptr<BlockCompressionFactory>> Create(
BlockCompressionType compression);
const CompressOptions& compression);

static Result<std::shared_ptr<BlockCompressionFactory>> Create(
BlockCompressionType compress_type);

BlockCompressionFactory() = default;
virtual ~BlockCompressionFactory() = default;
Expand Down
44 changes: 44 additions & 0 deletions src/paimon/common/data/serializer/row_compacted_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "paimon/common/data/generic_row.h"
#include "paimon/common/data/serializer/binary_serializer_utils.h"
#include "paimon/common/utils/date_time_utils.h"
#include "paimon/core/utils/fields_comparator.h"

namespace paimon {
Result<std::unique_ptr<RowCompactedSerializer>> RowCompactedSerializer::Create(
const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<MemoryPool>& pool) {
Expand All @@ -41,6 +43,48 @@ Result<std::unique_ptr<RowCompactedSerializer>> RowCompactedSerializer::Create(
schema, std::move(getters), std::move(writers), std::move(readers), pool));
}

Result<MemorySlice::SliceComparator> RowCompactedSerializer::CreateSliceComparator(
const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<MemoryPool>& pool) {
int32_t bit_set_in_bytes = RowCompactedSerializer::CalculateBitSetInBytes(schema->num_fields());
auto row_reader1 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
auto row_reader2 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
std::vector<RowCompactedSerializer::FieldReader> readers(schema->num_fields());
std::vector<FieldsComparator::VariantComparatorFunc> comparators(schema->num_fields());
for (int32_t i = 0; i < schema->num_fields(); i++) {
auto field_type = schema->field(i)->type();
PAIMON_ASSIGN_OR_RAISE(readers[i], CreateFieldReader(field_type, pool));
PAIMON_ASSIGN_OR_RAISE(comparators[i],
FieldsComparator::CompareVariant(i, field_type, /*use_view=*/false));
}
auto comparator = [row_reader1, row_reader2, readers, comparators](
const std::shared_ptr<MemorySlice>& slice1,
const std::shared_ptr<MemorySlice>& slice2) -> Result<int32_t> {
row_reader1->PointTo(*slice1->GetSegment(), slice1->Offset());
row_reader2->PointTo(*slice2->GetSegment(), slice2->Offset());
for (int32_t i = 0; i < static_cast<int32_t>(readers.size()); i++) {
bool is_null1 = row_reader1->IsNullAt(i);
bool is_null2 = row_reader2->IsNullAt(i);
if (!is_null1 || !is_null2) {
if (is_null1) {
return -1;
} else if (is_null2) {
return 1;
} else {
PAIMON_ASSIGN_OR_RAISE(VariantType field1, readers[i](i, row_reader1.get()));
PAIMON_ASSIGN_OR_RAISE(VariantType field2, readers[i](i, row_reader2.get()));
int32_t comp = comparators[i](field1, field2);
if (comp != 0) {
return comp;
}
}
}
}
return 0;
};
return std::function<Result<int32_t>(const std::shared_ptr<MemorySlice>&,
const std::shared_ptr<MemorySlice>&)>(comparator);
}

Result<std::shared_ptr<Bytes>> RowCompactedSerializer::SerializeToBytes(const InternalRow& row) {
if (!row_writer_) {
row_writer_ = std::make_unique<RowWriter>(CalculateBitSetInBytes(getters_.size()), pool_);
Expand Down
4 changes: 4 additions & 0 deletions src/paimon/common/data/serializer/row_compacted_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "paimon/common/data/binary_writer.h"
#include "paimon/common/memory/memory_segment.h"
#include "paimon/common/memory/memory_segment_utils.h"
#include "paimon/common/memory/memory_slice.h"
#include "paimon/common/utils/var_length_int_utils.h"
namespace paimon {
class RowCompactedSerializer {
Expand All @@ -36,6 +37,9 @@ class RowCompactedSerializer {

Result<std::unique_ptr<InternalRow>> Deserialize(const std::shared_ptr<Bytes>& bytes);

static Result<MemorySlice::SliceComparator> CreateSliceComparator(
const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<MemoryPool>& pool);

private:
class RowWriter {
public:
Expand Down
Loading
Loading