Skip to content

Commit 3adfdfc

Browse files
committed
c/partition_manager: introduced partition shutdown watchdog timer
Introduced a watchdog that is tracking partition shutdown state. The watchdog is intended to provide a mechanism that will ease debugging of partition shutdown issues. The watchdog tracks state of partitions that were requested to be stopped or removed. When there was no state update for the time longer than the configurable threshold (by default 30 seconds) the watchdog will emit error log entry which will inform user about the problem. Signed-off-by: Michal Maslanka <[email protected]>
1 parent f66bf5b commit 3adfdfc

File tree

3 files changed

+134
-9
lines changed

3 files changed

+134
-9
lines changed

src/v/cluster/partition_manager.cc

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "cluster/archival_metadata_stm.h"
2020
#include "cluster/fwd.h"
2121
#include "cluster/logger.h"
22+
#include "cluster/partition.h"
2223
#include "cluster/partition_recovery_manager.h"
2324
#include "cluster/types.h"
2425
#include "config/configuration.h"
@@ -38,6 +39,7 @@
3839

3940
#include <seastar/core/coroutine.hh>
4041
#include <seastar/core/io_priority_class.hh>
42+
#include <seastar/core/lowres_clock.hh>
4143
#include <seastar/core/reactor.hh>
4244
#include <seastar/core/shared_ptr.hh>
4345
#include <seastar/core/smp.hh>
@@ -57,15 +59,17 @@ partition_manager::partition_manager(
5759
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
5860
ss::lw_shared_ptr<const archival::configuration> archival_conf,
5961
ss::sharded<features::feature_table>& feature_table,
60-
ss::sharded<archival::upload_housekeeping_service>& upload_hks)
62+
ss::sharded<archival::upload_housekeeping_service>& upload_hks,
63+
config::binding<std::chrono::milliseconds> partition_shutdown_timeout)
6164
: _storage(storage.local())
6265
, _raft_manager(raft)
6366
, _partition_recovery_mgr(recovery_mgr)
6467
, _cloud_storage_api(cloud_storage_api)
6568
, _cloud_storage_cache(cloud_storage_cache)
6669
, _archival_conf(std::move(archival_conf))
6770
, _feature_table(feature_table)
68-
, _upload_hks(upload_hks) {
71+
, _upload_hks(upload_hks)
72+
, _partition_shutdown_timeout(std::move(partition_shutdown_timeout)) {
6973
_leader_notify_handle
7074
= _raft_manager.local().register_leadership_notification(
7175
[this](
@@ -80,6 +84,8 @@ partition_manager::partition_manager(
8084
}
8185
}
8286
});
87+
_shutdown_watchdog.set_callback(
88+
[this] { check_partitions_shutdown_state(); });
8389
}
8490

8591
partition_manager::~partition_manager() {
@@ -101,6 +107,11 @@ partition_manager::get_topic_partition_table(
101107
return rs;
102108
}
103109

110+
ss::future<> partition_manager::start() {
111+
maybe_arm_shutdown_watchdog();
112+
co_return;
113+
}
114+
104115
ss::future<consensus_ptr> partition_manager::manage(
105116
storage::ntp_config ntp_cfg,
106117
raft::group_id group,
@@ -290,11 +301,17 @@ ss::future<> partition_manager::stop_partitions() {
290301

291302
ss::future<>
292303
partition_manager::do_shutdown(ss::lw_shared_ptr<partition> partition) {
304+
partition_shutdown_state shutdown_state(partition);
305+
_partitions_shutting_down.push_back(shutdown_state);
306+
293307
try {
294308
auto ntp = partition->ntp();
309+
shutdown_state.update(partition_shutdown_stage::stopping_raft);
295310
co_await _raft_manager.local().shutdown(partition->raft());
296311
_unmanage_watchers.notify(ntp, model::topic_partition_view(ntp.tp));
312+
shutdown_state.update(partition_shutdown_stage::stopping_partition);
297313
co_await partition->stop();
314+
shutdown_state.update(partition_shutdown_stage::stopping_storage);
298315
co_await _storage.log_mgr().shutdown(partition->ntp());
299316
} catch (...) {
300317
vassert(
@@ -319,22 +336,27 @@ partition_manager::remove(const model::ntp& ntp, partition_removal_mode mode) {
319336
"manager",
320337
ntp));
321338
}
339+
partition_shutdown_state shutdown_state(partition);
340+
_partitions_shutting_down.push_back(shutdown_state);
322341
auto group_id = partition->group();
323342

324343
// remove partition from ntp & raft tables
325344
_ntp_table.erase(ntp);
326345
_raft_table.erase(group_id);
327-
346+
shutdown_state.update(partition_shutdown_stage::stopping_raft);
328347
co_await _raft_manager.local().remove(partition->raft());
329-
330348
_unmanage_watchers.notify(
331349
ntp, model::topic_partition_view(partition->ntp().tp));
332-
350+
shutdown_state.update(partition_shutdown_stage::stopping_partition);
333351
co_await partition->stop();
352+
shutdown_state.update(partition_shutdown_stage::removing_persistent_state);
334353
co_await partition->remove_persistent_state();
354+
shutdown_state.update(partition_shutdown_stage::removing_storage);
335355
co_await _storage.log_mgr().remove(partition->ntp());
336356

337357
if (mode == partition_removal_mode::global) {
358+
shutdown_state.update(
359+
partition_shutdown_stage::finalizing_remote_storage);
338360
co_await partition->finalize_remote_partition(_as);
339361
}
340362
}
@@ -350,14 +372,46 @@ ss::future<> partition_manager::shutdown(const model::ntp& ntp) {
350372
"manager",
351373
ntp)));
352374
}
353-
354375
// remove partition from ntp & raft tables
355376
_ntp_table.erase(ntp);
356377
_raft_table.erase(partition->group());
357378

