|
| 1 | +/* |
| 2 | + * Copyright 2026-present Alibaba Inc. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +#include "paimon/common/file_index/rangebitmap/dictionary/chunked_dictionary.h" |
| 18 | + |
| 19 | +#include <algorithm> |
| 20 | + |
| 21 | +#include "fmt/format.h" |
| 22 | +#include "paimon/common/file_index/rangebitmap/dictionary/fixed_length_chunk.h" |
| 23 | +#include "paimon/common/file_index/rangebitmap/dictionary/key_factory.h" |
| 24 | +#include "paimon/common/memory/memory_segment_utils.h" |
| 25 | +#include "paimon/fs/file_system.h" |
| 26 | +#include "paimon/io/byte_array_input_stream.h" |
| 27 | +#include "paimon/io/data_input_stream.h" |
| 28 | +#include "paimon/memory/bytes.h" |
| 29 | +#include "paimon/result.h" |
| 30 | +#include "paimon/status.h" |
| 31 | + |
| 32 | +namespace paimon { |
| 33 | + |
| 34 | +Result<int32_t> ChunkedDictionary::Find(const Literal& key) { |
| 35 | + int32_t low = 0; |
| 36 | + int32_t high = size_ - 1; |
| 37 | + while (low <= high) { |
| 38 | + const int32_t mid = low + (high - low) / 2; |
| 39 | + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Chunk> chunk, GetChunk(mid)); |
| 40 | + PAIMON_ASSIGN_OR_RAISE(const int32_t result, factory_->CompareLiteral(chunk->Key(), key)); |
| 41 | + if (result > 0) { |
| 42 | + high = mid - 1; |
| 43 | + } else if (result < 0) { |
| 44 | + low = mid + 1; |
| 45 | + } else { |
| 46 | + return chunk->Code(); |
| 47 | + } |
| 48 | + } |
| 49 | + if (low == 0) { |
| 50 | + return -(low + 1); // This makes sure the return value is negative if not found |
| 51 | + } |
| 52 | + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Chunk> prev_chunk, GetChunk(low - 1)); |
| 53 | + return prev_chunk->Find(key); |
| 54 | +} |
| 55 | + |
| 56 | +Result<Literal> ChunkedDictionary::Find(int32_t code) { |
| 57 | + if (size_ <= 0) { |
| 58 | + return Status::Invalid(fmt::format("Cannot find code {} in an empty Dictionary", code)); |
| 59 | + } |
| 60 | + if (code < 0) { |
| 61 | + return Status::Invalid(fmt::format("Invalid code: {}", code)); |
| 62 | + } |
| 63 | + int32_t low = 0; |
| 64 | + int32_t high = size_ - 1; |
| 65 | + |
| 66 | + while (low <= high) { |
| 67 | + const int32_t mid = low + (high - low) / 2; |
| 68 | + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Chunk> chunk, GetChunk(mid)); |
| 69 | + |
| 70 | + int32_t chunk_code = chunk->Code(); |
| 71 | + if (chunk_code > code) { |
| 72 | + high = mid - 1; |
| 73 | + } else if (chunk_code < code) { |
| 74 | + low = mid + 1; |
| 75 | + } else { |
| 76 | + return {chunk->Key()}; |
| 77 | + } |
| 78 | + } |
| 79 | + if (low == 0) { |
| 80 | + return Status::Invalid(fmt::format("Cannot find code {}", code)); |
| 81 | + } |
| 82 | + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Chunk> prev_chunk, GetChunk(low - 1)); |
| 83 | + return prev_chunk->Find(code); |
| 84 | +} |
| 85 | + |
| 86 | +Result<std::shared_ptr<Chunk>> ChunkedDictionary::GetChunk(int32_t index) { |
| 87 | + if (index < 0 || index >= size_) { |
| 88 | + return Status::Invalid(fmt::format("Invalid chunk index: {}", index)); |
| 89 | + } |
| 90 | + if (offsets_bytes_ == nullptr || chunks_bytes_ == nullptr) { |
| 91 | + PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_, FS_SEEK_SET)); |
| 92 | + auto offsets = Bytes::AllocateBytes(offsets_length_, pool_.get()); |
| 93 | + PAIMON_RETURN_NOT_OK(input_stream_->Read(offsets->data(), offsets_length_)); |
| 94 | + offsets_bytes_ = std::move(offsets); |
| 95 | + auto chunks = Bytes::AllocateBytes(chunks_length_, pool_.get()); |
| 96 | + PAIMON_RETURN_NOT_OK(input_stream_->Read(chunks->data(), chunks_length_)); |
| 97 | + chunks_bytes_ = std::move(chunks); |
| 98 | + } |
| 99 | + if (chunks_cache_[index]) { |
| 100 | + return chunks_cache_[index]; |
| 101 | + } |
| 102 | + auto data_in = std::make_unique<DataInputStream>( |
| 103 | + std::make_shared<ByteArrayInputStream>(offsets_bytes_->data(), offsets_length_)); |
| 104 | + PAIMON_RETURN_NOT_OK(data_in->Seek(sizeof(int32_t) * index)); |
| 105 | + PAIMON_ASSIGN_OR_RAISE(int32_t chunk_offset, data_in->ReadValue<int32_t>()); |
| 106 | + PAIMON_ASSIGN_OR_RAISE( |
| 107 | + std::unique_ptr<Chunk> chunk, |
| 108 | + factory_->MmapChunk(input_stream_, body_offset_ + offsets_length_ + chunk_offset, |
| 109 | + body_offset_ + chunks_length_ + offsets_length_, pool_)); |
| 110 | + chunks_cache_[index] = std::move(chunk); |
| 111 | + return chunks_cache_[index]; |
| 112 | +} |
| 113 | + |
| 114 | +Result<std::unique_ptr<ChunkedDictionary::Appender>> ChunkedDictionary::Appender::Create( |
| 115 | + const std::shared_ptr<KeyFactory>& key_factory, int32_t chunk_size_bytes, |
| 116 | + const std::shared_ptr<MemoryPool>& pool) { |
| 117 | + return std::unique_ptr<Appender>(new Appender(key_factory, chunk_size_bytes, pool)); |
| 118 | +} |
| 119 | + |
| 120 | +ChunkedDictionary::Appender::Appender(const std::shared_ptr<KeyFactory>& key_factory, |
| 121 | + int32_t chunk_size_bytes, |
| 122 | + const std::shared_ptr<MemoryPool>& pool) |
| 123 | + : pool_(pool), |
| 124 | + key_factory_(key_factory), |
| 125 | + chunk_size_bytes_(chunk_size_bytes), |
| 126 | + chunk_(nullptr), |
| 127 | + size_(0), |
| 128 | + key_offset_(0), |
| 129 | + chunks_offset_(0) { |
| 130 | + chunks_output_ = std::make_unique<MemorySegmentOutputStream>( |
| 131 | + MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool_); |
| 132 | + keys_output_ = std::make_unique<MemorySegmentOutputStream>( |
| 133 | + MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool_); |
| 134 | + offsets_output_ = std::make_unique<MemorySegmentOutputStream>( |
| 135 | + MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool_); |
| 136 | +} |
| 137 | + |
| 138 | +Status ChunkedDictionary::Appender::AppendSorted(const Literal& key, int32_t code) { |
| 139 | + if (key.IsNull()) { |
| 140 | + return Status::Invalid("key should not be null"); |
| 141 | + } |
| 142 | + if (last_key_.has_value()) { |
| 143 | + PAIMON_ASSIGN_OR_RAISE(int32_t compare_result, |
| 144 | + key_factory_->CompareLiteral(*last_key_, key)); |
| 145 | + if (compare_result >= 0) { |
| 146 | + return Status::Invalid("key must be in sorted order"); |
| 147 | + } |
| 148 | + } |
| 149 | + if (last_code_.has_value() && code != *last_code_ + 1) { |
| 150 | + return Status::Invalid("code must be in sorted order and increment by one"); |
| 151 | + } |
| 152 | + last_key_ = key; |
| 153 | + last_code_ = code; |
| 154 | + if (chunk_ == nullptr) { |
| 155 | + PAIMON_ASSIGN_OR_RAISE(chunk_, |
| 156 | + key_factory_->CreateChunk(key, code, chunk_size_bytes_, pool_)); |
| 157 | + } else { |
| 158 | + PAIMON_ASSIGN_OR_RAISE(bool success, chunk_->TryAdd(key)); |
| 159 | + if (success) { |
| 160 | + return Status::OK(); |
| 161 | + } |
| 162 | + PAIMON_RETURN_NOT_OK(Flush()); |
| 163 | + PAIMON_ASSIGN_OR_RAISE(chunk_, |
| 164 | + key_factory_->CreateChunk(key, code, chunk_size_bytes_, pool_)); |
| 165 | + } |
| 166 | + return Status::OK(); |
| 167 | +} |
| 168 | + |
| 169 | +Result<PAIMON_UNIQUE_PTR<Bytes>> ChunkedDictionary::Appender::Serialize() { |
| 170 | + if (chunk_ != nullptr) { |
| 171 | + PAIMON_RETURN_NOT_OK(Flush()); |
| 172 | + } |
| 173 | + int32_t header_size = 0; |
| 174 | + header_size += sizeof(int8_t); // version |
| 175 | + header_size += sizeof(int32_t); // size |
| 176 | + header_size += sizeof(int32_t); // offsets length |
| 177 | + header_size += sizeof(int32_t); // chunks length |
| 178 | + auto data_out = std::make_unique<MemorySegmentOutputStream>( |
| 179 | + MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool_); |
| 180 | + data_out->WriteValue<int32_t>(header_size); |
| 181 | + data_out->WriteValue<int8_t>(kCurrentVersion); |
| 182 | + data_out->WriteValue<int32_t>(size_); |
| 183 | + data_out->WriteValue<int32_t>(static_cast<int32_t>(offsets_output_->CurrentSize())); |
| 184 | + data_out->WriteValue<int32_t>(static_cast<int32_t>(chunks_output_->CurrentSize())); |
| 185 | + PAIMON_RETURN_NOT_OK(MemorySegmentUtils::CopyToStream( |
| 186 | + offsets_output_->Segments(), 0, static_cast<int32_t>(offsets_output_->CurrentSize()), |
| 187 | + data_out.get())); |
| 188 | + PAIMON_RETURN_NOT_OK(MemorySegmentUtils::CopyToStream( |
| 189 | + chunks_output_->Segments(), 0, static_cast<int32_t>(chunks_output_->CurrentSize()), |
| 190 | + data_out.get())); |
| 191 | + PAIMON_RETURN_NOT_OK(MemorySegmentUtils::CopyToStream( |
| 192 | + keys_output_->Segments(), 0, static_cast<int32_t>(keys_output_->CurrentSize()), |
| 193 | + data_out.get())); |
| 194 | + return MemorySegmentUtils::CopyToBytes( |
| 195 | + data_out->Segments(), 0, static_cast<int32_t>(data_out->CurrentSize()), pool_.get()); |
| 196 | +} |
| 197 | + |
| 198 | +Status ChunkedDictionary::Appender::Flush() { |
| 199 | + chunk_->SetOffset(key_offset_); |
| 200 | + PAIMON_ASSIGN_OR_RAISE(PAIMON_UNIQUE_PTR<Bytes> chunks_bytes, chunk_->SerializeChunk()); |
| 201 | + PAIMON_ASSIGN_OR_RAISE(PAIMON_UNIQUE_PTR<Bytes> keys_bytes, chunk_->SerializeKeys()); |
| 202 | + offsets_output_->WriteValue<int32_t>(chunks_offset_); |
| 203 | + chunks_offset_ += static_cast<int32_t>(chunks_bytes->size()); |
| 204 | + key_offset_ += static_cast<int32_t>(keys_bytes->size()); |
| 205 | + chunks_output_->Write(chunks_bytes->data(), chunks_bytes->size()); |
| 206 | + keys_output_->Write(keys_bytes->data(), keys_bytes->size()); |
| 207 | + size_ += 1; |
| 208 | + chunk_ = nullptr; |
| 209 | + return Status::OK(); |
| 210 | +} |
| 211 | + |
| 212 | +Result<std::unique_ptr<ChunkedDictionary>> ChunkedDictionary::Create( |
| 213 | + FieldType field_type, const std::shared_ptr<InputStream>& input_stream, int32_t offset, |
| 214 | + const std::shared_ptr<MemoryPool>& pool) { |
| 215 | + auto data_in = std::make_unique<DataInputStream>(input_stream); |
| 216 | + PAIMON_RETURN_NOT_OK(data_in->Seek(offset)); |
| 217 | + PAIMON_ASSIGN_OR_RAISE(int32_t header_length, data_in->ReadValue<int32_t>()); |
| 218 | + PAIMON_ASSIGN_OR_RAISE(int8_t version, data_in->ReadValue<int8_t>()); |
| 219 | + if (version != kCurrentVersion) { |
| 220 | + return Status::Invalid("Unknown version of ChunkedDictionary"); |
| 221 | + } |
| 222 | + PAIMON_ASSIGN_OR_RAISE(int32_t size, data_in->ReadValue<int32_t>()); |
| 223 | + PAIMON_ASSIGN_OR_RAISE(int32_t offsets_length, data_in->ReadValue<int32_t>()); |
| 224 | + PAIMON_ASSIGN_OR_RAISE(int32_t chunks_length, data_in->ReadValue<int32_t>()); |
| 225 | + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<KeyFactory> factory_shared, |
| 226 | + KeyFactory::Create(field_type)); |
| 227 | + auto result = std::unique_ptr<ChunkedDictionary>(new ChunkedDictionary( |
| 228 | + input_stream, factory_shared, size, offsets_length, chunks_length, |
| 229 | + static_cast<int32_t>(offset + header_length + sizeof(int32_t)), pool)); |
| 230 | + return result; |
| 231 | +} |
| 232 | + |
| 233 | +ChunkedDictionary::ChunkedDictionary(const std::shared_ptr<InputStream>& input_stream, |
| 234 | + const std::shared_ptr<KeyFactory>& factory, int32_t size, |
| 235 | + int32_t offsets_length, int32_t chunks_length, |
| 236 | + int32_t body_offset, const std::shared_ptr<MemoryPool>& pool) |
| 237 | + : pool_(pool), |
| 238 | + factory_(factory), |
| 239 | + input_stream_(input_stream), |
| 240 | + size_(size), |
| 241 | + offsets_length_(offsets_length), |
| 242 | + chunks_length_(chunks_length), |
| 243 | + body_offset_(body_offset), |
| 244 | + offsets_bytes_(nullptr), |
| 245 | + chunks_bytes_(nullptr), |
| 246 | + chunks_cache_(std::vector<std::shared_ptr<Chunk>>(size)) {} |
| 247 | +} // namespace paimon |
0 commit comments