Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1767: [C++] Improve writing performance of encoded string column and support EncodedStringVectorBatch for StringColumnWriter #2010

Closed
wants to merge 13 commits into from
26 changes: 26 additions & 0 deletions c++/include/orc/Vector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ namespace orc {
bool hasNulls;
// whether the vector batch is encoded
bool isEncoded;
// whether the dictionary is decoded into vector batch
bool dictionaryDecoded;

// custom memory pool
MemoryPool& memoryPool;
Expand Down Expand Up @@ -88,6 +90,14 @@ namespace orc {
*/
virtual bool hasVariableLength();

/**
* Decode possible dictionary into vector batch.
*/
void decodeDictionary();

protected:
virtual void decodeDictionaryImpl() {}

private:
ColumnVectorBatch(const ColumnVectorBatch&);
ColumnVectorBatch& operator=(const ColumnVectorBatch&);
Expand Down Expand Up @@ -248,6 +258,10 @@ namespace orc {
~EncodedStringVectorBatch() override;
std::string toString() const override;
void resize(uint64_t capacity) override;

// Calculate data and length in StringVectorBatch from dictionary and index
void decodeDictionaryImpl() override;

std::shared_ptr<StringDictionary> dictionary;

// index for dictionary entry
Expand All @@ -264,6 +278,9 @@ namespace orc {
bool hasVariableLength() override;

std::vector<ColumnVectorBatch*> fields;

protected:
void decodeDictionaryImpl() override;
};

struct ListVectorBatch : public ColumnVectorBatch {
Expand All @@ -283,6 +300,9 @@ namespace orc {

// the concatenated elements
std::unique_ptr<ColumnVectorBatch> elements;

protected:
void decodeDictionaryImpl() override;
};

struct MapVectorBatch : public ColumnVectorBatch {
Expand All @@ -304,6 +324,9 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> keys;
// the concatenated elements
std::unique_ptr<ColumnVectorBatch> elements;

protected:
void decodeDictionaryImpl() override;
};

struct UnionVectorBatch : public ColumnVectorBatch {
Expand All @@ -327,6 +350,9 @@ namespace orc {

// the sub-columns
std::vector<ColumnVectorBatch*> children;

protected:
void decodeDictionaryImpl() override;
};

struct Decimal {
Expand Down
60 changes: 36 additions & 24 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,10 +889,17 @@ namespace orc {
size_t length;
};

struct DictEntryWithIndex {
DictEntryWithIndex(const char* str, size_t len, size_t index)
: entry(str, len), index(index) {}
DictEntry entry;
size_t index;
};

SortedStringDictionary() : totalLength_(0) {}

// insert a new string into dictionary, return its insertion order
size_t insert(const char* data, size_t len);
size_t insert(const char* str, size_t len);

// write dictionary data & length to output buffer
void flush(AppendOnlyBufferedStream* dataStream, RleEncoder* lengthEncoder) const;
Expand All @@ -913,7 +920,9 @@ namespace orc {

private:
struct LessThan {
bool operator()(const DictEntry& left, const DictEntry& right) const {
bool operator()(const DictEntryWithIndex& l, const DictEntryWithIndex& r) {
const auto& left = l.entry;
const auto& right = r.entry;
int ret = memcmp(left.data, right.data, std::min(left.length, right.length));
if (ret != 0) {
return ret < 0;
Expand All @@ -922,8 +931,8 @@ namespace orc {
}
};

std::map<DictEntry, size_t, LessThan> dict_;
std::vector<std::vector<char>> data_;
mutable std::vector<DictEntryWithIndex> flatDict_;
std::unordered_map<std::string, size_t> keyToIndex_;
uint64_t totalLength_;

// use friend class here to avoid being bothered by const function calls
Expand All @@ -936,14 +945,10 @@ namespace orc {

// insert a new string into dictionary, return its insertion order
size_t SortedStringDictionary::insert(const char* str, size_t len) {
auto ret = dict_.insert({DictEntry(str, len), dict_.size()});
size_t index = flatDict_.size();
auto ret = keyToIndex_.emplace(std::string(str, len), index);
if (ret.second) {
// make a copy to internal storage
data_.push_back(std::vector<char>(len));
memcpy(data_.back().data(), str, len);
// update dictionary entry to link pointer to internal storage
DictEntry* entry = const_cast<DictEntry*>(&(ret.first->first));
entry->data = data_.back().data();
flatDict_.emplace_back(ret.first->first.data(), ret.first->first.size(), index);
totalLength_ += len;
}
return ret.first->second;
Expand All @@ -952,9 +957,12 @@ namespace orc {
// write dictionary data & length to output buffer
void SortedStringDictionary::flush(AppendOnlyBufferedStream* dataStream,
RleEncoder* lengthEncoder) const {
for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
dataStream->write(it->first.data, it->first.length);
lengthEncoder->write(static_cast<int64_t>(it->first.length));
std::sort(flatDict_.begin(), flatDict_.end(), LessThan());

for (const auto& entryWithIndex : flatDict_) {
const auto& entry = entryWithIndex.entry;
dataStream->write(entry.data, entry.length);
lengthEncoder->write(static_cast<int64_t>(entry.length));
}
}

Expand All @@ -970,10 +978,9 @@ namespace orc {
*/
void SortedStringDictionary::reorder(std::vector<int64_t>& idxBuffer) const {
// iterate the dictionary to get mapping from insertion order to value order
std::vector<size_t> mapping(dict_.size());
size_t dictIdx = 0;
for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
mapping[it->second] = dictIdx++;
std::vector<size_t> mapping(flatDict_.size());
for (size_t i = 0; i < flatDict_.size(); ++i) {
mapping[flatDict_[i].index] = i;
}

// do the transformation
Expand All @@ -985,15 +992,20 @@ namespace orc {
// get dict entries in insertion order
void SortedStringDictionary::getEntriesInInsertionOrder(
std::vector<const DictEntry*>& entries) const {
entries.resize(dict_.size());
for (auto it = dict_.cbegin(); it != dict_.cend(); ++it) {
entries[it->second] = &(it->first);
std::sort(flatDict_.begin(), flatDict_.end(),
[](const DictEntryWithIndex& left, const DictEntryWithIndex& right) {
return left.index < right.index;
});

entries.resize(flatDict_.size());
for (size_t i = 0; i < flatDict_.size(); ++i) {
entries[i] = &(flatDict_[i].entry);
}
}

// return count of entries
size_t SortedStringDictionary::size() const {
return dict_.size();
return flatDict_.size();
}

// return total length of strings in the dictioanry
Expand All @@ -1003,8 +1015,8 @@ namespace orc {

void SortedStringDictionary::clear() {
totalLength_ = 0;
data_.clear();
dict_.clear();
keyToIndex_.clear();
flatDict_.clear();
}

class StringColumnWriter : public ColumnWriter {
Expand Down
45 changes: 45 additions & 0 deletions c++/src/Vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace orc {
notNull(pool, cap),
hasNulls(false),
isEncoded(false),
dictionaryDecoded(false),
memoryPool(pool) {
std::memset(notNull.data(), 1, capacity);
}
Expand Down Expand Up @@ -61,6 +62,13 @@ namespace orc {
return false;
}

void ColumnVectorBatch::decodeDictionary() {
if (dictionaryDecoded) return;

decodeDictionaryImpl();
dictionaryDecoded = true;
}

StringDictionary::StringDictionary(MemoryPool& pool)
: dictionaryBlob(pool), dictionaryOffset(pool) {
// PASS
Expand Down Expand Up @@ -88,6 +96,17 @@ namespace orc {
}
}

void EncodedStringVectorBatch::decodeDictionaryImpl() {
size_t n = index.size();
resize(n);

for (size_t i = 0; i < n; ++i) {
if (!hasNulls || notNull[i]) {
dictionary->getValueByIndex(index[i], data[i], length[i]);
}
}
}

StringVectorBatch::StringVectorBatch(uint64_t capacity, MemoryPool& pool)
: ColumnVectorBatch(capacity, pool),
data(pool, capacity),
Expand Down Expand Up @@ -174,6 +193,12 @@ namespace orc {
return false;
}

void StructVectorBatch::decodeDictionaryImpl() {
for (const auto& field : fields) {
field->decodeDictionary();
}
}

ListVectorBatch::ListVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
offsets.zeroOut();
Expand Down Expand Up @@ -211,6 +236,10 @@ namespace orc {
return true;
}

void ListVectorBatch::decodeDictionaryImpl() {
elements->decodeDictionary();
}

MapVectorBatch::MapVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), offsets(pool, cap + 1) {
offsets.zeroOut();
Expand Down Expand Up @@ -251,6 +280,16 @@ namespace orc {
return true;
}

void MapVectorBatch::decodeDictionaryImpl() {
if (keys) {
keys->decodeDictionary();
}

if (elements) {
elements->decodeDictionary();
}
}

UnionVectorBatch::UnionVectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool), tags(pool, cap), offsets(pool, cap) {
tags.zeroOut();
Expand Down Expand Up @@ -310,6 +349,12 @@ namespace orc {
return false;
}

void UnionVectorBatch::decodeDictionaryImpl() {
for (const auto& child : children) {
child->decodeDictionary();
}
}

Decimal64VectorBatch::Decimal64VectorBatch(uint64_t cap, MemoryPool& pool)
: ColumnVectorBatch(cap, pool),
precision(0),
Expand Down
53 changes: 53 additions & 0 deletions c++/test/TestDictionaryEncoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,4 +434,57 @@ namespace orc {
testDictionaryMultipleStripes(DICT_THRESHOLD, false);
testDictionaryMultipleStripes(FALLBACK_THRESHOLD, false);
}

TEST(DictionaryEncoding, decodeDictionary) {
size_t rowCount = 8192;
size_t dictionarySize = 100;
auto* memoryPool = getDefaultPool();

auto encodedStringBatch = std::make_shared<EncodedStringVectorBatch>(rowCount, *memoryPool);
EXPECT_FALSE(encodedStringBatch->dictionaryDecoded);
encodedStringBatch->numElements = rowCount;
encodedStringBatch->hasNulls = true;
encodedStringBatch->isEncoded = true;
encodedStringBatch->dictionary = std::make_shared<StringDictionary>(*memoryPool);

auto& dictionary = *encodedStringBatch->dictionary;
dictionary.dictionaryBlob.resize(3 * dictionarySize);
dictionary.dictionaryOffset.resize(dictionarySize + 1);
dictionary.dictionaryOffset[0] = 0;
for (uint64_t i = 0; i < dictionarySize; ++i) {
std::ostringstream oss;
oss << std::setw(3) << std::setfill('0') << i;

auto str = oss.str();
memcpy(&dictionary.dictionaryBlob[3 * i], str.data(), str.size());
dictionary.dictionaryOffset[i + 1] = 3 * (i + 1);
}

for (uint64_t i = 0; i < rowCount; ++i) {
if (i % 10 == 0) {
encodedStringBatch->notNull[i] = 0;
encodedStringBatch->index[i] = 0;
} else {
encodedStringBatch->notNull[i] = 1;
encodedStringBatch->index[i] = i % dictionarySize;
}
}

encodedStringBatch->decodeDictionary();
EXPECT_TRUE(encodedStringBatch->dictionaryDecoded);
EXPECT_EQ(0, encodedStringBatch->blob.size());

for (uint64_t i = 0; i < rowCount; ++i) {
if (encodedStringBatch->notNull[i]) {
auto index = encodedStringBatch->index[i];
char* buf = nullptr;
int64_t buf_size = 0;
dictionary.getValueByIndex(index, buf, buf_size);

EXPECT_EQ(buf, encodedStringBatch->data[i]);
EXPECT_EQ(buf_size, encodedStringBatch->length[i]);
}
}
}

} // namespace orc
Loading