358379
return do_shutdown(partition);
359380
}
360381

382+
partition_manager::partition_shutdown_state::partition_shutdown_state(
383+
ss::lw_shared_ptr<cluster::partition> p)
384+
: partition(std::move(p))
385+
, stage(partition_manager::partition_shutdown_stage::shutdown_requested)
386+
, last_update_timestamp(ss::lowres_clock::now()) {}
387+
388+
void partition_manager::partition_shutdown_state::update(
389+
partition_shutdown_stage s) {
390+
stage = s;
391+
last_update_timestamp = ss::lowres_clock::now();
392+
}
393+
394+
void partition_manager::check_partitions_shutdown_state() {
395+
const auto now = ss::lowres_clock::now();
396+
for (auto& state : _partitions_shutting_down) {
397+
if (state.last_update_timestamp < now - _partition_shutdown_timeout()) {
398+
clusterlog.error(
399+
"partition {} shutdown takes longer than expected, current "
400+
"shutdown stage: {} time since last update: {} seconds",
401+
state.partition->ntp(),
402+
state.stage,
403+
(now - state.last_update_timestamp) / 1s);
404+
}
405+
}
406+
maybe_arm_shutdown_watchdog();
407+
}
408+
409+
void partition_manager::maybe_arm_shutdown_watchdog() {
410+
if (!_as.abort_requested()) {
411+
_shutdown_watchdog.arm(_partition_shutdown_timeout() / 5);
412+
}
413+
}
414+
361415
uint64_t partition_manager::upload_backlog_size() const {
362416
uint64_t size = 0;
363417
for (const auto& [_, partition] : _ntp_table) {
@@ -411,4 +465,26 @@ partition_manager::get_cloud_cache_disk_usage_target() const {
411465
[](auto acc, auto update) { return acc + update; });
412466
}
413467

468+
std::ostream& operator<<(
469+
std::ostream& o, const partition_manager::partition_shutdown_stage& stage) {
470+
switch (stage) {
471+
case partition_manager::partition_shutdown_stage::shutdown_requested:
472+
return o << "shutdown_requested";
473+
case partition_manager::partition_shutdown_stage::stopping_raft:
474+
return o << "stopping_raft";
475+
case partition_manager::partition_shutdown_stage::removing_raft:
476+
return o << "removing_raft";
477+
case partition_manager::partition_shutdown_stage::stopping_partition:
478+
return o << "stopping_partition";
479+
case partition_manager::partition_shutdown_stage::removing_persistent_state:
480+
return o << "removing_persistent_state";
481+
case partition_manager::partition_shutdown_stage::stopping_storage:
482+
return o << "stopping_storage";
483+
case partition_manager::partition_shutdown_stage::removing_storage:
484+
return o << "removing_storage";
485+
case partition_manager::partition_shutdown_stage::finalizing_remote_storage:
486+
return o << "finalizing_remote_storage";
487+
}
488+
}
489+
414490
} // namespace cluster

src/v/cluster/partition_manager.h

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "cluster/ntp_callbacks.h"
1818
#include "cluster/partition.h"
1919
#include "cluster/types.h"
20+
#include "config/property.h"
2021
#include "features/feature_table.h"
2122
#include "model/fundamental.h"
2223
#include "model/ktp.h"
@@ -25,10 +26,13 @@
2526
#include "raft/group_manager.h"
2627
#include "raft/heartbeat_manager.h"
2728
#include "storage/api.h"
29+
#include "utils/intrusive_list_helpers.h"
2830
#include "utils/named_type.h"
2931

