From 4db0581eff6c54c66f7cd128fffdb54c8ef59336 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Tue, 3 Sep 2024 19:05:46 +0800 Subject: [PATCH] ORC-1767: [C++] Improve writing performance of encoded string column and support EncodedStringVectorBatch for StringColumnWriter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve writing performance of encoded string column and support EncodedStringVectorBatch for StringColumnWriter. Performance was measured in https://github.com/ClickHouse/orc/pull/15 original tests. Closes #2010 from taiyang-li/apache_improve_dict_write. Lead-authored-by: taiyang-li <654010905@qq.com> Co-authored-by: 李扬 <654010905@qq.com> Signed-off-by: ffacs --- c++/include/orc/Vector.hh | 26 ++++++++++++ c++/src/ColumnWriter.cc | 66 ++++++++++++++++++------------ c++/src/Vector.cc | 45 ++++++++++++++++++++ c++/test/TestDictionaryEncoding.cc | 53 ++++++++++++++++++++++++ 4 files changed, 163 insertions(+), 27 deletions(-) diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh index 0dfe926965d..663bef9cd74 100644 --- a/c++/include/orc/Vector.hh +++ b/c++/include/orc/Vector.hh @@ -57,6 +57,8 @@ namespace orc { bool hasNulls; // whether the vector batch is encoded bool isEncoded; + // whether the dictionary is decoded into vector batch + bool dictionaryDecoded; // custom memory pool MemoryPool& memoryPool; @@ -88,6 +90,14 @@ namespace orc { */ virtual bool hasVariableLength(); + /** + * Decode possible dictionary into vector batch. + */ + void decodeDictionary(); + + protected: + virtual void decodeDictionaryImpl() {} + private: ColumnVectorBatch(const ColumnVectorBatch&); ColumnVectorBatch& operator=(const ColumnVectorBatch&); @@ -248,6 +258,10 @@ namespace orc { ~EncodedStringVectorBatch() override; std::string toString() const override; void resize(uint64_t capacity) override; + + // Calculate data and length in StringVectorBatch from dictionary and index + void decodeDictionaryImpl() override; + std::shared_ptr dictionary; // index for dictionary entry @@ -264,6 +278,9 @@ namespace orc { bool hasVariableLength() override; std::vector fields; + + protected: + void decodeDictionaryImpl() override; }; struct ListVectorBatch : public ColumnVectorBatch { @@ -283,6 +300,9 @@ namespace orc { // the concatenated elements std::unique_ptr elements; + + protected: + void decodeDictionaryImpl() override; }; struct MapVectorBatch : public ColumnVectorBatch { @@ -304,6 +324,9 @@ namespace orc { std::unique_ptr keys; // the concatenated elements std::unique_ptr elements; + + protected: + void decodeDictionaryImpl() override; }; struct UnionVectorBatch : public ColumnVectorBatch { @@ -327,6 +350,9 @@ namespace orc { // the sub-columns std::vector children; + + protected: + void decodeDictionaryImpl() override; }; struct Decimal { diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc index 141f709d46e..ae9a682e1e6 100644 --- a/c++/src/ColumnWriter.cc +++ b/c++/src/ColumnWriter.cc @@ -887,10 +887,17 @@ namespace orc { size_t length; }; + struct DictEntryWithIndex { + DictEntryWithIndex(const char* str, size_t len, size_t index) + : entry(str, len), index(index) {} + DictEntry entry; + size_t index; + }; + SortedStringDictionary() : totalLength(0) {} // insert a new string into dictionary, return its insertion order - size_t insert(const char* data, size_t len); + size_t insert(const char* str, size_t len); // write dictionary data & length to output buffer void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const; @@ -911,7 +918,9 @@ namespace orc { private: struct LessThan { - bool operator()(const DictEntry& left, const DictEntry& right) const { + bool operator()(const DictEntryWithIndex& l, const DictEntryWithIndex& r) { + const auto& left = l.entry; + const auto& right = r.entry; int ret = memcmp(left.data, right.data, std::min(left.length, right.length)); if (ret != 0) { return ret < 0; @@ -920,9 +929,9 @@ namespace orc { } }; - std::map dict; - std::vector> data; - uint64_t totalLength; + mutable std::vector flatDict_; + std::unordered_map keyToIndex_; + uint64_t totalLength_; // use friend class here to avoid being bothered by const function calls friend class StringColumnWriter; @@ -934,15 +943,11 @@ namespace orc { // insert a new string into dictionary, return its insertion order size_t SortedStringDictionary::insert(const char* str, size_t len) { - auto ret = dict.insert({DictEntry(str, len), dict.size()}); + size_t index = flatDict_.size(); + auto ret = keyToIndex_.emplace(std::string(str, len), index); if (ret.second) { - // make a copy to internal storage - data.push_back(std::vector(len)); - memcpy(data.back().data(), str, len); - // update dictionary entry to link pointer to internal storage - DictEntry* entry = const_cast(&(ret.first->first)); - entry->data = data.back().data(); - totalLength += len; + flatDict_.emplace_back(ret.first->first.data(), ret.first->first.size(), index); + totalLength_ += len; } return ret.first->second; } @@ -950,9 +955,12 @@ namespace orc { // write dictionary data & length to output buffer void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const { - for (auto it = dict.cbegin(); it != dict.cend(); ++it) { - dataStream->write(it->first.data, it->first.length); - lengthEncoder->write(static_cast(it->first.length)); + std::sort(flatDict_.begin(), flatDict_.end(), LessThan()); + + for (const auto& entryWithIndex : flatDict_) { + const auto& entry = entryWithIndex.entry; + dataStream->write(entry.data, entry.length); + lengthEncoder->write(static_cast(entry.length)); } } @@ -968,10 +976,9 @@ namespace orc { */ void SortedStringDictionary::reorder(std::vector& idxBuffer) const { // iterate the dictionary to get mapping from insertion order to value order - std::vector mapping(dict.size()); - size_t dictIdx = 0; - for (auto it = dict.cbegin(); it != dict.cend(); ++it) { - mapping[it->second] = dictIdx++; + std::vector mapping(flatDict_.size()); + for (size_t i = 0; i < flatDict_.size(); ++i) { + mapping[flatDict_[i].index] = i; } // do the transformation @@ -983,15 +990,20 @@ namespace orc { // get dict entries in insertion order void SortedStringDictionary::getEntriesInInsertionOrder( std::vector& entries) const { - entries.resize(dict.size()); - for (auto it = dict.cbegin(); it != dict.cend(); ++it) { - entries[it->second] = &(it->first); + std::sort(flatDict_.begin(), flatDict_.end(), + [](const DictEntryWithIndex& left, const DictEntryWithIndex& right) { + return left.index < right.index; + }); + + entries.resize(flatDict_.size()); + for (size_t i = 0; i < flatDict_.size(); ++i) { + entries[i] = &(flatDict_[i].entry); } } // return count of entries size_t SortedStringDictionary::size() const { - return dict.size(); + return flatDict_.size(); } // return total length of strings in the dictioanry @@ -1000,9 +1012,9 @@ namespace orc { } void SortedStringDictionary::clear() { - totalLength = 0; - data.clear(); - dict.clear(); + totalLength_ = 0; + keyToIndex_.clear(); + flatDict_.clear(); } class StringColumnWriter : public ColumnWriter { diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc index ef16b318023..0ae6a535d78 100644 --- a/c++/src/Vector.cc +++ b/c++/src/Vector.cc @@ -33,6 +33,7 @@ namespace orc { notNull(pool, cap), hasNulls(false), isEncoded(false), + dictionaryDecoded(false), memoryPool(pool) { std::memset(notNull.data(), 1, capacity); } @@ -60,6 +61,13 @@ namespace orc { return false; } + void ColumnVectorBatch::decodeDictionary() { + if (dictionaryDecoded) return; + + decodeDictionaryImpl(); + dictionaryDecoded = true; + } + StringDictionary::StringDictionary(MemoryPool& pool) : dictionaryBlob(pool), dictionaryOffset(pool) { // PASS @@ -87,6 +95,17 @@ namespace orc { } } + void EncodedStringVectorBatch::decodeDictionaryImpl() { + size_t n = index.size(); + resize(n); + + for (size_t i = 0; i < n; ++i) { + if (!hasNulls || notNull[i]) { + dictionary->getValueByIndex(index[i], data[i], length[i]); + } + } + } + StringVectorBatch::StringVectorBatch(uint64_t _capacity, MemoryPool& pool) : ColumnVectorBatch(_capacity, pool), data(pool, _capacity), @@ -173,6 +192,12 @@ namespace orc { return false; } + void StructVectorBatch::decodeDictionaryImpl() { + for (const auto& field : fields) { + field->decodeDictionary(); + } + } + ListVectorBatch::ListVectorBatch(uint64_t cap, MemoryPool& pool) : ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) { // PASS @@ -210,6 +235,10 @@ namespace orc { return true; } + void ListVectorBatch::decodeDictionaryImpl() { + elements->decodeDictionary(); + } + MapVectorBatch::MapVectorBatch(uint64_t cap, MemoryPool& pool) : ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) { // PASS @@ -250,6 +279,16 @@ namespace orc { return true; } + void MapVectorBatch::decodeDictionaryImpl() { + if (keys) { + keys->decodeDictionary(); + } + + if (elements) { + elements->decodeDictionary(); + } + } + UnionVectorBatch::UnionVectorBatch(uint64_t cap, MemoryPool& pool) : ColumnVectorBatch(cap, pool), tags(pool, cap), offsets(pool, cap) { // PASS @@ -308,6 +347,12 @@ namespace orc { return false; } + void UnionVectorBatch::decodeDictionaryImpl() { + for (const auto& child : children) { + child->decodeDictionary(); + } + } + Decimal64VectorBatch::Decimal64VectorBatch(uint64_t cap, MemoryPool& pool) : ColumnVectorBatch(cap, pool), precision(0), diff --git a/c++/test/TestDictionaryEncoding.cc b/c++/test/TestDictionaryEncoding.cc index f3dcaa0067b..0f9e0a73de6 100644 --- a/c++/test/TestDictionaryEncoding.cc +++ b/c++/test/TestDictionaryEncoding.cc @@ -429,4 +429,57 @@ namespace orc { testDictionaryMultipleStripes(DICT_THRESHOLD, false); testDictionaryMultipleStripes(FALLBACK_THRESHOLD, false); } + + TEST(DictionaryEncoding, decodeDictionary) { + size_t rowCount = 8192; + size_t dictionarySize = 100; + auto* memoryPool = getDefaultPool(); + + auto encodedStringBatch = std::make_shared(rowCount, *memoryPool); + EXPECT_FALSE(encodedStringBatch->dictionaryDecoded); + encodedStringBatch->numElements = rowCount; + encodedStringBatch->hasNulls = true; + encodedStringBatch->isEncoded = true; + encodedStringBatch->dictionary = std::make_shared(*memoryPool); + + auto& dictionary = *encodedStringBatch->dictionary; + dictionary.dictionaryBlob.resize(3 * dictionarySize); + dictionary.dictionaryOffset.resize(dictionarySize + 1); + dictionary.dictionaryOffset[0] = 0; + for (uint64_t i = 0; i < dictionarySize; ++i) { + std::ostringstream oss; + oss << std::setw(3) << std::setfill('0') << i; + + auto str = oss.str(); + memcpy(&dictionary.dictionaryBlob[3 * i], str.data(), str.size()); + dictionary.dictionaryOffset[i + 1] = 3 * (i + 1); + } + + for (uint64_t i = 0; i < rowCount; ++i) { + if (i % 10 == 0) { + encodedStringBatch->notNull[i] = 0; + encodedStringBatch->index[i] = 0; + } else { + encodedStringBatch->notNull[i] = 1; + encodedStringBatch->index[i] = i % dictionarySize; + } + } + + encodedStringBatch->decodeDictionary(); + EXPECT_TRUE(encodedStringBatch->dictionaryDecoded); + EXPECT_EQ(0, encodedStringBatch->blob.size()); + + for (uint64_t i = 0; i < rowCount; ++i) { + if (encodedStringBatch->notNull[i]) { + auto index = encodedStringBatch->index[i]; + char* buf = nullptr; + int64_t buf_size = 0; + dictionary.getValueByIndex(index, buf, buf_size); + + EXPECT_EQ(buf, encodedStringBatch->data[i]); + EXPECT_EQ(buf_size, encodedStringBatch->length[i]); + } + } + } + } // namespace orc