Skip to content

Commit 8cf1047

Browse files
authored
ORC-1280: [C++] Implement block-based buffer(Part I)
This closes #1271
1 parent 7d18634 commit 8cf1047

File tree

5 files changed

+284
-0
lines changed

5 files changed

+284
-0
lines changed

c++/src/BlockBuffer.cc

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#include "BlockBuffer.hh"
20+
21+
#include <algorithm>
22+
23+
namespace orc {
24+
25+
BlockBuffer::BlockBuffer(MemoryPool& pool, uint64_t _blockSize)
26+
: memoryPool(pool),
27+
currentSize(0),
28+
currentCapacity(0),
29+
blockSize(_blockSize) {
30+
if (blockSize == 0) {
31+
throw std::logic_error("Block size cannot be zero");
32+
}
33+
reserve(blockSize);
34+
}
35+
36+
BlockBuffer::~BlockBuffer() {
37+
for (size_t i = 0; i < blocks.size(); ++i) {
38+
memoryPool.free(blocks[i]);
39+
}
40+
blocks.clear();
41+
currentSize = currentCapacity = 0;
42+
}
43+
44+
BlockBuffer::Block BlockBuffer::getBlock(uint64_t blockIndex) const {
45+
if (blockIndex >= getBlockNumber()) {
46+
throw std::out_of_range("Block index out of range");
47+
}
48+
return Block(blocks[blockIndex],
49+
std::min(currentSize - blockIndex * blockSize, blockSize));
50+
}
51+
52+
BlockBuffer::Block BlockBuffer::getNextBlock() {
53+
if (currentSize < currentCapacity) {
54+
Block emptyBlock(
55+
blocks[currentSize / blockSize] + currentSize % blockSize,
56+
blockSize - currentSize % blockSize);
57+
currentSize = (currentSize / blockSize + 1) * blockSize;
58+
return emptyBlock;
59+
} else {
60+
resize(currentSize + blockSize);
61+
return Block(blocks.back(), blockSize);
62+
}
63+
}
64+
65+
void BlockBuffer::resize(uint64_t size) {
66+
reserve(size);
67+
if (currentCapacity >= size) {
68+
currentSize = size;
69+
} else {
70+
throw std::logic_error("Block buffer resize error");
71+
}
72+
}
73+
74+
void BlockBuffer::reserve(uint64_t newCapacity) {
75+
while (currentCapacity < newCapacity) {
76+
char* newBlockPtr = memoryPool.malloc(blockSize);
77+
if (newBlockPtr != nullptr) {
78+
blocks.push_back(newBlockPtr);
79+
currentCapacity += blockSize;
80+
} else {
81+
break;
82+
}
83+
}
84+
}
85+
} // namespace orc

c++/src/BlockBuffer.hh

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#ifndef ORC_BLOCK_BUFFER_HH
20+
#define ORC_BLOCK_BUFFER_HH
21+
22+
#include "orc/MemoryPool.hh"
23+
24+
#include <vector>
25+
26+
namespace orc {
27+
28+
/**
29+
* BlockBuffer implements a memory allocation policy based on
30+
* equal-length blocks. BlockBuffer will reserve multiple blocks
31+
* for allocation.
32+
*/
33+
class BlockBuffer {
34+
private:
35+
MemoryPool& memoryPool;
36+
// current buffer size
37+
uint64_t currentSize;
38+
// maximal capacity (actual allocated memory)
39+
uint64_t currentCapacity;
40+
// unit for buffer expansion
41+
const uint64_t blockSize;
42+
// pointers to the start of each block
43+
std::vector<char*> blocks;
44+
45+
// non-copy-constructible
46+
BlockBuffer(BlockBuffer& buffer) = delete;
47+
BlockBuffer& operator=(BlockBuffer& buffer) = delete;
48+
BlockBuffer(BlockBuffer&& buffer) = delete;
49+
BlockBuffer& operator=(BlockBuffer&& buffer) = delete;
50+
51+
public:
52+
BlockBuffer(MemoryPool& pool, uint64_t blockSize);
53+
54+
~BlockBuffer();
55+
56+
/**
57+
* Block points to a section of memory allocated by BlockBuffer,
58+
* containing the corresponding physical memory address and available size.
59+
*/
60+
struct Block {
61+
// the start of block
62+
char* data;
63+
// number of bytes available at data
64+
uint64_t size;
65+
66+
Block() : data(nullptr), size(0) {}
67+
Block(char* _data, uint64_t _size) : data(_data), size(_size) {}
68+
Block(const Block& block) = default;
69+
~Block() = default;
70+
};
71+
72+
/**
73+
* Get the allocated block object.
74+
* The last allocated block size may be less than blockSize,
75+
* and the rest of the blocks are all of size blockSize.
76+
* @param blockIndex the index of blocks
77+
* @return the allocated block object
78+
*/
79+
Block getBlock(uint64_t blockIndex) const;
80+
81+
/**
82+
* Get a empty block or allocate a new block to write.
83+
* If the last allocated block size is less than blockSize,
84+
* the size of empty block is equal to blockSize minus the size of
85+
* the last allocated block size. Otherwise, the size of
86+
* the empty block is equal to blockSize.
87+
* @return a empty block object
88+
*/
89+
Block getNextBlock();
90+
91+
/**
92+
* Get the number of blocks that are fully or partially occupied
93+
*/
94+
uint64_t getBlockNumber() const {
95+
return (currentSize + blockSize - 1) / blockSize;
96+
}
97+
98+
uint64_t size() const {
99+
return currentSize;
100+
}
101+
102+
uint64_t capacity() const {
103+
return currentCapacity;
104+
}
105+
106+
void resize(uint64_t size);
107+
/**
108+
* Requests the BlockBuffer to contain at least newCapacity bytes.
109+
* Reallocation happens if there is need of more space.
110+
* @param newCapacity new capacity of BlockBuffer
111+
*/
112+
void reserve(uint64_t newCapacity);
113+
};
114+
} // namespace orc
115+
116+
#endif

