diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 09f7fb0f..db9b3017 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -323,6 +323,11 @@ set(EP_CXX_FLAGS "${CMAKE_CXX_FLAGS}") set(EP_C_FLAGS "${CMAKE_C_FLAGS}") string(REPLACE "-Wglobal-constructors" "" EP_CXX_FLAGS ${EP_CXX_FLAGS}) string(REPLACE "-Wglobal-constructors" "" EP_C_FLAGS ${EP_C_FLAGS}) +# Remove coverage flags from third-party dependencies to avoid gcov dependency +string(REPLACE "--coverage" "" EP_CXX_FLAGS ${EP_CXX_FLAGS}) +string(REPLACE "--coverage" "" EP_C_FLAGS ${EP_C_FLAGS}) +string(REPLACE "-DCOVERAGE_BUILD" "" EP_CXX_FLAGS ${EP_CXX_FLAGS}) +string(REPLACE "-DCOVERAGE_BUILD" "" EP_C_FLAGS ${EP_C_FLAGS}) if(NOT MSVC_TOOLCHAIN) # Set -fPIC on all external projects string(APPEND EP_CXX_FLAGS diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.cpp b/src/paimon/common/data/serializer/row_compacted_serializer.cpp index 19fd8774..0e944d86 100644 --- a/src/paimon/common/data/serializer/row_compacted_serializer.cpp +++ b/src/paimon/common/data/serializer/row_compacted_serializer.cpp @@ -59,8 +59,8 @@ Result RowCompactedSerializer::CreateSliceComparat auto comparator = [row_reader1, row_reader2, readers, comparators]( const std::shared_ptr& slice1, const std::shared_ptr& slice2) -> Result { - row_reader1->PointTo(*slice1->GetSegment(), slice1->Offset()); - row_reader2->PointTo(*slice2->GetSegment(), slice2->Offset()); + row_reader1->PointTo(slice1->GetSegment(), slice1->Offset()); + row_reader2->PointTo(slice2->GetSegment(), slice2->Offset()); for (int32_t i = 0; i < static_cast(readers.size()); i++) { bool is_null1 = row_reader1->IsNullAt(i); bool is_null2 = row_reader2->IsNullAt(i); diff --git a/src/paimon/common/io/cache/cache.cpp b/src/paimon/common/io/cache/cache.cpp index 7e2e3e1c..16f96a82 100644 --- a/src/paimon/common/io/cache/cache.cpp +++ b/src/paimon/common/io/cache/cache.cpp @@ -17,9 +17,9 @@ #include "paimon/common/io/cache/cache.h" namespace paimon { -std::shared_ptr NoCache::Get( +Result> NoCache::Get( const std::shared_ptr& key, - std::function(const std::shared_ptr&)> supplier) { + std::function>(const std::shared_ptr&)> supplier) { return supplier(key); } diff --git a/src/paimon/common/io/cache/cache.h b/src/paimon/common/io/cache/cache.h index 0176b960..8826e9bf 100644 --- a/src/paimon/common/io/cache/cache.h +++ b/src/paimon/common/io/cache/cache.h @@ -22,7 +22,7 @@ #include "paimon/common/io/cache/cache_key.h" #include "paimon/common/memory/memory_segment.h" -#include "paimon/status.h" +#include "paimon/result.h" namespace paimon { class CacheValue; @@ -30,9 +30,10 @@ class CacheValue; class Cache { public: virtual ~Cache() = default; - virtual std::shared_ptr Get( + virtual Result> Get( const std::shared_ptr& key, - std::function(const std::shared_ptr&)> supplier) = 0; + std::function>(const std::shared_ptr&)> + supplier) = 0; virtual void Put(const std::shared_ptr& key, const std::shared_ptr& value) = 0; @@ -46,10 +47,10 @@ class Cache { class NoCache : public Cache { public: - std::shared_ptr Get( + Result> Get( const std::shared_ptr& key, - std::function(const std::shared_ptr&)> supplier) - override; + std::function>(const std::shared_ptr&)> + supplier) override; void Put(const std::shared_ptr& key, const std::shared_ptr& value) override; void Invalidate(const std::shared_ptr& key) override; @@ -59,13 +60,13 @@ class NoCache : public Cache { class CacheValue { public: - explicit CacheValue(const std::shared_ptr& segment) : segment_(segment) {} + explicit CacheValue(const MemorySegment& segment) : segment_(segment) {} - std::shared_ptr GetSegment() { + const MemorySegment& GetSegment() const { return segment_; } private: - std::shared_ptr segment_; + MemorySegment segment_; }; } // namespace paimon diff --git a/src/paimon/common/io/cache/cache_manager.cpp b/src/paimon/common/io/cache/cache_manager.cpp index 0e3e3ea4..39ef5aa8 100644 --- a/src/paimon/common/io/cache/cache_manager.cpp +++ b/src/paimon/common/io/cache/cache_manager.cpp @@ -18,20 +18,16 @@ namespace paimon { -std::shared_ptr CacheManager::GetPage( +Result CacheManager::GetPage( std::shared_ptr& key, std::function(const std::shared_ptr&)> reader) { auto& cache = key->IsIndex() ? index_cache_ : data_cache_; - auto supplier = [=](const std::shared_ptr& k) -> std::shared_ptr { - auto ret = reader(k); - if (!ret.ok()) { - return nullptr; - } - auto segment = ret.value(); - auto ptr = std::make_shared(segment); - return std::make_shared(ptr); + auto supplier = [&](const std::shared_ptr& k) -> Result> { + PAIMON_ASSIGN_OR_RAISE(MemorySegment segment, reader(k)); + return std::make_shared(segment); }; - return cache->Get(key, supplier)->GetSegment(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr cache_value, cache->Get(key, supplier)); + return cache_value->GetSegment(); } void CacheManager::InvalidPage(const std::shared_ptr& key) { diff --git a/src/paimon/common/io/cache/cache_manager.h b/src/paimon/common/io/cache/cache_manager.h index 5a379b30..51c506c0 100644 --- a/src/paimon/common/io/cache/cache_manager.h +++ b/src/paimon/common/io/cache/cache_manager.h @@ -34,7 +34,7 @@ class CacheManager { index_cache_ = std::make_shared(); } - std::shared_ptr GetPage( + Result GetPage( std::shared_ptr& key, std::function(const std::shared_ptr&)> reader); diff --git a/src/paimon/common/io/memory_segment_output_stream.cpp b/src/paimon/common/io/memory_segment_output_stream.cpp index f0ab4910..0f94b259 100644 --- a/src/paimon/common/io/memory_segment_output_stream.cpp +++ b/src/paimon/common/io/memory_segment_output_stream.cpp @@ -30,8 +30,7 @@ MemorySegmentOutputStream::MemorySegmentOutputStream(int32_t segment_size, } void MemorySegmentOutputStream::Advance() { - MemorySegment next_segment = NextSegment(); - current_segment_ = std::make_shared(next_segment); + current_segment_ = NextSegment(); position_in_segment_ = 0; } @@ -61,7 +60,7 @@ void MemorySegmentOutputStream::Write(const char* data, uint32_t size) { void MemorySegmentOutputStream::Write(const MemorySegment& segment, int32_t offset, int32_t len) { int32_t remaining = segment_size_ - position_in_segment_; if (remaining >= len) { - segment.CopyTo(offset, current_segment_.get(), position_in_segment_, len); + segment.CopyTo(offset, ¤t_segment_, position_in_segment_, len); position_in_segment_ += len; } else { if (remaining == 0) { @@ -70,7 +69,7 @@ void MemorySegmentOutputStream::Write(const MemorySegment& segment, int32_t offs } while (true) { int32_t to_put = std::min(remaining, len); - segment.CopyTo(offset, current_segment_.get(), position_in_segment_, to_put); + segment.CopyTo(offset, ¤t_segment_, position_in_segment_, to_put); offset += to_put; len -= to_put; diff --git a/src/paimon/common/io/memory_segment_output_stream.h b/src/paimon/common/io/memory_segment_output_stream.h index 623ec902..4fb2c3fd 100644 --- a/src/paimon/common/io/memory_segment_output_stream.h +++ b/src/paimon/common/io/memory_segment_output_stream.h @@ -79,7 +79,7 @@ class PAIMON_EXPORT MemorySegmentOutputStream { int32_t segment_size_; int32_t position_in_segment_; std::shared_ptr pool_; - std::shared_ptr current_segment_; + MemorySegment current_segment_; std::vector memory_segments_; ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN; @@ -88,7 +88,7 @@ class PAIMON_EXPORT MemorySegmentOutputStream { template void MemorySegmentOutputStream::WriteValueImpl(T v) { if (position_in_segment_ <= segment_size_ - static_cast(sizeof(T))) { - current_segment_->PutValue(position_in_segment_, v); + current_segment_.PutValue(position_in_segment_, v); position_in_segment_ += sizeof(T); } else if (position_in_segment_ == segment_size_) { Advance(); diff --git a/src/paimon/common/lookup/lookup_store_factory.cpp b/src/paimon/common/lookup/lookup_store_factory.cpp index 0257cd86..389b2b79 100644 --- a/src/paimon/common/lookup/lookup_store_factory.cpp +++ b/src/paimon/common/lookup/lookup_store_factory.cpp @@ -34,8 +34,8 @@ Result> LookupStoreFactory::BfGenerator(int64_t row return std::shared_ptr(); } auto bloom_filter = BloomFilter::Create(row_count, options.GetLookupCacheBloomFilterFpp()); - auto bytes_for_bf = MemorySegment::AllocateHeapMemory(bloom_filter->ByteLength(), pool); - auto memory_segment = std::make_shared(bytes_for_bf); + MemorySegment memory_segment = + MemorySegment::AllocateHeapMemory(bloom_filter->ByteLength(), pool); PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(memory_segment)); return bloom_filter; } diff --git a/src/paimon/common/memory/memory_slice.cpp b/src/paimon/common/memory/memory_slice.cpp index 8e1b7669..ad3ab2cd 100644 --- a/src/paimon/common/memory/memory_slice.cpp +++ b/src/paimon/common/memory/memory_slice.cpp @@ -21,16 +21,14 @@ namespace paimon { std::shared_ptr MemorySlice::Wrap(const std::shared_ptr& bytes) { auto segment = MemorySegment::Wrap(bytes); - auto ptr = std::make_shared(segment); - return std::make_shared(ptr, 0, ptr->Size()); + return std::make_shared(segment, 0, segment.Size()); } -std::shared_ptr MemorySlice::Wrap(const std::shared_ptr& segment) { - return std::make_shared(segment, 0, segment->Size()); +std::shared_ptr MemorySlice::Wrap(const MemorySegment& segment) { + return std::make_shared(segment, 0, segment.Size()); } -MemorySlice::MemorySlice(const std::shared_ptr& segment, int32_t offset, - int32_t length) +MemorySlice::MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length) : segment_(segment), offset_(offset), length_(length) {} std::shared_ptr MemorySlice::Slice(int32_t index, int32_t length) { @@ -49,38 +47,38 @@ int32_t MemorySlice::Offset() const { } std::shared_ptr MemorySlice::GetHeapMemory() const { - return segment_->GetHeapMemory(); + return segment_.GetHeapMemory(); } -std::shared_ptr MemorySlice::GetSegment() const { +const MemorySegment& MemorySlice::GetSegment() const { return segment_; } int8_t MemorySlice::ReadByte(int32_t position) { - return segment_->GetValue(offset_ + position); + return segment_.GetValue(offset_ + position); } int32_t MemorySlice::ReadInt(int32_t position) { - return segment_->GetValue(offset_ + position); + return segment_.GetValue(offset_ + position); } int16_t MemorySlice::ReadShort(int32_t position) { - return segment_->GetValue(offset_ + position); + return segment_.GetValue(offset_ + position); } int64_t MemorySlice::ReadLong(int32_t position) { - return segment_->GetValue(offset_ + position); + return segment_.GetValue(offset_ + position); } std::string_view MemorySlice::ReadStringView() { - auto array = segment_->GetArray(); + auto array = segment_.GetArray(); return {array->data() + offset_, static_cast(length_)}; } std::shared_ptr MemorySlice::CopyBytes(MemoryPool* pool) { auto bytes = std::make_shared(length_, pool); auto target = MemorySegment::Wrap(bytes); - segment_->CopyTo(offset_, &target, 0, length_); + segment_.CopyTo(offset_, &target, 0, length_); return bytes; } @@ -110,8 +108,8 @@ std::shared_ptr MemorySlice::ToInput() { int32_t MemorySlice::Compare(const MemorySlice& other) const { int32_t len = std::min(length_, other.length_); for (int32_t i = 0; i < len; ++i) { - auto byte1 = static_cast(segment_->Get(offset_ + i)); - auto byte2 = static_cast(other.segment_->Get(other.offset_ + i)); + auto byte1 = static_cast(segment_.Get(offset_ + i)); + auto byte2 = static_cast(other.segment_.Get(other.offset_ + i)); if (byte1 != byte2) { return static_cast(byte1) - static_cast(byte2); } diff --git a/src/paimon/common/memory/memory_slice.h b/src/paimon/common/memory/memory_slice.h index 0dfe5f6c..e4196edb 100644 --- a/src/paimon/common/memory/memory_slice.h +++ b/src/paimon/common/memory/memory_slice.h @@ -34,7 +34,7 @@ class MemorySliceInput; class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this { public: static std::shared_ptr Wrap(const std::shared_ptr& bytes); - static std::shared_ptr Wrap(const std::shared_ptr& segment); + static std::shared_ptr Wrap(const MemorySegment& segment); using SliceComparator = std::function(const std::shared_ptr&, const std::shared_ptr&)>; @@ -42,13 +42,13 @@ class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this& segment, int32_t offset, int32_t length); + MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length); std::shared_ptr Slice(int32_t index, int32_t length); int32_t Length() const; int32_t Offset() const; std::shared_ptr GetHeapMemory() const; - std::shared_ptr GetSegment() const; + const MemorySegment& GetSegment() const; int8_t ReadByte(int32_t position); int32_t ReadInt(int32_t position); @@ -71,7 +71,7 @@ class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this segment_; + MemorySegment segment_; int32_t offset_; int32_t length_; }; diff --git a/src/paimon/common/memory/memory_slice_output.cpp b/src/paimon/common/memory/memory_slice_output.cpp index 1fefe68d..8588ee6a 100644 --- a/src/paimon/common/memory/memory_slice_output.cpp +++ b/src/paimon/common/memory/memory_slice_output.cpp @@ -34,8 +34,7 @@ void MemorySliceOutput::Reset() { } std::unique_ptr MemorySliceOutput::ToSlice() { - auto segment = std::make_shared(segment_); - return std::make_unique(segment, 0, size_); + return std::make_unique(segment_, 0, size_); } template diff --git a/src/paimon/common/sst/block_cache.h b/src/paimon/common/sst/block_cache.h index 03a6ef94..3462039b 100644 --- a/src/paimon/common/sst/block_cache.h +++ b/src/paimon/common/sst/block_cache.h @@ -36,18 +36,18 @@ class BlockCache { ~BlockCache() = default; - std::shared_ptr GetBlock(int64_t position, int32_t length, bool is_index) { + Result GetBlock(int64_t position, int32_t length, bool is_index) { auto key = CacheKey::ForPosition(file_path_, position, length, is_index); auto it = blocks_.find(key); if (it == blocks_.end()) { - auto segment = cache_manager_->GetPage( - key, [&](const std::shared_ptr&) -> Result { - return ReadFrom(position, length); - }); - if (!segment.get()) { - blocks_.insert({key, std::make_shared(segment)}); - } + PAIMON_ASSIGN_OR_RAISE( + MemorySegment segment, + cache_manager_->GetPage( + key, [&](const std::shared_ptr&) -> Result { + return ReadFrom(position, length); + })); + blocks_.insert({key, std::make_shared(segment)}); return segment; } return it->second->GetSegment(); diff --git a/src/paimon/common/sst/sst_file_io_test.cpp b/src/paimon/common/sst/sst_file_io_test.cpp index edad7c93..85320619 100644 --- a/src/paimon/common/sst/sst_file_io_test.cpp +++ b/src/paimon/common/sst/sst_file_io_test.cpp @@ -26,14 +26,17 @@ #include "arrow/array/builder_primitive.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" +#include "paimon/common/factories/io_hook.h" #include "paimon/common/sst/sst_file_reader.h" #include "paimon/common/sst/sst_file_writer.h" +#include "paimon/common/utils/scope_guard.h" #include "paimon/defs.h" #include "paimon/memory/memory_pool.h" #include "paimon/predicate/literal.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/status.h" #include "paimon/testing/mock/mock_file_batch_reader.h" +#include "paimon/testing/utils/io_exception_helper.h" #include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/testharness.h" namespace paimon { @@ -89,8 +92,7 @@ TEST_P(SstFileIOTest, TestSimple) { // write data auto bf = BloomFilter::Create(30, 0.01); auto seg_for_bf = MemorySegment::AllocateHeapMemory(bf->ByteLength(), pool_.get()); - auto seg_ptr = std::make_shared(seg_for_bf); - ASSERT_OK(bf->SetMemorySegment(seg_ptr)); + ASSERT_OK(bf->SetMemorySegment(seg_for_bf)); auto writer = std::make_shared(out, pool_, bf, 50, factory); std::set value_hash; // k1-k5 @@ -131,9 +133,8 @@ TEST_P(SstFileIOTest, TestSimple) { auto bloom_filer_bytes = Bytes::AllocateBytes(size, pool_.get()); ASSERT_OK(in->Read(bloom_filer_bytes->data(), bloom_filer_bytes->size())); auto seg = MemorySegment::Wrap(std::move(bloom_filer_bytes)); - auto ptr = std::make_shared(seg); auto bloom_filter = std::make_shared(entries, size); - ASSERT_OK(bloom_filter->SetMemorySegment(ptr)); + ASSERT_OK(bloom_filter->SetMemorySegment(seg)); for (const auto& value : value_hash) { ASSERT_TRUE(bloom_filter->TestHash(value)); } @@ -207,6 +208,82 @@ TEST_P(SstFileIOTest, TestJavaCompatibility) { ASSERT_EQ("1999999", string1999999); } +TEST_F(SstFileIOTest, TestIOException) { + bool run_complete = false; + auto io_hook = paimon::IOHook::GetInstance(); + for (size_t i = 0; i < 200; i++) { + auto test_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_dir); + paimon::ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); + io_hook->Reset(i, paimon::IOHook::Mode::RETURN_ERROR); + + auto index_path = test_dir->Str() + "/sst_io_exception_test.data"; + + ASSERT_OK_AND_ASSIGN(std::shared_ptr factory, + BlockCompressionFactory::Create(BlockCompressionType::ZSTD)); + + // write + auto out_result = fs_->Create(index_path, /*overwrite=*/false); + CHECK_HOOK_STATUS(out_result.status(), i); + std::shared_ptr out = std::move(out_result).value(); + + auto bf = BloomFilter::Create(30, 0.01); + MemorySegment seg_for_bf = MemorySegment::AllocateHeapMemory(bf->ByteLength(), pool_.get()); + ASSERT_OK(bf->SetMemorySegment(seg_for_bf)); + auto writer = std::make_shared(out, pool_, bf, 50, factory); + + bool write_failed = false; + for (size_t j = 1; j <= 5; j++) { + std::string key = "k" + std::to_string(j); + std::string value = std::to_string(j); + auto write_status = writer->Write(std::make_shared(key, pool_.get()), + std::make_shared(value, pool_.get())); + if (!write_status.ok()) { + CHECK_HOOK_STATUS(write_status, i); + write_failed = true; + break; + } + } + if (write_failed) { + continue; + } + + CHECK_HOOK_STATUS(writer->Flush(), i); + + auto bloom_filter_handle_result = writer->WriteBloomFilter(); + CHECK_HOOK_STATUS(bloom_filter_handle_result.status(), i); + auto index_block_handle_result = writer->WriteIndexBlock(); + CHECK_HOOK_STATUS(index_block_handle_result.status(), i); + CHECK_HOOK_STATUS(writer->WriteFooter(index_block_handle_result.value(), + bloom_filter_handle_result.value()), + i); + + CHECK_HOOK_STATUS(out->Flush(), i); + CHECK_HOOK_STATUS(out->Close(), i); + + // read + auto in_result = fs_->Open(index_path); + CHECK_HOOK_STATUS(in_result.status(), i); + std::shared_ptr in = std::move(in_result).value(); + + auto reader_result = SstFileReader::Create(pool_, in, comparator_); + CHECK_HOOK_STATUS(reader_result.status(), i); + std::shared_ptr reader = std::move(reader_result).value(); + + std::string k4 = "k4"; + auto v4_result = reader->Lookup(std::make_shared(k4, pool_.get())); + CHECK_HOOK_STATUS(v4_result.status(), i); + ASSERT_TRUE(v4_result.value()); + std::string string4{v4_result.value()->data(), v4_result.value()->size()}; + ASSERT_EQ("4", string4); + + ASSERT_OK(reader->Close()); + run_complete = true; + break; + } + ASSERT_TRUE(run_complete); +} + INSTANTIATE_TEST_SUITE_P(Group, SstFileIOTest, ::testing::Values(SstFileParam{"none/79d01717-8380-4504-86e1-387e6c058d0a", BlockCompressionType::NONE}, diff --git a/src/paimon/common/sst/sst_file_reader.cpp b/src/paimon/common/sst/sst_file_reader.cpp index f313b224..1fa836d7 100644 --- a/src/paimon/common/sst/sst_file_reader.cpp +++ b/src/paimon/common/sst/sst_file_reader.cpp @@ -30,11 +30,9 @@ Result> SstFileReader::Create( std::make_shared(file_path, in, pool, std::make_unique()); // read footer - auto segment = block_cache->GetBlock(file_len - BlockFooter::ENCODED_LENGTH, - BlockFooter::ENCODED_LENGTH, true); - if (!segment.get()) { - return Status::Invalid("Read footer error"); - } + PAIMON_ASSIGN_OR_RAISE(MemorySegment segment, + block_cache->GetBlock(file_len - BlockFooter::ENCODED_LENGTH, + BlockFooter::ENCODED_LENGTH, true)); auto slice = MemorySlice::Wrap(segment); auto input = slice->ToInput(); PAIMON_ASSIGN_OR_RAISE(std::unique_ptr footer, @@ -47,20 +45,24 @@ Result> SstFileReader::Create( bloom_filter_handle->Size() || bloom_filter_handle->Offset())) { bloom_filter = std::make_shared(bloom_filter_handle->ExpectedEntries(), bloom_filter_handle->Size()); - PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(block_cache->GetBlock( - bloom_filter_handle->Offset(), bloom_filter_handle->Size(), true))); + PAIMON_ASSIGN_OR_RAISE(MemorySegment bloom_filter_data, + block_cache->GetBlock(bloom_filter_handle->Offset(), + bloom_filter_handle->Size(), true)); + PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(bloom_filter_data)); } // create index block reader auto index_block_handle = footer->GetIndexBlockHandle(); - auto trailer_data = + PAIMON_ASSIGN_OR_RAISE( + MemorySegment trailer_data, block_cache->GetBlock(index_block_handle->Offset() + index_block_handle->Size(), - BlockTrailer::ENCODED_LENGTH, true); + BlockTrailer::ENCODED_LENGTH, true)); auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput(); auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input); - auto block_data = - block_cache->GetBlock(index_block_handle->Offset(), index_block_handle->Size(), true); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr uncompressed_data, + PAIMON_ASSIGN_OR_RAISE( + MemorySegment block_data, + block_cache->GetBlock(index_block_handle->Offset(), index_block_handle->Size(), true)); + PAIMON_ASSIGN_OR_RAISE(MemorySegment uncompressed_data, DecompressBlock(block_data, trailer, pool)); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator)); @@ -107,10 +109,11 @@ Result> SstFileReader::Lookup(const std::shared_ptr> SstFileReader::GetNextBlock( std::unique_ptr& index_iterator) { - PAIMON_ASSIGN_OR_RAISE(auto ret, index_iterator->Next()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr ret, index_iterator->Next()); auto& slice = ret->value; auto input = slice->ToInput(); - PAIMON_ASSIGN_OR_RAISE(auto reader, ReadBlock(BlockHandle::ReadBlockHandle(input), false)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, + ReadBlock(BlockHandle::ReadBlockHandle(input), false)); return reader->Iterator(); } @@ -122,20 +125,22 @@ Result> SstFileReader::ReadBlock(std::shared_ptr> SstFileReader::ReadBlock( const std::shared_ptr& handle, bool index) { - auto trailer_data = block_cache_->GetBlock(handle->Offset() + handle->Size(), - BlockTrailer::ENCODED_LENGTH, true); + PAIMON_ASSIGN_OR_RAISE(MemorySegment trailer_data, + block_cache_->GetBlock(handle->Offset() + handle->Size(), + BlockTrailer::ENCODED_LENGTH, true)); auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput(); auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input); - auto block_data = block_cache_->GetBlock(handle->Offset(), handle->Size(), index); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr uncompressed_data, + PAIMON_ASSIGN_OR_RAISE(MemorySegment block_data, + block_cache_->GetBlock(handle->Offset(), handle->Size(), index)); + PAIMON_ASSIGN_OR_RAISE(MemorySegment uncompressed_data, DecompressBlock(block_data, trailer, pool_)); return BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator_); } -Result> SstFileReader::DecompressBlock( - const std::shared_ptr& compressed_data, - const std::unique_ptr& trailer, const std::shared_ptr& pool) { - auto input_memory = compressed_data->GetHeapMemory(); +Result SstFileReader::DecompressBlock(const MemorySegment& compressed_data, + const std::unique_ptr& trailer, + const std::shared_ptr& pool) { + auto input_memory = compressed_data.GetHeapMemory(); // check crc32c auto crc32c_code = CRC32C::calculate(input_memory->data(), input_memory->size()); @@ -167,7 +172,7 @@ Result> SstFileReader::DecompressBlock( if (static_cast(uncompressed_length) != output_memory->size()) { return Status::Invalid("Invalid data"); } - return std::make_shared(output); + return output; } } diff --git a/src/paimon/common/sst/sst_file_reader.h b/src/paimon/common/sst/sst_file_reader.h index 02381cdd..d92df3c0 100644 --- a/src/paimon/common/sst/sst_file_reader.h +++ b/src/paimon/common/sst/sst_file_reader.h @@ -70,9 +70,9 @@ class SstFileReader { Status Close(); private: - static Result> DecompressBlock( - const std::shared_ptr& compressed_data, - const std::unique_ptr& trailer, const std::shared_ptr& pool); + static Result DecompressBlock(const MemorySegment& compressed_data, + const std::unique_ptr& trailer, + const std::shared_ptr& pool); SstFileReader(const std::shared_ptr& pool, const std::shared_ptr& block_cache, diff --git a/src/paimon/common/sst/sst_file_writer.cpp b/src/paimon/common/sst/sst_file_writer.cpp index 088ac24b..786678d1 100644 --- a/src/paimon/common/sst/sst_file_writer.cpp +++ b/src/paimon/common/sst/sst_file_writer.cpp @@ -74,7 +74,8 @@ Result> SstFileWriter::WriteBloomFilter() { return std::shared_ptr(); } auto data = bloom_filter_->GetBitSet()->ToSlice()->ReadStringView(); - auto handle = std::make_shared(out_->GetPos().value(), data.size(), + PAIMON_ASSIGN_OR_RAISE(int64_t bloom_filter_pos, out_->GetPos()); + auto handle = std::make_shared(bloom_filter_pos, data.size(), bloom_filter_->ExpectedEntries()); PAIMON_RETURN_NOT_OK(WriteBytes(data.data(), data.size())); @@ -121,7 +122,8 @@ Result> SstFileWriter::FlushBlockWriter( auto trailer_memory_slice = std::make_shared(static_cast(compression_type), crc32c) ->WriteBlockTrailer(pool_.get()); - auto block_handle = std::make_shared(out_->GetPos().value_or(0), view.size()); + PAIMON_ASSIGN_OR_RAISE(int64_t block_pos, out_->GetPos()); + auto block_handle = std::make_shared(block_pos, view.size()); // 1. write data PAIMON_RETURN_NOT_OK(WriteBytes(view.data(), view.size())); diff --git a/src/paimon/common/utils/bit_set.cpp b/src/paimon/common/utils/bit_set.cpp index 5f12ec4b..8b735b88 100644 --- a/src/paimon/common/utils/bit_set.cpp +++ b/src/paimon/common/utils/bit_set.cpp @@ -17,14 +17,11 @@ #include "paimon/common/utils/bit_set.h" namespace paimon { -Status BitSet::SetMemorySegment(std::shared_ptr segment, int32_t offset) { - if (!segment) { - return Status::Invalid("MemorySegment can not be null."); - } +Status BitSet::SetMemorySegment(MemorySegment segment, int32_t offset) { if (offset < 0) { return Status::Invalid("Offset should be positive integer."); } - if (offset + byte_length_ > segment->Size()) { + if (offset + byte_length_ > segment.Size()) { return Status::Invalid("Could not set MemorySegment, the remain buffers is not enough."); } segment_ = segment; @@ -33,7 +30,7 @@ Status BitSet::SetMemorySegment(std::shared_ptr segment, int32_t } void BitSet::UnsetMemorySegment() { - segment_.reset(); + segment_ = MemorySegment(); } Status BitSet::Set(unsigned int index) { @@ -41,9 +38,9 @@ Status BitSet::Set(unsigned int index) { return Status::IndexError("Index out of bound"); } unsigned int byte_index = index >> 3; - auto val = segment_->Get(offset_ + byte_index); + auto val = segment_.Get(offset_ + byte_index); val |= (1 << (index & BYTE_INDEX_MASK)); - segment_->PutValue(offset_ + byte_index, val); + segment_.PutValue(offset_ + byte_index, val); return Status::OK(); } @@ -52,18 +49,18 @@ bool BitSet::Get(unsigned int index) { return false; } unsigned int byte_index = index >> 3; - auto val = segment_->Get(offset_ + byte_index); + auto val = segment_.Get(offset_ + byte_index); return (val & (1 << (index & BYTE_INDEX_MASK))) != 0; } void BitSet::Clear() { int index = 0; while (index + 8 <= byte_length_) { - segment_->PutValue(offset_ + index, 0L); + segment_.PutValue(offset_ + index, 0L); index += 8; } while (index < byte_length_) { - segment_->PutValue(offset_ + index, static_cast(0)); + segment_.PutValue(offset_ + index, static_cast(0)); index += 1; } } diff --git a/src/paimon/common/utils/bit_set.h b/src/paimon/common/utils/bit_set.h index 22bcb7f7..0c5d4856 100644 --- a/src/paimon/common/utils/bit_set.h +++ b/src/paimon/common/utils/bit_set.h @@ -32,10 +32,10 @@ class PAIMON_EXPORT BitSet { public: explicit BitSet(int64_t byte_length) : byte_length_(byte_length), bit_size_(byte_length << 3) {} - Status SetMemorySegment(std::shared_ptr segment, int32_t offset = 0); + Status SetMemorySegment(MemorySegment segment, int32_t offset = 0); void UnsetMemorySegment(); - std::shared_ptr& GetMemorySegment() { + const MemorySegment& GetMemorySegment() const { return segment_; } @@ -64,6 +64,6 @@ class PAIMON_EXPORT BitSet { int64_t byte_length_; int64_t bit_size_; int32_t offset_; - std::shared_ptr segment_; + MemorySegment segment_; }; } // namespace paimon diff --git a/src/paimon/common/utils/bit_set_test.cpp b/src/paimon/common/utils/bit_set_test.cpp index a81e42fe..74611c90 100644 --- a/src/paimon/common/utils/bit_set_test.cpp +++ b/src/paimon/common/utils/bit_set_test.cpp @@ -34,8 +34,7 @@ TEST(BitSetTest, TestBitSet) { auto bit_set = std::make_shared(1024); auto pool = GetDefaultPool(); auto seg = MemorySegment::AllocateHeapMemory(1024, pool.get()); - auto ptr = std::make_shared(seg); - ASSERT_OK(bit_set->SetMemorySegment(ptr)); + ASSERT_OK(bit_set->SetMemorySegment(seg)); for (int i = 0; i < 100; i++) { ASSERT_OK(bit_set->Set(i * 2 + 1)); } diff --git a/src/paimon/common/utils/bloom_filter.cpp b/src/paimon/common/utils/bloom_filter.cpp index 607f1aa6..5cd764c5 100644 --- a/src/paimon/common/utils/bloom_filter.cpp +++ b/src/paimon/common/utils/bloom_filter.cpp @@ -87,7 +87,7 @@ bool BloomFilter::TestHash(int32_t hash1) const { return true; } -Status BloomFilter::SetMemorySegment(std::shared_ptr segment, int32_t offset) { +Status BloomFilter::SetMemorySegment(MemorySegment segment, int32_t offset) { return bit_set_->SetMemorySegment(segment, offset); } diff --git a/src/paimon/common/utils/bloom_filter.h b/src/paimon/common/utils/bloom_filter.h index 2cb5012c..bbd3abda 100644 --- a/src/paimon/common/utils/bloom_filter.h +++ b/src/paimon/common/utils/bloom_filter.h @@ -51,7 +51,7 @@ class PAIMON_EXPORT BloomFilter { return bit_set_; } - Status SetMemorySegment(std::shared_ptr segment, int32_t offset = 0); + Status SetMemorySegment(MemorySegment segment, int32_t offset = 0); Status AddHash(int32_t hash1); diff --git a/src/paimon/common/utils/bloom_filter_test.cpp b/src/paimon/common/utils/bloom_filter_test.cpp index d652ce9a..a643e755 100644 --- a/src/paimon/common/utils/bloom_filter_test.cpp +++ b/src/paimon/common/utils/bloom_filter_test.cpp @@ -35,8 +35,7 @@ TEST(BloomFilterTest, TestOneSegmentBuilder) { auto pool = GetDefaultPool(); auto bloom_filter = BloomFilter::Create(items, 0.01); auto seg = MemorySegment::AllocateHeapMemory(1024, pool.get()); - auto ptr = std::make_shared(seg); - ASSERT_OK(bloom_filter->SetMemorySegment(ptr)); + ASSERT_OK(bloom_filter->SetMemorySegment(seg)); std::mt19937_64 engine(std::random_device{}()); // NOLINT(whitespace/braces) std::uniform_int_distribution distribution(0, items); @@ -109,8 +108,7 @@ TEST(BloomFilterTest, TestBloomFilter) { // segments 1 auto seg1 = MemorySegment::AllocateHeapMemory(1024, pool.get()); - auto ptr1 = std::make_shared(seg1); - ASSERT_OK(bloom_filter->SetMemorySegment(ptr1)); + ASSERT_OK(bloom_filter->SetMemorySegment(seg1)); std::set test_data1; for (int32_t i = 0; i < items; i++) { @@ -125,8 +123,7 @@ TEST(BloomFilterTest, TestBloomFilter) { // segments 2 std::set test_data2; auto seg2 = MemorySegment::AllocateHeapMemory(1024, pool.get()); - auto ptr2 = std::make_shared(seg2); - ASSERT_OK(bloom_filter->SetMemorySegment(ptr2)); + ASSERT_OK(bloom_filter->SetMemorySegment(seg2)); for (int32_t i = 0; i < items; i++) { int32_t random = distribution(engine); test_data2.insert(random); @@ -136,7 +133,7 @@ TEST(BloomFilterTest, TestBloomFilter) { ASSERT_TRUE(bloom_filter->TestHash(value)); } // switch to segment1 - ASSERT_OK(bloom_filter->SetMemorySegment(ptr1)); + ASSERT_OK(bloom_filter->SetMemorySegment(seg1)); for (const auto& value : test_data1) { ASSERT_TRUE(bloom_filter->TestHash(value)); } @@ -148,7 +145,7 @@ TEST(BloomFilterTest, TestBloomFilter) { } // switch to segment2 and clear - ASSERT_OK(bloom_filter->SetMemorySegment(ptr2)); + ASSERT_OK(bloom_filter->SetMemorySegment(seg2)); bloom_filter->Reset(); for (const auto& value : test_data2) { ASSERT_FALSE(bloom_filter->TestHash(value)); diff --git a/src/paimon/core/mergetree/levels_test.cpp b/src/paimon/core/mergetree/levels_test.cpp index e7af5d1c..d20fb3b6 100644 --- a/src/paimon/core/mergetree/levels_test.cpp +++ b/src/paimon/core/mergetree/levels_test.cpp @@ -164,4 +164,19 @@ TEST_F(LevelsTest, TestUpdate) { ASSERT_EQ(levels->NumberOfSortedRuns(), 4); } +TEST_F(LevelsTest, TestRunOfLevelInvalidLevel) { + std::vector> input_files = {CreateDataFileMeta(2, 0, 1, 0), + CreateDataFileMeta(1, 2, 3, 1)}; + ASSERT_OK_AND_ASSIGN(auto levels, + Levels::Create(CreateComparator(), input_files, /*num_levels=*/3)); + + // Test invalid level 0 + auto result = Levels::RunOfLevel(0, levels->GetLevels()); + ASSERT_NOK_WITH_MSG(result, "Level0 does not have one single sorted run."); + + // Test invalid negative level + auto result_neg = Levels::RunOfLevel(-1, levels->GetLevels()); + ASSERT_NOK_WITH_MSG(result_neg, "Level0 does not have one single sorted run."); +} + } // namespace paimon::test diff --git a/src/paimon/core/mergetree/lookup/persist_processor_test.cpp b/src/paimon/core/mergetree/lookup/persist_processor_test.cpp index cbd09b21..aa204ba0 100644 --- a/src/paimon/core/mergetree/lookup/persist_processor_test.cpp +++ b/src/paimon/core/mergetree/lookup/persist_processor_test.cpp @@ -59,6 +59,7 @@ class PersistProcessorTest : public testing::Test { TEST_F(PersistProcessorTest, TestEmptyProcessor) { auto processor_factory = std::make_shared(); + ASSERT_EQ(processor_factory->Identifier(), "empty"); ASSERT_OK_AND_ASSIGN(auto processor, processor_factory->Create(serializer_factory_->Version(), /*serializer_factory=*/serializer_factory_, @@ -76,6 +77,7 @@ TEST_F(PersistProcessorTest, TestEmptyProcessor) { TEST_F(PersistProcessorTest, TestPositionProcessor) { auto processor_factory = std::make_shared(); + ASSERT_EQ(processor_factory->Identifier(), "position"); ASSERT_OK_AND_ASSIGN(auto processor, processor_factory->Create(serializer_factory_->Version(), /*serializer_factory=*/serializer_factory_, @@ -94,6 +96,7 @@ TEST_F(PersistProcessorTest, TestPositionProcessor) { TEST_F(PersistProcessorTest, TestValueProcessor) { auto processor_factory = std::make_shared(file_schema_); + ASSERT_EQ(processor_factory->Identifier(), "value"); ASSERT_OK_AND_ASSIGN(auto processor, processor_factory->Create(serializer_factory_->Version(), /*serializer_factory=*/serializer_factory_, @@ -108,8 +111,29 @@ TEST_F(PersistProcessorTest, TestValueProcessor) { "Not support for PersistToDisk with position"); } +TEST_F(PersistProcessorTest, TestInvalideValueProcessor) { + std::shared_ptr current_schema = + arrow::schema({arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float32())}); + + auto processor_factory = std::make_shared(current_schema); + ASSERT_EQ(processor_factory->Identifier(), "value"); + // test schema not equal + ASSERT_NOK_WITH_MSG(processor_factory->Create(serializer_factory_->Version(), + /*serializer_factory=*/serializer_factory_, + /*file_schema=*/file_schema_, pool_), + "f3: float must be equal with file_schema"); + // test version mismatch + ASSERT_NOK_WITH_MSG( + processor_factory->Create("invalid version", + /*serializer_factory=*/serializer_factory_, + /*file_schema=*/current_schema, pool_), + "file_ser_version invalid version mismatch DefaultLookupSerializerFactory version v1"); +} + TEST_F(PersistProcessorTest, TestValueAndPositionProcessor) { auto processor_factory = std::make_shared(file_schema_); + ASSERT_EQ(processor_factory->Identifier(), "position-and-value"); ASSERT_OK_AND_ASSIGN(auto processor, processor_factory->Create(serializer_factory_->Version(), /*serializer_factory=*/serializer_factory_, diff --git a/src/paimon/core/mergetree/lookup_levels_test.cpp b/src/paimon/core/mergetree/lookup_levels_test.cpp index 1242b43f..b7e6340a 100644 --- a/src/paimon/core/mergetree/lookup_levels_test.cpp +++ b/src/paimon/core/mergetree/lookup_levels_test.cpp @@ -217,6 +217,7 @@ TEST_F(LookupLevelsTest, TestMultiLevels) { ASSERT_EQ(lookup_levels->lookup_file_cache_.size(), 2); ASSERT_EQ(lookup_levels->schema_id_and_ser_version_to_processors_.size(), 1); + ASSERT_EQ(lookup_levels->GetLevels()->NonEmptyHighestLevel(), 2); // TODO(lisizhuo.lsz): test lookuplevels close }