diff --git a/src/engine/NamedQueryCache.cpp b/src/engine/NamedQueryCache.cpp index 7acc9515e5..276f9b0c17 100644 --- a/src/engine/NamedQueryCache.cpp +++ b/src/engine/NamedQueryCache.cpp @@ -6,29 +6,35 @@ // _____________________________________________________________________________ std::shared_ptr NamedQueryCache ::getOperation( - const Key& key, QueryExecutionContext* ctx) const { - const auto& [table, map, sortedOn] = get(key); - // TODO we should get rid of the copies for the IdTable (and - // probably the other members) especially for larger results). - return std::make_shared(ctx, table.clone(), map, sortedOn); + const Key& key, QueryExecutionContext* ctx) { + const auto& ptr = get(key); + const auto& [table, map, sortedOn] = *ptr; + // TODO Add a local vocab, and consider also passing a shared_ptr for + // the local vocab. + return std::make_shared(ctx, table, map, sortedOn); } // _____________________________________________________________________________ -auto NamedQueryCache::get(const Key& key) const -> const Value& { +auto NamedQueryCache::get(const Key& key) -> std::shared_ptr { auto l = cache_.wlock(); - auto it = l->find(key); - if (it == l->end()) { + if (!l->contains(key)) { throw std::runtime_error{ absl::StrCat("The named query with the name \"", key, "\" was not pinned to the named query cache")}; } - return it->second; + return (*l)[key]; } // _____________________________________________________________________________ void NamedQueryCache::store(const Key& key, Value value) { - (*cache_.wlock()).insert_or_assign(key, std::move(value)); + // TODO Check the overwrite semantics of the cache class. + cache_.wlock()->insert(key, std::move(value)); } // _____________________________________________________________________________ -void NamedQueryCache::clear() { cache_.wlock()->clear(); } +void NamedQueryCache::clear() { cache_.wlock()->clearAll(); } + +// _____________________________________________________________________________ +size_t NamedQueryCache::numEntries() const { + return cache_.rlock()->numNonPinnedEntries(); +} diff --git a/src/engine/NamedQueryCache.h b/src/engine/NamedQueryCache.h index 4d9775b1c5..23a65e0a57 100644 --- a/src/engine/NamedQueryCache.h +++ b/src/engine/NamedQueryCache.h @@ -4,21 +4,30 @@ #pragma once #include "engine/ValuesForTesting.h" +#include "util/Cache.h" #include "util/Synchronized.h" -// A simple threadsafe cache that associates query results with an explicit +// A simple thread-safe cache that associates query results with an explicit // name. class NamedQueryCache { public: // The cache value. It stores all the information required to construct a // proper `QueryExecutionTree` later on. + // TODO We definitely need the local vocab here... struct Value { - IdTable result_; + std::shared_ptr result_; VariableToColumnMap varToColMap_; std::vector resultSortedOn_; }; + + // TODO Use a better size getter for better statistics. + struct ValueSizeGetter { + ad_utility::MemorySize operator()(const Value&) { + return ad_utility::MemorySize::bytes(1); + } + }; using Key = std::string; - using Cache = ad_utility::HashMap; + using Cache = ad_utility::LRUCache; private: ad_utility::Synchronized cache_; @@ -31,16 +40,16 @@ class NamedQueryCache { // Clear the cache. void clear(); + // Get the number of cached queries. + size_t numEntries() const; + // Retrieve the query result that is associated with the `key`. // Throw an exception if the `key` doesn't exist. - const Value& get(const Key& key) const; + std::shared_ptr get(const Key& key); // Retrieve the query result with the given `key` and convert it into an // explicit `ValuesForTesting` operation that can be used as part of a // `QueryExecutionTree`. - // TODO This can be done more efficiently if we implement a dedicated - // operation for this use case, `ValuesForTesting` currently incurs one - // (unneeded) copy per query execution. - std::shared_ptr getOperation( - const Key& key, QueryExecutionContext* ctx) const; + std::shared_ptr getOperation(const Key& key, + QueryExecutionContext* ctx); }; diff --git a/src/engine/Operation.cpp b/src/engine/Operation.cpp index 865a37e6b0..0b72a672ec 100644 --- a/src/engine/Operation.cpp +++ b/src/engine/Operation.cpp @@ -350,9 +350,10 @@ std::shared_ptr Operation::getResult( // return the result, but only pin it. const auto& actualResult = result._resultPointer->resultTable(); AD_CORRECTNESS_CHECK(actualResult.isFullyMaterialized()); - auto t = NamedQueryCache::Value(actualResult.idTable().clone(), - getExternallyVisibleVariableColumns(), - actualResult.sortedBy()); + // TODO probably we don't need to `clone()` the IdTable here. + auto t = NamedQueryCache::Value( + std::make_shared(actualResult.idTable().clone()), + getExternallyVisibleVariableColumns(), actualResult.sortedBy()); _executionContext->namedQueryCache().store(name, std::move(t)); runtimeInfo().addDetail("pinned-with-explicit-name", name); diff --git a/src/engine/Server.cpp b/src/engine/Server.cpp index d5d223b3a9..e96f65bf04 100644 --- a/src/engine/Server.cpp +++ b/src/engine/Server.cpp @@ -287,6 +287,11 @@ CPP_template_2(typename RequestT, typename ResponseT)( logCommand(cmd, "clear cache completely (including unpinned elements)"); cache_.clearAll(); response = createJsonResponse(composeCacheStatsJson(), request); + } else if (auto cmd = checkParameter("cmd", "clear-named-cache")) { + requireValidAccessToken("clear-named-cache"); + logCommand(cmd, "clear the cache for named queries"); + namedQueryCache_.clear(); + response = createJsonResponse(composeCacheStatsJson(), request); } else if (auto cmd = checkParameter("cmd", "clear-delta-triples")) { requireValidAccessToken("clear-delta-triples"); logCommand(cmd, "clear delta triples"); @@ -380,7 +385,8 @@ CPP_template_2(typename RequestT, typename ResponseT)( queryHub_, request, std::invoke(opFieldString, op)); auto [parsedOperation, qec, cancellationHandle, cancelTimeoutOnDestruction] = - parseOperation(messageSender, parameters, op, timeLimit.value()); + parseOperation(messageSender, parameters, op, timeLimit.value(), + accessTokenOk); if (pred(parsedOperation)) { throw std::runtime_error( absl::StrCat(msg, parsedOperation._originalString)); @@ -491,7 +497,8 @@ CPP_template_2(typename Operation)( requires QueryOrUpdate) auto Server:: parseOperation(ad_utility::websocket::MessageSender& messageSender, const ad_utility::url_parser::ParamValueMap& params, - const Operation& operation, TimeLimit timeLimit) { + const Operation& operation, TimeLimit timeLimit, + bool accessTokenOk) { // The operation string was to be copied, do it here at the beginning. const auto [operationName, operationSPARQL] = [&operation]() -> std::pair { @@ -509,13 +516,24 @@ CPP_template_2(typename Operation)( // Do the query planning. This creates a `QueryExecutionTree`, which will // then be used to process the query. auto [pinSubtrees, pinResult] = determineResultPinning(params); + std::optional pinNamed = + ad_utility::url_parser::checkParameter(params, "pin-named-query", {}); LOG(INFO) << "Processing the following " << operationName << ":" << (pinResult ? " [pin result]" : "") << (pinSubtrees ? " [pin subresults]" : "") << "\n" + << (pinNamed ? absl::StrCat(" [pin named as ]", pinNamed.value()) + : "") << operationSPARQL << std::endl; QueryExecutionContext qec(index_, &cache_, allocator_, - sortPerformanceEstimator_, std::ref(messageSender), - pinSubtrees, pinResult); + sortPerformanceEstimator_, &namedQueryCache_, + std::ref(messageSender), pinSubtrees, pinResult); + if (pinNamed.has_value()) { + if (!accessTokenOk) { + throw std::runtime_error( + "The pinning of named queries requires a valid access token"); + } + qec.pinWithExplicitName() = std::move(pinNamed); + } ParsedQuery parsedQuery = SparqlParser::parseQuery(std::move(operationSPARQL)); // SPARQL Protocol 2.1.4 specifies that the dataset from the query @@ -624,6 +642,7 @@ nlohmann::json Server::composeCacheStatsJson() const { nlohmann::json result; result["num-non-pinned-entries"] = cache_.numNonPinnedEntries(); result["num-pinned-entries"] = cache_.numPinnedEntries(); + result["num-named-queries"] = namedQueryCache_.numEntries(); // TODO Get rid of the `getByte()`, once `MemorySize` has it's own json // converter. @@ -764,45 +783,6 @@ CPP_template_2(typename RequestT, typename ResponseT)( PlannedQuery plannedQuery = co_await planQuery(queryThreadPool_, std::move(query), requestTimer, timeLimit, qec, cancellationHandle); - ad_utility::websocket::MessageSender messageSender = - createMessageSender(queryHub_, request, query.query_); - auto [cancellationHandle, cancelTimeoutOnDestruction] = - setupCancellationHandle(messageSender.getQueryId(), timeLimit); - - // Figure out, whether the query is to be pinned in the cache (either - // implicitly, or explicitly as a named query). - auto [pinSubtrees, pinResult] = determineResultPinning(params); - std::optional pinNamed = - ad_utility::url_parser::checkParameter(params, "pin-named-query", {}); - LOG(INFO) << "Processing the following SPARQL query:" - << (pinResult ? " [pin result]" : "") - << (pinSubtrees ? " [pin subresults]" : "") << "\n" - << (pinNamed ? absl::StrCat(" [pin named as ]", pinNamed.value()) - : "") - << "\n" - << query.query_ << std::endl; - QueryExecutionContext qec(index_, &cache_, allocator_, - sortPerformanceEstimator_, &namedQueryCache_, - std::ref(messageSender), pinSubtrees, pinResult); - - // The usage of an `optional` here is required because of a limitation in - // Boost::Asio which forces us to use default-constructible result types with - // `computeInNewThread`. We also can't unwrap the optional directly in this - // function, because then the conan build fails in a very strange way, - // probably related to issues in GCC's coroutine implementation. - // For the same reason (crashes in the conanbuild) we store the coroutine in - // an explicit variable instead of directly `co_await`-ing it. - auto coroutine = computeInNewThread( - queryThreadPool_, - [this, &query, &qec, cancellationHandle, &timeLimit, - &requestTimer]() -> std::optional { - return setupPlannedQuery(query.datasetClauses_, query.query_, qec, - cancellationHandle, timeLimit, requestTimer); - }, - cancellationHandle); - auto plannedQueryOpt = co_await std::move(coroutine); - AD_CORRECTNESS_CHECK(plannedQueryOpt.has_value()); - auto plannedQuery = std::move(plannedQueryOpt).value(); auto qet = plannedQuery.queryExecutionTree_; // Read the export limit from the send` parameter (historical name). This @@ -824,11 +804,6 @@ CPP_template_2(typename RequestT, typename ResponseT)( qet.getRootOperation()->getLimit()._offset); limitOffset._offset -= qet.getRootOperation()->getLimit()._offset; - if (pinNamed.has_value()) { - // TODO 1. Make this require a valid access token. 2. also allow - // for clearing the cache. - qec.pinWithExplicitName() = pinNamed.value(); - } // This actually processes the query and sends the result in the requested // format. co_await sendStreamableResponse(request, send, mediaType, plannedQuery, qet, @@ -935,6 +910,7 @@ json Server::processUpdateImpl( // update anyway (The index of the located triples snapshot is part of the // cache key). cache_.clearAll(); + namedQueryCache_.clear(); return createResponseMetadataForUpdate(requestTimer, index_, deltaTriples, plannedUpdate, qet, countBefore, diff --git a/src/engine/Server.h b/src/engine/Server.h index a72fdf5f8b..a8452218d9 100644 --- a/src/engine/Server.h +++ b/src/engine/Server.h @@ -185,7 +185,8 @@ class Server { const ad_utility::url_parser:: ParamValueMap& params, const Operation& operation, - TimeLimit timeLimit); + TimeLimit timeLimit, + bool accessTokenOk); // Plan a parsed query. Awaitable planQuery(net::static_thread_pool& thread_pool, diff --git a/src/engine/ValuesForTesting.h b/src/engine/ValuesForTesting.h index a74ea360da..23e9fbda57 100644 --- a/src/engine/ValuesForTesting.h +++ b/src/engine/ValuesForTesting.h @@ -57,21 +57,31 @@ class ValuesForTesting : public Operation { variables_ = computeVarMapFromVector(variables); } - ValuesForTesting(QueryExecutionContext* ctx, IdTable table, + ValuesForTesting(QueryExecutionContext* ctx, + std::shared_ptr table, VariableToColumnMap variables, std::vector sortedColumns = {}, LocalVocab localVocab = LocalVocab{}) : Operation{ctx}, variables_{std::move(variables)}, supportsLimit_{false}, - sizeEstimate_{table.numRows()}, + sizeEstimate_{table->numRows()}, costEstimate_{0}, resultSortedColumns_{std::move(sortedColumns)}, localVocab_{std::move(localVocab)}, multiplicity_{}, forceFullyMaterialized_{false} { - tables_.push_back(std::make_shared(std::move(table))); + tables_.push_back(std::move(table)); } + + ValuesForTesting(QueryExecutionContext* ctx, IdTable table, + VariableToColumnMap variables, + std::vector sortedColumns = {}, + LocalVocab localVocab = LocalVocab{}) + : ValuesForTesting{ctx, std::make_shared(std::move(table)), + std::move(variables), std::move(sortedColumns), + std::move(localVocab)} {} + explicit ValuesForTesting(QueryExecutionContext* ctx, std::vector tables, VarVector variables, bool unlikelyToFitInCache = false, diff --git a/test/engine/CMakeLists.txt b/test/engine/CMakeLists.txt index 41b2b463ad..2fdd14ad20 100644 --- a/test/engine/CMakeLists.txt +++ b/test/engine/CMakeLists.txt @@ -13,3 +13,4 @@ addLinkAndRunAsSingleTest(SpatialJoinAlgorithmsTest engine) addLinkAndDiscoverTestSerial(QueryExecutionTreeTest engine) addLinkAndDiscoverTestSerial(DescribeTest engine) addLinkAndDiscoverTestSerial(ExistsJoinTest engine) +addLinkAndDiscoverTestSerial(NamedQueryCacheTest) diff --git a/test/engine/NamedQueryCacheTest.cpp b/test/engine/NamedQueryCacheTest.cpp index 669758e208..1522a64237 100644 --- a/test/engine/NamedQueryCacheTest.cpp +++ b/test/engine/NamedQueryCacheTest.cpp @@ -1,3 +1,21 @@ // // Created by kalmbacj on 2/6/25. // + +#include + +#include "../util/IdTableHelpers.h" +#include "engine/NamedQueryCache.h" + +TEST(NamedQueryCache, basicWorkflow) { + NamedQueryCache cache; + EXPECT_EQ(cache.numEntries(), 0); + auto table = makeIdTableFromVector({{3, 7}, {9, 11}}); + using V = Variable; + VariableToColumnMap varColMap{{V{"?x"}, makeAlwaysDefinedColumn(0)}, + {V{"?y"}, makeAlwaysDefinedColumn(1)}}; + NamedQueryCache::Value value{ + std::make_shared(table.clone()), varColMap, {1, 0}}; + cache.store("query-1", std::move(value)); + EXPECT_EQ(cache.numEntries(), 1); +}