c++/src/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ set(SOURCE_FILES
205205
sargs/TruthValue.cc
206206
wrap/orc-proto-wrapper.cc
207207
Adaptor.cc
208+
BlockBuffer.cc
208209
BloomFilter.cc
209210
ByteRLE.cc
210211
ColumnPrinter.cc

c++/test/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ add_executable (orc-test
2222
MemoryInputStream.cc
2323
MemoryOutputStream.cc
2424
TestAttributes.cc
25+
TestBlockBuffer.cc
2526
TestBufferedOutputStream.cc
2627
TestBloomFilter.cc
2728
TestByteRle.cc

c++/test/TestBlockBuffer.cc

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#include "BlockBuffer.hh"
20+
#include "orc/OrcFile.hh"
21+
#include "wrap/gtest-wrapper.h"
22+
23+
namespace orc {
24+
25+
TEST(TestBlockBuffer, size_and_capacity) {
26+
MemoryPool* pool = getDefaultPool();
27+
BlockBuffer buffer(*pool, 1024);
28+
29+
// block buffer will preallocate one block during initialization
30+
EXPECT_EQ(buffer.getBlockNumber(), 0);
31+
EXPECT_EQ(buffer.size(), 0);
32+
EXPECT_EQ(buffer.capacity(), 1024);
33+
34+
buffer.reserve(128 * 1024);
35+
EXPECT_EQ(buffer.getBlockNumber(), 0);
36+
EXPECT_EQ(buffer.size(), 0);
37+
EXPECT_EQ(buffer.capacity(), 128 * 1024);
38+
39+
// new size < old capacity
40+
buffer.resize(64 * 1024);
41+
EXPECT_EQ(buffer.getBlockNumber(), 64);
42+
EXPECT_EQ(buffer.size(), 64 * 1024);
43+
EXPECT_EQ(buffer.capacity(), 128 * 1024);
44+
45+
// new size > old capacity
46+
buffer.resize(256 * 1024);
47+
EXPECT_EQ(buffer.getBlockNumber(), 256);
48+
EXPECT_EQ(buffer.size(), 256 * 1024);
49+
EXPECT_EQ(buffer.capacity(), 256 * 1024);
50+
}
51+
52+
TEST(TestBlockBuffer, get_block) {
53+
MemoryPool* pool = getDefaultPool();
54+
BlockBuffer buffer(*pool, 1024);
55+
56+
EXPECT_EQ(buffer.getBlockNumber(), 0);
57+
for (uint64_t i = 0; i < 10; ++i) {
58+
BlockBuffer::Block block = buffer.getNextBlock();
59+
EXPECT_EQ(buffer.getBlockNumber(), i + 1);
60+
for (uint64_t j = 0; j < block.size; ++j) {
61+
if (i % 2 == 0) {
62+
block.data[j] = static_cast<char>('A' + (i + j) % 26);
63+
} else {
64+
block.data[j] = static_cast<char>('a' + (i + j) % 26);
65+
}
66+
}
67+
}
68+
69+
// verify the block data
70+
for (uint64_t i = 0; i < buffer.getBlockNumber(); ++i) {
71+
BlockBuffer::Block block = buffer.getBlock(i);
72+
for (uint64_t j = 0; j < block.size; ++j) {
73+
if (i % 2 == 0) {
74+
EXPECT_EQ(block.data[j], 'A' + (i + j) % 26);
75+
} else {
76+
EXPECT_EQ(block.data[j], 'a' + (i + j) % 26);
77+
}
78+
}
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)