Skip to content

Commit

Permalink
Get rid of another unused copy.
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Kalmbach <[email protected]>
  • Loading branch information
joka921 committed Feb 19, 2025
1 parent 7fed85e commit 50b7dc4
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 75 deletions.
28 changes: 17 additions & 11 deletions src/engine/NamedQueryCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,35 @@

// _____________________________________________________________________________
std::shared_ptr<ValuesForTesting> NamedQueryCache ::getOperation(
const Key& key, QueryExecutionContext* ctx) const {
const auto& [table, map, sortedOn] = get(key);
// TODO<joka921> we should get rid of the copies for the IdTable (and
// probably the other members) especially for larger results).
return std::make_shared<ValuesForTesting>(ctx, table.clone(), map, sortedOn);
const Key& key, QueryExecutionContext* ctx) {
const auto& ptr = get(key);
const auto& [table, map, sortedOn] = *ptr;
// TODO<joka921> Add a local vocab, and consider also passing a shared_ptr for
// the local vocab.
return std::make_shared<ValuesForTesting>(ctx, table, map, sortedOn);
}

// _____________________________________________________________________________
auto NamedQueryCache::get(const Key& key) const -> const Value& {
auto NamedQueryCache::get(const Key& key) -> std::shared_ptr<const Value> {
auto l = cache_.wlock();
auto it = l->find(key);
if (it == l->end()) {
if (!l->contains(key)) {
throw std::runtime_error{
absl::StrCat("The named query with the name \"", key,
"\" was not pinned to the named query cache")};
}
return it->second;
return (*l)[key];
}

// _____________________________________________________________________________
void NamedQueryCache::store(const Key& key, Value value) {
(*cache_.wlock()).insert_or_assign(key, std::move(value));
// TODO<joka921> Check the overwrite semantics of the cache class.
cache_.wlock()->insert(key, std::move(value));
}

// _____________________________________________________________________________
void NamedQueryCache::clear() { cache_.wlock()->clear(); }
void NamedQueryCache::clear() { cache_.wlock()->clearAll(); }

// _____________________________________________________________________________
size_t NamedQueryCache::numEntries() const {
return cache_.rlock()->numNonPinnedEntries();
}
27 changes: 18 additions & 9 deletions src/engine/NamedQueryCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,30 @@
#pragma once

#include "engine/ValuesForTesting.h"
#include "util/Cache.h"
#include "util/Synchronized.h"

// A simple threadsafe cache that associates query results with an explicit
// A simple thread-safe cache that associates query results with an explicit
// name.
class NamedQueryCache {
public:
// The cache value. It stores all the information required to construct a
// proper `QueryExecutionTree` later on.
// TODO<joka921> We definitely need the local vocab here...
struct Value {
IdTable result_;
std::shared_ptr<const IdTable> result_;
VariableToColumnMap varToColMap_;
std::vector<ColumnIndex> resultSortedOn_;
};

// TODO<joka921> Use a better size getter for better statistics.
struct ValueSizeGetter {
ad_utility::MemorySize operator()(const Value&) {
return ad_utility::MemorySize::bytes(1);
}
};
using Key = std::string;
using Cache = ad_utility::HashMap<std::string, Value>;
using Cache = ad_utility::LRUCache<Key, Value, ValueSizeGetter>;

private:
ad_utility::Synchronized<Cache> cache_;
Expand All @@ -31,16 +40,16 @@ class NamedQueryCache {
// Clear the cache.
void clear();

// Get the number of cached queries.
size_t numEntries() const;

// Retrieve the query result that is associated with the `key`.
// Throw an exception if the `key` doesn't exist.
const Value& get(const Key& key) const;
std::shared_ptr<const Value> get(const Key& key);

// Retrieve the query result with the given `key` and convert it into an
// explicit `ValuesForTesting` operation that can be used as part of a
// `QueryExecutionTree`.
// TODO<joka921> This can be done more efficiently if we implement a dedicated
// operation for this use case, `ValuesForTesting` currently incurs one
// (unneeded) copy per query execution.
std::shared_ptr<ValuesForTesting> getOperation(
const Key& key, QueryExecutionContext* ctx) const;
std::shared_ptr<ValuesForTesting> getOperation(const Key& key,
QueryExecutionContext* ctx);
};
7 changes: 4 additions & 3 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,10 @@ std::shared_ptr<const Result> Operation::getResult(
// return the result, but only pin it.
const auto& actualResult = result._resultPointer->resultTable();
AD_CORRECTNESS_CHECK(actualResult.isFullyMaterialized());
auto t = NamedQueryCache::Value(actualResult.idTable().clone(),
getExternallyVisibleVariableColumns(),
actualResult.sortedBy());
// TODO<joka921> probably we don't need to `clone()` the IdTable here.
auto t = NamedQueryCache::Value(
std::make_shared<const IdTable>(actualResult.idTable().clone()),
getExternallyVisibleVariableColumns(), actualResult.sortedBy());
_executionContext->namedQueryCache().store(name, std::move(t));

runtimeInfo().addDetail("pinned-with-explicit-name", name);
Expand Down
72 changes: 24 additions & 48 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ CPP_template_2(typename RequestT, typename ResponseT)(
logCommand(cmd, "clear cache completely (including unpinned elements)");
cache_.clearAll();
response = createJsonResponse(composeCacheStatsJson(), request);
} else if (auto cmd = checkParameter("cmd", "clear-named-cache")) {
requireValidAccessToken("clear-named-cache");
logCommand(cmd, "clear the cache for named queries");
namedQueryCache_.clear();
response = createJsonResponse(composeCacheStatsJson(), request);
} else if (auto cmd = checkParameter("cmd", "clear-delta-triples")) {
requireValidAccessToken("clear-delta-triples");
logCommand(cmd, "clear delta triples");
Expand Down Expand Up @@ -380,7 +385,8 @@ CPP_template_2(typename RequestT, typename ResponseT)(
queryHub_, request, std::invoke(opFieldString, op));
auto [parsedOperation, qec, cancellationHandle,
cancelTimeoutOnDestruction] =
parseOperation(messageSender, parameters, op, timeLimit.value());
parseOperation(messageSender, parameters, op, timeLimit.value(),
accessTokenOk);
if (pred(parsedOperation)) {
throw std::runtime_error(
absl::StrCat(msg, parsedOperation._originalString));
Expand Down Expand Up @@ -491,7 +497,8 @@ CPP_template_2(typename Operation)(
requires QueryOrUpdate<Operation>) auto Server::
parseOperation(ad_utility::websocket::MessageSender& messageSender,
const ad_utility::url_parser::ParamValueMap& params,
const Operation& operation, TimeLimit timeLimit) {
const Operation& operation, TimeLimit timeLimit,
bool accessTokenOk) {
// The operation string was to be copied, do it here at the beginning.
const auto [operationName, operationSPARQL] =
[&operation]() -> std::pair<std::string_view, std::string> {
Expand All @@ -509,13 +516,24 @@ CPP_template_2(typename Operation)(
// Do the query planning. This creates a `QueryExecutionTree`, which will
// then be used to process the query.
auto [pinSubtrees, pinResult] = determineResultPinning(params);
std::optional<std::string> pinNamed =
ad_utility::url_parser::checkParameter(params, "pin-named-query", {});
LOG(INFO) << "Processing the following " << operationName << ":"
<< (pinResult ? " [pin result]" : "")
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< (pinNamed ? absl::StrCat(" [pin named as ]", pinNamed.value())
: "")
<< operationSPARQL << std::endl;
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
sortPerformanceEstimator_, &namedQueryCache_,
std::ref(messageSender), pinSubtrees, pinResult);
if (pinNamed.has_value()) {
if (!accessTokenOk) {
throw std::runtime_error(
"The pinning of named queries requires a valid access token");
}
qec.pinWithExplicitName() = std::move(pinNamed);
}
ParsedQuery parsedQuery =
SparqlParser::parseQuery(std::move(operationSPARQL));
// SPARQL Protocol 2.1.4 specifies that the dataset from the query
Expand Down Expand Up @@ -624,6 +642,7 @@ nlohmann::json Server::composeCacheStatsJson() const {
nlohmann::json result;
result["num-non-pinned-entries"] = cache_.numNonPinnedEntries();
result["num-pinned-entries"] = cache_.numPinnedEntries();
result["num-named-queries"] = namedQueryCache_.numEntries();

// TODO Get rid of the `getByte()`, once `MemorySize` has it's own json
// converter.
Expand Down Expand Up @@ -764,45 +783,6 @@ CPP_template_2(typename RequestT, typename ResponseT)(
PlannedQuery plannedQuery =
co_await planQuery(queryThreadPool_, std::move(query), requestTimer,
timeLimit, qec, cancellationHandle);
ad_utility::websocket::MessageSender messageSender =
createMessageSender(queryHub_, request, query.query_);
auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

// Figure out, whether the query is to be pinned in the cache (either
// implicitly, or explicitly as a named query).
auto [pinSubtrees, pinResult] = determineResultPinning(params);
std::optional<std::string> pinNamed =
ad_utility::url_parser::checkParameter(params, "pin-named-query", {});
LOG(INFO) << "Processing the following SPARQL query:"
<< (pinResult ? " [pin result]" : "")
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< (pinNamed ? absl::StrCat(" [pin named as ]", pinNamed.value())
: "")
<< "\n"
<< query.query_ << std::endl;
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, &namedQueryCache_,
std::ref(messageSender), pinSubtrees, pinResult);

// The usage of an `optional` here is required because of a limitation in
// Boost::Asio which forces us to use default-constructible result types with
// `computeInNewThread`. We also can't unwrap the optional directly in this
// function, because then the conan build fails in a very strange way,
// probably related to issues in GCC's coroutine implementation.
// For the same reason (crashes in the conanbuild) we store the coroutine in
// an explicit variable instead of directly `co_await`-ing it.
auto coroutine = computeInNewThread(
queryThreadPool_,
[this, &query, &qec, cancellationHandle, &timeLimit,
&requestTimer]() -> std::optional<PlannedQuery> {
return setupPlannedQuery(query.datasetClauses_, query.query_, qec,
cancellationHandle, timeLimit, requestTimer);
},
cancellationHandle);
auto plannedQueryOpt = co_await std::move(coroutine);
AD_CORRECTNESS_CHECK(plannedQueryOpt.has_value());
auto plannedQuery = std::move(plannedQueryOpt).value();
auto qet = plannedQuery.queryExecutionTree_;

// Read the export limit from the send` parameter (historical name). This
Expand All @@ -824,11 +804,6 @@ CPP_template_2(typename RequestT, typename ResponseT)(
qet.getRootOperation()->getLimit()._offset);
limitOffset._offset -= qet.getRootOperation()->getLimit()._offset;

if (pinNamed.has_value()) {
// TODO<joka921> 1. Make this require a valid access token. 2. also allow
// for clearing the cache.
qec.pinWithExplicitName() = pinNamed.value();
}
// This actually processes the query and sends the result in the requested
// format.
co_await sendStreamableResponse(request, send, mediaType, plannedQuery, qet,
Expand Down Expand Up @@ -935,6 +910,7 @@ json Server::processUpdateImpl(
// update anyway (The index of the located triples snapshot is part of the
// cache key).
cache_.clearAll();
namedQueryCache_.clear();

return createResponseMetadataForUpdate(requestTimer, index_, deltaTriples,
plannedUpdate, qet, countBefore,
Expand Down
3 changes: 2 additions & 1 deletion src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ class Server {
const ad_utility::url_parser::
ParamValueMap& params,
const Operation& operation,
TimeLimit timeLimit);
TimeLimit timeLimit,
bool accessTokenOk);

// Plan a parsed query.
Awaitable<PlannedQuery> planQuery(net::static_thread_pool& thread_pool,
Expand Down
16 changes: 13 additions & 3 deletions src/engine/ValuesForTesting.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,31 @@ class ValuesForTesting : public Operation {
variables_ = computeVarMapFromVector(variables);
}

ValuesForTesting(QueryExecutionContext* ctx, IdTable table,
ValuesForTesting(QueryExecutionContext* ctx,
std::shared_ptr<const IdTable> table,
VariableToColumnMap variables,
std::vector<ColumnIndex> sortedColumns = {},
LocalVocab localVocab = LocalVocab{})
: Operation{ctx},
variables_{std::move(variables)},
supportsLimit_{false},
sizeEstimate_{table.numRows()},
sizeEstimate_{table->numRows()},
costEstimate_{0},
resultSortedColumns_{std::move(sortedColumns)},
localVocab_{std::move(localVocab)},
multiplicity_{},
forceFullyMaterialized_{false} {
tables_.push_back(std::make_shared<const IdTable>(std::move(table)));
tables_.push_back(std::move(table));
}

ValuesForTesting(QueryExecutionContext* ctx, IdTable table,
VariableToColumnMap variables,
std::vector<ColumnIndex> sortedColumns = {},
LocalVocab localVocab = LocalVocab{})
: ValuesForTesting{ctx, std::make_shared<const IdTable>(std::move(table)),
std::move(variables), std::move(sortedColumns),
std::move(localVocab)} {}

explicit ValuesForTesting(QueryExecutionContext* ctx,
std::vector<IdTable> tables, VarVector variables,
bool unlikelyToFitInCache = false,
Expand Down
1 change: 1 addition & 0 deletions test/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ addLinkAndRunAsSingleTest(SpatialJoinAlgorithmsTest engine)
addLinkAndDiscoverTestSerial(QueryExecutionTreeTest engine)
addLinkAndDiscoverTestSerial(DescribeTest engine)
addLinkAndDiscoverTestSerial(ExistsJoinTest engine)
addLinkAndDiscoverTestSerial(NamedQueryCacheTest)
18 changes: 18 additions & 0 deletions test/engine/NamedQueryCacheTest.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
//
// Created by kalmbacj on 2/6/25.
//

#include <gmock/gmock.h>

#include "../util/IdTableHelpers.h"
#include "engine/NamedQueryCache.h"

TEST(NamedQueryCache, basicWorkflow) {
NamedQueryCache cache;
EXPECT_EQ(cache.numEntries(), 0);
auto table = makeIdTableFromVector({{3, 7}, {9, 11}});
using V = Variable;
VariableToColumnMap varColMap{{V{"?x"}, makeAlwaysDefinedColumn(0)},
{V{"?y"}, makeAlwaysDefinedColumn(1)}};
NamedQueryCache::Value value{
std::make_shared<const IdTable>(table.clone()), varColMap, {1, 0}};
cache.store("query-1", std::move(value));
EXPECT_EQ(cache.numEntries(), 1);
}

0 comments on commit 50b7dc4

Please sign in to comment.