diff --git a/src/engine/AddCombinedRowToTable.h b/src/engine/AddCombinedRowToTable.h index 75c2bcd848..8c8939f64c 100644 --- a/src/engine/AddCombinedRowToTable.h +++ b/src/engine/AddCombinedRowToTable.h @@ -26,6 +26,8 @@ class AddCombinedRowToIdTable { size_t numJoinColumns_; std::optional, 2>> inputLeftAndRight_; IdTable resultTable_; + LocalVocab mergedVocab_{}; + std::array currentVocabs_{nullptr, nullptr}; // This struct stores the information, which row indices from the input are // combined into a given row index in the output, i.e. "To obtain the @@ -62,7 +64,7 @@ class AddCombinedRowToIdTable { // This callback is called with the result as an argument each time `flush()` // is called. It can be used to consume parts of the result early, before the // complete operation has finished. - using BlockwiseCallback = std::function; + using BlockwiseCallback = std::function; [[no_unique_address]] BlockwiseCallback blockwiseCallback_{ad_utility::noop}; using CancellationHandle = ad_utility::SharedCancellationHandle; @@ -124,6 +126,41 @@ class AddCombinedRowToIdTable { } } + // Unwrap type `T` to get an `IdTableView<0>`, even if it's not an + // `IdTableView<0>`. Identity for `IdTableView<0>`. + template + static IdTableView<0> toView(const T& table) { + if constexpr (requires { table.template asStaticView<0>(); }) { + return table.template asStaticView<0>(); + } else { + return table; + } + } + + // Merge the local vocab contained in `T` with the `mergedVocab_` and set the + // passed pointer reference to that vocab. + template + void mergeVocab(const T& table, const LocalVocab*& currentVocab) { + AD_CORRECTNESS_CHECK(currentVocab == nullptr); + if constexpr (requires { table.getLocalVocab(); }) { + currentVocab = &table.getLocalVocab(); + mergedVocab_.mergeWith(std::span{&table.getLocalVocab(), 1}); + } + } + + // Flush remaining pending entries before changing the input. + void flushBeforeInputChange() { + // Clear to avoid unnecessary merge. + currentVocabs_ = {nullptr, nullptr}; + if (nextIndex_ != 0) { + AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value()); + flush(); + } else { + // Clear vocab when no rows were written. + mergedVocab_ = LocalVocab{}; + } + } + // Set or reset the input. All following calls to `addRow` then refer to // indices in the new input. Before resetting, `flush()` is called, so all the // rows from the previous inputs get materialized before deleting the old @@ -131,17 +168,9 @@ class AddCombinedRowToIdTable { // `IdTable` or `IdTableView<0>`, or any other type that has a // `asStaticView<0>` method that returns an `IdTableView<0>`. void setInput(const auto& inputLeft, const auto& inputRight) { - auto toView = [](const T& table) { - if constexpr (requires { table.template asStaticView<0>(); }) { - return table.template asStaticView<0>(); - } else { - return table; - } - }; - if (nextIndex_ != 0) { - AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value()); - flush(); - } + flushBeforeInputChange(); + mergeVocab(inputLeft, currentVocabs_.at(0)); + mergeVocab(inputRight, currentVocabs_.at(1)); inputLeftAndRight_ = std::array{toView(inputLeft), toView(inputRight)}; checkNumColumns(); } @@ -149,17 +178,8 @@ class AddCombinedRowToIdTable { // Only set the left input. After this it is only allowed to call // `addOptionalRow` and not `addRow` until `setInput` has been called again. void setOnlyLeftInputForOptionalJoin(const auto& inputLeft) { - auto toView = [](const T& table) { - if constexpr (requires { table.template asStaticView<0>(); }) { - return table.template asStaticView<0>(); - } else { - return table; - } - }; - if (nextIndex_ != 0) { - AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value()); - flush(); - } + flushBeforeInputChange(); + mergeVocab(inputLeft, currentVocabs_.at(0)); // The right input will be empty, but with the correct number of columns. inputLeftAndRight_ = std::array{ toView(inputLeft), @@ -188,6 +208,8 @@ class AddCombinedRowToIdTable { return std::move(resultTable_); } + LocalVocab& localVocab() { return mergedVocab_; } + // Disable copying and moving, it is currently not needed and makes it harder // to reason about AddCombinedRowToIdTable(const AddCombinedRowToIdTable&) = delete; @@ -320,7 +342,17 @@ class AddCombinedRowToIdTable { indexBuffer_.clear(); optionalIndexBuffer_.clear(); nextIndex_ = 0; - std::invoke(blockwiseCallback_, result); + std::invoke(blockwiseCallback_, result, mergedVocab_); + // The current `IdTable`s might still be active, so we have to merge the + // local vocabs again if all other sets were moved-out. + if (resultTable_.empty()) { + // Make sure to reset `mergedVocab_` so it is in a valid state again. + mergedVocab_ = LocalVocab{}; + // Only merge non-null vocabs. + auto range = currentVocabs_ | std::views::filter(toBool) | + std::views::transform(dereference); + mergedVocab_.mergeWith(std::ranges::ref_view{range}); + } } const IdTableView<0>& inputLeft() const { return inputLeftAndRight_.value()[0]; diff --git a/src/engine/Join.cpp b/src/engine/Join.cpp index 184df75128..6d2376bb9c 100644 --- a/src/engine/Join.cpp +++ b/src/engine/Join.cpp @@ -20,10 +20,61 @@ #include "global/RuntimeParameters.h" #include "util/Exception.h" #include "util/HashMap.h" +#include "util/JoinAlgorithms/JoinAlgorithms.h" using std::endl; using std::string; +namespace { +using OptionalPermutation = Join::OptionalPermutation; +void applyPermutation(IdTable& idTable, + const OptionalPermutation& permutation) { + if (permutation.has_value()) { + idTable.setColumnSubset(permutation.value()); + } +} + +using LazyInputView = + cppcoro::generator>; +// Convert a `generator` to a `generator` for +// more efficient access in the join columns below and apply the given +// permutation to each table. +LazyInputView convertGenerator(Result::Generator gen, + OptionalPermutation permutation = {}) { + for (auto& [table, localVocab] : gen) { + applyPermutation(table, permutation); + // Make sure to actually move the table into the wrapper so that the tables + // live as long as the wrapper. + ad_utility::IdTableAndFirstCol t{std::move(table), std::move(localVocab)}; + co_yield t; + } +} + +using MaterializedInputView = + std::array>, 1>; +// Wrap a fully materialized result in a `IdTableAndFirstCol` and an array. It +// then fulfills the concept `view` which is required by the +// lazy join algorithms. Note: The `convertGenerator` function above +// conceptually does exactly the same for lazy inputs. +MaterializedInputView asSingleTableView( + const Result& result, const std::vector& permutation) { + return std::array{ad_utility::IdTableAndFirstCol{ + result.idTable().asColumnSubsetView(permutation), + result.getCopyOfLocalVocab()}}; +} + +// Wrap a result either in an array with a single element or in a range wrapping +// the lazy result generator. Note that the lifetime of the view is coupled to +// the lifetime of the result. +std::variant resultToView( + const Result& result, const std::vector& permutation) { + if (result.isFullyMaterialized()) { + return asSingleTableView(result, permutation); + } + return convertGenerator(std::move(result.idTables()), permutation); +} +} // namespace + // _____________________________________________________________________________ Join::Join(QueryExecutionContext* qec, std::shared_ptr t1, std::shared_ptr t2, ColumnIndex t1JoinCol, @@ -90,44 +141,34 @@ string Join::getCacheKeyImpl() const { string Join::getDescriptor() const { return "Join on " + _joinVar.name(); } // _____________________________________________________________________________ -ProtoResult Join::computeResult([[maybe_unused]] bool requestLaziness) { +ProtoResult Join::computeResult(bool requestLaziness) { LOG(DEBUG) << "Getting sub-results for join result computation..." << endl; - IdTable idTable{getResultWidth(), getExecutionContext()->getAllocator()}; - if (_left->knownEmptyResult() || _right->knownEmptyResult()) { _left->getRootOperation()->updateRuntimeInformationWhenOptimizedOut(); _right->getRootOperation()->updateRuntimeInformationWhenOptimizedOut(); - return {std::move(idTable), resultSortedOn(), LocalVocab()}; + return createEmptyResult(); } // Always materialize results that meet one of the following criteria: // * They are already present in the cache // * Their result is small - // * They might contain UNDEF values in the join column - // The first two conditions are for performance reasons, the last one is - // because we currently cannot perform the optimized lazy joins when UNDEF - // values are involved. - auto getCachedOrSmallResult = [](QueryExecutionTree& tree, - ColumnIndex joinCol) { + // This is purely for performance reasons. + auto getCachedOrSmallResult = [](const QueryExecutionTree& tree) { bool isSmall = tree.getRootOperation()->getSizeEstimate() < RuntimeParameters().get<"lazy-index-scan-max-size-materialization">(); - auto undefStatus = - tree.getVariableAndInfoByColumnIndex(joinCol).second.mightContainUndef_; - bool containsUndef = - undefStatus == ColumnIndexAndTypeInfo::UndefStatus::PossiblyUndefined; // The third argument means "only get the result if it can be read from the - // cache". So effectively, this returns the result if it is small, contains - // UNDEF values, or is contained in the cache, otherwise `nullptr`. + // cache". So effectively, this returns the result if it is small, or is + // contained in the cache, otherwise `nullptr`. // TODO Add a unit test that checks the correct conditions return tree.getRootOperation()->getResult( - false, (isSmall || containsUndef) ? ComputationMode::FULLY_MATERIALIZED - : ComputationMode::ONLY_IF_CACHED); + false, isSmall ? ComputationMode::FULLY_MATERIALIZED + : ComputationMode::ONLY_IF_CACHED); }; - auto leftResIfCached = getCachedOrSmallResult(*_left, _leftJoinCol); + auto leftResIfCached = getCachedOrSmallResult(*_left); checkCancellation(); - auto rightResIfCached = getCachedOrSmallResult(*_right, _rightJoinCol); + auto rightResIfCached = getCachedOrSmallResult(*_right); checkCancellation(); auto leftIndexScan = @@ -135,20 +176,13 @@ ProtoResult Join::computeResult([[maybe_unused]] bool requestLaziness) { if (leftIndexScan && std::dynamic_pointer_cast(_right->getRootOperation())) { if (rightResIfCached && !leftResIfCached) { - idTable = computeResultForIndexScanAndIdTable( - rightResIfCached->idTable(), _rightJoinCol, *leftIndexScan, - _leftJoinCol); - checkCancellation(); - return {std::move(idTable), resultSortedOn(), LocalVocab{}}; + AD_CORRECTNESS_CHECK(rightResIfCached->isFullyMaterialized()); + return computeResultForIndexScanAndIdTable( + requestLaziness, std::move(rightResIfCached), _rightJoinCol, + leftIndexScan, _leftJoinCol); } else if (!leftResIfCached) { - idTable = computeResultForTwoIndexScans(); - checkCancellation(); - // TODO When we add triples to the - // index, the vocabularies of index scans will not necessarily be empty - // and we need a mechanism to still retrieve them when using the lazy - // scan. - return {std::move(idTable), resultSortedOn(), LocalVocab{}}; + return computeResultForTwoIndexScans(requestLaziness); } } @@ -157,46 +191,33 @@ ProtoResult Join::computeResult([[maybe_unused]] bool requestLaziness) { Service::precomputeSiblingResult(_left->getRootOperation(), _right->getRootOperation(), false, requestLaziness); - std::shared_ptr leftRes = - leftResIfCached ? leftResIfCached : _left->getResult(); + leftResIfCached ? leftResIfCached : _left->getResult(true); checkCancellation(); - if (leftRes->idTable().empty()) { + if (leftRes->isFullyMaterialized() && leftRes->idTable().empty()) { _right->getRootOperation()->updateRuntimeInformationWhenOptimizedOut(); - // TODO When we add triples to the - // index, the vocabularies of index scans will not necessarily be empty and - // we need a mechanism to still retrieve them when using the lazy scan. - return {std::move(idTable), resultSortedOn(), LocalVocab()}; + return createEmptyResult(); } // Note: If only one of the children is a scan, then we have made sure in the // constructor that it is the right child. - // We currently cannot use this optimized lazy scan if the result from `_left` - // contains UNDEF values. - const auto& leftIdTable = leftRes->idTable(); - auto leftHasUndef = - !leftIdTable.empty() && leftIdTable.at(0, _leftJoinCol).isUndefined(); auto rightIndexScan = std::dynamic_pointer_cast(_right->getRootOperation()); - if (rightIndexScan && !rightResIfCached && !leftHasUndef) { - idTable = computeResultForIndexScanAndIdTable( - leftRes->idTable(), _leftJoinCol, *rightIndexScan, _rightJoinCol); - checkCancellation(); - return {std::move(idTable), resultSortedOn(), - leftRes->getSharedLocalVocab()}; + if (rightIndexScan && !rightResIfCached && leftRes->isFullyMaterialized()) { + return computeResultForIndexScanAndIdTable( + requestLaziness, std::move(leftRes), _leftJoinCol, rightIndexScan, + _rightJoinCol); } std::shared_ptr rightRes = - rightResIfCached ? rightResIfCached : _right->getResult(); - checkCancellation(); - join(leftRes->idTable(), _leftJoinCol, rightRes->idTable(), _rightJoinCol, - &idTable); + rightResIfCached ? rightResIfCached : _right->getResult(true); checkCancellation(); - - // If only one of the two operands has a non-empty local vocabulary, share - // with that one (otherwise, throws an exception). - return {std::move(idTable), resultSortedOn(), - Result::getMergedLocalVocab(*leftRes, *rightRes)}; + if (leftRes->isFullyMaterialized() && rightRes->isFullyMaterialized()) { + return computeResultForTwoMaterializedInputs(std::move(leftRes), + std::move(rightRes)); + } + return lazyJoin(std::move(leftRes), _leftJoinCol, std::move(rightRes), + _rightJoinCol, requestLaziness); } // _____________________________________________________________________________ @@ -408,6 +429,115 @@ void Join::join(const IdTable& a, ColumnIndex jc1, const IdTable& b, << ", size = " << result->size() << "\n"; } +// _____________________________________________________________________________ +Result::Generator Join::runLazyJoinAndConvertToGenerator( + ad_utility::InvocableWithExactReturnType< + Result::IdTableVocabPair, + std::function> auto action, + OptionalPermutation permutation) const { + std::atomic_flag write = true; + std::variant + storage; + ad_utility::JThread thread{[&write, &storage, &action, &permutation]() { + auto writeValue = [&write, &storage](auto value) noexcept { + storage = std::move(value); + write.clear(); + write.notify_one(); + }; + auto writeValueAndWait = [&permutation, &write, + &writeValue](Result::IdTableVocabPair value) { + AD_CORRECTNESS_CHECK(write.test()); + applyPermutation(value.idTable_, permutation); + writeValue(std::move(value)); + // Wait until we are allowed to write again. + write.wait(false); + }; + auto addValue = [&writeValueAndWait](IdTable& idTable, + LocalVocab& localVocab) { + if (idTable.size() < CHUNK_SIZE) { + return; + } + writeValueAndWait({std::move(idTable), std::move(localVocab)}); + }; + try { + auto finalValue = action(addValue); + if (!finalValue.idTable_.empty()) { + writeValueAndWait(std::move(finalValue)); + } + writeValue(std::monostate{}); + } catch (...) { + writeValue(std::current_exception()); + } + }}; + while (true) { + // Wait for read phase. + write.wait(true); + if (std::holds_alternative(storage)) { + break; + } + if (std::holds_alternative(storage)) { + std::rethrow_exception(std::get(storage)); + } + co_yield std::get(storage); + // Initiate write phase. + write.test_and_set(); + write.notify_one(); + } +} + +// _____________________________________________________________________________ +ProtoResult Join::createResult( + bool requestedLaziness, + ad_utility::InvocableWithExactReturnType< + Result::IdTableVocabPair, + std::function> auto action, + std::optional> permutation) const { + if (requestedLaziness) { + return {runLazyJoinAndConvertToGenerator(std::move(action), + std::move(permutation)), + resultSortedOn()}; + } else { + auto [idTable, localVocab] = action(ad_utility::noop); + applyPermutation(idTable, permutation); + return {std::move(idTable), resultSortedOn(), std::move(localVocab)}; + } +} + +// ______________________________________________________________________________ +ProtoResult Join::lazyJoin(std::shared_ptr a, ColumnIndex jc1, + std::shared_ptr b, ColumnIndex jc2, + bool requestLaziness) const { + // If both inputs are fully materialized, we can join them more + // efficiently. + AD_CONTRACT_CHECK(!a->isFullyMaterialized() || !b->isFullyMaterialized()); + ad_utility::JoinColumnMapping joinColMap{ + {{jc1, jc2}}, + _left->getRootOperation()->getResultWidth(), + _right->getRootOperation()->getResultWidth()}; + auto resultPermutation = joinColMap.permutationResult(); + return createResult( + requestLaziness, + [this, a = std::move(a), b = std::move(b), + joinColMap = std::move(joinColMap)]( + std::function yieldTable) { + ad_utility::AddCombinedRowToIdTable rowAdder{ + 1, IdTable{getResultWidth(), allocator()}, cancellationHandle_, + CHUNK_SIZE, std::move(yieldTable)}; + auto leftRange = resultToView(*a, joinColMap.permutationLeft()); + auto rightRange = resultToView(*b, joinColMap.permutationRight()); + std::visit( + [&rowAdder](auto& leftBlocks, auto& rightBlocks) { + ad_utility::zipperJoinForBlocksWithPotentialUndef( + leftBlocks, rightBlocks, std::less{}, rowAdder); + }, + leftRange, rightRange); + auto localVocab = std::move(rowAdder.localVocab()); + return Result::IdTableVocabPair{std::move(rowAdder).resultTable(), + std::move(localVocab)}; + }, + std::move(resultPermutation)); +} + // ______________________________________________________________________________ template void Join::hashJoinImpl(const IdTable& dynA, ColumnIndex jc1, @@ -517,7 +647,7 @@ void Join::hashJoin(const IdTable& dynA, ColumnIndex jc1, const IdTable& dynB, // ___________________________________________________________________________ template void Join::addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB, - const ColumnIndex jcRowB, + ColumnIndex jcRowB, IdTableStatic* table) { // Add a new, empty row. const size_t backIndex = table->size(); @@ -540,15 +670,17 @@ void Join::addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB, } namespace { +using GeneratorWithDetails = + cppcoro::generator, + CompressedRelationReader::LazyScanMetadata>; // Convert a `generator` for more // efficient access in the join columns below. -cppcoro::generator, - CompressedRelationReader::LazyScanMetadata> -convertGenerator(Permutation::IdTableGenerator gen) { +GeneratorWithDetails convertGenerator(Permutation::IdTableGenerator gen) { co_await cppcoro::getDetails = gen.details(); gen.setDetailsPointer(&co_await cppcoro::getDetails); for (auto& table : gen) { - ad_utility::IdTableAndFirstCol t{std::move(table)}; + // IndexScans don't have a local vocabulary, so we can just use an empty one + ad_utility::IdTableAndFirstCol t{std::move(table), LocalVocab{}}; co_yield t; } } @@ -582,89 +714,153 @@ void updateRuntimeInfoForLazyScan( } // namespace // ______________________________________________________________________________________________________ -IdTable Join::computeResultForTwoIndexScans() { - auto leftScan = - std::dynamic_pointer_cast(_left->getRootOperation()); - auto rightScan = - std::dynamic_pointer_cast(_right->getRootOperation()); - AD_CORRECTNESS_CHECK(leftScan && rightScan); - // The join column already is the first column in both inputs, so we don't - // have to permute the inputs and results for the `AddCombinedRowToIdTable` - // class to work correctly. - AD_CORRECTNESS_CHECK(_leftJoinCol == 0 && _rightJoinCol == 0); - ad_utility::AddCombinedRowToIdTable rowAdder{ - 1, IdTable{getResultWidth(), getExecutionContext()->getAllocator()}, - cancellationHandle_}; - - ad_utility::Timer timer{ad_utility::timer::Timer::InitialStatus::Started}; - auto [leftBlocksInternal, rightBlocksInternal] = - IndexScan::lazyScanForJoinOfTwoScans(*leftScan, *rightScan); - runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs()); - - auto leftBlocks = convertGenerator(std::move(leftBlocksInternal)); - auto rightBlocks = convertGenerator(std::move(rightBlocksInternal)); - - ad_utility::zipperJoinForBlocksWithoutUndef(leftBlocks, rightBlocks, - std::less{}, rowAdder); - - updateRuntimeInfoForLazyScan(*leftScan, leftBlocks.details()); - updateRuntimeInfoForLazyScan(*rightScan, rightBlocks.details()); - - AD_CORRECTNESS_CHECK(leftBlocks.details().numBlocksRead_ <= - rightBlocks.details().numElementsRead_); - AD_CORRECTNESS_CHECK(rightBlocks.details().numBlocksRead_ <= - leftBlocks.details().numElementsRead_); - - return std::move(rowAdder).resultTable(); +ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const { + return createResult( + requestLaziness, + [this](std::function yieldTable) { + auto leftScan = + std::dynamic_pointer_cast(_left->getRootOperation()); + auto rightScan = + std::dynamic_pointer_cast(_right->getRootOperation()); + AD_CORRECTNESS_CHECK(leftScan && rightScan); + // The join column already is the first column in both inputs, so we + // don't have to permute the inputs and results for the + // `AddCombinedRowToIdTable` class to work correctly. + AD_CORRECTNESS_CHECK(_leftJoinCol == 0 && _rightJoinCol == 0); + ad_utility::AddCombinedRowToIdTable rowAdder{ + 1, IdTable{getResultWidth(), allocator()}, cancellationHandle_, + CHUNK_SIZE, std::move(yieldTable)}; + + ad_utility::Timer timer{ + ad_utility::timer::Timer::InitialStatus::Started}; + auto [leftBlocksInternal, rightBlocksInternal] = + IndexScan::lazyScanForJoinOfTwoScans(*leftScan, *rightScan); + runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs()); + + auto leftBlocks = convertGenerator(std::move(leftBlocksInternal)); + auto rightBlocks = convertGenerator(std::move(rightBlocksInternal)); + + ad_utility::zipperJoinForBlocksWithoutUndef(leftBlocks, rightBlocks, + std::less{}, rowAdder); + + updateRuntimeInfoForLazyScan(*leftScan, leftBlocks.details()); + updateRuntimeInfoForLazyScan(*rightScan, rightBlocks.details()); + + AD_CORRECTNESS_CHECK(leftBlocks.details().numBlocksRead_ <= + rightBlocks.details().numElementsRead_); + AD_CORRECTNESS_CHECK(rightBlocks.details().numBlocksRead_ <= + leftBlocks.details().numElementsRead_); + auto localVocab = std::move(rowAdder.localVocab()); + return Result::IdTableVocabPair{std::move(rowAdder).resultTable(), + std::move(localVocab)}; + }); } // ______________________________________________________________________________________________________ template -IdTable Join::computeResultForIndexScanAndIdTable(const IdTable& idTable, - ColumnIndex joinColTable, - IndexScan& scan, - ColumnIndex joinColScan) { +ProtoResult Join::computeResultForIndexScanAndIdTable( + bool requestLaziness, std::shared_ptr resultWithIdTable, + ColumnIndex joinColTable, std::shared_ptr scan, + ColumnIndex joinColScan) const { + const IdTable& idTable = resultWithIdTable->idTable(); // We first have to permute the columns. auto [jcLeft, jcRight, numColsLeft, numColsRight] = [&]() { return idTableIsRightInput - ? std::tuple{joinColScan, joinColTable, scan.getResultWidth(), + ? std::tuple{joinColScan, joinColTable, scan->getResultWidth(), idTable.numColumns()} : std::tuple{joinColTable, joinColScan, idTable.numColumns(), - scan.getResultWidth()}; + scan->getResultWidth()}; }(); - auto joinColMap = ad_utility::JoinColumnMapping{ + ad_utility::JoinColumnMapping joinColMap{ {{jcLeft, jcRight}}, numColsLeft, numColsRight}; - ad_utility::AddCombinedRowToIdTable rowAdder{ - 1, IdTable{getResultWidth(), getExecutionContext()->getAllocator()}, - cancellationHandle_}; - - AD_CORRECTNESS_CHECK(joinColScan == 0); - auto permutationIdTable = - ad_utility::IdTableAndFirstCol{idTable.asColumnSubsetView( - idTableIsRightInput ? joinColMap.permutationRight() - : joinColMap.permutationLeft())}; - - ad_utility::Timer timer{ad_utility::timer::Timer::InitialStatus::Started}; - auto rightBlocksInternal = - scan.lazyScanForJoinOfColumnWithScan(permutationIdTable.col()); - auto rightBlocks = convertGenerator(std::move(rightBlocksInternal)); - - runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs()); - - auto doJoin = [&rowAdder](auto& left, auto& right) mutable { - ad_utility::zipperJoinForBlocksWithoutUndef(left, right, std::less{}, - rowAdder); - }; - auto blockForIdTable = std::span{&permutationIdTable, 1}; - if (idTableIsRightInput) { - doJoin(rightBlocks, blockForIdTable); - } else { - doJoin(blockForIdTable, rightBlocks); - } - auto result = std::move(rowAdder).resultTable(); - result.setColumnSubset(joinColMap.permutationResult()); + auto resultPermutation = joinColMap.permutationResult(); + return createResult( + requestLaziness, + [this, joinColTable, scan = std::move(scan), joinColScan, + resultWithIdTable = std::move(resultWithIdTable), + joinColMap = std::move(joinColMap)]( + std::function yieldTable) { + const IdTable& idTable = resultWithIdTable->idTable(); + ad_utility::AddCombinedRowToIdTable rowAdder{ + 1, IdTable{getResultWidth(), allocator()}, cancellationHandle_, + CHUNK_SIZE, std::move(yieldTable)}; + + AD_CORRECTNESS_CHECK(joinColScan == 0); + auto permutationIdTable = ad_utility::IdTableAndFirstCol{ + idTable.asColumnSubsetView(idTableIsRightInput + ? joinColMap.permutationRight() + : joinColMap.permutationLeft()), + resultWithIdTable->getCopyOfLocalVocab()}; + + ad_utility::Timer timer{ + ad_utility::timer::Timer::InitialStatus::Started}; + bool idTableHasUndef = + !idTable.empty() && idTable.at(0, joinColTable).isUndefined(); + std::optional> indexScanResult = + std::nullopt; + auto rightBlocks = [&scan, idTableHasUndef, &permutationIdTable, + &indexScanResult]() + -> std::variant { + if (idTableHasUndef) { + indexScanResult = + scan->getResult(false, ComputationMode::LAZY_IF_SUPPORTED); + AD_CORRECTNESS_CHECK( + !indexScanResult.value()->isFullyMaterialized()); + return convertGenerator( + std::move(indexScanResult.value()->idTables())); + } else { + auto rightBlocksInternal = + scan->lazyScanForJoinOfColumnWithScan(permutationIdTable.col()); + return convertGenerator(std::move(rightBlocksInternal)); + } + }(); + + runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs()); + auto doJoin = [&rowAdder](auto& left, auto& right) mutable { + // Note: The `zipperJoinForBlocksWithPotentialUndef` automatically + // switches to a more efficient implementation if there are no UNDEF + // values in any of the inputs. + ad_utility::zipperJoinForBlocksWithPotentialUndef( + left, right, std::less{}, rowAdder); + }; + auto blockForIdTable = std::array{std::move(permutationIdTable)}; + std::visit( + [&doJoin, &blockForIdTable](auto& blocks) { + if constexpr (idTableIsRightInput) { + doJoin(blocks, blockForIdTable); + } else { + doJoin(blockForIdTable, blocks); + } + }, + rightBlocks); + + if (std::holds_alternative(rightBlocks)) { + updateRuntimeInfoForLazyScan( + *scan, std::get(rightBlocks).details()); + } - updateRuntimeInfoForLazyScan(scan, rightBlocks.details()); - return result; + auto localVocab = std::move(rowAdder.localVocab()); + return Result::IdTableVocabPair{std::move(rowAdder).resultTable(), + std::move(localVocab)}; + }, + std::move(resultPermutation)); +} +// _____________________________________________________________________________ +ProtoResult Join::computeResultForTwoMaterializedInputs( + std::shared_ptr leftRes, + std::shared_ptr rightRes) const { + IdTable idTable{getResultWidth(), allocator()}; + join(leftRes->idTable(), _leftJoinCol, rightRes->idTable(), _rightJoinCol, + &idTable); + checkCancellation(); + + return {std::move(idTable), resultSortedOn(), + Result::getMergedLocalVocab(*leftRes, *rightRes)}; +} + +// _____________________________________________________________________________ +ProtoResult Join::createEmptyResult() const { + return {IdTable{getResultWidth(), allocator()}, resultSortedOn(), + LocalVocab{}}; } diff --git a/src/engine/Join.h b/src/engine/Join.h index d23a9e4f5d..a65d10f87b 100644 --- a/src/engine/Join.h +++ b/src/engine/Join.h @@ -11,7 +11,7 @@ #include "engine/QueryExecutionTree.h" #include "util/HashMap.h" #include "util/HashSet.h" -#include "util/JoinAlgorithms/JoinAlgorithms.h" +#include "util/TypeTraits.h" class Join : public Operation { private: @@ -33,6 +33,9 @@ class Join : public Operation { std::shared_ptr t2, ColumnIndex t1JoinCol, ColumnIndex t2JoinCol); + using OptionalPermutation = std::optional>; + + static constexpr size_t CHUNK_SIZE = 100'000; // A very explicit constructor, which initializes an invalid join object (it // has no subtrees, which violates class invariants). These invalid Join // objects can be used for unit tests that only test member functions which @@ -93,6 +96,46 @@ class Join : public Operation { void join(const IdTable& a, ColumnIndex jc1, const IdTable& b, ColumnIndex jc2, IdTable* result) const; + private: + // Part of the implementation of `createResult`. This function is called when + // the result should be yielded lazily. + // Action is a lambda that itself runs the join operation in a blocking + // manner. It is passed a special function that is supposed to be the callback + // being passed to the `AddCombinedRowToIdTable` so that the partial results + // can be yielded during execution. This is achieved by spawning a separate + // thread. + Result::Generator runLazyJoinAndConvertToGenerator( + ad_utility::InvocableWithExactReturnType< + Result::IdTableVocabPair, + std::function> auto action, + OptionalPermutation permutation) const; + + public: + // Helper function to compute the result of a join operation and conditionally + // return a lazy or fully materialized result depending on `requestLaziness`. + // This is achieved by running the `action` lambda in a separate thread and + // returning a lazy result that reads from the queue of the thread. If + // `requestLaziness` is false, the result is fully materialized and returned + // directly. + // `permutation` indicates a permutation to apply to the result columns before + // yielding/returning them. An empty vector means no permutation is applied. + // `action` is a lambda that can be used to send partial chunks to a consumer + // in addition to returning the remaining result. If laziness is not required + // it is a no-op. + ProtoResult createResult( + bool requestedLaziness, + ad_utility::InvocableWithExactReturnType< + Result::IdTableVocabPair, + std::function> auto action, + OptionalPermutation permutation = {}) const; + + // Fallback implementation of a join that is used when at least one of the two + // inputs is not fully materialized. This represents the general case where we + // don't have any optimization left to try. + ProtoResult lazyJoin(std::shared_ptr a, ColumnIndex jc1, + std::shared_ptr b, ColumnIndex jc2, + bool requestLaziness) const; + /** * @brief Joins IdTables dynA and dynB on join column jc2, returning * the result in dynRes. Creates a cross product for matching rows by putting @@ -113,24 +156,29 @@ class Join : public Operation { virtual string getCacheKeyImpl() const override; private: - ProtoResult computeResult([[maybe_unused]] bool requestLaziness) override; + ProtoResult computeResult(bool requestLaziness) override; VariableToColumnMap computeVariableToColumnMap() const override; // A special implementation that is called when both children are // `IndexScan`s. Uses the lazy scans to only retrieve the subset of the // `IndexScan`s that is actually needed without fully materializing them. - IdTable computeResultForTwoIndexScans(); + ProtoResult computeResultForTwoIndexScans(bool requestLaziness) const; // A special implementation that is called when one of the children is an // `IndexScan`. The argument `scanIsLeft` determines whether the `IndexScan` // is the left or the right child of this `Join`. This needs to be known to // determine the correct order of the columns in the result. template - IdTable computeResultForIndexScanAndIdTable(const IdTable& idTable, - ColumnIndex joinColTable, - IndexScan& scan, - ColumnIndex joinColScan); + ProtoResult computeResultForIndexScanAndIdTable( + bool requestLaziness, std::shared_ptr resultWithIdTable, + ColumnIndex joinColTable, std::shared_ptr scan, + ColumnIndex joinColScan) const; + + // Default case where both inputs are fully materialized. + ProtoResult computeResultForTwoMaterializedInputs( + std::shared_ptr leftRes, + std::shared_ptr rightRes) const; /* * @brief Combines 2 rows like in a join and inserts the result in the @@ -146,7 +194,7 @@ class Join : public Operation { */ template static void addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB, - const ColumnIndex jcRowB, + ColumnIndex jcRowB, IdTableStatic* table); /* @@ -156,4 +204,7 @@ class Join : public Operation { static void hashJoinImpl(const IdTable& dynA, ColumnIndex jc1, const IdTable& dynB, ColumnIndex jc2, IdTable* dynRes); + + // Commonly used code for the various known-to-be-empty cases. + ProtoResult createEmptyResult() const; }; diff --git a/src/engine/LocalVocab.h b/src/engine/LocalVocab.h index f61982400d..4cc47c5e55 100644 --- a/src/engine/LocalVocab.h +++ b/src/engine/LocalVocab.h @@ -99,6 +99,9 @@ class LocalVocab { // Return true if and only if the local vocabulary is empty. bool empty() const { return size() == 0; } + // The number of set stores (primary set and other sets). + size_t numSets() const { return 1 + otherWordSets_.size(); } + // Get the `LocalVocabEntry` corresponding to the given `LocalVocabIndex`. // // NOTE: This used to be a more complex function but is now a simple diff --git a/src/engine/Operation.cpp b/src/engine/Operation.cpp index 9c6ae814aa..21bfecc411 100644 --- a/src/engine/Operation.cpp +++ b/src/engine/Operation.cpp @@ -120,6 +120,10 @@ ProtoResult Operation::runComputation(const ad_utility::Timer& timer, // correctly because the result was computed, so we can pass `nullopt` as // the last argument. if (result.isFullyMaterialized()) { + size_t numLocalVocabs = result.localVocab().numSets(); + if (numLocalVocabs > 1) { + runtimeInfo().addDetail("num-local-vocabs", numLocalVocabs); + } updateRuntimeInformationOnSuccess(result.idTable().size(), ad_utility::CacheStatus::computed, timer.msecs(), std::nullopt); diff --git a/src/index/IndexImpl.cpp b/src/index/IndexImpl.cpp index 27ae9491ff..c1db24b264 100644 --- a/src/index/IndexImpl.cpp +++ b/src/index/IndexImpl.cpp @@ -192,24 +192,24 @@ IndexImpl::buildOspWithPatterns( // them to the queue. IdTable outputBufferTable{NumColumnsIndexBuilding + 2, ad_utility::makeUnlimitedAllocator()}; - auto pushToQueue = - [&, bufferSize = - BUFFER_SIZE_JOIN_PATTERNS_WITH_OSP.load()](IdTable& table) { - if (table.numRows() >= bufferSize) { - if (!outputBufferTable.empty()) { - queue.push(std::move(outputBufferTable)); - outputBufferTable.clear(); - } - queue.push(std::move(table)); - } else { - outputBufferTable.insertAtEnd(table.begin(), table.end()); - if (outputBufferTable.size() >= bufferSize) { - queue.push(std::move(outputBufferTable)); - outputBufferTable.clear(); - } - } - table.clear(); - }; + auto pushToQueue = [&, bufferSize = + BUFFER_SIZE_JOIN_PATTERNS_WITH_OSP.load()]( + IdTable& table, LocalVocab&) { + if (table.numRows() >= bufferSize) { + if (!outputBufferTable.empty()) { + queue.push(std::move(outputBufferTable)); + outputBufferTable.clear(); + } + queue.push(std::move(table)); + } else { + outputBufferTable.insertAtEnd(table.begin(), table.end()); + if (outputBufferTable.size() >= bufferSize) { + queue.push(std::move(outputBufferTable)); + outputBufferTable.clear(); + } + } + table.clear(); + }; lazyOptionalJoinOnFirstColumn(ospAsBlocksTransformed, lazyPatternScan, pushToQueue); diff --git a/src/util/JoinAlgorithms/JoinColumnMapping.h b/src/util/JoinAlgorithms/JoinColumnMapping.h index bb01c24a9b..26d9be15db 100644 --- a/src/util/JoinAlgorithms/JoinColumnMapping.h +++ b/src/util/JoinAlgorithms/JoinColumnMapping.h @@ -8,6 +8,7 @@ #include #include +#include "engine/LocalVocab.h" #include "util/Algorithm.h" namespace ad_utility { @@ -108,6 +109,7 @@ template struct IdTableAndFirstCol { private: Table table_; + LocalVocab localVocab_; public: // Typedef needed for generic interfaces. @@ -116,7 +118,8 @@ struct IdTableAndFirstCol { std::decay_t; // Construct by taking ownership of the table. - explicit IdTableAndFirstCol(Table t) : table_{std::move(t)} {} + IdTableAndFirstCol(Table t, LocalVocab localVocab) + : table_{std::move(t)}, localVocab_{std::move(localVocab)} {} // Get access to the first column. decltype(auto) col() { return table_.getColumn(0); } @@ -131,6 +134,8 @@ struct IdTableAndFirstCol { bool empty() const { return col().empty(); } const Id& operator[](size_t idx) const { return col()[idx]; } + const Id& front() const { return col().front(); } + const Id& back() const { return col().back(); } size_t size() const { return col().size(); } @@ -141,5 +146,7 @@ struct IdTableAndFirstCol { IdTableView asStaticView() const { return table_.template asStaticView(); } + + const LocalVocab& getLocalVocab() const { return localVocab_; } }; } // namespace ad_utility diff --git a/test/AddCombinedRowToTableTest.cpp b/test/AddCombinedRowToTableTest.cpp index 63d3d5a8be..c4a6b1c874 100644 --- a/test/AddCombinedRowToTableTest.cpp +++ b/test/AddCombinedRowToTableTest.cpp @@ -198,3 +198,128 @@ TEST(AddCombinedRowToTable, flushDoesCheckCancellation) { cancellationHandle->cancel(ad_utility::CancellationState::MANUAL); EXPECT_THROW(adder.flush(), ad_utility::CancellationException); } + +namespace { +struct IdTableWithVocab { + IdTable idTable_; + LocalVocab localVocab_; + + const LocalVocab& getLocalVocab() const { return localVocab_; } + + template + IdTableView asStaticView() const { + return idTable_.asStaticView(); + } +}; + +using ad_utility::triple_component::Literal; + +Literal fromString(std::string_view string) { + return Literal::fromStringRepresentation(absl::StrCat("\"", string, "\"")); +} + +// _____________________________________________________________________________ +LocalVocab createVocabWithSingleString(std::string_view string) { + LocalVocab localVocab; + localVocab.getIndexAndAddIfNotContained(LocalVocabEntry{fromString(string)}); + return localVocab; +} + +// _____________________________________________________________________________ +bool vocabContainsString(const LocalVocab& vocab, std::string_view string) { + return ad_utility::contains(vocab.getAllWordsForTesting(), + LocalVocabEntry{fromString(string)}); +} +} // namespace + +// _____________________________________________________________________________ +TEST(AddCombinedRowToTable, verifyLocalVocabIsUpdatedCorrectly) { + auto outputTable = makeIdTableFromVector({}); + outputTable.setNumColumns(3); + std::vector localVocabs; + ad_utility::AddCombinedRowToIdTable adder{ + 1, std::move(outputTable), + std::make_shared>(), 1, + [&localVocabs](IdTable& idTable, LocalVocab& localVocab) { + localVocabs.push_back(std::move(localVocab)); + // Clear to trigger new merging of local vocabs, in practice + // `localVocab` is not altered without altering `idTable` as well. + idTable.clear(); + }}; + + IdTableWithVocab input1{makeIdTableFromVector({{0, 1}}), + createVocabWithSingleString("a")}; + IdTableWithVocab input2{makeIdTableFromVector({{0, 2}}), + createVocabWithSingleString("b")}; + IdTableWithVocab input3{makeIdTableFromVector({{0, 3}}), + createVocabWithSingleString("c")}; + + using ::testing::SizeIs; + + adder.setInput(input1, input2); + adder.addRow(0, 0); + adder.addRow(0, 0); + EXPECT_THAT(localVocabs, SizeIs(1)); + adder.addRow(0, 0); + + adder.setInput(input2, input3); + EXPECT_THAT(localVocabs, SizeIs(2)); + adder.addRow(0, 0); + adder.flush(); + EXPECT_THAT(localVocabs, SizeIs(3)); + + adder.setOnlyLeftInputForOptionalJoin(input1); + adder.addOptionalRow(0); + adder.addOptionalRow(0); + EXPECT_THAT(localVocabs, SizeIs(4)); + adder.addOptionalRow(0); + + localVocabs.push_back(std::move(adder.localVocab())); + + ASSERT_THAT(localVocabs, SizeIs(5)); + + EXPECT_TRUE(vocabContainsString(localVocabs[0], "a")); + EXPECT_TRUE(vocabContainsString(localVocabs[0], "b")); + EXPECT_FALSE(vocabContainsString(localVocabs[0], "c")); + + EXPECT_TRUE(vocabContainsString(localVocabs[1], "a")); + EXPECT_TRUE(vocabContainsString(localVocabs[1], "b")); + EXPECT_FALSE(vocabContainsString(localVocabs[1], "c")); + + EXPECT_FALSE(vocabContainsString(localVocabs[2], "a")); + EXPECT_TRUE(vocabContainsString(localVocabs[2], "b")); + EXPECT_TRUE(vocabContainsString(localVocabs[2], "c")); + + EXPECT_TRUE(vocabContainsString(localVocabs[3], "a")); + EXPECT_FALSE(vocabContainsString(localVocabs[3], "b")); + EXPECT_FALSE(vocabContainsString(localVocabs[3], "c")); + + EXPECT_TRUE(vocabContainsString(localVocabs[4], "a")); + EXPECT_FALSE(vocabContainsString(localVocabs[4], "b")); + EXPECT_FALSE(vocabContainsString(localVocabs[4], "c")); +} + +// _____________________________________________________________________________ +TEST(AddCombinedRowToTable, verifyLocalVocabIsRetainedWhenNotMoving) { + auto outputTable = makeIdTableFromVector({}); + outputTable.setNumColumns(3); + ad_utility::AddCombinedRowToIdTable adder{ + 1, std::move(outputTable), + std::make_shared>(), 1}; + + IdTableWithVocab input1{makeIdTableFromVector({{0, 1}}), + createVocabWithSingleString("a")}; + IdTableWithVocab input2{makeIdTableFromVector({{0, 2}}), + createVocabWithSingleString("b")}; + + adder.setInput(input1, input2); + adder.addRow(0, 0); + adder.flush(); + adder.addRow(0, 0); + + LocalVocab localVocab = std::move(adder.localVocab()); + + EXPECT_TRUE(vocabContainsString(localVocab, "a")); + EXPECT_TRUE(vocabContainsString(localVocab, "b")); + EXPECT_THAT(localVocab.getAllWordsForTesting(), ::testing::SizeIs(2)); +} diff --git a/test/JoinTest.cpp b/test/JoinTest.cpp index c64c03d5d2..f1b24c2eb4 100644 --- a/test/JoinTest.cpp +++ b/test/JoinTest.cpp @@ -25,10 +25,11 @@ #include "engine/Values.h" #include "engine/ValuesForTesting.h" #include "engine/idTable/IdTable.h" -#include "global/RuntimeParameters.h" #include "util/Forward.h" #include "util/IndexTestHelpers.h" +#include "util/OperationTestHelpers.h" #include "util/Random.h" +#include "util/RuntimeParametersTestHelpers.h" #include "util/SourceLocation.h" using ad_utility::testing::makeAllocator; @@ -208,6 +209,13 @@ std::vector createJoinTestSet() { return myTestSet; } + +IdTable createIdTableOfSizeWithValue(size_t size, Id value) { + IdTable idTable{1, ad_utility::testing::makeAllocator()}; + idTable.resize(size); + std::ranges::fill(idTable.getColumn(0), value); + return idTable; +} } // namespace TEST(JoinTest, joinTest) { @@ -230,11 +238,29 @@ using ExpectedColumns = ad_utility::HashMap< std::pair, ColumnIndexAndTypeInfo::UndefStatus>>; // Test that the result of the `join` matches the `expected` outcome. -void testJoinOperation(Join& join, const ExpectedColumns& expected) { - auto res = join.getResult(); +// If `requestLaziness` is true, the join is requested to be lazy. If +// `expectLazinessParityWhenNonEmpty` is true, the laziness of the result is +// expected to be the same as `requestLaziness` if the result is not empty. +void testJoinOperation(Join& join, const ExpectedColumns& expected, + bool requestLaziness = false, + bool expectLazinessParityWhenNonEmpty = false, + ad_utility::source_location location = + ad_utility::source_location::current()) { + auto lt = generateLocationTrace(location); + auto res = join.getResult(false, requestLaziness + ? ComputationMode::LAZY_IF_SUPPORTED + : ComputationMode::FULLY_MATERIALIZED); const auto& varToCols = join.getExternallyVisibleVariableColumns(); EXPECT_EQ(varToCols.size(), expected.size()); - const auto& table = res->idTable(); + if (expectLazinessParityWhenNonEmpty && + (!res->isFullyMaterialized() || !res->idTable().empty())) { + EXPECT_EQ(res->isFullyMaterialized(), !requestLaziness); + } + IdTable table = + res->isFullyMaterialized() + ? res->idTable().clone() + : aggregateTables(std::move(res->idTables()), join.getResultWidth()) + .first; ASSERT_EQ(table.numColumns(), expected.size()); for (const auto& [var, columnAndStatus] : expected) { const auto& [colIndex, undefStatus] = varToCols.at(var); @@ -333,8 +359,9 @@ TEST(JoinTest, joinWithFullScanPSO) { TEST(JoinTest, joinWithColumnAndScan) { auto test = [](size_t materializationThreshold) { auto qec = ad_utility::testing::getQec("

1.

2. 3."); - RuntimeParameters().set<"lazy-index-scan-max-size-materialization">( - materializationThreshold); + auto cleanup = + setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">( + materializationThreshold); qec->getQueryTreeCache().clearAll(); auto fullScanPSO = ad_utility::makeExecutionTree( qec, PSO, SparqlTriple{Var{"?s"}, "

", Var{"?o"}}); @@ -365,8 +392,9 @@ TEST(JoinTest, joinWithColumnAndScan) { TEST(JoinTest, joinWithColumnAndScanEmptyInput) { auto test = [](size_t materializationThreshold) { auto qec = ad_utility::testing::getQec("

1.

2. 3."); - RuntimeParameters().set<"lazy-index-scan-max-size-materialization">( - materializationThreshold); + auto cleanup = + setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">( + materializationThreshold); qec->getQueryTreeCache().clearAll(); auto fullScanPSO = ad_utility::makeExecutionTree( qec, PSO, SparqlTriple{Var{"?s"}, "

", Var{"?o"}}); @@ -396,8 +424,9 @@ TEST(JoinTest, joinWithColumnAndScanEmptyInput) { TEST(JoinTest, joinWithColumnAndScanUndefValues) { auto test = [](size_t materializationThreshold) { auto qec = ad_utility::testing::getQec("

1.

2. 3."); - RuntimeParameters().set<"lazy-index-scan-max-size-materialization">( - materializationThreshold); + auto cleanup = + setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">( + materializationThreshold); qec->getQueryTreeCache().clearAll(); auto fullScanPSO = ad_utility::makeExecutionTree( qec, PSO, SparqlTriple{Var{"?s"}, "

", Var{"?o"}}); @@ -414,11 +443,20 @@ TEST(JoinTest, joinWithColumnAndScanUndefValues) { VariableToColumnMap expectedVariables{ {Variable{"?s"}, makeAlwaysDefinedColumn(0)}, {Variable{"?o"}, makeAlwaysDefinedColumn(1)}}; - testJoinOperation(join, makeExpectedColumns(expectedVariables, expected)); + auto expectedColumns = makeExpectedColumns(expectedVariables, expected); + + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, true, + materializationThreshold < 3); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, false); auto joinSwitched = Join{qec, valuesTree, fullScanPSO, 0, 0}; - testJoinOperation(joinSwitched, - makeExpectedColumns(expectedVariables, expected)); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(joinSwitched, expectedColumns, true, + materializationThreshold < 3); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(joinSwitched, expectedColumns, false); }; test(0); test(1); @@ -431,9 +469,9 @@ TEST(JoinTest, joinTwoScans) { auto test = [](size_t materializationThreshold) { auto qec = ad_utility::testing::getQec( "

1.

2. 3 . 4. 7. "); - RuntimeParameters().set<"lazy-index-scan-max-size-materialization">( - materializationThreshold); - qec->getQueryTreeCache().clearAll(); + auto cleanup = + setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">( + materializationThreshold); auto scanP = ad_utility::makeExecutionTree( qec, PSO, SparqlTriple{Var{"?s"}, "

", Var{"?o"}}); auto scanP2 = ad_utility::makeExecutionTree( @@ -448,11 +486,20 @@ TEST(JoinTest, joinTwoScans) { {Variable{"?s"}, makeAlwaysDefinedColumn(0)}, {Variable{"?q"}, makeAlwaysDefinedColumn(1)}, {Variable{"?o"}, makeAlwaysDefinedColumn(2)}}; - testJoinOperation(join, makeExpectedColumns(expectedVariables, expected)); + auto expectedColumns = makeExpectedColumns(expectedVariables, expected); + + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, true, + materializationThreshold <= 3); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, false); auto joinSwitched = Join{qec, scanP2, scanP, 0, 0}; - testJoinOperation(joinSwitched, - makeExpectedColumns(expectedVariables, expected)); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(joinSwitched, expectedColumns, true, + materializationThreshold <= 3); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(joinSwitched, expectedColumns, false); }; test(0); test(1); @@ -469,3 +516,243 @@ TEST(JoinTest, invalidJoinVariable) { ASSERT_ANY_THROW(Join(qec, valuesTree2, valuesTree, 0, 0)); } + +// _____________________________________________________________________________ +TEST(JoinTest, joinTwoLazyOperationsWithAndWithoutUndefValues) { + auto performJoin = [](std::vector leftTables, + std::vector rightTables, + const IdTable& expected, + bool expectPossiblyUndefinedResult, + ad_utility::source_location loc = + ad_utility::source_location::current()) { + auto l = generateLocationTrace(loc); + auto qec = ad_utility::testing::getQec(); + auto cleanup = + setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">( + 0); + auto leftTree = ad_utility::makeExecutionTree( + qec, std::move(leftTables), Vars{Variable{"?s"}}, false, + std::vector{0}); + auto rightTree = ad_utility::makeExecutionTree( + qec, std::move(rightTables), Vars{Variable{"?s"}}, false, + std::vector{0}); + VariableToColumnMap expectedVariables{ + {Variable{"?s"}, expectPossiblyUndefinedResult + ? makePossiblyUndefinedColumn(0) + : makeAlwaysDefinedColumn(0)}}; + auto expectedColumns = makeExpectedColumns(expectedVariables, expected); + auto join = Join{qec, leftTree, rightTree, 0, 0}; + EXPECT_EQ(join.getDescriptor(), "Join on ?s"); + + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, true, true); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, false); + + auto joinSwitched = Join{qec, rightTree, leftTree, 0, 0}; + qec->getQueryTreeCache().clearAll(); + testJoinOperation(joinSwitched, expectedColumns, true, true); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(joinSwitched, expectedColumns, false); + }; + auto U = Id::makeUndefined(); + std::vector leftTables; + std::vector rightTables; + IdTable expected1{1, ad_utility::makeUnlimitedAllocator()}; + performJoin(std::move(leftTables), std::move(rightTables), expected1, false); + + leftTables.push_back(makeIdTableFromVector({{U}})); + rightTables.push_back(makeIdTableFromVector({{U}})); + auto expected2 = makeIdTableFromVector({{U}}); + performJoin(std::move(leftTables), std::move(rightTables), expected2, true); + + leftTables.push_back(makeIdTableFromVector({{U}, {I(0)}})); + rightTables.push_back(makeIdTableFromVector({{U}})); + auto expected3 = makeIdTableFromVector({{U}, {I(0)}}); + performJoin(std::move(leftTables), std::move(rightTables), expected3, true); + + leftTables.push_back(makeIdTableFromVector({{U}, {I(0)}})); + leftTables.push_back(makeIdTableFromVector({{I(1)}})); + rightTables.push_back(makeIdTableFromVector({{I(0)}})); + auto expected4 = makeIdTableFromVector({{I(0)}, {I(0)}}); + performJoin(std::move(leftTables), std::move(rightTables), expected4, false); + + leftTables.push_back(makeIdTableFromVector({{U}, {I(0)}})); + leftTables.push_back(makeIdTableFromVector({{I(1)}})); + rightTables.push_back(IdTable{1, ad_utility::makeUnlimitedAllocator()}); + IdTable expected5{1, ad_utility::makeUnlimitedAllocator()}; + performJoin(std::move(leftTables), std::move(rightTables), expected5, false); + + leftTables.push_back(makeIdTableFromVector({{I(0)}})); + leftTables.push_back(makeIdTableFromVector({{I(1)}})); + rightTables.push_back(makeIdTableFromVector({{I(1)}})); + rightTables.push_back(makeIdTableFromVector({{I(2)}})); + auto expected6 = makeIdTableFromVector({{I(1)}}); + performJoin(std::move(leftTables), std::move(rightTables), expected6, false); + + leftTables.push_back(makeIdTableFromVector({{U}})); + leftTables.push_back(makeIdTableFromVector({{I(2)}})); + rightTables.push_back(createIdTableOfSizeWithValue(Join::CHUNK_SIZE, I(1))); + auto expected7 = createIdTableOfSizeWithValue(Join::CHUNK_SIZE, I(1)); + performJoin(std::move(leftTables), std::move(rightTables), expected7, false); +} + +// _____________________________________________________________________________ +TEST(JoinTest, joinLazyAndNonLazyOperationWithAndWithoutUndefValues) { + auto performJoin = [](IdTable leftTable, std::vector rightTables, + const IdTable& expected, + bool expectPossiblyUndefinedResult, + ad_utility::source_location loc = + ad_utility::source_location::current()) { + auto l = generateLocationTrace(loc); + auto qec = ad_utility::testing::getQec(); + auto cleanup = + setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">( + 0); + auto leftTree = + ad_utility::makeExecutionTree( + qec, std::move(leftTable), Vars{Variable{"?s"}}, false, + std::vector{0}, LocalVocab{}, std::nullopt, true); + auto rightTree = ad_utility::makeExecutionTree( + qec, std::move(rightTables), Vars{Variable{"?s"}}, false, + std::vector{0}); + VariableToColumnMap expectedVariables{ + {Variable{"?s"}, expectPossiblyUndefinedResult + ? makePossiblyUndefinedColumn(0) + : makeAlwaysDefinedColumn(0)}}; + auto expectedColumns = makeExpectedColumns(expectedVariables, expected); + auto join = Join{qec, leftTree, rightTree, 0, 0}; + EXPECT_EQ(join.getDescriptor(), "Join on ?s"); + + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, true); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, false); + + auto joinSwitched = Join{qec, rightTree, leftTree, 0, 0}; + qec->getQueryTreeCache().clearAll(); + testJoinOperation(joinSwitched, expectedColumns, true); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(joinSwitched, expectedColumns, false); + }; + auto U = Id::makeUndefined(); + std::vector rightTables; + rightTables.push_back(makeIdTableFromVector({{U}})); + auto expected1 = makeIdTableFromVector({{U}}); + performJoin(makeIdTableFromVector({{U}}), std::move(rightTables), expected1, + true); + + rightTables.push_back(makeIdTableFromVector({{U}})); + auto expected2 = makeIdTableFromVector({{U}, {I(0)}}); + performJoin(makeIdTableFromVector({{U}, {I(0)}}), std::move(rightTables), + expected2, true); + + rightTables.push_back(makeIdTableFromVector({{I(0)}})); + auto expected3 = makeIdTableFromVector({{I(0)}, {I(0)}}); + performJoin(makeIdTableFromVector({{U}, {I(0)}, {I(1)}}), + std::move(rightTables), expected3, false); + + rightTables.push_back(makeIdTableFromVector({{U}, {I(0)}})); + auto expected4 = makeIdTableFromVector({{I(0)}, {I(0)}, {I(1)}}); + performJoin(makeIdTableFromVector({{I(0)}, {I(1)}}), std::move(rightTables), + expected4, false); + + rightTables.push_back(IdTable{1, ad_utility::makeUnlimitedAllocator()}); + IdTable expected5{1, ad_utility::makeUnlimitedAllocator()}; + performJoin(makeIdTableFromVector({{U}, {I(0)}, {I(1)}}), + std::move(rightTables), expected5, false); + + rightTables.push_back(makeIdTableFromVector({{I(1)}})); + rightTables.push_back(makeIdTableFromVector({{I(2)}})); + auto expected6 = makeIdTableFromVector({{I(1)}}); + performJoin(makeIdTableFromVector({{I(0)}, {I(1)}}), std::move(rightTables), + expected6, false); + + rightTables.push_back(makeIdTableFromVector({{U}})); + rightTables.push_back(makeIdTableFromVector({{I(2)}})); + auto expected7 = createIdTableOfSizeWithValue(Join::CHUNK_SIZE, I(1)); + performJoin(createIdTableOfSizeWithValue(Join::CHUNK_SIZE, I(1)), + std::move(rightTables), expected7, false); +} + +// _____________________________________________________________________________ +TEST(JoinTest, errorInSeparateThreadIsPropagatedCorrectly) { + auto qec = ad_utility::testing::getQec(); + auto cleanup = + setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">(0); + auto leftTree = + ad_utility::makeExecutionTree(qec, Variable{"?s"}); + auto rightTree = ad_utility::makeExecutionTree( + qec, makeIdTableFromVector({{I(1)}}), Vars{Variable{"?s"}}, false, + std::vector{0}); + VariableToColumnMap expectedVariables{ + {Variable{"?s"}, makeAlwaysDefinedColumn(0)}}; + Join join{qec, leftTree, rightTree, 0, 0}; + + auto result = join.getResult(false, ComputationMode::LAZY_IF_SUPPORTED); + ASSERT_FALSE(result->isFullyMaterialized()); + + auto& idTables = result->idTables(); + AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE(idTables.begin(), + testing::StrEq("AlwaysFailOperation"), + std::runtime_error); +} + +// _____________________________________________________________________________ +TEST(JoinTest, verifyColumnPermutationsAreAppliedCorrectly) { + auto qec = + ad_utility::testing::getQec("

.

. ."); + auto cleanup = + setRuntimeParameterForTest<"lazy-index-scan-max-size-materialization">(0); + auto U = Id::makeUndefined(); + { + auto leftTree = ad_utility::makeExecutionTree( + qec, makeIdTableFromVector({{U, I(1), U}, {U, I(3), U}}), + Vars{Variable{"?t"}, Variable{"?s"}, Variable{"?u"}}, false, + std::vector{1}); + auto rightTree = ad_utility::makeExecutionTree( + qec, makeIdTableFromVector({{U, I(10), I(1)}, {U, U, I(2)}}), + Vars{Variable{"?v"}, Variable{"?w"}, Variable{"?s"}}, false, + std::vector{2}); + VariableToColumnMap expectedVariables{ + {Variable{"?s"}, makeAlwaysDefinedColumn(0)}, + {Variable{"?t"}, makePossiblyUndefinedColumn(1)}, + {Variable{"?u"}, makePossiblyUndefinedColumn(2)}, + {Variable{"?v"}, makePossiblyUndefinedColumn(3)}, + {Variable{"?w"}, makePossiblyUndefinedColumn(4)}}; + auto expected = makeIdTableFromVector({{I(1), U, U, U, I(10)}}); + auto expectedColumns = makeExpectedColumns(expectedVariables, expected); + auto join = Join{qec, leftTree, rightTree, 1, 2}; + EXPECT_EQ(join.getDescriptor(), "Join on ?s"); + + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, true, true); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, false); + } + { + auto leftTree = ad_utility::makeExecutionTree( + qec, makeIdTableFromVector({{I(1), I(2), U}}), + Vars{Variable{"?p"}, Variable{"?q"}, Variable{"?s"}}, false, + std::vector{2}, LocalVocab{}, std::nullopt, true); + auto fullScanPSO = ad_utility::makeExecutionTree( + qec, PSO, SparqlTriple{Variable{"?s"}, "

", Var{"?o"}}); + VariableToColumnMap expectedVariables{ + {Variable{"?s"}, makeAlwaysDefinedColumn(0)}, + {Variable{"?p"}, makeAlwaysDefinedColumn(1)}, + {Variable{"?q"}, makeAlwaysDefinedColumn(2)}, + {Variable{"?o"}, makeAlwaysDefinedColumn(3)}}; + auto id = ad_utility::testing::makeGetId(qec->getIndex()); + auto expected = + makeIdTableFromVector({{id(""), I(1), I(2), id("")}, + {id(""), I(1), I(2), id("")}}); + auto expectedColumns = makeExpectedColumns(expectedVariables, expected); + auto join = Join{qec, leftTree, fullScanPSO, 2, 0}; + EXPECT_EQ(join.getDescriptor(), "Join on ?s"); + + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, true, true); + qec->getQueryTreeCache().clearAll(); + testJoinOperation(join, expectedColumns, false); + } +} diff --git a/test/util/OperationTestHelpers.h b/test/util/OperationTestHelpers.h index a2d0fdc816..14c455960c 100644 --- a/test/util/OperationTestHelpers.h +++ b/test/util/OperationTestHelpers.h @@ -87,21 +87,35 @@ class ShallowParentOperation : public Operation { // Operation that will throw on `computeResult` for testing. class AlwaysFailOperation : public Operation { + std::optional variable_ = std::nullopt; + std::vector getChildren() override { return {}; } - string getCacheKeyImpl() const override { AD_FAIL(); } + string getCacheKeyImpl() const override { + // Because this operation always fails, it should never be cached. + return "AlwaysFailOperationCacheKey"; + } string getDescriptor() const override { return "AlwaysFailOperationDescriptor"; } - size_t getResultWidth() const override { return 0; } + size_t getResultWidth() const override { return 1; } size_t getCostEstimate() override { return 0; } uint64_t getSizeEstimateBeforeLimit() override { return 0; } float getMultiplicity([[maybe_unused]] size_t) override { return 0; } bool knownEmptyResult() override { return false; } - vector resultSortedOn() const override { return {}; } - VariableToColumnMap computeVariableToColumnMap() const override { return {}; } + vector resultSortedOn() const override { return {0}; } + VariableToColumnMap computeVariableToColumnMap() const override { + if (!variable_.has_value()) { + return {}; + } + return {{variable_.value(), + ColumnIndexAndTypeInfo{ + 0, ColumnIndexAndTypeInfo::UndefStatus::AlwaysDefined}}}; + } public: using Operation::Operation; + AlwaysFailOperation(QueryExecutionContext* qec, Variable variable) + : Operation{qec}, variable_{std::move(variable)} {} ProtoResult computeResult(bool requestLaziness) override { if (!requestLaziness) { throw std::runtime_error{"AlwaysFailOperation"}; diff --git a/test/util/RuntimeParametersTestHelpers.h b/test/util/RuntimeParametersTestHelpers.h new file mode 100644 index 0000000000..9b75aaa350 --- /dev/null +++ b/test/util/RuntimeParametersTestHelpers.h @@ -0,0 +1,20 @@ +// Copyright 2024, University of Freiburg, +// Chair of Algorithms and Data Structures. +// Author: Robin Textor-Falconi + +#pragma once + +#include + +#include "global/RuntimeParameters.h" + +// Set a runtime parameter to a specific value for the duration of the current +// scope. The original value is restored when the scope is left. +template +[[nodiscard]] auto setRuntimeParameterForTest(Value&& value) { + auto originalValue = RuntimeParameters().get(); + RuntimeParameters().set(AD_FWD(value)); + return absl::Cleanup{[originalValue = std::move(originalValue)]() { + RuntimeParameters().set(originalValue); + }}; +}