Skip to content

Commit

Permalink
Smaller block size and add _col2LastId to block metadata (#917)
Browse files Browse the repository at this point in the history
1. The block size used to be 1 << 23 (over 8M), which is too large, since we always need to decompress at least one whole block, even when reading only few triples. It's now 500'000, which still has a relatively small overall space consumption.

2. Add member _col2LastId to block data because we will need it for #916 and it's nice if our live indexes already have this information so that we can play around with this PR without having to rebuild indexes.

3. Renamed the data members in `CompressedRelation.h` such that they have `trailingUnderscores_`.

4. Unrelated fix in IndexTestHelpers.h: The test TTL file was not deleted after the test, now it is.
  • Loading branch information
hannahbast authored Apr 4, 2023
1 parent 8b65e4d commit 499c612
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 151 deletions.
132 changes: 67 additions & 65 deletions src/index/CompressedRelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ void CompressedRelationReader::scan(
ad_utility::SharedConcurrentTimeoutTimer timer) const {
AD_CONTRACT_CHECK(result->numColumns() == NumColumns);

// get all the blocks where _col0FirstId <= col0Id <= _col0LastId
// get all the blocks where col0FirstId_ <= col0Id <= col0LastId_
struct KeyLhs {
Id _col0FirstId;
Id _col0LastId;
Id col0FirstId_;
Id col0LastId_;
};
Id col0Id = metadata._col0Id;
Id col0Id = metadata.col0Id_;
// TODO<joka921, Clang16> Use a structured binding. Structured bindings are
// currently not supported by clang when using OpenMP because clang internally
// transforms the `#pragma`s into lambdas, and capturing structured bindings
Expand All @@ -39,7 +39,7 @@ void CompressedRelationReader::scan(
// we use clang 16.
blockMetadata.begin(), blockMetadata.end(), KeyLhs{col0Id, col0Id},
[](const auto& a, const auto& b) {
return a._col0FirstId < b._col0FirstId && a._col0LastId < b._col0LastId;
return a.col0FirstId_ < b.col0FirstId_ && a.col0LastId_ < b.col0LastId_;
});

// The total size of the result is now known.
Expand All @@ -57,19 +57,19 @@ void CompressedRelationReader::scan(
// actual scan result.
bool firstBlockIsIncomplete =
beginBlock < endBlock &&
(beginBlock->_col0FirstId < col0Id || beginBlock->_col0LastId > col0Id);
(beginBlock->col0FirstId_ < col0Id || beginBlock->col0LastId_ > col0Id);
auto lastBlock = endBlock - 1;

bool lastBlockIsIncomplete =
beginBlock < lastBlock &&
(lastBlock->_col0FirstId < col0Id || lastBlock->_col0LastId > col0Id);
(lastBlock->col0FirstId_ < col0Id || lastBlock->col0LastId_ > col0Id);

// Invariant: A relation spans multiple blocks exclusively or several
// entities are stored completely in the same Block.
AD_CORRECTNESS_CHECK(!firstBlockIsIncomplete || (beginBlock == lastBlock));
AD_CORRECTNESS_CHECK(!lastBlockIsIncomplete);
if (firstBlockIsIncomplete) {
AD_CORRECTNESS_CHECK(metadata._offsetInBlock !=
AD_CORRECTNESS_CHECK(metadata.offsetInBlock_ !=
std::numeric_limits<uint64_t>::max());
}

Expand All @@ -78,7 +78,7 @@ void CompressedRelationReader::scan(
// the result.
auto readIncompleteBlock = [&](const auto& block) {
// A block is uniquely identified by its start position in the file.
auto cacheKey = block._offsetsAndCompressedSize.at(0)._offsetInFile;
auto cacheKey = block.offsetsAndCompressedSize_.at(0).offsetInFile_;
auto uncompressedBuffer = blockCache_
.computeOnce(cacheKey,
[&]() {
Expand All @@ -88,12 +88,12 @@ void CompressedRelationReader::scan(
._resultPointer;

// Extract the part of the block that actually belongs to the relation
auto numElements = metadata._numRows;
auto numElements = metadata.numRows_;
AD_CORRECTNESS_CHECK(uncompressedBuffer->numColumns() ==
metadata.numColumns());
for (size_t i = 0; i < uncompressedBuffer->numColumns(); ++i) {
const auto& inputCol = uncompressedBuffer->getColumn(i);
auto begin = inputCol.begin() + metadata._offsetInBlock;
auto begin = inputCol.begin() + metadata.offsetInBlock_;
auto resultColumn = result->getColumn(i);
AD_CORRECTNESS_CHECK(numElements <= spaceLeft);
std::copy(begin, begin + numElements, resultColumn.begin());
Expand Down Expand Up @@ -130,7 +130,7 @@ void CompressedRelationReader::scan(
std::move(compressedBuffer)]() {
ad_utility::TimeBlockAndLog tbl{"Decompressing a block"};

decompressBlockToExistingIdTable(compressedBuffer, block._numRows,
decompressBlockToExistingIdTable(compressedBuffer, block.numRows_,
*result, rowIndexOfNextBlock);
};

Expand All @@ -144,8 +144,8 @@ void CompressedRelationReader::scan(

// this is again serial code, set up the correct pointers
// for the next block;
spaceLeft -= block._numRows;
rowIndexOfNextBlock += block._numRows;
spaceLeft -= block.numRows_;
rowIndexOfNextBlock += block.numRows_;
}
AD_CORRECTNESS_CHECK(spaceLeft == 0);
} // End of omp parallel region, all the decompression was handled now.
Expand All @@ -162,20 +162,20 @@ void CompressedRelationReader::scan(
// Get all the blocks that possibly might contain our pair of col0Id and
// col1Id
struct KeyLhs {
Id _col0FirstId;
Id _col0LastId;
Id _col1FirstId;
Id _col1LastId;
Id col0FirstId_;
Id col0LastId_;
Id col1FirstId_;
Id col1LastId_;
};

auto comp = [](const auto& a, const auto& b) {
bool endBeforeBegin = a._col0LastId < b._col0FirstId;
bool endBeforeBegin = a.col0LastId_ < b.col0FirstId_;
endBeforeBegin |=
(a._col0LastId == b._col0FirstId && a._col1LastId < b._col1FirstId);
(a.col0LastId_ == b.col0FirstId_ && a.col1LastId_ < b.col1FirstId_);
return endBeforeBegin;
};

Id col0Id = metaData._col0Id;
Id col0Id = metaData.col0Id_;

// Note: See the comment in the other overload for `scan` above for the
// reason why we (currently) can't use a structured binding here.
Expand All @@ -187,7 +187,7 @@ void CompressedRelationReader::scan(
// Invariant: The col0Id is completely stored in a single block, or it is
// contained in multiple blocks that only contain this col0Id,
bool col0IdHasExclusiveBlocks =
metaData._offsetInBlock == std::numeric_limits<uint64_t>::max();
metaData.offsetInBlock_ == std::numeric_limits<uint64_t>::max();
if (!col0IdHasExclusiveBlocks) {
// This might also be zero if no block was found at all.
AD_CORRECTNESS_CHECK(endBlock - beginBlock <= 1);
Expand All @@ -207,13 +207,13 @@ void CompressedRelationReader::scan(

// Find the range in the block, that belongs to the same relation `col0Id`
bool containedInOnlyOneBlock =
metaData._offsetInBlock != std::numeric_limits<uint64_t>::max();
metaData.offsetInBlock_ != std::numeric_limits<uint64_t>::max();
auto begin = col1Column.begin();
if (containedInOnlyOneBlock) {
begin += metaData._offsetInBlock;
begin += metaData.offsetInBlock_;
}
auto end =
containedInOnlyOneBlock ? begin + metaData._numRows : col1Column.end();
containedInOnlyOneBlock ? begin + metaData.numRows_ : col1Column.end();

// Find the range in the block, where also the col1Id matches (the second
// ID in the `std::array` does not matter).
Expand Down Expand Up @@ -250,7 +250,7 @@ void CompressedRelationReader::scan(
// First accumulate the complete blocks in the "middle"
auto totalResultSize = std::accumulate(
beginBlock, endBlock, 0ul, [](const auto& count, const auto& block) {
return count + block._numRows;
return count + block.numRows_;
});
// Add the possibly incomplete blocks from the beginning and end;
totalResultSize += firstBlockResult.size() + lastBlockResult.size();
Expand All @@ -270,7 +270,7 @@ void CompressedRelationReader::scan(
const auto& block = *beginBlock;

// Read the block serially, only read the second column.
AD_CORRECTNESS_CHECK(block._offsetsAndCompressedSize.size() == 2);
AD_CORRECTNESS_CHECK(block.offsetsAndCompressedSize_.size() == 2);
CompressedBlock compressedBuffer =
readCompressedBlockFromFile(block, file, std::vector{1ul});

Expand All @@ -281,7 +281,7 @@ void CompressedRelationReader::scan(
std::move(compressedBuffer)]() mutable {
ad_utility::TimeBlockAndLog tbl{"Decompression a block"};

decompressBlockToExistingIdTable(compressedBuffer, block._numRows,
decompressBlockToExistingIdTable(compressedBuffer, block.numRows_,
*result, rowIndexOfNextBlockStart);
};

Expand All @@ -295,7 +295,7 @@ void CompressedRelationReader::scan(
}

// update the pointers
rowIndexOfNextBlockStart += block._numRows;
rowIndexOfNextBlockStart += block.numRows_;
} // end of parallel region
}
// Add the last block.
Expand Down Expand Up @@ -328,7 +328,7 @@ void CompressedRelationWriter::addRelation(Id col0Id,
float multC1 = computeMultiplicity(col1And2Ids.numRows(), numDistinctCol1);
// Dummy value that will be overwritten later
float multC2 = 42.42;
// This sets everything except the _offsetInBlock, which will be set
// This sets everything except the offsetInBlock_, which will be set
// explicitly below.
CompressedRelationMetadata metaData{col0Id, col1And2Ids.numRows(), multC1,
multC2};
Expand All @@ -345,43 +345,44 @@ void CompressedRelationWriter::addRelation(Id col0Id,
// this relation are too large, we will write the buffered relations to file
// and start a new block.
bool relationHasExclusiveBlocks =
sizeInBytes(col1And2Ids) > 0.8 * static_cast<double>(_numBytesPerBlock);
sizeInBytes(col1And2Ids) > 0.8 * static_cast<double>(numBytesPerBlock_);
if (relationHasExclusiveBlocks ||
sizeInBytes(col1And2Ids) + sizeInBytes(_buffer) >
static_cast<double>(_numBytesPerBlock) * 1.5) {
sizeInBytes(col1And2Ids) + sizeInBytes(buffer_) >
static_cast<double>(numBytesPerBlock_) * 1.5) {
writeBufferedRelationsToSingleBlock();
}

if (relationHasExclusiveBlocks) {
// The relation is large, immediately write the relation to a set of
// exclusive blocks.
writeRelationToExclusiveBlocks(col0Id, col1And2Ids);
metaData._offsetInBlock = std::numeric_limits<uint64_t>::max();
metaData.offsetInBlock_ = std::numeric_limits<uint64_t>::max();
} else {
// Append to the current buffered block.
metaData._offsetInBlock = _buffer.numRows();
metaData.offsetInBlock_ = buffer_.numRows();
static_assert(sizeof(col1And2Ids[0][0]) == sizeof(Id));
if (_buffer.numRows() == 0) {
_currentBlockData._col0FirstId = col0Id;
_currentBlockData._col1FirstId = col1And2Ids(0, 0);
if (buffer_.numRows() == 0) {
currentBlockData_.col0FirstId_ = col0Id;
currentBlockData_.col1FirstId_ = col1And2Ids(0, 0);
}
_currentBlockData._col0LastId = col0Id;
_currentBlockData._col1LastId = col1And2Ids(col1And2Ids.numRows() - 1, 0);
AD_CORRECTNESS_CHECK(_buffer.numColumns() == col1And2Ids.numColumns());
auto bufferOldSize = _buffer.numRows();
_buffer.resize(_buffer.numRows() + col1And2Ids.numRows());
currentBlockData_.col0LastId_ = col0Id;
currentBlockData_.col1LastId_ = col1And2Ids(col1And2Ids.numRows() - 1, 0);
currentBlockData_.col2LastId_ = col1And2Ids(col1And2Ids.numRows() - 1, 1);
AD_CORRECTNESS_CHECK(buffer_.numColumns() == col1And2Ids.numColumns());
auto bufferOldSize = buffer_.numRows();
buffer_.resize(buffer_.numRows() + col1And2Ids.numRows());
for (size_t i = 0; i < col1And2Ids.numColumns(); ++i) {
const auto& column = col1And2Ids.getColumn(i);
std::ranges::copy(column, _buffer.getColumn(i).begin() + bufferOldSize);
std::ranges::copy(column, buffer_.getColumn(i).begin() + bufferOldSize);
}
}
_metaDataBuffer.push_back(metaData);
metaDataBuffer_.push_back(metaData);
}

// _____________________________________________________________________________
void CompressedRelationWriter::writeRelationToExclusiveBlocks(
Id col0Id, const BufferedIdTable& data) {
const size_t numRowsPerBlock = _numBytesPerBlock / (NumColumns * sizeof(Id));
const size_t numRowsPerBlock = numBytesPerBlock_ / (NumColumns * sizeof(Id));
AD_CORRECTNESS_CHECK(numRowsPerBlock > 0);
AD_CORRECTNESS_CHECK(data.numColumns() == NumColumns);
const auto totalSize = data.numRows();
Expand All @@ -394,38 +395,39 @@ void CompressedRelationWriter::writeRelationToExclusiveBlocks(
{column.begin() + i, column.begin() + i + actualNumRowsPerBlock}));
}

_blockBuffer.push_back(CompressedBlockMetadata{
blockBuffer_.push_back(CompressedBlockMetadata{
std::move(offsets), actualNumRowsPerBlock, col0Id, col0Id, data[i][0],
data[i + actualNumRowsPerBlock - 1][0]});
data[i + actualNumRowsPerBlock - 1][0],
data[i + actualNumRowsPerBlock - 1][1]});
}
}

// ___________________________________________________________________________
void CompressedRelationWriter::writeBufferedRelationsToSingleBlock() {
if (_buffer.empty()) {
if (buffer_.empty()) {
return;
}

AD_CORRECTNESS_CHECK(_buffer.numColumns() == NumColumns);
AD_CORRECTNESS_CHECK(buffer_.numColumns() == NumColumns);
// Convert from bytes to number of ID pairs.
size_t numRows = _buffer.numRows();
size_t numRows = buffer_.numRows();

// TODO<joka921, C++23> This is
// `ranges::to<vector>(ranges::transform_view(_buffer.getColumns(),
// `ranges::to<vector>(ranges::transform_view(buffer_.getColumns(),
// compressAndWriteColumn))`;
std::ranges::for_each(_buffer.getColumns(),
std::ranges::for_each(buffer_.getColumns(),
[this](const auto& column) mutable {
_currentBlockData._offsetsAndCompressedSize.push_back(
currentBlockData_.offsetsAndCompressedSize_.push_back(
compressAndWriteColumn(column));
});

_currentBlockData._numRows = numRows;
// The `firstId` and `lastId` of `_currentBlockData` were already set
currentBlockData_.numRows_ = numRows;
// The `firstId` and `lastId` of `currentBlockData_` were already set
// correctly by `addRelation()`.
_blockBuffer.push_back(_currentBlockData);
blockBuffer_.push_back(currentBlockData_);
// Reset the data of the current block.
_currentBlockData = CompressedBlockMetadata{};
_buffer.clear();
currentBlockData_ = CompressedBlockMetadata{};
buffer_.clear();
}

// _____________________________________________________________________________
Expand All @@ -448,10 +450,10 @@ CompressedBlock CompressedRelationReader::readCompressedBlockFromFile(
// TODO<C++23> Use `std::views::zip`
for (size_t i = 0; i < compressedBuffer.size(); ++i) {
const auto& offset =
blockMetaData._offsetsAndCompressedSize.at(columnIndices->at(i));
blockMetaData.offsetsAndCompressedSize_.at(columnIndices->at(i));
auto& currentCol = compressedBuffer[i];
currentCol.resize(offset._compressedSize);
file.read(currentCol.data(), offset._compressedSize, offset._offsetInFile);
currentCol.resize(offset.compressedSize_);
file.read(currentCol.data(), offset.compressedSize_, offset.offsetInFile_);
}
return compressedBuffer;
}
Expand Down Expand Up @@ -500,7 +502,7 @@ DecompressedBlock CompressedRelationReader::readAndDecompressBlock(
std::optional<std::vector<size_t>> columnIndices) {
CompressedBlock compressedColumns = readCompressedBlockFromFile(
blockMetaData, file, std::move(columnIndices));
const auto numRowsToRead = blockMetaData._numRows;
const auto numRowsToRead = blockMetaData.numRows_;
return decompressBlock(compressedColumns, numRowsToRead);
}

Expand All @@ -509,8 +511,8 @@ CompressedBlockMetadata::OffsetAndCompressedSize
CompressedRelationWriter::compressAndWriteColumn(std::span<const Id> column) {
std::vector<char> compressedBlock = ZstdWrapper::compress(
(void*)(column.data()), column.size() * sizeof(column[0]));
auto offsetInFile = _outfile.tell();
auto offsetInFile = outfile_.tell();
auto compressedSize = compressedBlock.size();
_outfile.write(compressedBlock.data(), compressedBlock.size());
outfile_.write(compressedBlock.data(), compressedBlock.size());
return {offsetInFile, compressedSize};
};
Loading

0 comments on commit 499c612

Please sign in to comment.