3032
#include <absl/container/flat_hash_map.h>
3133

34+
#include <chrono>
35+
3236
namespace cluster {
3337
class partition_manager
3438
: public ss::peering_sharded_service<partition_manager> {
@@ -44,7 +48,8 @@ class partition_manager
4448
ss::sharded<cloud_storage::cache>&,
4549
ss::lw_shared_ptr<const archival::configuration>,
4650
ss::sharded<features::feature_table>&,
47-
ss::sharded<archival::upload_housekeeping_service>&);
51+
ss::sharded<archival::upload_housekeeping_service>&,
52+
config::binding<std::chrono::milliseconds>);
4853

4954
~partition_manager();
5055

@@ -81,7 +86,7 @@ class partition_manager
8186
return nullptr;
8287
}
8388

84-
ss::future<> start() { return ss::now(); }
89+
ss::future<> start();
8590
ss::future<> stop_partitions();
8691
ss::future<consensus_ptr> manage(
8792
storage::ntp_config,
@@ -206,6 +211,37 @@ class partition_manager
206211
}
207212

208213
private:
214+
enum class partition_shutdown_stage {
215+
shutdown_requested,
216+
stopping_raft,
217+
removing_raft,
218+
stopping_partition,
219+
removing_persistent_state,
220+
stopping_storage,
221+
removing_storage,
222+
finalizing_remote_storage
223+
};
224+
225+
struct partition_shutdown_state {
226+
explicit partition_shutdown_state(ss::lw_shared_ptr<partition>);
227+
228+
partition_shutdown_state(partition_shutdown_state&&) = delete;
229+
partition_shutdown_state(const partition_shutdown_state&) = delete;
230+
partition_shutdown_state& operator=(partition_shutdown_state&&)
231+
= delete;
232+
partition_shutdown_state& operator=(const partition_shutdown_state&)
233+
= delete;
234+
~partition_shutdown_state() = default;
235+
236+
void update(partition_shutdown_stage);
237+
// it is more convenient to keep the pointer to partition than an ntp
238+
// copy.
239+
ss::lw_shared_ptr<partition> partition;
240+
partition_shutdown_stage stage;
241+
ss::lowres_clock::time_point last_update_timestamp;
242+
intrusive_list_hook hook;
243+
};
244+
209245
/// Download log if partition_recovery_manager is initialized.
210246
///
211247
/// It might not be initialized if cloud storage is disable.
@@ -217,6 +253,9 @@ class partition_manager
217253

218254
ss::future<> do_shutdown(ss::lw_shared_ptr<partition>);
219255

256+
void check_partitions_shutdown_state();
257+
258+
void maybe_arm_shutdown_watchdog();
220259
storage::api& _storage;
221260
/// used to wait for concurrent recoveries
222261
ss::sharded<raft::group_manager>& _raft_manager;
@@ -235,7 +274,11 @@ class partition_manager
235274
ss::lw_shared_ptr<const archival::configuration> _archival_conf;
236275
ss::sharded<features::feature_table>& _feature_table;
237276
ss::sharded<archival::upload_housekeeping_service>& _upload_hks;
277+
intrusive_list<partition_shutdown_state, &partition_shutdown_state::hook>
278+
_partitions_shutting_down;
238279
ss::gate _gate;
280+
config::binding<std::chrono::milliseconds> _partition_shutdown_timeout;
281+
ss::timer<> _shutdown_watchdog;
239282

240283
// In general, all our background work is in partition objects which
241284
// have their own abort source. This abort source is only for work that
@@ -250,5 +293,7 @@ class partition_manager
250293
state_machine_registry _stm_registry;
251294

252295
friend std::ostream& operator<<(std::ostream&, const partition_manager&);
296+
friend std::ostream& operator<<(
297+
std::ostream&, const partition_manager::partition_shutdown_stage&);
253298
};
254299
} // namespace cluster

src/v/redpanda/application.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1441,7 +1441,11 @@ void application::wire_up_redpanda_services(
14411441
}
14421442
}),
14431443
std::ref(feature_table),
1444-
std::ref(_archival_upload_housekeeping))
1444+
std::ref(_archival_upload_housekeeping),
1445+
ss::sharded_parameter([] {
1446+
return config::shard_local_cfg()
1447+
.partition_manager_shutdown_watchdog_timeout.bind();
1448+
}))
14451449
.get();
14461450
vlog(_log.info, "Partition manager started");
14471451
construct_service(

0 commit comments

Comments
 (0)