Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ set(EP_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
set(EP_C_FLAGS "${CMAKE_C_FLAGS}")
string(REPLACE "-Wglobal-constructors" "" EP_CXX_FLAGS ${EP_CXX_FLAGS})
string(REPLACE "-Wglobal-constructors" "" EP_C_FLAGS ${EP_C_FLAGS})
# Remove coverage flags from third-party dependencies to avoid gcov dependency
string(REPLACE "--coverage" "" EP_CXX_FLAGS ${EP_CXX_FLAGS})
string(REPLACE "--coverage" "" EP_C_FLAGS ${EP_C_FLAGS})
string(REPLACE "-DCOVERAGE_BUILD" "" EP_CXX_FLAGS ${EP_CXX_FLAGS})
string(REPLACE "-DCOVERAGE_BUILD" "" EP_C_FLAGS ${EP_C_FLAGS})
if(NOT MSVC_TOOLCHAIN)
# Set -fPIC on all external projects
string(APPEND EP_CXX_FLAGS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ Result<MemorySlice::SliceComparator> RowCompactedSerializer::CreateSliceComparat
auto comparator = [row_reader1, row_reader2, readers, comparators](
const std::shared_ptr<MemorySlice>& slice1,
const std::shared_ptr<MemorySlice>& slice2) -> Result<int32_t> {
row_reader1->PointTo(*slice1->GetSegment(), slice1->Offset());
row_reader2->PointTo(*slice2->GetSegment(), slice2->Offset());
row_reader1->PointTo(slice1->GetSegment(), slice1->Offset());
row_reader2->PointTo(slice2->GetSegment(), slice2->Offset());
for (int32_t i = 0; i < static_cast<int32_t>(readers.size()); i++) {
bool is_null1 = row_reader1->IsNullAt(i);
bool is_null2 = row_reader2->IsNullAt(i);
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/io/cache/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
#include "paimon/common/io/cache/cache.h"

namespace paimon {
std::shared_ptr<CacheValue> NoCache::Get(
Result<std::shared_ptr<CacheValue>> NoCache::Get(
const std::shared_ptr<CacheKey>& key,
std::function<std::shared_ptr<CacheValue>(const std::shared_ptr<CacheKey>&)> supplier) {
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)> supplier) {
return supplier(key);
}

Expand Down
19 changes: 10 additions & 9 deletions src/paimon/common/io/cache/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@

#include "paimon/common/io/cache/cache_key.h"
#include "paimon/common/memory/memory_segment.h"
#include "paimon/status.h"
#include "paimon/result.h"

namespace paimon {
class CacheValue;

class Cache {
public:
virtual ~Cache() = default;
virtual std::shared_ptr<CacheValue> Get(
virtual Result<std::shared_ptr<CacheValue>> Get(
const std::shared_ptr<CacheKey>& key,
std::function<std::shared_ptr<CacheValue>(const std::shared_ptr<CacheKey>&)> supplier) = 0;
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
supplier) = 0;

virtual void Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) = 0;
Expand All @@ -46,10 +47,10 @@ class Cache {

class NoCache : public Cache {
public:
std::shared_ptr<CacheValue> Get(
Result<std::shared_ptr<CacheValue>> Get(
const std::shared_ptr<CacheKey>& key,
std::function<std::shared_ptr<CacheValue>(const std::shared_ptr<CacheKey>&)> supplier)
override;
std::function<Result<std::shared_ptr<CacheValue>>(const std::shared_ptr<CacheKey>&)>
supplier) override;
void Put(const std::shared_ptr<CacheKey>& key,
const std::shared_ptr<CacheValue>& value) override;
void Invalidate(const std::shared_ptr<CacheKey>& key) override;
Expand All @@ -59,13 +60,13 @@ class NoCache : public Cache {

class CacheValue {
public:
explicit CacheValue(const std::shared_ptr<MemorySegment>& segment) : segment_(segment) {}
explicit CacheValue(const MemorySegment& segment) : segment_(segment) {}

std::shared_ptr<MemorySegment> GetSegment() {
const MemorySegment& GetSegment() const {
return segment_;
}

private:
std::shared_ptr<MemorySegment> segment_;
MemorySegment segment_;
};
} // namespace paimon
16 changes: 6 additions & 10 deletions src/paimon/common/io/cache/cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,16 @@

namespace paimon {

std::shared_ptr<MemorySegment> CacheManager::GetPage(
Result<MemorySegment> CacheManager::GetPage(
std::shared_ptr<CacheKey>& key,
std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> reader) {
auto& cache = key->IsIndex() ? index_cache_ : data_cache_;
auto supplier = [=](const std::shared_ptr<CacheKey>& k) -> std::shared_ptr<CacheValue> {
auto ret = reader(k);
if (!ret.ok()) {
return nullptr;
}
auto segment = ret.value();
auto ptr = std::make_shared<MemorySegment>(segment);
return std::make_shared<CacheValue>(ptr);
auto supplier = [&](const std::shared_ptr<CacheKey>& k) -> Result<std::shared_ptr<CacheValue>> {
PAIMON_ASSIGN_OR_RAISE(MemorySegment segment, reader(k));
return std::make_shared<CacheValue>(segment);
};
return cache->Get(key, supplier)->GetSegment();
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<CacheValue> cache_value, cache->Get(key, supplier));
return cache_value->GetSegment();
}

void CacheManager::InvalidPage(const std::shared_ptr<CacheKey>& key) {
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/io/cache/cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CacheManager {
index_cache_ = std::make_shared<NoCache>();
}

std::shared_ptr<MemorySegment> GetPage(
Result<MemorySegment> GetPage(
std::shared_ptr<CacheKey>& key,
std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> reader);

Expand Down
7 changes: 3 additions & 4 deletions src/paimon/common/io/memory_segment_output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ MemorySegmentOutputStream::MemorySegmentOutputStream(int32_t segment_size,
}

void MemorySegmentOutputStream::Advance() {
MemorySegment next_segment = NextSegment();
current_segment_ = std::make_shared<MemorySegment>(next_segment);
current_segment_ = NextSegment();
position_in_segment_ = 0;
}

Expand Down Expand Up @@ -61,7 +60,7 @@ void MemorySegmentOutputStream::Write(const char* data, uint32_t size) {
void MemorySegmentOutputStream::Write(const MemorySegment& segment, int32_t offset, int32_t len) {
int32_t remaining = segment_size_ - position_in_segment_;
if (remaining >= len) {
segment.CopyTo(offset, current_segment_.get(), position_in_segment_, len);
segment.CopyTo(offset, &current_segment_, position_in_segment_, len);
position_in_segment_ += len;
} else {
if (remaining == 0) {
Expand All @@ -70,7 +69,7 @@ void MemorySegmentOutputStream::Write(const MemorySegment& segment, int32_t offs
}
while (true) {
int32_t to_put = std::min(remaining, len);
segment.CopyTo(offset, current_segment_.get(), position_in_segment_, to_put);
segment.CopyTo(offset, &current_segment_, position_in_segment_, to_put);
offset += to_put;
len -= to_put;

Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/io/memory_segment_output_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class PAIMON_EXPORT MemorySegmentOutputStream {
int32_t segment_size_;
int32_t position_in_segment_;
std::shared_ptr<MemoryPool> pool_;
std::shared_ptr<MemorySegment> current_segment_;
MemorySegment current_segment_;
std::vector<MemorySegment> memory_segments_;

ByteOrder byte_order_ = ByteOrder::PAIMON_BIG_ENDIAN;
Expand All @@ -88,7 +88,7 @@ class PAIMON_EXPORT MemorySegmentOutputStream {
template <typename T>
void MemorySegmentOutputStream::WriteValueImpl(T v) {
if (position_in_segment_ <= segment_size_ - static_cast<int32_t>(sizeof(T))) {
current_segment_->PutValue<T>(position_in_segment_, v);
current_segment_.PutValue<T>(position_in_segment_, v);
position_in_segment_ += sizeof(T);
} else if (position_in_segment_ == segment_size_) {
Advance();
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/lookup/lookup_store_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ Result<std::shared_ptr<BloomFilter>> LookupStoreFactory::BfGenerator(int64_t row
return std::shared_ptr<BloomFilter>();
}
auto bloom_filter = BloomFilter::Create(row_count, options.GetLookupCacheBloomFilterFpp());
auto bytes_for_bf = MemorySegment::AllocateHeapMemory(bloom_filter->ByteLength(), pool);
auto memory_segment = std::make_shared<MemorySegment>(bytes_for_bf);
MemorySegment memory_segment =
MemorySegment::AllocateHeapMemory(bloom_filter->ByteLength(), pool);
PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(memory_segment));
return bloom_filter;
}
Expand Down
30 changes: 14 additions & 16 deletions src/paimon/common/memory/memory_slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@
namespace paimon {
std::shared_ptr<MemorySlice> MemorySlice::Wrap(const std::shared_ptr<Bytes>& bytes) {
auto segment = MemorySegment::Wrap(bytes);
auto ptr = std::make_shared<MemorySegment>(segment);
return std::make_shared<MemorySlice>(ptr, 0, ptr->Size());
return std::make_shared<MemorySlice>(segment, 0, segment.Size());
}

std::shared_ptr<MemorySlice> MemorySlice::Wrap(const std::shared_ptr<MemorySegment>& segment) {
return std::make_shared<MemorySlice>(segment, 0, segment->Size());
std::shared_ptr<MemorySlice> MemorySlice::Wrap(const MemorySegment& segment) {
return std::make_shared<MemorySlice>(segment, 0, segment.Size());
}

MemorySlice::MemorySlice(const std::shared_ptr<MemorySegment>& segment, int32_t offset,
int32_t length)
MemorySlice::MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length)
: segment_(segment), offset_(offset), length_(length) {}

std::shared_ptr<MemorySlice> MemorySlice::Slice(int32_t index, int32_t length) {
Expand All @@ -49,38 +47,38 @@ int32_t MemorySlice::Offset() const {
}

std::shared_ptr<Bytes> MemorySlice::GetHeapMemory() const {
return segment_->GetHeapMemory();
return segment_.GetHeapMemory();
}

std::shared_ptr<MemorySegment> MemorySlice::GetSegment() const {
const MemorySegment& MemorySlice::GetSegment() const {
return segment_;
}

int8_t MemorySlice::ReadByte(int32_t position) {
return segment_->GetValue<int8_t>(offset_ + position);
return segment_.GetValue<int8_t>(offset_ + position);
}

int32_t MemorySlice::ReadInt(int32_t position) {
return segment_->GetValue<int32_t>(offset_ + position);
return segment_.GetValue<int32_t>(offset_ + position);
}

int16_t MemorySlice::ReadShort(int32_t position) {
return segment_->GetValue<int16_t>(offset_ + position);
return segment_.GetValue<int16_t>(offset_ + position);
}

int64_t MemorySlice::ReadLong(int32_t position) {
return segment_->GetValue<int64_t>(offset_ + position);
return segment_.GetValue<int64_t>(offset_ + position);
}

std::string_view MemorySlice::ReadStringView() {
auto array = segment_->GetArray();
auto array = segment_.GetArray();
return {array->data() + offset_, static_cast<size_t>(length_)};
}

std::shared_ptr<Bytes> MemorySlice::CopyBytes(MemoryPool* pool) {
auto bytes = std::make_shared<Bytes>(length_, pool);
auto target = MemorySegment::Wrap(bytes);
segment_->CopyTo(offset_, &target, 0, length_);
segment_.CopyTo(offset_, &target, 0, length_);
return bytes;
}

Expand Down Expand Up @@ -110,8 +108,8 @@ std::shared_ptr<MemorySliceInput> MemorySlice::ToInput() {
int32_t MemorySlice::Compare(const MemorySlice& other) const {
int32_t len = std::min(length_, other.length_);
for (int32_t i = 0; i < len; ++i) {
auto byte1 = static_cast<unsigned char>(segment_->Get(offset_ + i));
auto byte2 = static_cast<unsigned char>(other.segment_->Get(other.offset_ + i));
auto byte1 = static_cast<unsigned char>(segment_.Get(offset_ + i));
auto byte2 = static_cast<unsigned char>(other.segment_.Get(other.offset_ + i));
if (byte1 != byte2) {
return static_cast<int>(byte1) - static_cast<int>(byte2);
}
Expand Down
8 changes: 4 additions & 4 deletions src/paimon/common/memory/memory_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ class MemorySliceInput;
class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this<MemorySlice> {
public:
static std::shared_ptr<MemorySlice> Wrap(const std::shared_ptr<Bytes>& bytes);
static std::shared_ptr<MemorySlice> Wrap(const std::shared_ptr<MemorySegment>& segment);
static std::shared_ptr<MemorySlice> Wrap(const MemorySegment& segment);

using SliceComparator = std::function<Result<int32_t>(const std::shared_ptr<MemorySlice>&,
const std::shared_ptr<MemorySlice>&)>;

public:
MemorySlice() = default;

MemorySlice(const std::shared_ptr<MemorySegment>& segment, int32_t offset, int32_t length);
MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length);
std::shared_ptr<MemorySlice> Slice(int32_t index, int32_t length);

int32_t Length() const;
int32_t Offset() const;
std::shared_ptr<Bytes> GetHeapMemory() const;
std::shared_ptr<MemorySegment> GetSegment() const;
const MemorySegment& GetSegment() const;

int8_t ReadByte(int32_t position);
int32_t ReadInt(int32_t position);
Expand All @@ -71,7 +71,7 @@ class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this<MemorySlic
int32_t Compare(const MemorySlice& other) const;

private:
std::shared_ptr<MemorySegment> segment_;
MemorySegment segment_;
int32_t offset_;
int32_t length_;
};
Expand Down
3 changes: 1 addition & 2 deletions src/paimon/common/memory/memory_slice_output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ void MemorySliceOutput::Reset() {
}

std::unique_ptr<MemorySlice> MemorySliceOutput::ToSlice() {
auto segment = std::make_shared<MemorySegment>(segment_);
return std::make_unique<MemorySlice>(segment, 0, size_);
return std::make_unique<MemorySlice>(segment_, 0, size_);
}

template <typename T>
Expand Down
16 changes: 8 additions & 8 deletions src/paimon/common/sst/block_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ class BlockCache {

~BlockCache() = default;

std::shared_ptr<MemorySegment> GetBlock(int64_t position, int32_t length, bool is_index) {
Result<MemorySegment> GetBlock(int64_t position, int32_t length, bool is_index) {
auto key = CacheKey::ForPosition(file_path_, position, length, is_index);

auto it = blocks_.find(key);
if (it == blocks_.end()) {
auto segment = cache_manager_->GetPage(
key, [&](const std::shared_ptr<paimon::CacheKey>&) -> Result<MemorySegment> {
return ReadFrom(position, length);
});
if (!segment.get()) {
blocks_.insert({key, std::make_shared<CacheValue>(segment)});
}
PAIMON_ASSIGN_OR_RAISE(
MemorySegment segment,
cache_manager_->GetPage(
key, [&](const std::shared_ptr<paimon::CacheKey>&) -> Result<MemorySegment> {
return ReadFrom(position, length);
}));
blocks_.insert({key, std::make_shared<CacheValue>(segment)});
return segment;
}
return it->second->GetSegment();
Expand Down
Loading
Loading