Skip to content

Commit b07ac82

Browse files
committed
MB-70482: Add topkeys controller
And return an error if topkeys is already running including a UUID Change-Id: I311ff239eb8c854b540a3e9fc63919e680bd8566 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/239905 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Jim Walker <jim@couchbase.com>
1 parent 8967c0b commit b07ac82

File tree

12 files changed

+419
-82
lines changed

12 files changed

+419
-82
lines changed

daemon/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ add_library(memcached_daemon STATIC
118118
token_auth_data.h
119119
top_keys.cc
120120
top_keys.h
121+
top_keys_controller.cc
122+
top_keys_controller.h
121123
tracing.cc
122124
tracing.h
123125
yielding_limited_concurrency_task.cc

daemon/ioctl.cc

Lines changed: 56 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "external_auth_manager_thread.h"
1818
#include "front_end_thread.h"
1919
#include "settings.h"
20+
#include "top_keys_controller.h"
2021
#include "tracing.h"
2122
#include "utilities/string_utilities.h"
2223
#include <logger/logger.h>
@@ -118,20 +119,6 @@ static cb::engine_errc ioctlGetTopkeysStop(Cookie& cookie,
118119
const StrToStrMap& args,
119120
std::string& value,
120121
cb::mcbp::Datatype& datatype) {
121-
std::shared_ptr<cb::trace::topkeys::Collector> collector;
122-
FrontEndThread::forEach(
123-
[&collector](auto& t) {
124-
if (!t.keyTrace) {
125-
return;
126-
}
127-
128-
if (!collector) {
129-
collector = t.keyTrace;
130-
}
131-
t.keyTrace.reset();
132-
},
133-
true);
134-
135122
std::size_t limit = 100;
136123
if (args.contains("limit")) {
137124
try {
@@ -145,25 +132,39 @@ static cb::engine_errc ioctlGetTopkeysStop(Cookie& cookie,
145132
}
146133
}
147134

148-
if (!collector) {
149-
cookie.setErrorContext(
150-
"Failed to collect topkeys - tracing not active");
151-
LOG_WARNING_CTX("Failed to collect topkeys - tracing not active",
152-
{"conn_id", cookie.getConnection().getId()});
153-
return cb::engine_errc::failed;
135+
cb::uuid::uuid_t uuid;
136+
if (args.contains("uuid")) {
137+
try {
138+
uuid = cb::uuid::from_string(args.find("uuid")->second);
139+
} catch (const std::exception& exception) {
140+
LOG_ERROR_CTX("Failed to parse uuid",
141+
{"conn_id", cookie.getConnection().getId()},
142+
{"error", exception.what()});
143+
return cb::engine_errc::invalid_arguments;
144+
}
154145
}
155146

156-
try {
157-
nlohmann::json json = collector->getResults(limit);
158-
value = json.dump();
159-
} catch (const std::exception& exception) {
160-
LOG_ERROR_CTX("Failed to get trace data",
161-
{"conn_id", cookie.getConnection().getId()},
162-
{"error", exception.what()});
163-
return cb::engine_errc::failed;
147+
auto [status, json] =
148+
cb::trace::topkeys::Controller::instance().stop(uuid, limit);
149+
if (status == cb::engine_errc::success) {
150+
try {
151+
value = json.dump();
152+
} catch (const std::exception& exception) {
153+
LOG_ERROR_CTX(
154+
"Failed to get trace data. Trace data will be discarded",
155+
{"conn_id", cookie.getConnection().getId()},
156+
{"error", exception.what()});
157+
return cb::engine_errc::failed;
158+
}
159+
datatype = cb::mcbp::Datatype::JSON;
160+
return cb::engine_errc::success;
164161
}
165-
datatype = cb::mcbp::Datatype::JSON;
166-
return cb::engine_errc::success;
162+
163+
if (json.is_object()) {
164+
cookie.setErrorJsonExtras(json);
165+
}
166+
cookie.setErrorContext("Failed to start topkeys collection");
167+
return status;
167168
}
168169

169170
cb::engine_errc ioctl_get_property(Cookie& cookie,
@@ -325,7 +326,9 @@ static cb::engine_errc ioctlSetServerlessUnitSize(Cookie& cookie,
325326

326327
static cb::engine_errc ioctlSetTopkeysStart(Cookie& cookie,
327328
const StrToStrMap& args,
328-
const std::string&) {
329+
const std::string&,
330+
std::string& result,
331+
cb::mcbp::Datatype& datatype) {
329332
std::size_t limit = 10000;
330333
if (args.contains("limit")) {
331334
try {
@@ -402,27 +405,28 @@ static cb::engine_errc ioctlSetTopkeysStart(Cookie& cookie,
402405
}
403406
}
404407

405-
auto expiry_time = cb::time::steady_clock::now() +
406-
std::chrono::seconds(expected_duration);
407-
408-
auto collector = cb::trace::topkeys::Collector::create(
409-
limit, shards, expiry_time, bucket_filter);
410-
if (!collector) {
411-
cookie.setErrorContext("Failed to create topkeys collector");
412-
LOG_WARNING_CTX("Failed to create topkeys collector",
413-
{"conn_id", cookie.getConnection().getId()});
414-
return cb::engine_errc::failed;
408+
const auto [status, uuid] =
409+
cb::trace::topkeys::Controller::instance().create(
410+
limit,
411+
shards,
412+
std::chrono::seconds(expected_duration),
413+
bucket_filter);
414+
nlohmann::json json = {{"uuid", to_string(uuid)}};
415+
if (status == cb::engine_errc::success) {
416+
result = json.dump();
417+
datatype = cb::mcbp::Datatype::JSON;
418+
} else {
419+
cookie.setErrorJsonExtras(json);
420+
cookie.setErrorContext("Failed to start topkeys collection");
415421
}
416-
417-
FrontEndThread::forEach([&collector](auto& t) { t.keyTrace = collector; },
418-
true);
419-
420-
return cb::engine_errc::success;
422+
return status;
421423
}
422424

423425
cb::engine_errc ioctl_set_property(Cookie& cookie,
424426
const std::string& key,
425-
const std::string& value) {
427+
const std::string& value,
428+
std::string& result,
429+
cb::mcbp::Datatype& datatype) {
426430
std::pair<std::string, StrToStrMap> request;
427431

428432
try {
@@ -431,6 +435,9 @@ cb::engine_errc ioctl_set_property(Cookie& cookie,
431435
return cb::engine_errc::invalid_arguments;
432436
}
433437

438+
result.clear();
439+
datatype = cb::mcbp::Datatype::Raw;
440+
434441
auto& manager = cb::ioctl::Manager::getInstance();
435442
auto* id = manager.lookup(request.first);
436443
if (id) {
@@ -471,7 +478,8 @@ cb::engine_errc ioctl_set_property(Cookie& cookie,
471478
}
472479
return cb::engine_errc::not_supported;
473480
case cb::ioctl::Id::TopkeysStart:
474-
return ioctlSetTopkeysStart(cookie, request.second, value);
481+
return ioctlSetTopkeysStart(
482+
cookie, request.second, value, result, datatype);
475483

476484
case cb::ioctl::Id::TraceDumpBegin: // may only be used with Get
477485
case cb::ioctl::Id::TraceDumpGet: // may only be used with Get

daemon/ioctl.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,18 @@ cb::engine_errc ioctl_get_property(Cookie& cookie,
3232

3333
/**
3434
* Attempts to set property `key` to the value `value`.
35-
* If the property could be written, return cb::engine_errc::success.
36-
* Otherwise returns a status code indicating why the write failed.
35+
*
36+
* @param cookie represents the command
37+
* @param key is the name of the control
38+
* @param value is the value to set for the key
39+
* @param result is an out parameter to send back to the client
40+
* @param datatype is another out parameter describing the value in result
41+
*
42+
* @return cb::engine_errc::success if the property could be written,
43+
* otherwise a status code indicating why the write failed.
3744
*/
3845
cb::engine_errc ioctl_set_property(Cookie& cookie,
3946
const std::string& key,
40-
const std::string& value);
47+
const std::string& value,
48+
std::string& result,
49+
cb::mcbp::Datatype& datatype);

daemon/one_shot_task.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,18 @@ class OneShotTask : public GlobalTask {
2121
* @param id The task identifier for the task
2222
* @param name_ The name for the task
2323
* @param function_ The function to run on the executor
24+
* @param expectedRuntime_ The expected runtime of the task
25+
* @param initialSleepTime The initial sleep time before the task is first
26+
* run
2427
*/
2528
OneShotTask(TaskId id,
2629
std::string name_,
2730
std::function<void()> function_,
2831
std::chrono::microseconds expectedRuntime_ =
29-
std::chrono::milliseconds(100))
30-
: GlobalTask(NoBucketTaskable::instance(), id, 0, true),
32+
std::chrono::milliseconds(100),
33+
std::chrono::nanoseconds initialSleepTime =
34+
std::chrono::nanoseconds(0))
35+
: GlobalTask(NoBucketTaskable::instance(), id, initialSleepTime, true),
3136
name(std::move(name_)),
3237
function(std::move(function_)),
3338
expectedRuntime(std::move(expectedRuntime_)) {

daemon/protocol/mcbp/ioctl_command_context.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ cb::engine_errc IoctlCommandContext::execute() {
2828
const auto key = std::string{cookie.getRequest().getKeyString()};
2929
if (req.getClientOpcode() == cb::mcbp::ClientOpcode::IoctlSet) {
3030
const auto value = std::string{cookie.getRequest().getValueString()};
31-
return ioctl_set_property(cookie, key, value);
31+
return ioctl_set_property(cookie, key, value, response, datatype);
3232
}
3333
return ioctl_get_property(cookie, key, response, datatype);
3434
}

daemon/top_keys.cc

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
#include "top_keys.h"
1212

1313
#include "bucket_manager.h"
14+
#include "nobucket_taskable.h"
15+
#include "one_shot_task.h"
16+
#include "top_keys_controller.h"
1417

18+
#include <executor/executorpool.h>
1519
#include <memcached/dockey_view.h>
1620
#include <memcached/limits.h>
1721
#include <memcached/storeddockey.h>
@@ -73,12 +77,21 @@ void to_json(nlohmann::json& json, const Result& result) {
7377
}
7478
}
7579

80+
Collector::~Collector() {
81+
if (expiry_remover) {
82+
expiry_remover->cancel();
83+
}
84+
}
85+
7686
class CountingCollector : public Collector {
7787
public:
7888
CountingCollector(std::size_t max,
7989
std::size_t shards,
80-
cb::time::steady_clock::time_point expiry_time)
81-
: Collector(expiry_time), limit(max), shardmaps(shards) {
90+
std::chrono::seconds expiry_time,
91+
bool install_cleanup_task)
92+
: Collector(expiry_time, install_cleanup_task),
93+
limit(max),
94+
shardmaps(shards) {
8295
}
8396

8497
void access(const size_t bucket,
@@ -211,9 +224,10 @@ class FilteredCountingCollector : public CountingCollector {
211224
public:
212225
FilteredCountingCollector(std::size_t max,
213226
std::size_t shards,
214-
cb::time::steady_clock::time_point expiry_time,
215-
std::vector<std::size_t> buckets)
216-
: CountingCollector(max, shards, expiry_time),
227+
std::chrono::seconds expiry_time,
228+
std::vector<std::size_t> buckets,
229+
bool install_cleanup_task)
230+
: CountingCollector(max, shards, expiry_time, install_cleanup_task),
217231
bucketfilter(std::move(buckets)) {
218232
}
219233

@@ -229,20 +243,45 @@ class FilteredCountingCollector : public CountingCollector {
229243
std::vector<std::size_t> bucketfilter;
230244
};
231245

232-
std::shared_ptr<Collector> Collector::create(
233-
std::size_t num_keys,
234-
std::size_t shards,
235-
cb::time::steady_clock::time_point expiry_time,
236-
std::vector<std::size_t> buckets) {
246+
std::shared_ptr<Collector> Collector::create(std::size_t num_keys,
247+
std::size_t shards,
248+
std::chrono::seconds expiry_time,
249+
std::vector<std::size_t> buckets,
250+
bool install_cleanup_task) {
237251
if (num_keys) {
238252
if (buckets.empty()) {
239253
return std::make_unique<CountingCollector>(
240-
num_keys, shards, expiry_time);
254+
num_keys, shards, expiry_time, install_cleanup_task);
241255
}
242256
return std::make_unique<FilteredCountingCollector>(
243-
num_keys, shards, expiry_time, std::move(buckets));
257+
num_keys,
258+
shards,
259+
expiry_time,
260+
std::move(buckets),
261+
install_cleanup_task);
244262
}
245263
return {};
246264
}
247265

266+
Collector::Collector(std::chrono::seconds exp, bool install_cleanup_task)
267+
: expiry_time(cb::time::steady_clock::now() + exp) {
268+
if (install_cleanup_task) {
269+
// Install the cleanup handler to run 1 second *after* it should
270+
// expire as that would allow the front end threads to potentially
271+
// disconnect from the trace *before* we remove it (which means
272+
// we don't need to inject a task in each front end thread)
273+
// and can release the memory immediately in this thread context
274+
expiry_remover = std::make_shared<OneShotTask>(
275+
TaskId::Core_ExpiredTopKeysRemover,
276+
fmt::format("Expired TopKeys remover: {}", ::to_string(uuid)),
277+
[collector_uuid = uuid]() {
278+
Controller::instance().onExpiry(collector_uuid);
279+
},
280+
std::chrono::milliseconds(20),
281+
exp + std::chrono::seconds(1));
282+
283+
ExecutorPool::get()->schedule(expiry_remover);
284+
}
285+
}
286+
248287
} // namespace cb::trace::topkeys

daemon/top_keys.h

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010

1111
#pragma once
1212

13-
#include "memcached/dockey_view.h"
14-
13+
#include <executor/globaltask.h>
14+
#include <memcached/dockey_view.h>
1515
#include <nlohmann/json_fwd.hpp>
1616
#include <platform/cb_time.h>
17+
#include <platform/uuid.h>
1718
#include <memory>
1819
#include <unordered_map>
1920
#include <vector>
@@ -45,7 +46,7 @@ void to_json(nlohmann::json& json, const Result& result);
4546

4647
class Collector {
4748
public:
48-
virtual ~Collector() = default;
49+
virtual ~Collector();
4950

5051
/**
5152
* Create a Collector instance used to track key access.
@@ -59,13 +60,23 @@ class Collector {
5960
* bucket. For space efficiencty we use the bucket *id*
6061
* and not the bucket name (as buckets typically don't
6162
* come and go all the time)
63+
* @param install_cleanup_task If true, the collector will install a task
64+
* to remove itself after the expiry time
65+
* has passed. This parameter is set to true
66+
* in the normal case where the collector is
67+
* created by the front end thread, but we
68+
* would like to avoid that when running
69+
* isolated unit tests testing other
70+
* functionality of the collector.
71+
*
72+
* @return a shared pointer to the created Collector instance
6273
*/
6374
static std::shared_ptr<Collector> create(
6475
std::size_t num_keys,
6576
std::size_t shards,
66-
cb::time::steady_clock::time_point expiry_time =
67-
cb::time::steady_clock::now() + std::chrono::minutes(1),
68-
std::vector<std::size_t> buckets = {});
77+
std::chrono::seconds expiry_time = std::chrono::minutes(1),
78+
std::vector<std::size_t> buckets = {},
79+
bool install_cleanup_task = false);
6980

7081
/// Is this collector expired or not (e.g. should we discard the collected
7182
/// data). The time is passed in as an argument to avoid having to fetch the
@@ -75,6 +86,12 @@ class Collector {
7586
return now >= expiry_time;
7687
}
7788

89+
/// Get the uuid of this collector (used by the clients to identify the
90+
/// collector when they want to stop it and retrieve the collected data)
91+
auto get_uuid() const {
92+
return uuid;
93+
}
94+
7895
/**
7996
* Register access for a key in a given bucket
8097
*
@@ -94,8 +111,10 @@ class Collector {
94111
virtual Result getResults(size_t limit) const = 0;
95112

96113
protected:
97-
Collector(cb::time::steady_clock::time_point exp) : expiry_time(exp) {};
114+
Collector(std::chrono::seconds exp, bool install_cleanup_task);
98115
const cb::time::steady_clock::time_point expiry_time;
116+
const cb::uuid::uuid_t uuid = cb::uuid::random();
117+
ExTask expiry_remover;
99118
};
100119

101120
} // namespace cb::trace::topkeys

0 commit comments

Comments
 (0)