Skip to content

Commit

Permalink
ORC-1767: [C++] Improve writing performance of encoded string column …
Browse files Browse the repository at this point in the history
…and support EncodedStringVectorBatch for StringColumnWriter

Improve writing performance of encoded string column and support EncodedStringVectorBatch for StringColumnWriter.
Performance was measured in ClickHouse#15

original tests.

Closes apache#2010 from taiyang-li/apache_improve_dict_write.

Lead-authored-by: taiyang-li <[email protected]>
Co-authored-by: 李扬 <[email protected]>
Signed-off-by: ffacs <[email protected]>
  • Loading branch information
taiyang-li committed Sep 4, 2024
1 parent bcc025c commit 4db0581
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 27 deletions.
26 changes: 26 additions & 0 deletions c++/include/orc/Vector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ namespace orc {
bool hasNulls;
// whether the vector batch is encoded
bool isEncoded;
// whether the dictionary is decoded into vector batch
bool dictionaryDecoded;

// custom memory pool
MemoryPool& memoryPool;
Expand Down Expand Up @@ -88,6 +90,14 @@ namespace orc {
*/
virtual bool hasVariableLength();

/**
* Decode possible dictionary into vector batch.
*/
void decodeDictionary();

protected:
virtual void decodeDictionaryImpl() {}

private:
ColumnVectorBatch(const ColumnVectorBatch&);
ColumnVectorBatch& operator=(const ColumnVectorBatch&);
Expand Down Expand Up @@ -248,6 +258,10 @@ namespace orc {
~EncodedStringVectorBatch() override;
std::string toString() const override;
void resize(uint64_t capacity) override;

// Calculate data and length in StringVectorBatch from dictionary and index
void decodeDictionaryImpl() override;

std::shared_ptr<StringDictionary> dictionary;

// index for dictionary entry
Expand All @@ -264,6 +278,9 @@ namespace orc {
bool hasVariableLength() override;

std::vector<ColumnVectorBatch*> fields;

protected:
void decodeDictionaryImpl() override;
};

struct ListVectorBatch : public ColumnVectorBatch {
Expand All @@ -283,6 +300,9 @@ namespace orc {

// the concatenated elements
std::unique_ptr<ColumnVectorBatch> elements;

protected:
void decodeDictionaryImpl() override;
};

struct MapVectorBatch : public ColumnVectorBatch {
Expand All @@ -304,6 +324,9 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> keys;
// the concatenated elements
std::unique_ptr<ColumnVectorBatch> elements;

protected:
void decodeDictionaryImpl() override;
};

struct UnionVectorBatch : public ColumnVectorBatch {
Expand All @@ -327,6 +350,9 @@ namespace orc {

// the sub-columns
std::vector<ColumnVectorBatch*> children;

protected:
void decodeDictionaryImpl() override;
};

struct Decimal {
Expand Down
66 changes: 39 additions & 27 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -887,10 +887,17 @@ namespace orc {
size_t length;
};

struct DictEntryWithIndex {
DictEntryWithIndex(const char* str, size_t len, size_t index)
: entry(str, len), index(index) {}
DictEntry entry;
size_t index;
};

SortedStringDictionary() : totalLength(0) {}

// insert a new string into dictionary, return its insertion order
size_t insert(const char* data, size_t len);
size_t insert(const char* str, size_t len);

// write dictionary data & length to output buffer
void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const;
Expand All @@ -911,7 +918,9 @@ namespace orc {

private:
struct LessThan {
bool operator()(const DictEntry& left, const DictEntry& right) const {
bool operator()(const DictEntryWithIndex& l, const DictEntryWithIndex& r) {
const auto& left = l.entry;
const auto& right = r.entry;
int ret = memcmp(left.data, right.data, std::min(left.length, right.length));
if (ret != 0) {
return ret < 0;
Expand All @@ -920,9 +929,9 @@ namespace orc {
}
};

std::map<DictEntry, size_t, LessThan> dict;
std::vector<std::vector<char>> data;
uint64_t totalLength;
mutable std::vector<DictEntryWithIndex> flatDict_;
std::unordered_map<std::string, size_t> keyToIndex_;
uint64_t totalLength_;

// use friend class here to avoid being bothered by const function calls
friend class StringColumnWriter;
Expand All @@ -934,25 +943,24 @@ namespace orc {

// insert a new string into dictionary, return its insertion order
size_t SortedStringDictionary::insert(const char* str, size_t len) {
auto ret = dict.insert({DictEntry(str, len), dict.size()});
size_t index = flatDict_.size();
auto ret = keyToIndex_.emplace(std::string(str, len), index);
if (ret.second) {
// make a copy to internal storage
data.push_back(std::vector<char>(len));
memcpy(data.back().data(), str, len);
// update dictionary entry to link pointer to internal storage
DictEntry* entry = const_cast<DictEntry*>(&(ret.first->first));
entry->data = data.back().data();
totalLength += len;
flatDict_.emplace_back(ret.first->first.data(), ret.first->first.size(), index);
totalLength_ += len;
}
return ret.first->second;
}

// write dictionary data & length to output buffer
void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
RleEncoder* lengthEncoder) const {
for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
dataStream->write(it->first.data, it->first.length);
lengthEncoder->write(static_cast<int64_t>(it->first.length));
std::sort(flatDict_.begin(), flatDict_.end(), LessThan());

for (const auto& entryWithIndex : flatDict_) {
const auto& entry = entryWithIndex.entry;
dataStream->write(entry.data, entry.length);
lengthEncoder->write(static_cast<int64_t>(entry.length));
}
}

Expand All @@ -968,10 +976,9 @@ namespace orc {
*/
void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
// iterate the dictionary to get mapping from insertion order to value order
std::vector<size_t> mapping(dict.size());
size_t dictIdx = 0;
for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
mapping[it->second] = dictIdx++;
std::vector<size_t> mapping(flatDict_.size());
for (size_t i = 0; i < flatDict_.size(); ++i) {
mapping[flatDict_[i].index] = i;
}

// do the transformation
Expand All @@ -983,15 +990,20 @@ namespace orc {
// get dict entries in insertion order
void SortedStringDictionary::getEntriesInInsertionOrder(
std::vector<const DictEntry*>& entries) const {
entries.resize(dict.size());
for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
entries[it->second] = &(it->first);
std::sort(flatDict_.begin(), flatDict_.end(),
[](const DictEntryWithIndex& left, const DictEntryWithIndex& right) {
return left.index < right.index;
});

entries.resize(flatDict_.size());
for (size_t i = 0; i < flatDict_.size(); ++i) {
entries[i] = &(flatDict_[i].entry);
}
}

// return count of entries
size_t SortedStringDictionary::size() const {
return dict.size();
return flatDict_.size();
}

// return total length of strings in the dictioanry
Expand All @@ -1000,9 +1012,9 @@ namespace orc {
}

void SortedStringDictionary::clear() {
totalLength = 0;
data.clear();
dict.clear();
totalLength_ = 0;
keyToIndex_.clear();
flatDict_.clear();
}

class StringColumnWriter : public ColumnWriter {
Expand Down
45 changes: 45 additions & 0 deletions c++/src/Vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace orc {
notNull(pool, cap),
hasNulls(false),
isEncoded(false),
dictionaryDecoded(false),
memoryPool(pool) {
std::memset(notNull.data(), 1, capacity);
}
Expand Down Expand Up @@ -60,6 +61,13 @@ namespace orc {
return false;
}

void ColumnVectorBatch::decodeDictionary() {
if (dictionaryDecoded) return;

decodeDictionaryImpl();
dictionaryDecoded = true;
}

StringDictionary::StringDictionary(MemoryPool& pool)
: dictionaryBlob(pool), dictionaryOffset(pool) {
// PASS
Expand Down Expand Up @@ -87,6 +95,17 @@ namespace orc {
}
}

void EncodedStringVectorBatch::decodeDictionaryImpl() {
size_t n = index.size();
resize(n);

for (size_t i = 0; i < n; ++i) {
if (!hasNulls || notNull[i]) {
dictionary->getValueByIndex(index[i], data[i], length[i]);
}
}
}

StringVectorBatch::StringVectorBatch(uint64_t _capacity, MemoryPool& pool)
: ColumnVectorBatch(_capacity, pool),
data(pool, _capacity),
Expand Down Expand Up @@ -173,6 +192,12 @@ namespace orc {
return false;
}

void StructVectorBatch::decodeDictionaryImpl() {
for (const auto& field : fields) {
field->decodeDictionary();
}
}

ListVectorBatch::ListVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
// PASS
Expand Down Expand Up @@ -210,6 +235,10 @@ namespace orc {
return true;
}

void ListVectorBatch::decodeDictionaryImpl() {
elements->decodeDictionary();
}

MapVectorBatch::MapVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
// PASS
Expand Down Expand Up @@ -250,6 +279,16 @@ namespace orc {
return true;
}

void MapVectorBatch::decodeDictionaryImpl() {
if (keys) {
keys->decodeDictionary();
}

if (elements) {
elements->decodeDictionary();
}
}

UnionVectorBatch::UnionVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), tags(pool, cap), offsets(pool, cap) {
// PASS
Expand Down Expand Up @@ -308,6 +347,12 @@ namespace orc {
return false;
}

void UnionVectorBatch::decodeDictionaryImpl() {
for (const auto& child : children) {
child->decodeDictionary();
}
}

Decimal64VectorBatch::Decimal64VectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool),
precision(0),
Expand Down
53 changes: 53 additions & 0 deletions c++/test/TestDictionaryEncoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,57 @@ namespace orc {
testDictionaryMultipleStripes(DICT_THRESHOLD, false);
testDictionaryMultipleStripes(FALLBACK_THRESHOLD, false);
}

TEST(DictionaryEncoding, decodeDictionary) {
size_t rowCount = 8192;
size_t dictionarySize = 100;
auto* memoryPool = getDefaultPool();

auto encodedStringBatch = std::make_shared<EncodedStringVectorBatch>(rowCount, *memoryPool);
EXPECT_FALSE(encodedStringBatch->dictionaryDecoded);
encodedStringBatch->numElements = rowCount;
encodedStringBatch->hasNulls = true;
encodedStringBatch->isEncoded = true;
encodedStringBatch->dictionary = std::make_shared<StringDictionary>(*memoryPool);

auto& dictionary = *encodedStringBatch->dictionary;
dictionary.dictionaryBlob.resize(3 * dictionarySize);
dictionary.dictionaryOffset.resize(dictionarySize + 1);
dictionary.dictionaryOffset[0] = 0;
for (uint64_t i = 0; i < dictionarySize; ++i) {
std::ostringstream oss;
oss << std::setw(3) << std::setfill('0') << i;

auto str = oss.str();
memcpy(&dictionary.dictionaryBlob[3 * i], str.data(), str.size());
dictionary.dictionaryOffset[i + 1] = 3 * (i + 1);
}

for (uint64_t i = 0; i < rowCount; ++i) {
if (i % 10 == 0) {
encodedStringBatch->notNull[i] = 0;
encodedStringBatch->index[i] = 0;
} else {
encodedStringBatch->notNull[i] = 1;
encodedStringBatch->index[i] = i % dictionarySize;
}
}

encodedStringBatch->decodeDictionary();
EXPECT_TRUE(encodedStringBatch->dictionaryDecoded);
EXPECT_EQ(0, encodedStringBatch->blob.size());

for (uint64_t i = 0; i < rowCount; ++i) {
if (encodedStringBatch->notNull[i]) {
auto index = encodedStringBatch->index[i];
char* buf = nullptr;
int64_t buf_size = 0;
dictionary.getValueByIndex(index, buf, buf_size);

EXPECT_EQ(buf, encodedStringBatch->data[i]);
EXPECT_EQ(buf_size, encodedStringBatch->length[i]);
}
}
}

} // namespace orc

0 comments on commit 4db0581

Please sign in to comment.