Skip to content

Commit

Permalink
Some review stuff and the bugfix.
Browse files Browse the repository at this point in the history
  • Loading branch information
joka921 committed Jan 10, 2024
1 parent 152ecec commit 91446c6
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 76 deletions.
2 changes: 1 addition & 1 deletion src/engine/AddCombinedRowToTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class AddCombinedRowToIdTable {
size_t bufferSize_ = 100'000;

// TODO<joka921> Comment
BlockwiseCallback blockwiseCallback_{};
[[no_unique_address]] BlockwiseCallback blockwiseCallback_{};

public:
// Construct from the number of join columns, the two inputs, and the output.
Expand Down
4 changes: 4 additions & 0 deletions src/index/MetaDataHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ class MetaDataWrapperDense {
// ____________________________________________________________
void set(Id id, const value_type& value) {
// Assert that the ids are ascending.
if (_vec.size() != 0 && _vec.back().col0Id_ >= id) {
LOG(ERROR) << "out of bounds " << id << " " << _vec.back().col0Id_
<< std::endl;
}

Check warning on line 95 in src/index/MetaDataHandler.h

View check run for this annotation

Codecov / codecov/patch

src/index/MetaDataHandler.h#L93-L95

Added lines #L93 - L95 were not covered by tests
AD_CONTRACT_CHECK(_vec.size() == 0 || _vec.back().col0Id_ < id);
_vec.push_back(value);
}
Expand Down
157 changes: 82 additions & 75 deletions src/util/JoinAlgorithms/JoinAlgorithms.h
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,6 @@ class BlockAndSubrange {
using reference = std::iterator_traits<typename Block::iterator>::reference;
using const_reference =
std::iterator_traits<typename Block::const_iterator>::reference;
// using reference = std::iterator_traits<typename
// Block::iterator>::value_type;

// Construct from a container object, where the initial subrange will
// represent the whole container.
Expand All @@ -568,11 +566,11 @@ class BlockAndSubrange {

// Return a reference to the last element of the currently specified subrange.
reference back() {
AD_CORRECTNESS_CHECK(subrange_.second - 1 < block_->size());
AD_CORRECTNESS_CHECK(!empty() && subrange_.second <= block_->size());
return (*block_)[subrange_.second - 1];
}
const_reference back() const {
AD_CORRECTNESS_CHECK(subrange_.second - 1 < block_->size());
AD_CORRECTNESS_CHECK(!empty() && subrange_.second <= block_->size());
return std::as_const(*block_)[subrange_.second - 1];
}

Expand All @@ -591,6 +589,7 @@ class BlockAndSubrange {
fullBlock().begin() + subrange_.second};
}

// Get a view that iterates over all the indices that belong to the subrange.
auto getIndexRange() const {
return std::views::iota(subrange_.first, subrange_.second);
}
Expand Down Expand Up @@ -640,12 +639,12 @@ class BlockAndSubrange {
// currently required by the join algorithm for one side of the join.
template <typename Iterator, typename End, typename Projection>
struct JoinSide {
using SameBlocks =
using CurrentBlocks =
std::vector<detail::BlockAndSubrange<typename Iterator::value_type>>;
Iterator it_;
const End end_;
const Projection& projection_;
SameBlocks sameBlocks_{};
CurrentBlocks currentBlocks_{};

// Type aliases for a single element from a block from the left/right input.
using value_type = std::ranges::range_value_t<typename Iterator::value_type>;
Expand All @@ -658,9 +657,9 @@ struct JoinSide {
template <typename It, typename End, typename Projection>
JoinSide(It, End, const Projection&) -> JoinSide<It, End, Projection>;

// Create a `JoinSide` object from a range of `blocks` and a projection. Note
// Create a `JoinSide` object from a range of `blocks` and a `projection`. Note
// that the `blocks` are stored as a reference, so the caller is responsible for
// keeping them valid until the
// keeping them valid until the join is completed.
template <typename Blocks>
auto makeJoinSide(Blocks& blocks, const auto& projection) {
return JoinSide{blocks.begin(), blocks.end(), projection};
Expand All @@ -670,17 +669,17 @@ auto makeJoinSide(Blocks& blocks, const auto& projection) {
template <typename T>
concept IsJoinSide = ad_utility::isInstantiation<T, JoinSide>;

// The class that actually performs the zipper join for blocks without undef.
// The class that actually performs the zipper join for blocks without UNDEF.
// See the public `zipperJoinForBlocksWithoutUndef` function below for details.
template <IsJoinSide LeftSide, IsJoinSide RightSide, typename LessThan,
typename CompatibleRowAction>
struct BlockZipperJoinImpl {
// The left and right inputs of the join.
// The left and right inputs of the join
LeftSide leftSide_;
RightSide rightSide_;
// The used comparison.
const LessThan& lessThan_;
// The callback that is called for the matching rows.
// The callback that is called for each pair of matching rows.
CompatibleRowAction& compatibleRowAction_;

// Type alias for the result of the projection. Elements from the left and
Expand All @@ -697,51 +696,52 @@ struct BlockZipperJoinImpl {
return !lessThan_(el1, el2) && !lessThan_(el2, el1);
};

// Recompute the `minEl`. It is the minimum of the last element in the first
// block of either of the join sides.
ProjectedEl getMinEl() {
// Recompute the `currentEl`. It is the minimum of the last element in the
// first block of either of the join sides.
ProjectedEl getCurrentEl() {
auto getFirst = [](const auto& side) {
return side.projection_(side.sameBlocks_.front().back());
return side.projection_(side.currentBlocks_.front().back());
};
return std::min(getFirst(leftSide_), getFirst(rightSide_), lessThan_);
};

// Fill `side.sameBlocks_` with blocks from the range `[side.it_, sidde.end_)`
// and advance `side.it_` for each read buffer until all elements <= `minEl`
// are added or until three blocks have been added to the targetBuffer.
// Calling this function requires that all blocks that contain elements `<
// minEl` have already been consumed. Returns `true` if all blocks that
// contain elements <= `minEl` have been added, and `false` if the function
// returned because 3 blocks were added without fulfilling the condition.
bool fillEqualToMinimum(IsJoinSide auto& side, const auto& minEl) {
// Fill `side.sameBlocks_` with blocks from the range `[side.it_, side.end_)`
// and advance `side.it_` for each read buffer until all elements <=
// `currentEl` are added or until three blocks have been added to the
// targetBuffer. Calling this function requires that all blocks that contain
// elements `< currentEl` have already been consumed. Returns `true` if all
// blocks that contain elements <= `currentEl` have been added, and `false` if
// the function returned because 3 blocks were added without fulfilling the
// condition.
bool fillEqualToMinimum(IsJoinSide auto& side, const auto& currentEl) {
auto& it = side.it_;
auto& end = side.end_;
for (size_t numBlocksRead = 0; it != end && numBlocksRead < 3;
++it, ++numBlocksRead) {
if (std::ranges::empty(*it)) {
continue;
}

Check warning on line 723 in src/util/JoinAlgorithms/JoinAlgorithms.h

View check run for this annotation

Codecov / codecov/patch

src/util/JoinAlgorithms/JoinAlgorithms.h#L722-L723

Added lines #L722 - L723 were not covered by tests
if (!eq((*it)[0], minEl)) {
AD_CORRECTNESS_CHECK(lessThan_(minEl, (*it)[0]));
if (!eq((*it)[0], currentEl)) {
AD_CORRECTNESS_CHECK(lessThan_(currentEl, (*it)[0]));
return true;
}
AD_CORRECTNESS_CHECK(std::ranges::is_sorted(*it, lessThan_));
side.sameBlocks_.emplace_back(std::move(*it));
side.currentBlocks_.emplace_back(std::move(*it));
}
return it == end;
}

// Fill the buffers in `leftSide_` and rightSide_` until they both contain at
// least one block and at least one of them contains all the blocks with
// elements `<= minEl`. The returned `BlockStatus` reports which of the sides
// contain all the relevant blocks.
// elements `<= currentEl`. The returned `BlockStatus` reports which of the
// sides contain all the relevant blocks.
enum struct BlockStatus { leftMissing, rightMissing, allFilled };
BlockStatus fillEqualToMinimumBothSides(const auto& minEl) {
BlockStatus fillEqualToMinimumBothSides(const auto& currentEl) {
bool allBlocksFromLeft = false;
bool allBlocksFromRight = false;
while (!(allBlocksFromLeft || allBlocksFromRight)) {
allBlocksFromLeft = fillEqualToMinimum(leftSide_, minEl);
allBlocksFromRight = fillEqualToMinimum(rightSide_, minEl);
allBlocksFromLeft = fillEqualToMinimum(leftSide_, currentEl);
allBlocksFromRight = fillEqualToMinimum(rightSide_, currentEl);
}
if (!allBlocksFromRight) {
return BlockStatus::rightMissing;

Check warning on line 747 in src/util/JoinAlgorithms/JoinAlgorithms.h

View check run for this annotation

Codecov / codecov/patch

src/util/JoinAlgorithms/JoinAlgorithms.h#L747

Added line #L747 was not covered by tests
Expand Down Expand Up @@ -780,15 +780,15 @@ struct BlockZipperJoinImpl {
// obtain a tuple of the following elements:
// * A reference to the first full block
// * The currently active subrange of that block
// * An iterator pointing to the position of the `minEl` in the block.
auto getFirstBlock(auto& sameBlocks, const auto& minEl) {
// * An iterator pointing to the first element ` >= currentEl` in the block.
auto getFirstBlock(auto& sameBlocks, const auto& currentEl) {
AD_CORRECTNESS_CHECK(!sameBlocks.empty());
const auto& first = sameBlocks.at(0);
auto it = std::ranges::lower_bound(first.subrange(), minEl, lessThan_);
auto it = std::ranges::lower_bound(first.subrange(), currentEl, lessThan_);
return std::tuple{std::ref(first.fullBlock()), first.subrange(), it};
};

// Call `compatibleRowAction` for all pairs of elements in the cartesian
// Call `compatibleRowAction` for all pairs of elements in the Cartesian
// product of the blocks in `blocksLeft` and `blocksRight`.
template <bool DoOptionalJoin>
void addAll(const auto& blocksLeft, const auto& blocksRight) {
Expand Down Expand Up @@ -819,18 +819,18 @@ struct BlockZipperJoinImpl {
compatibleRowAction_.flush();
};

// Return a vector of subranges of all elements in `input` that are equal to
// the last element that we can safely join (this is the `minEl`).
// Return a vector of subranges of all elements in `blocks` that are equal to
// the last element that we can safely join (this is the `currentEl`).
// Effectively, these subranges cover all the blocks completely except maybe
// the last one, which might contain elements `> minEl` at the end.
auto pushRelevantSubranges(const auto& input, const auto& minEl) {
auto result = input;
// the last one, which might contain elements `> currentEl` at the end.
auto pushRelevantSubranges(const auto& blocks, const auto& currentEl) {
auto result = blocks;
// If one of the inputs is empty, this function shouldn't have been called
// in the first place.
AD_CORRECTNESS_CHECK(!result.empty());
auto& last = result.back();
last.setSubrange(
std::ranges::equal_range(last.subrange(), minEl, lessThan_));
std::ranges::equal_range(last.subrange(), currentEl, lessThan_));
return result;
};

Expand All @@ -841,12 +841,12 @@ struct BlockZipperJoinImpl {
// `sameBlocksLeft/Right`, as they are not needed anymore.
template <bool DoOptionalJoin>
void joinAndRemoveBeginning(auto& sameBlocksLeft, auto& sameBlocksRight,
const auto& minEl) {
const auto& currentEl) {
// Get the first blocks.
auto [fullBlockLeft, subrangeLeft, minElItL] =
getFirstBlock(sameBlocksLeft, minEl);
auto [fullBlockRight, subrangeRight, minElItR] =
getFirstBlock(sameBlocksRight, minEl);
auto [fullBlockLeft, subrangeLeft, currentElItL] =
getFirstBlock(sameBlocksLeft, currentEl);
auto [fullBlockRight, subrangeRight, currentElItR] =
getFirstBlock(sameBlocksRight, currentEl);

compatibleRowAction_.setInput(fullBlockLeft.get(), fullBlockRight.get());
auto addRowIndex = [begL = fullBlockLeft.get().begin(),
Expand All @@ -866,24 +866,24 @@ struct BlockZipperJoinImpl {
}
}();
[[maybe_unused]] auto res = zipperJoinWithUndef(
std::ranges::subrange{subrangeLeft.begin(), minElItL},
std::ranges::subrange{subrangeRight.begin(), minElItR}, lessThan_,
std::ranges::subrange{subrangeLeft.begin(), currentElItL},
std::ranges::subrange{subrangeRight.begin(), currentElItR}, lessThan_,
addRowIndex, noop, noop, addNotFoundRowIndex);
compatibleRowAction_.flush();

// Remove the joined elements.
sameBlocksLeft.at(0).setSubrange(minElItL, subrangeLeft.end());
sameBlocksRight.at(0).setSubrange(minElItR, subrangeRight.end());
sameBlocksLeft.at(0).setSubrange(currentElItL, subrangeLeft.end());
sameBlocksRight.at(0).setSubrange(currentElItR, subrangeRight.end());
};

// If the `targetBuffer` is empty, read the next nonempty block from `[it,
// end)` if there is one.
void fillWithAtLeastOne(auto& side) {
auto& targetBuffer = side.sameBlocks_;
auto& targetBuffer = side.currentBlocks_;
auto& it = side.it_;
const auto& end = side.end_;
while (targetBuffer.empty() && it != end) {
auto&& el = *it;
auto& el = *it;
if (!el.empty()) {
AD_CORRECTNESS_CHECK(std::ranges::is_sorted(el, lessThan_));
targetBuffer.emplace_back(std::move(el));
Expand Down Expand Up @@ -918,42 +918,45 @@ struct BlockZipperJoinImpl {
// `sameBlocksRight` is empty after calling this function. Then we have
// finished processing all blocks and can finish the overall algorithm.
void fillBuffer(auto& blockStatus) {
AD_CORRECTNESS_CHECK(leftSide_.sameBlocks_.size() <= 1);
AD_CORRECTNESS_CHECK(rightSide_.sameBlocks_.size() <= 1);
AD_CORRECTNESS_CHECK(leftSide_.currentBlocks_.size() <= 1);
AD_CORRECTNESS_CHECK(rightSide_.currentBlocks_.size() <= 1);

fillWithAtLeastOne(leftSide_);
fillWithAtLeastOne(rightSide_);

if (leftSide_.sameBlocks_.empty() || rightSide_.sameBlocks_.empty()) {
if (leftSide_.currentBlocks_.empty() || rightSide_.currentBlocks_.empty()) {
// One of the inputs was exhausted, we are done.
// If the left side is not empty and this is an optional join, then we
// will add the remaining elements from the `leftSide_` later in the
// `fillWithAllFromLeft` function.
return;
}

// Add the remaining blocks such that condition 3 from above is fulfilled.
blockStatus = fillEqualToMinimumBothSides(getMinEl());
currentMinEl_ = getMinEl();
blockStatus = fillEqualToMinimumBothSides(getCurrentEl());
currentMinEl_ = getCurrentEl();
}

// Combine the above functionality and perform one round of joining.
template <bool DoOptionalJoin, typename ProjectedEl>
void joinBuffers(auto& blockStatus) {
auto& sameBlocksLeft = leftSide_.sameBlocks_;
auto& sameBlocksRight = rightSide_.sameBlocks_;
auto& sameBlocksLeft = leftSide_.currentBlocks_;
auto& sameBlocksRight = rightSide_.currentBlocks_;
joinAndRemoveBeginning<DoOptionalJoin>(sameBlocksLeft, sameBlocksRight,
getMinEl());
getCurrentEl());

ProjectedEl minEl = getMinEl();
auto l = pushRelevantSubranges(sameBlocksLeft, minEl);
auto r = pushRelevantSubranges(sameBlocksRight, minEl);
ProjectedEl currentEl = getCurrentEl();
auto l = pushRelevantSubranges(sameBlocksLeft, currentEl);
auto r = pushRelevantSubranges(sameBlocksRight, currentEl);

auto getNextBlocks = [&minEl, self = this, &blockStatus](auto& target,
auto& side) {
self->removeAllButUnjoined(side.sameBlocks_, minEl);
bool allBlocksWereFilled = self->fillEqualToMinimum(side, minEl);
if (side.sameBlocks_.empty()) {
auto getNextBlocks = [&currentEl, self = this, &blockStatus](auto& target,
auto& side) {
self->removeAllButUnjoined(side.currentBlocks_, currentEl);
bool allBlocksWereFilled = self->fillEqualToMinimum(side, currentEl);

Check warning on line 955 in src/util/JoinAlgorithms/JoinAlgorithms.h

View check run for this annotation

Codecov / codecov/patch

src/util/JoinAlgorithms/JoinAlgorithms.h#L954-L955

Added lines #L954 - L955 were not covered by tests
if (side.currentBlocks_.empty()) {
AD_CORRECTNESS_CHECK(allBlocksWereFilled);
}
target = self->pushRelevantSubranges(side.sameBlocks_, minEl);
target = self->pushRelevantSubranges(side.currentBlocks_, currentEl);

Check warning on line 959 in src/util/JoinAlgorithms/JoinAlgorithms.h

View check run for this annotation

Codecov / codecov/patch

src/util/JoinAlgorithms/JoinAlgorithms.h#L957-L959

Added lines #L957 - L959 were not covered by tests
if (allBlocksWereFilled) {
blockStatus = BlockStatus::allFilled;

Check warning on line 961 in src/util/JoinAlgorithms/JoinAlgorithms.h

View check run for this annotation

Codecov / codecov/patch

src/util/JoinAlgorithms/JoinAlgorithms.h#L961

Added line #L961 was not covered by tests
}
Expand All @@ -964,8 +967,8 @@ struct BlockZipperJoinImpl {
addAll<DoOptionalJoin>(l, r);
switch (blockStatus.value()) {
case BlockStatus::allFilled: {
removeAllButUnjoined(sameBlocksLeft, minEl);
removeAllButUnjoined(sameBlocksRight, minEl);
removeAllButUnjoined(sameBlocksLeft, currentEl);
removeAllButUnjoined(sameBlocksRight, currentEl);
return;
}

Check warning on line 973 in src/util/JoinAlgorithms/JoinAlgorithms.h

View check run for this annotation

Codecov / codecov/patch

src/util/JoinAlgorithms/JoinAlgorithms.h#L973

Added line #L973 was not covered by tests
case BlockStatus::rightMissing: {
Expand All @@ -986,9 +989,7 @@ struct BlockZipperJoinImpl {
// elements from the left input after the right input has been completely
// processed.
auto fillWithAllFromLeft() {
auto& sameBlocksLeft = leftSide_.sameBlocks_;
auto& it1 = leftSide_.it_;
const auto& end1 = leftSide_.end_;
auto& sameBlocksLeft = leftSide_.currentBlocks_;
for (auto& block : sameBlocksLeft) {
compatibleRowAction_.setLeftInput(block.fullBlock());

Expand All @@ -997,12 +998,17 @@ struct BlockZipperJoinImpl {
compatibleRowAction_.addOptionalRow(idx);
}
}
auto& it1 = leftSide_.it_;
const auto& end1 = leftSide_.end_;
while (it1 != end1) {
auto& block = *it1;
compatibleRowAction_.setLeftInput(block);

Check warning on line 1005 in src/util/JoinAlgorithms/JoinAlgorithms.h

View check run for this annotation

Codecov / codecov/patch

src/util/JoinAlgorithms/JoinAlgorithms.h#L1004-L1005

Added lines #L1004 - L1005 were not covered by tests
for (size_t idx : ad_utility::integerRange(block.size())) {
compatibleRowAction_.addOptionalRow(idx);
}

Check warning on line 1008 in src/util/JoinAlgorithms/JoinAlgorithms.h

View check run for this annotation

Codecov / codecov/patch

src/util/JoinAlgorithms/JoinAlgorithms.h#L1007-L1008

Added lines #L1007 - L1008 were not covered by tests
// We need to manually flush, because the `block` is captured by reference
// and not valid anymore after increasing the iterator.
compatibleRowAction_.flush();
++it1;
}

Check warning on line 1013 in src/util/JoinAlgorithms/JoinAlgorithms.h

View check run for this annotation

Codecov / codecov/patch

src/util/JoinAlgorithms/JoinAlgorithms.h#L1011-L1013

Added lines #L1011 - L1013 were not covered by tests
compatibleRowAction_.flush();
Expand All @@ -1014,7 +1020,8 @@ struct BlockZipperJoinImpl {
std::optional<BlockStatus> blockStatus;
while (true) {
fillBuffer(blockStatus);
if (leftSide_.sameBlocks_.empty() || rightSide_.sameBlocks_.empty()) {
if (leftSide_.currentBlocks_.empty() ||
rightSide_.currentBlocks_.empty()) {
if constexpr (DoOptionalJoin) {
fillWithAllFromLeft();
}
Expand Down

0 comments on commit 91446c6

Please sign in to comment.