Skip to content

Commit 9c3ad23

Browse files
committed
ORC-1767: [C++] Improve writing performance of encoded string column and support EncodedStringVectorBatch for StringColumnWriter
Improve writing performance of encoded string column and support EncodedStringVectorBatch for StringColumnWriter. Performance was measured in ClickHouse#15 original tests. Closes apache#2010 from taiyang-li/apache_improve_dict_write. Lead-authored-by: taiyang-li <[email protected]> Co-authored-by: 李扬 <[email protected]> Signed-off-by: ffacs <[email protected]>
1 parent bcc025c commit 9c3ad23

File tree

4 files changed

+165
-29
lines changed

4 files changed

+165
-29
lines changed

c++/include/orc/Vector.hh

+26
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ namespace orc {
5757
bool hasNulls;
5858
// whether the vector batch is encoded
5959
bool isEncoded;
60+
// whether the dictionary is decoded into vector batch
61+
bool dictionaryDecoded;
6062

6163
// custom memory pool
6264
MemoryPool& memoryPool;
@@ -88,6 +90,14 @@ namespace orc {
8890
*/
8991
virtual bool hasVariableLength();
9092

93+
/**
94+
* Decode possible dictionary into vector batch.
95+
*/
96+
void decodeDictionary();
97+
98+
protected:
99+
virtual void decodeDictionaryImpl() {}
100+
91101
private:
92102
ColumnVectorBatch(const ColumnVectorBatch&);
93103
ColumnVectorBatch& operator=(const ColumnVectorBatch&);
@@ -248,6 +258,10 @@ namespace orc {
248258
~EncodedStringVectorBatch() override;
249259
std::string toString() const override;
250260
void resize(uint64_t capacity) override;
261+
262+
// Calculate data and length in StringVectorBatch from dictionary and index
263+
void decodeDictionaryImpl() override;
264+
251265
std::shared_ptr<StringDictionary> dictionary;
252266

253267
// index for dictionary entry
@@ -264,6 +278,9 @@ namespace orc {
264278
bool hasVariableLength() override;
265279

266280
std::vector<ColumnVectorBatch*> fields;
281+
282+
protected:
283+
void decodeDictionaryImpl() override;
267284
};
268285

269286
struct ListVectorBatch : public ColumnVectorBatch {
@@ -283,6 +300,9 @@ namespace orc {
283300

284301
// the concatenated elements
285302
std::unique_ptr<ColumnVectorBatch> elements;
303+
304+
protected:
305+
void decodeDictionaryImpl() override;
286306
};
287307

288308
struct MapVectorBatch : public ColumnVectorBatch {
@@ -304,6 +324,9 @@ namespace orc {
304324
std::unique_ptr<ColumnVectorBatch> keys;
305325
// the concatenated elements
306326
std::unique_ptr<ColumnVectorBatch> elements;
327+
328+
protected:
329+
void decodeDictionaryImpl() override;
307330
};
308331

309332
struct UnionVectorBatch : public ColumnVectorBatch {
@@ -327,6 +350,9 @@ namespace orc {
327350

328351
// the sub-columns
329352
std::vector<ColumnVectorBatch*> children;
353+
354+
protected:
355+
void decodeDictionaryImpl() override;
330356
};
331357

332358
struct Decimal {

c++/src/ColumnWriter.cc

+41-29
Original file line numberDiff line numberDiff line change
@@ -887,10 +887,17 @@ namespace orc {
887887
size_t length;
888888
};
889889

890-
SortedStringDictionary() : totalLength(0) {}
890+
struct DictEntryWithIndex {
891+
DictEntryWithIndex(const char* str, size_t len, size_t index)
892+
: entry(str, len), index(index) {}
893+
DictEntry entry;
894+
size_t index;
895+
};
896+
897+
SortedStringDictionary() : totalLength_(0) {}
891898

892899
// insert a new string into dictionary, return its insertion order
893-
size_t insert(const char* data, size_t len);
900+
size_t insert(const char* str, size_t len);
894901

895902
// write dictionary data & length to output buffer
896903
void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const;
@@ -911,7 +918,9 @@ namespace orc {
911918

912919
private:
913920
struct LessThan {
914-
bool operator()(const DictEntry& left, const DictEntry& right) const {
921+
bool operator()(const DictEntryWithIndex& l, const DictEntryWithIndex& r) {
922+
const auto& left = l.entry;
923+
const auto& right = r.entry;
915924
int ret = memcmp(left.data, right.data, std::min(left.length, right.length));
916925
if (ret != 0) {
917926
return ret < 0;
@@ -920,9 +929,9 @@ namespace orc {
920929
}
921930
};
922931

923-
std::map<DictEntry, size_t, LessThan> dict;
924-
std::vector<std::vector<char>> data;
925-
uint64_t totalLength;
932+
mutable std::vector<DictEntryWithIndex> flatDict_;
933+
std::unordered_map<std::string, size_t> keyToIndex_;
934+
uint64_t totalLength_;
926935

927936
// use friend class here to avoid being bothered by const function calls
928937
friend class StringColumnWriter;
@@ -934,25 +943,24 @@ namespace orc {
934943

935944
// insert a new string into dictionary, return its insertion order
936945
size_t SortedStringDictionary::insert(const char* str, size_t len) {
937-
auto ret = dict.insert({DictEntry(str, len), dict.size()});
946+
size_t index = flatDict_.size();
947+
auto ret = keyToIndex_.emplace(std::string(str, len), index);
938948
if (ret.second) {
939-
// make a copy to internal storage
940-
data.push_back(std::vector<char>(len));
941-
memcpy(data.back().data(), str, len);
942-
// update dictionary entry to link pointer to internal storage
943-
DictEntry* entry = const_cast<DictEntry*>(&(ret.first->first));
944-
entry->data = data.back().data();
945-
totalLength += len;
949+
flatDict_.emplace_back(ret.first->first.data(), ret.first->first.size(), index);
950+
totalLength_ += len;
946951
}
947952
return ret.first->second;
948953
}
949954

950955
// write dictionary data & length to output buffer
951956
void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
952957
RleEncoder* lengthEncoder) const {
953-
for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
954-
dataStream->write(it->first.data, it->first.length);
955-
lengthEncoder->write(static_cast<int64_t>(it->first.length));
958+
std::sort(flatDict_.begin(), flatDict_.end(), LessThan());
959+
960+
for (const auto& entryWithIndex : flatDict_) {
961+
const auto& entry = entryWithIndex.entry;
962+
dataStream->write(entry.data, entry.length);
963+
lengthEncoder->write(static_cast<int64_t>(entry.length));
956964
}
957965
}
958966

@@ -968,10 +976,9 @@ namespace orc {
968976
*/
969977
void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
970978
// iterate the dictionary to get mapping from insertion order to value order
971-
std::vector<size_t> mapping(dict.size());
972-
size_t dictIdx = 0;
973-
for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
974-
mapping[it->second] = dictIdx++;
979+
std::vector<size_t> mapping(flatDict_.size());
980+
for (size_t i = 0; i < flatDict_.size(); ++i) {
981+
mapping[flatDict_[i].index] = i;
975982
}
976983

977984
// do the transformation
@@ -983,26 +990,31 @@ namespace orc {
983990
// get dict entries in insertion order
984991
void SortedStringDictionary::getEntriesInInsertionOrder(
985992
std::vector<const DictEntry*>& entries) const {
986-
entries.resize(dict.size());
987-
for (auto it = dict.cbegin(); it != dict.cend(); ++it) {
988-
entries[it->second] = &(it->first);
993+
std::sort(flatDict_.begin(), flatDict_.end(),
994+
[](const DictEntryWithIndex& left, const DictEntryWithIndex& right) {
995+
return left.index < right.index;
996+
});
997+
998+
entries.resize(flatDict_.size());
999+
for (size_t i = 0; i < flatDict_.size(); ++i) {
1000+
entries[i] = &(flatDict_[i].entry);
9891001
}
9901002
}
9911003

9921004
// return count of entries
9931005
size_t SortedStringDictionary::size() const {
994-
return dict.size();
1006+
return flatDict_.size();
9951007
}
9961008

9971009
// return total length of strings in the dictioanry
9981010
uint64_t SortedStringDictionary::length() const {
999-
return totalLength;
1011+
return totalLength_;
10001012
}
10011013

10021014
void SortedStringDictionary::clear() {
1003-
totalLength = 0;
1004-
data.clear();
1005-
dict.clear();
1015+
totalLength_ = 0;
1016+
keyToIndex_.clear();
1017+
flatDict_.clear();
10061018
}
10071019

10081020
class StringColumnWriter : public ColumnWriter {

c++/src/Vector.cc

+45
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace orc {
3333
notNull(pool, cap),
3434
hasNulls(false),
3535
isEncoded(false),
36+
dictionaryDecoded(false),
3637
memoryPool(pool) {
3738
std::memset(notNull.data(), 1, capacity);
3839
}
@@ -60,6 +61,13 @@ namespace orc {
6061
return false;
6162
}
6263

64+
void ColumnVectorBatch::decodeDictionary() {
65+
if (dictionaryDecoded) return;
66+
67+
decodeDictionaryImpl();
68+
dictionaryDecoded = true;
69+
}
70+
6371
StringDictionary::StringDictionary(MemoryPool& pool)
6472
: dictionaryBlob(pool), dictionaryOffset(pool) {
6573
// PASS
@@ -87,6 +95,17 @@ namespace orc {
8795
}
8896
}
8997

98+
void EncodedStringVectorBatch::decodeDictionaryImpl() {
99+
size_t n = index.size();
100+
resize(n);
101+
102+
for (size_t i = 0; i < n; ++i) {
103+
if (!hasNulls || notNull[i]) {
104+
dictionary->getValueByIndex(index[i], data[i], length[i]);
105+
}
106+
}
107+
}
108+
90109
StringVectorBatch::StringVectorBatch(uint64_t _capacity, MemoryPool& pool)
91110
: ColumnVectorBatch(_capacity, pool),
92111
data(pool, _capacity),
@@ -173,6 +192,12 @@ namespace orc {
173192
return false;
174193
}
175194

195+
void StructVectorBatch::decodeDictionaryImpl() {
196+
for (const auto& field : fields) {
197+
field->decodeDictionary();
198+
}
199+
}
200+
176201
ListVectorBatch::ListVectorBatch(uint64_t cap, MemoryPool& pool)
177202
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
178203
// PASS
@@ -210,6 +235,10 @@ namespace orc {
210235
return true;
211236
}
212237

238+
void ListVectorBatch::decodeDictionaryImpl() {
239+
elements->decodeDictionary();
240+
}
241+
213242
MapVectorBatch::MapVectorBatch(uint64_t cap, MemoryPool& pool)
214243
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
215244
// PASS
@@ -250,6 +279,16 @@ namespace orc {
250279
return true;
251280
}
252281

282+
void MapVectorBatch::decodeDictionaryImpl() {
283+
if (keys) {
284+
keys->decodeDictionary();
285+
}
286+
287+
if (elements) {
288+
elements->decodeDictionary();
289+
}
290+
}
291+
253292
UnionVectorBatch::UnionVectorBatch(uint64_t cap, MemoryPool& pool)
254293
: ColumnVectorBatch(cap, pool), tags(pool, cap), offsets(pool, cap) {
255294
// PASS
@@ -308,6 +347,12 @@ namespace orc {
308347
return false;
309348
}
310349

350+
void UnionVectorBatch::decodeDictionaryImpl() {
351+
for (const auto& child : children) {
352+
child->decodeDictionary();
353+
}
354+
}
355+
311356
Decimal64VectorBatch::Decimal64VectorBatch(uint64_t cap, MemoryPool& pool)
312357
: ColumnVectorBatch(cap, pool),
313358
precision(0),

c++/test/TestDictionaryEncoding.cc

+53
Original file line numberDiff line numberDiff line change
@@ -429,4 +429,57 @@ namespace orc {
429429
testDictionaryMultipleStripes(DICT_THRESHOLD, false);
430430
testDictionaryMultipleStripes(FALLBACK_THRESHOLD, false);
431431
}
432+
433+
TEST(DictionaryEncoding, decodeDictionary) {
434+
size_t rowCount = 8192;
435+
size_t dictionarySize = 100;
436+
auto* memoryPool = getDefaultPool();
437+
438+
auto encodedStringBatch = std::make_shared<EncodedStringVectorBatch>(rowCount, *memoryPool);
439+
EXPECT_FALSE(encodedStringBatch->dictionaryDecoded);
440+
encodedStringBatch->numElements = rowCount;
441+
encodedStringBatch->hasNulls = true;
442+
encodedStringBatch->isEncoded = true;
443+
encodedStringBatch->dictionary = std::make_shared<StringDictionary>(*memoryPool);
444+
445+
auto& dictionary = *encodedStringBatch->dictionary;
446+
dictionary.dictionaryBlob.resize(3 * dictionarySize);
447+
dictionary.dictionaryOffset.resize(dictionarySize + 1);
448+
dictionary.dictionaryOffset[0] = 0;
449+
for (uint64_t i = 0; i < dictionarySize; ++i) {
450+
std::ostringstream oss;
451+
oss << std::setw(3) << std::setfill('0') << i;
452+
453+
auto str = oss.str();
454+
memcpy(&dictionary.dictionaryBlob[3 * i], str.data(), str.size());
455+
dictionary.dictionaryOffset[i + 1] = 3 * (i + 1);
456+
}
457+
458+
for (uint64_t i = 0; i < rowCount; ++i) {
459+
if (i % 10 == 0) {
460+
encodedStringBatch->notNull[i] = 0;
461+
encodedStringBatch->index[i] = 0;
462+
} else {
463+
encodedStringBatch->notNull[i] = 1;
464+
encodedStringBatch->index[i] = i % dictionarySize;
465+
}
466+
}
467+
468+
encodedStringBatch->decodeDictionary();
469+
EXPECT_TRUE(encodedStringBatch->dictionaryDecoded);
470+
EXPECT_EQ(0, encodedStringBatch->blob.size());
471+
472+
for (uint64_t i = 0; i < rowCount; ++i) {
473+
if (encodedStringBatch->notNull[i]) {
474+
auto index = encodedStringBatch->index[i];
475+
char* buf = nullptr;
476+
int64_t buf_size = 0;
477+
dictionary.getValueByIndex(index, buf, buf_size);
478+
479+
EXPECT_EQ(buf, encodedStringBatch->data[i]);
480+
EXPECT_EQ(buf_size, encodedStringBatch->length[i]);
481+
}
482+
}
483+
}
484+
432485
} // namespace orc

0 commit comments

Comments
 (0)