Skip to content

Commit

Permalink
Changes from a review.
Browse files Browse the repository at this point in the history
  • Loading branch information
joka921 committed Jan 11, 2024
1 parent 4bcbb6c commit 53d954a
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 124 deletions.
50 changes: 29 additions & 21 deletions src/engine/AddCombinedRowToTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ template <std::invocable<IdTable&> BlockwiseCallback = ad_utility::Noop>
class AddCombinedRowToIdTable {
std::vector<size_t> numUndefinedPerColumn_;
size_t numJoinColumns_;
std::optional<std::array<IdTableView<0>, 2>> inputs_;
std::optional<std::array<IdTableView<0>, 2>> inputLeftAndRight_;
IdTable resultTable_;

// This struct stores the information, which row indices from the input are
Expand All @@ -38,9 +38,9 @@ class AddCombinedRowToIdTable {
// Store the indices that have not yet been written.
std::vector<TargetIndexAndRowIndices> indexBuffer_;

// Store the information, which row index from the first input is written to a
// Store the information, which row index from the left input is written to a
// given index in the output. This is used for OPTIONAL joins where there are
// rows that have no counterpart in the second input.
// rows that have no counterpart in the right input.
struct TargetIndexAndRowIndex {
size_t targetIndex_;
size_t rowIndex_;
Expand All @@ -59,7 +59,9 @@ class AddCombinedRowToIdTable {
// materialized and written to the result in one go.
size_t bufferSize_ = 100'000;

// TODO<joka921> Comment
// This callback is called with the result as an argument each time `flush()`
// is called. It can be used to consume parts of the result early, before the
// complete operation has finished.
[[no_unique_address]] BlockwiseCallback blockwiseCallback_{};

public:
Expand All @@ -71,7 +73,7 @@ class AddCombinedRowToIdTable {
BlockwiseCallback blockwiseCallback = {})
: numUndefinedPerColumn_(output.numColumns()),
numJoinColumns_{numJoinColumns},
inputs_{std::array{std::move(input1), std::move(input2)}},
inputLeftAndRight_{std::array{std::move(input1), std::move(input2)}},
resultTable_{std::move(output)},
bufferSize_{bufferSize},
blockwiseCallback_{std::move(blockwiseCallback)} {
Expand All @@ -87,7 +89,7 @@ class AddCombinedRowToIdTable {
BlockwiseCallback blockwiseCallback = {})
: numUndefinedPerColumn_(output.numColumns()),
numJoinColumns_{numJoinColumns},
inputs_{std::nullopt},
inputLeftAndRight_{std::nullopt},
resultTable_{std::move(output)},
bufferSize_{bufferSize},
blockwiseCallback_{std::move(blockwiseCallback)} {
Expand All @@ -103,7 +105,7 @@ class AddCombinedRowToIdTable {
// The next free row in the output will be created from
// `inputLeft_[rowIndexA]` and `inputRight_[rowIndexB]`.
void addRow(size_t rowIndexA, size_t rowIndexB) {
AD_EXPENSIVE_CHECK(inputs_.has_value());
AD_EXPENSIVE_CHECK(inputLeftAndRight_.has_value());
indexBuffer_.push_back(
TargetIndexAndRowIndices{nextIndex_, {rowIndexA, rowIndexB}});
++nextIndex_;
Expand All @@ -127,14 +129,16 @@ class AddCombinedRowToIdTable {
}
};
if (nextIndex_ != 0) {
AD_CORRECTNESS_CHECK(inputs_.has_value());
AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value());
flush();
}
inputs_ = std::array{toView(inputLeft), toView(inputRight)};
inputLeftAndRight_ = std::array{toView(inputLeft), toView(inputRight)};
checkNumColumns();
}

void setLeftInput(const auto& inputLeft) {
// Only set the left input. After this it is only allowed to call
// `addOptionalRow` and not `addRow` until `setInput` has been called again.
void setOnlyLeftInputForOptionalJoin(const auto& inputLeft) {
auto toView = []<typename T>(const T& table) {
if constexpr (requires { table.template asStaticView<0>(); }) {
return table.template asStaticView<0>();
Expand All @@ -143,11 +147,11 @@ class AddCombinedRowToIdTable {
}
};
if (nextIndex_ != 0) {
AD_CORRECTNESS_CHECK(inputs_.has_value());
AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value());
flush();
}

Check warning on line 152 in src/engine/AddCombinedRowToTable.h

View check run for this annotation

Codecov / codecov/patch

src/engine/AddCombinedRowToTable.h#L150-L152

Added lines #L150 - L152 were not covered by tests
// TODO<joka921> This is rather unsafe, we should think of something better.
inputs_ = std::array{
// The right input will be empty, but with the correct number of columns.
inputLeftAndRight_ = std::array{
toView(inputLeft),
IdTableView<0>{resultTable_.numColumns() -
toView(inputLeft).numColumns() + numJoinColumns_,
Expand All @@ -158,7 +162,7 @@ class AddCombinedRowToIdTable {
// `inputLeft_[rowIndexA]`. The columns from `inputRight_` will all be set to
// UNDEF
void addOptionalRow(size_t rowIndexA) {
AD_EXPENSIVE_CHECK(inputs_.has_value());
AD_EXPENSIVE_CHECK(inputLeftAndRight_.has_value());
optionalIndexBuffer_.push_back(
TargetIndexAndRowIndex{nextIndex_, rowIndexA});
++nextIndex_;
Expand Down Expand Up @@ -200,7 +204,7 @@ class AddCombinedRowToIdTable {
if (nextIndex_ == 0) {
return;
}
AD_CORRECTNESS_CHECK(inputs_.has_value());
AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value());
result.resize(oldSize + nextIndex_);

// Sometimes columns are combined where one value is UNDEF and the other one
Expand Down Expand Up @@ -233,7 +237,7 @@ class AddCombinedRowToIdTable {
resultCol[oldSize + targetIndex] = resultId;
}

// Write the optional rows. For the second input those are always
// Write the optional rows. For the right input those are always
// undefined.
for (const auto& [targetIndex, sourceIndex] : optionalIndexBuffer_) {
Id id = colLeft[sourceIndex];
Expand Down Expand Up @@ -264,7 +268,7 @@ class AddCombinedRowToIdTable {
resultCol[oldSize + targetIndex] = resultId;
}

// Write the optional rows. For the second input those are always
// Write the optional rows. For the right input those are always
// undefined.
for (const auto& [targetIndex, sourceIndex] : optionalIndexBuffer_) {
Id id = [&col, sourceIndex = sourceIndex]() {
Expand All @@ -288,13 +292,13 @@ class AddCombinedRowToIdTable {
++nextResultColIdx;
}

// Then the remaining columns from the first input.
// Then the remaining columns from the left input.
for (size_t col = numJoinColumns_; col < inputLeft().numColumns(); ++col) {
writeNonJoinColumn.template operator()<true>(col, nextResultColIdx);
++nextResultColIdx;
}

// Then the remaining columns from the second input.
// Then the remaining columns from the right input.
for (size_t col = numJoinColumns_; col < inputRight().numColumns(); col++) {
writeNonJoinColumn.template operator()<false>(col, nextResultColIdx);
++nextResultColIdx;
Expand All @@ -305,9 +309,13 @@ class AddCombinedRowToIdTable {
nextIndex_ = 0;
std::invoke(blockwiseCallback_, result);
}
const IdTableView<0>& inputLeft() const { return inputs_.value()[0]; }
const IdTableView<0>& inputLeft() const {
return inputLeftAndRight_.value()[0];
}

const IdTableView<0>& inputRight() const { return inputs_.value()[1]; }
const IdTableView<0>& inputRight() const {
return inputLeftAndRight_.value()[1];
}

void checkNumColumns() const {
AD_CONTRACT_CHECK(inputLeft().numColumns() >= numJoinColumns_);
Expand Down
127 changes: 71 additions & 56 deletions src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,61 +125,82 @@ std::unique_ptr<TurtleParserBase> IndexImpl::makeTurtleParser(
}
}

// Several helper functions for joining the OSP with the patterns.
namespace {
static auto lazyScanWithPermutedPatterns(auto& sorterPtr, auto columnIndices) {
auto setSubset = [columnIndices](auto& idTable) {
idTable.setColumnSubset(columnIndices);
};
return ad_utility::repeatedTransformView(
sorterPtr->template getSortedBlocks<0>(), setSubset);
}

static auto lazyOptionalJoinOnFirstColumn(auto&& leftInput, auto&& rightInput,
auto resultCallback) {
auto projection = [](const auto& row) -> Id { return row[0]; };
auto compareProjection = []<typename T>(const T& row) {
if constexpr (ad_utility::SimilarTo<T, Id>) {
return row;
} else {
return row[0];
}
};
auto comparator = [&compareProjection](const auto& l, const auto& r) {
return compareProjection(l) < compareProjection(r);
};

IdTable outputTable{5, ad_utility::makeUnlimitedAllocator<Id>()};
auto rowAdder = ad_utility::AddCombinedRowToIdTable<decltype(resultCallback)>{
1, std::move(outputTable), 100'000, resultCallback};

ad_utility::zipperJoinForBlocksWithoutUndef(leftInput, rightInput, comparator,
rowAdder, projection, projection,
std::true_type{});
rowAdder.flush();
}

auto fixBlockAfterPatternJoin(auto block) {
block.value().setColumnSubset(std::array<ColumnIndex, 5>{2, 1, 0, 3, 4});
std::ranges::for_each(block.value().getColumn(4), [](Id& id) {
id = id.isUndefined() ? Id::makeFromInt(NO_PATTERN) : id;
});
return std::move(block.value()).template toStatic<0>();
}
} // namespace

// ____________________________________________________________________________
std::unique_ptr<ExternalSorter<SortByPSO, 5>> IndexImpl::buildOspWithPatterns(
PatternCreatorNew::TripleOutput patternOutput, auto isQleverInternalId) {
PatternCreatorNew::TripleSorter patternOutput, auto isQleverInternalId) {
auto&& [patternsPSO, secondSorter] = patternOutput;
auto setSubset = [](auto& idTable) {
idTable.setColumnSubset(std::array<ColumnIndex, 2>{0, 2});
};
auto lazyPatternScan = ad_utility::repeatedTransformView(
ad_utility::OwningView{patternsPSO->template getSortedBlocks<0>()},
setSubset);
auto lazyPatternScan = lazyScanWithPermutedPatterns(
patternsPSO, std::array<ColumnIndex, 2>{0, 2});
ad_utility::data_structures::ThreadSafeQueue<IdTable> queue{4};
ad_utility::JThread joinWithPatternThread{[&] {
IdTable outputBufferTable{5, ad_utility::makeUnlimitedAllocator<Id>()};

auto setOspSubset = [](auto& idTable) {
idTable.setColumnSubset(std::array<ColumnIndex, 4>{2, 1, 0, 3});
};
auto ospAsBlocksTransformed = ad_utility::repeatedTransformView(
secondSorter->template getSortedBlocks<0>(), setOspSubset);
auto projection = [](const auto& row) -> Id { return row[0]; };
auto compareProjection = []<typename T>(const T& row) {
if constexpr (ad_utility::SimilarTo<T, Id>) {
return row;
} else {
return row[0];
}
};
auto comparator = [&compareProjection](const auto& l, const auto& r) {
return compareProjection(l) < compareProjection(r);
};
auto pushToQueue = [&, bufferSize = BUFFER_SIZE_JOIN_PATTERNS_WITH_OSP().load()](IdTable& table) {
if (table.numRows() >= bufferSize) {
if (!outputBufferTable.empty()) {
queue.push(std::move(outputBufferTable));
outputBufferTable.clear();
}
queue.push(std::move(table));
} else {
outputBufferTable.insertAtEnd(table.begin(), table.end());
if (outputBufferTable.size() >= bufferSize) {
queue.push(std::move(outputBufferTable));
outputBufferTable.clear();
}
}
table.clear();
};

IdTable outputTable{5, ad_utility::makeUnlimitedAllocator<Id>()};
auto rowAdder = ad_utility::AddCombinedRowToIdTable<decltype(pushToQueue)>{
1, std::move(outputTable), 100'000, pushToQueue};
auto ospAsBlocksTransformed = lazyScanWithPermutedPatterns(
secondSorter, std::array<ColumnIndex, 4>{2, 1, 0, 3});
auto pushToQueue =
[&, bufferSize =
BUFFER_SIZE_JOIN_PATTERNS_WITH_OSP().load()](IdTable& table) {
if (table.numRows() >= bufferSize) {
if (!outputBufferTable.empty()) {
queue.push(std::move(outputBufferTable));
outputBufferTable.clear();
}
queue.push(std::move(table));
} else {
outputBufferTable.insertAtEnd(table.begin(), table.end());
if (outputBufferTable.size() >= bufferSize) {
queue.push(std::move(outputBufferTable));
outputBufferTable.clear();
}
}
table.clear();
};

ad_utility::zipperJoinForBlocksWithoutUndef(
ospAsBlocksTransformed, lazyPatternScan, comparator, rowAdder,
projection, projection, std::true_type{});
rowAdder.flush();
lazyOptionalJoinOnFirstColumn(ospAsBlocksTransformed, lazyPatternScan,
pushToQueue);
if (!outputBufferTable.empty()) {
queue.push(std::move(outputBufferTable));
outputBufferTable.clear();
Expand All @@ -190,13 +211,7 @@ std::unique_ptr<ExternalSorter<SortByPSO, 5>> IndexImpl::buildOspWithPatterns(
auto blockGenerator =
[](auto& queue) -> cppcoro::generator<IdTableStatic<0>> {
while (auto block = queue.pop()) {
block.value().setColumnSubset(std::array<ColumnIndex, 5>{2, 1, 0, 3, 4});
std::ranges::for_each(block.value().getColumn(4), [](Id& id) {
id = id.isUndefined() ? Id::makeFromInt(NO_PATTERN) : id;
});
IdTableStatic<0> staticBlock =
std::move(block.value()).template toStatic<0>();
co_yield staticBlock;
co_yield fixBlockAfterPatternJoin(std::move(block));
}
}(queue);

Expand Down Expand Up @@ -1492,13 +1507,13 @@ void IndexImpl::createPSOAndPOS(size_t numColumns, auto& isInternalId,
// _____________________________________________________________________________
template <typename... NextSorter>
requires(sizeof...(NextSorter) <= 1)
std::optional<PatternCreatorNew::TripleOutput> IndexImpl::createSPOAndSOP(
std::optional<PatternCreatorNew::TripleSorter> IndexImpl::createSPOAndSOP(
size_t numColumns, auto& isInternalId, BlocksOfTriples sortedTriples,
NextSorter&&... nextSorter) {
size_t numSubjectsNormal = 0;
auto numSubjectCounter =
makeNumDistinctIdsCounter<0>(numSubjectsNormal, isInternalId);
std::optional<PatternCreatorNew::TripleOutput> result;
std::optional<PatternCreatorNew::TripleSorter> result;
if (usePatterns_) {
// We will return the next sorter.
AD_CORRECTNESS_CHECK(sizeof...(nextSorter) == 0);
Expand Down
6 changes: 3 additions & 3 deletions src/index/IndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ class IndexImpl {
// metadata. Also builds the patterns if specified.
template <typename... NextSorter>
requires(sizeof...(NextSorter) <= 1)
std::optional<PatternCreatorNew::TripleOutput> createSPOAndSOP(
std::optional<PatternCreatorNew::TripleSorter> createSPOAndSOP(
size_t numColumns, auto& isInternalId, BlocksOfTriples sortedTriples,
NextSorter&&... nextSorter);
// Create the OSP and OPS permutations. Additionally, count the number of
Expand Down Expand Up @@ -777,7 +777,7 @@ class IndexImpl {
// of only two permutations (where we have to build the Pxx permutations). In
// all other cases the Sxx permutations are built first because we need the
// patterns.
std::optional<PatternCreatorNew::TripleOutput> createFirstPermutationPair(
std::optional<PatternCreatorNew::TripleSorter> createFirstPermutationPair(
auto&&... args) {
static_assert(std::is_same_v<FirstPermutation, SortBySPO>);
static_assert(std::is_same_v<SecondPermutation, SortByOSP>);
Expand All @@ -800,5 +800,5 @@ class IndexImpl {
}

std::unique_ptr<ExternalSorter<SortByPSO, 5>> buildOspWithPatterns(
PatternCreatorNew::TripleOutput patternOutput, auto isQLeverInternalId);
PatternCreatorNew::TripleSorter patternOutput, auto isQLeverInternalId);
};
2 changes: 1 addition & 1 deletion src/index/PatternCreator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void PatternCreatorNew::finishSubject(VocabIndex subjectIndex,

auto additionalTriple = std::array{Id::makeFromVocabIndex(subjectIndex),
hasPatternId, Id::makeFromInt(patternId)};
tripleOutput_.hasPatternAsPSO_->push(additionalTriple);
tripleOutput_.hasPatternPredicateSortedByPSO_->push(additionalTriple);
auto curSubject = Id::makeFromVocabIndex(currentSubjectIndex_.value());
std::ranges::for_each(tripleBuffer_, [this, patternId,
&curSubject](const auto& t) {
Expand Down
12 changes: 6 additions & 6 deletions src/index/PatternCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ class PatternCreatorNew {
ad_utility::CompressedExternalIdTableSorter<SortByOSP, 4>;

// Combine all the triples that this pattern creator creates.
struct TripleOutput {
std::unique_ptr<PSOSorter> hasPatternAsPSO_;
std::unique_ptr<OSPSorter4Cols> ospSorterWithSubjectPatterns_;
struct TripleSorter {
std::unique_ptr<PSOSorter> hasPatternPredicateSortedByPSO_;
std::unique_ptr<OSPSorter4Cols> triplesWithSubjectPatternsSortedByOsp_;
};

private:
Expand Down Expand Up @@ -110,7 +110,7 @@ class PatternCreatorNew {
bool isInternal_;
};
ad_utility::BufferedVector<TripleAndIsInternal> tripleBuffer_;
TripleOutput tripleOutput_;
TripleSorter tripleOutput_;

// The predicates which have already occured in one of the patterns. Needed to
// count the number of distinct predicates.
Expand Down Expand Up @@ -169,7 +169,7 @@ class PatternCreatorNew {
CompactVectorOfStrings<Id>& patterns);

// Move out the sorted triples after finishing creating the patterns.
TripleOutput&& getTripleOutput() && {
TripleSorter&& getTripleOutput() && {
finish();
return std::move(tripleOutput_);
}
Expand All @@ -180,7 +180,7 @@ class PatternCreatorNew {
void printStatistics(PatternStatistics patternStatistics) const;

auto& ospSorterTriplesWithPattern() {
return *tripleOutput_.ospSorterWithSubjectPatterns_;
return *tripleOutput_.triplesWithSubjectPatternsSortedByOsp_;
}
};

Expand Down
Loading

0 comments on commit 53d954a

Please sign in to comment.