Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ set(EP_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
set(EP_C_FLAGS "${CMAKE_C_FLAGS}")
string(REPLACE "-Wglobal-constructors" "" EP_CXX_FLAGS ${EP_CXX_FLAGS})
string(REPLACE "-Wglobal-constructors" "" EP_C_FLAGS ${EP_C_FLAGS})
# Remove coverage flags from third-party dependencies to avoid gcov dependency
string(REPLACE "--coverage" "" EP_CXX_FLAGS ${EP_CXX_FLAGS})
string(REPLACE "--coverage" "" EP_C_FLAGS ${EP_C_FLAGS})
string(REPLACE "-DCOVERAGE_BUILD" "" EP_CXX_FLAGS ${EP_CXX_FLAGS})
string(REPLACE "-DCOVERAGE_BUILD" "" EP_C_FLAGS ${EP_C_FLAGS})
if(NOT MSVC_TOOLCHAIN)
# Set -fPIC on all external projects
string(APPEND EP_CXX_FLAGS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ Result<MemorySlice::SliceComparator> RowCompactedSerializer::CreateSliceComparat
auto comparator = [row_reader1, row_reader2, readers, comparators](
const std::shared_ptr<MemorySlice>& slice1,
const std::shared_ptr<MemorySlice>& slice2) -> Result<int32_t> {
row_reader1->PointTo(*slice1->GetSegment(), slice1->Offset());
row_reader2->PointTo(*slice2->GetSegment(), slice2->Offset());
row_reader1->PointTo(slice1->GetSegment(), slice1->Offset());
row_reader2->PointTo(slice2->GetSegment(), slice2->Offset());
for (int32_t i = 0; i < static_cast<int32_t>(readers.size()); i++) {
bool is_null1 = row_reader1->IsNullAt(i);
bool is_null2 = row_reader2->IsNullAt(i);
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/io/cache/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
#include "paimon/common/io/cache/cache.h"

namespace paimon {
std::shared_ptr<CacheValue> NoCache::Get(
Result<std::shared_ptr<CacheValue>> NoCache::Get(
const std::shared_ptr<CacheKey>& key,
std::function<std::shared_ptr<CacheValue>(const std::shared_ptr<CacheKey>&)> supplier) {
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)> supplier) {
return supplier(key);
}

Expand Down
19 changes: 10 additions & 9 deletions src/paimon/common/io/cache/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@

#include "paimon/common/io/cache/cache_key.h"
#include "paimon/common/memory/memory_segment.h"
#include "paimon/status.h"
#include "paimon/result.h"

namespace paimon {
class CacheValue;

class Cache {
public:
virtual ~Cache() = default;
virtual std::shared_ptr<CacheValue> Get(
virtual Result<std::shared_ptr<CacheValue>> Get(
const std::shared_ptr<CacheKey>& key,
std::function<std::shared_ptr<CacheValue>(const std::shared_ptr<CacheKey>&)> supplier) = 0;
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
supplier) = 0;

virtual void Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) = 0;
Expand All @@ -46,10 +47,10 @@ class Cache {

class NoCache : public Cache {
public:
std::shared_ptr<CacheValue> Get(
Result<std::shared_ptr<CacheValue>> Get(
const std::shared_ptr<CacheKey>& key,
std::function<std::shared_ptr<CacheValue>(const std::shared_ptr<CacheKey>&)> supplier)
override;
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
supplier) override;
void Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) override;
void Invalidate(const std::shared_ptr<CacheKey>& key) override;
Expand All @@ -59,13 +60,13 @@ class NoCache : public Cache {

class CacheValue {
public:
explicit CacheValue(const std::shared_ptr<MemorySegment>& segment) : segment_(segment) {}
explicit CacheValue(const MemorySegment& segment) : segment_(segment) {}

std::shared_ptr<MemorySegment> GetSegment() {
MemorySegment GetSegment() {
return segment_;
Comment thread
lxy-9602 marked this conversation as resolved.
}

private:
std::shared_ptr<MemorySegment> segment_;
MemorySegment segment_;
};
} // namespace paimon
16 changes: 6 additions & 10 deletions src/paimon/common/io/cache/cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,16 @@

namespace paimon {

std::shared_ptr<MemorySegment> CacheManager::GetPage(
Result<MemorySegment> CacheManager::GetPage(
std::shared_ptr<CacheKey>& key,
std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> reader) {
auto& cache = key->IsIndex() ? index_cache_ : data_cache_;
auto supplier = [=](const std::shared_ptr<CacheKey>& k) -> std::shared_ptr<CacheValue> {
auto ret = reader(k);
if (!ret.ok()) {
return nullptr;
}
auto segment = ret.value();
auto ptr = std::make_shared<MemorySegment>(segment);
return std::make_shared<CacheValue>(ptr);
auto supplier = [&](const std::shared_ptr<CacheKey>& k) -> Result<std::shared_ptr<CacheValue>> {
PAIMON_ASSIGN_OR_RAISE(MemorySegment segment, reader(k));
return std::make_shared<CacheValue>(segment);
};
return cache->Get(key, supplier)->GetSegment();
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<CacheValue> cache_value, cache->Get(key, supplier));
return cache_value->GetSegment();
}

void CacheManager::InvalidPage(const std::shared_ptr<CacheKey>& key) {
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/io/cache/cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CacheManager {
index_cache_ = std::make_shared<NoCache>();
}

std::shared_ptr<MemorySegment> GetPage(
Result<MemorySegment> GetPage(
std::shared_ptr<CacheKey>& key,
std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> reader);

Expand Down
7 changes: 3 additions & 4 deletions src/paimon/common/io/memory_segment_output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ MemorySegmentOutputStream::MemorySegmentOutputStream(int32_t segment_size,
}

void MemorySegmentOutputStream::Advance() {
MemorySegment next_segment = NextSegment();
current_segment_ = std::make_shared<MemorySegment>(next_segment);
current_segment_ = NextSegment();
position_in_segment_ = 0;
}

Expand Down Expand Up @@ -61,7 +60,7 @@ void MemorySegmentOutputStream::Write(const char* data, uint32_t size) {
void MemorySegmentOutputStream::Write(const MemorySegment& segment, int32_t offset, int32_t len) {
int32_t remaining = segment_size_ - position_in_segment_;
if (remaining >= len) {
segment.CopyTo(offset, current_segment_.get(), position_in_segment_, len);
segment.CopyTo(offset, &current_segment_, position_in_segment_, len);
position_in_segment_ += len;
} else {
if (remaining == 0) {
Expand All @@ -70,7 +69,7 @@ void MemorySegmentOutputStream::Write(const MemorySegment& segment, int32_t offs
}
while (true) {
int32_t to_put = std::min(remaining, len);
segment.CopyTo(offset, current_segment_.get(), position_in_segment_, to_put);
segment.CopyTo(offset, &current_segment_, position_in_segment_, to_put);
offset += to_put;
len -= to_put;

Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/io/memory_segment_output_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class PAIMON_EXPORT MemorySegmentOutputStream {
int32_t segment_size_;
int32_t position_in_segment_;
std::shared_ptr<MemoryPool> pool_;
std::shared_ptr<MemorySegment> current_segment_;
MemorySegment current_segment_;
std::vector<MemorySegment> memory_segments_;

ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN;
Expand All @@ -88,7 +88,7 @@ class PAIMON_EXPORT MemorySegmentOutputStream {
template <typename T>
void MemorySegmentOutputStream::WriteValueImpl(T v) {
if (position_in_segment_ <= segment_size_ - static_cast<int32_t>(sizeof(T))) {
current_segment_->PutValue<T>(position_in_segment_, v);
current_segment_.PutValue<T>(position_in_segment_, v);
position_in_segment_ += sizeof(T);
} else if (position_in_segment_ == segment_size_) {
Advance();
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/lookup/lookup_store_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ Result<std::shared_ptr<BloomFilter>> LookupStoreFactory::BfGenerator(int64_t row
return std::shared_ptr<BloomFilter>();
}
auto bloom_filter = BloomFilter::Create(row_count, options.GetLookupCacheBloomFilterFpp());
auto bytes_for_bf = MemorySegment::AllocateHeapMemory(bloom_filter->ByteLength(), pool);
auto memory_segment = std::make_shared<MemorySegment>(bytes_for_bf);
MemorySegment memory_segment =
MemorySegment::AllocateHeapMemory(bloom_filter->ByteLength(), pool);
PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(memory_segment));
return bloom_filter;
}
Expand Down
30 changes: 14 additions & 16 deletions src/paimon/common/memory/memory_slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@
namespace paimon {
std::shared_ptr<MemorySlice> MemorySlice::Wrap(const std::shared_ptr<Bytes>& bytes) {
auto segment = MemorySegment::Wrap(bytes);
auto ptr = std::make_shared<MemorySegment>(segment);
return std::make_shared<MemorySlice>(ptr, 0, ptr->Size());
return std::make_shared<MemorySlice>(segment, 0, segment.Size());
}

std::shared_ptr<MemorySlice> MemorySlice::Wrap(const std::shared_ptr<MemorySegment>& segment) {
return std::make_shared<MemorySlice>(segment, 0, segment->Size());
std::shared_ptr<MemorySlice> MemorySlice::Wrap(const MemorySegment& segment) {
return std::make_shared<MemorySlice>(segment, 0, segment.Size());
}

MemorySlice::MemorySlice(const std::shared_ptr<MemorySegment>& segment, int32_t offset,
int32_t length)
MemorySlice::MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length)
: segment_(segment), offset_(offset), length_(length) {}

std::shared_ptr<MemorySlice> MemorySlice::Slice(int32_t index, int32_t length) {
Expand All @@ -49,38 +47,38 @@ int32_t MemorySlice::Offset() const {
}

std::shared_ptr<Bytes> MemorySlice::GetHeapMemory() const {
return segment_->GetHeapMemory();
return segment_.GetHeapMemory();
}

std::shared_ptr<MemorySegment> MemorySlice::GetSegment() const {
MemorySegment MemorySlice::GetSegment() const {
return segment_;
}

int8_t MemorySlice::ReadByte(int32_t position) {
return segment_->GetValue<int8_t>(offset_ + position);
return segment_.GetValue<int8_t>(offset_ + position);
}

int32_t MemorySlice::ReadInt(int32_t position) {
return segment_->GetValue<int32_t>(offset_ + position);
return segment_.GetValue<int32_t>(offset_ + position);
}

int16_t MemorySlice::ReadShort(int32_t position) {
return segment_->GetValue<int16_t>(offset_ + position);
return segment_.GetValue<int16_t>(offset_ + position);
}

int64_t MemorySlice::ReadLong(int32_t position) {
return segment_->GetValue<int64_t>(offset_ + position);
return segment_.GetValue<int64_t>(offset_ + position);
}

std::string_view MemorySlice::ReadStringView() {
auto array = segment_->GetArray();
auto array = segment_.GetArray();
return {array->data() + offset_, static_cast<size_t>(length_)};
}

std::shared_ptr<Bytes> MemorySlice::CopyBytes(MemoryPool* pool) {
auto bytes = std::make_shared<Bytes>(length_, pool);
auto target = MemorySegment::Wrap(bytes);
segment_->CopyTo(offset_, &target, 0, length_);
segment_.CopyTo(offset_, &target, 0, length_);
return bytes;
}

Expand Down Expand Up @@ -110,8 +108,8 @@ std::shared_ptr<MemorySliceInput> MemorySlice::ToInput() {
int32_t MemorySlice::Compare(const MemorySlice& other) const {
int32_t len = std::min(length_, other.length_);
for (int32_t i = 0; i < len; ++i) {
auto byte1 = static_cast<unsigned char>(segment_->Get(offset_ + i));
auto byte2 = static_cast<unsigned char>(other.segment_->Get(other.offset_ + i));
auto byte1 = static_cast<unsigned char>(segment_.Get(offset_ + i));
auto byte2 = static_cast<unsigned char>(other.segment_.Get(other.offset_ + i));
if (byte1 != byte2) {
return static_cast<int>(byte1) - static_cast<int>(byte2);
}
Expand Down
8 changes: 4 additions & 4 deletions src/paimon/common/memory/memory_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ class MemorySliceInput;
class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this<MemorySlice> {
public:
static std::shared_ptr<MemorySlice> Wrap(const std::shared_ptr<Bytes>& bytes);
static std::shared_ptr<MemorySlice> Wrap(const std::shared_ptr<MemorySegment>& segment);
static std::shared_ptr<MemorySlice> Wrap(const MemorySegment& segment);

using SliceComparator = std::function<Result<int32_t>(const std::shared_ptr<MemorySlice>&,
const std::shared_ptr<MemorySlice>&)>;

public:
MemorySlice() = default;

MemorySlice(const std::shared_ptr<MemorySegment>& segment, int32_t offset, int32_t length);
MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length);
std::shared_ptr<MemorySlice> Slice(int32_t index, int32_t length);

int32_t Length() const;
int32_t Offset() const;
std::shared_ptr<Bytes> GetHeapMemory() const;
std::shared_ptr<MemorySegment> GetSegment() const;
MemorySegment GetSegment() const;
Comment thread
lxy-9602 marked this conversation as resolved.
Outdated

int8_t ReadByte(int32_t position);
int32_t ReadInt(int32_t position);
Expand All @@ -71,7 +71,7 @@ class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this<MemorySlic
int32_t Compare(const MemorySlice& other) const;

private:
std::shared_ptr<MemorySegment> segment_;
MemorySegment segment_;
int32_t offset_;
int32_t length_;
};
Expand Down
3 changes: 1 addition & 2 deletions src/paimon/common/memory/memory_slice_output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ void MemorySliceOutput::Reset() {
}

std::unique_ptr<MemorySlice> MemorySliceOutput::ToSlice() {
auto segment = std::make_shared<MemorySegment>(segment_);
return std::make_unique<MemorySlice>(segment, 0, size_);
return std::make_unique<MemorySlice>(segment_, 0, size_);
}

template <typename T>
Expand Down
16 changes: 8 additions & 8 deletions src/paimon/common/sst/block_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ class BlockCache {

~BlockCache() = default;

std::shared_ptr<MemorySegment> GetBlock(int64_t position, int32_t length, bool is_index) {
Result<MemorySegment> GetBlock(int64_t position, int32_t length, bool is_index) {
auto key = CacheKey::ForPosition(file_path_, position, length, is_index);

auto it = blocks_.find(key);
if (it == blocks_.end()) {
auto segment = cache_manager_->GetPage(
key, [&](const std::shared_ptr<paimon::CacheKey>&) -> Result<MemorySegment> {
return ReadFrom(position, length);
});
if (!segment.get()) {
blocks_.insert({key, std::make_shared<CacheValue>(segment)});
}
PAIMON_ASSIGN_OR_RAISE(
MemorySegment segment,
cache_manager_->GetPage(
key, [&](const std::shared_ptr<paimon::CacheKey>&) -> Result<MemorySegment> {
return ReadFrom(position, length);
}));
blocks_.insert({key, std::make_shared<CacheValue>(segment)});
return segment;
}
return it->second->GetSegment();
Expand Down
Loading
Loading