19
19
#include " cluster/archival_metadata_stm.h"
20
20
#include " cluster/fwd.h"
21
21
#include " cluster/logger.h"
22
+ #include " cluster/partition.h"
22
23
#include " cluster/partition_recovery_manager.h"
23
24
#include " cluster/types.h"
24
25
#include " config/configuration.h"
38
39
39
40
#include < seastar/core/coroutine.hh>
40
41
#include < seastar/core/io_priority_class.hh>
42
+ #include < seastar/core/lowres_clock.hh>
41
43
#include < seastar/core/reactor.hh>
42
44
#include < seastar/core/shared_ptr.hh>
43
45
#include < seastar/core/smp.hh>
@@ -57,15 +59,17 @@ partition_manager::partition_manager(
57
59
ss::sharded<cloud_storage::cache>& cloud_storage_cache,
58
60
ss::lw_shared_ptr<const archival::configuration> archival_conf,
59
61
ss::sharded<features::feature_table>& feature_table,
60
- ss::sharded<archival::upload_housekeeping_service>& upload_hks)
62
+ ss::sharded<archival::upload_housekeeping_service>& upload_hks,
63
+ config::binding<std::chrono::milliseconds> partition_shutdown_timeout)
61
64
: _storage(storage.local())
62
65
, _raft_manager(raft)
63
66
, _partition_recovery_mgr(recovery_mgr)
64
67
, _cloud_storage_api(cloud_storage_api)
65
68
, _cloud_storage_cache(cloud_storage_cache)
66
69
, _archival_conf(std::move(archival_conf))
67
70
, _feature_table(feature_table)
68
- , _upload_hks(upload_hks) {
71
+ , _upload_hks(upload_hks)
72
+ , _partition_shutdown_timeout(std::move(partition_shutdown_timeout)) {
69
73
_leader_notify_handle
70
74
= _raft_manager.local ().register_leadership_notification (
71
75
[this ](
@@ -80,6 +84,8 @@ partition_manager::partition_manager(
80
84
}
81
85
}
82
86
});
87
+ _shutdown_watchdog.set_callback (
88
+ [this ] { check_partitions_shutdown_state (); });
83
89
}
84
90
85
91
partition_manager::~partition_manager () {
@@ -101,6 +107,11 @@ partition_manager::get_topic_partition_table(
101
107
return rs;
102
108
}
103
109
110
+ ss::future<> partition_manager::start () {
111
+ maybe_arm_shutdown_watchdog ();
112
+ co_return ;
113
+ }
114
+
104
115
ss::future<consensus_ptr> partition_manager::manage (
105
116
storage::ntp_config ntp_cfg,
106
117
raft::group_id group,
@@ -290,11 +301,17 @@ ss::future<> partition_manager::stop_partitions() {
290
301
291
302
ss::future<>
292
303
partition_manager::do_shutdown (ss::lw_shared_ptr<partition> partition) {
304
+ partition_shutdown_state shutdown_state (partition);
305
+ _partitions_shutting_down.push_back (shutdown_state);
306
+
293
307
try {
294
308
auto ntp = partition->ntp ();
309
+ shutdown_state.update (partition_shutdown_stage::stopping_raft);
295
310
co_await _raft_manager.local ().shutdown (partition->raft ());
296
311
_unmanage_watchers.notify (ntp, model::topic_partition_view (ntp.tp ));
312
+ shutdown_state.update (partition_shutdown_stage::stopping_partition);
297
313
co_await partition->stop ();
314
+ shutdown_state.update (partition_shutdown_stage::stopping_storage);
298
315
co_await _storage.log_mgr ().shutdown (partition->ntp ());
299
316
} catch (...) {
300
317
vassert (
@@ -314,33 +331,34 @@ partition_manager::remove(const model::ntp& ntp, partition_removal_mode mode) {
314
331
auto partition = get (ntp);
315
332
316
333
if (!partition) {
317
- return ss::make_exception_future<>( std::invalid_argument (fmt::format (
334
+ throw std::invalid_argument (fmt::format (
318
335
" Can not remove partition. NTP {} is not present in partition "
319
336
" manager" ,
320
- ntp))) ;
337
+ ntp));
321
338
}
339
+ partition_shutdown_state shutdown_state (partition);
340
+ _partitions_shutting_down.push_back (shutdown_state);
322
341
auto group_id = partition->group ();
323
342
324
343
// remove partition from ntp & raft tables
325
344
_ntp_table.erase (ntp);
326
345
_raft_table.erase (group_id);
327
-
328
- return _raft_manager.local ()
329
- .remove (partition->raft ())
330
- .then ([this , ntp] {
331
- _unmanage_watchers.notify (ntp, model::topic_partition_view (ntp.tp ));
332
- })
333
- .then ([partition] { return partition->stop (); })
334
- .then ([partition] { return partition->remove_persistent_state (); })
335
- .then ([this , ntp] { return _storage.log_mgr ().remove (ntp); })
336
- .then ([this , partition, mode] {
337
- if (mode == partition_removal_mode::global) {
338
- return partition->finalize_remote_partition (_as);
339
- } else {
340
- return ss::now ();
341
- }
342
- })
343
- .finally ([partition] {}); // in the end remove partition
346
+ shutdown_state.update (partition_shutdown_stage::stopping_raft);
347
+ co_await _raft_manager.local ().remove (partition->raft ());
348
+ _unmanage_watchers.notify (
349
+ ntp, model::topic_partition_view (partition->ntp ().tp ));
350
+ shutdown_state.update (partition_shutdown_stage::stopping_partition);
351
+ co_await partition->stop ();
352
+ shutdown_state.update (partition_shutdown_stage::removing_persistent_state);
353
+ co_await partition->remove_persistent_state ();
354
+ shutdown_state.update (partition_shutdown_stage::removing_storage);
355
+ co_await _storage.log_mgr ().remove (partition->ntp ());
356
+
357
+ if (mode == partition_removal_mode::global) {
358
+ shutdown_state.update (
359
+ partition_shutdown_stage::finalizing_remote_storage);
360
+ co_await partition->finalize_remote_partition (_as);
361
+ }
344
362
}
345
363
346
364
ss::future<> partition_manager::shutdown (const model::ntp& ntp) {
@@ -354,14 +372,46 @@ ss::future<> partition_manager::shutdown(const model::ntp& ntp) {
354
372
" manager" ,
355
373
ntp)));
356
374
}
357
-
358
375
// remove partition from ntp & raft tables
359
376
_ntp_table.erase (ntp);
360
377
_raft_table.erase (partition->group ());
361
378
362
379
return do_shutdown (partition);
363
380
}
364
381
382
+ partition_manager::partition_shutdown_state::partition_shutdown_state (
383
+ ss::lw_shared_ptr<cluster::partition> p)
384
+ : partition(std::move(p))
385
+ , stage(partition_manager::partition_shutdown_stage::shutdown_requested)
386
+ , last_update_timestamp(ss::lowres_clock::now()) {}
387
+
388
+ void partition_manager::partition_shutdown_state::update (
389
+ partition_shutdown_stage s) {
390
+ stage = s;
391
+ last_update_timestamp = ss::lowres_clock::now ();
392
+ }
393
+
394
+ void partition_manager::check_partitions_shutdown_state () {
395
+ const auto now = ss::lowres_clock::now ();
396
+ for (auto & state : _partitions_shutting_down) {
397
+ if (state.last_update_timestamp < now - _partition_shutdown_timeout ()) {
398
+ clusterlog.error (
399
+ " partition {} shutdown takes longer than expected, current "
400
+ " shutdown stage: {} time since last update: {} seconds" ,
401
+ state.partition ->ntp (),
402
+ state.stage ,
403
+ (now - state.last_update_timestamp ) / 1s);
404
+ }
405
+ }
406
+ maybe_arm_shutdown_watchdog ();
407
+ }
408
+
409
+ void partition_manager::maybe_arm_shutdown_watchdog () {
410
+ if (!_as.abort_requested ()) {
411
+ _shutdown_watchdog.arm (_partition_shutdown_timeout () / 5 );
412
+ }
413
+ }
414
+
365
415
uint64_t partition_manager::upload_backlog_size () const {
366
416
uint64_t size = 0 ;
367
417
for (const auto & [_, partition] : _ntp_table) {
@@ -415,4 +465,26 @@ partition_manager::get_cloud_cache_disk_usage_target() const {
415
465
[](auto acc, auto update) { return acc + update; });
416
466
}
417
467
468
+ std::ostream& operator <<(
469
+ std::ostream& o, const partition_manager::partition_shutdown_stage& stage) {
470
+ switch (stage) {
471
+ case partition_manager::partition_shutdown_stage::shutdown_requested:
472
+ return o << " shutdown_requested" ;
473
+ case partition_manager::partition_shutdown_stage::stopping_raft:
474
+ return o << " stopping_raft" ;
475
+ case partition_manager::partition_shutdown_stage::removing_raft:
476
+ return o << " removing_raft" ;
477
+ case partition_manager::partition_shutdown_stage::stopping_partition:
478
+ return o << " stopping_partition" ;
479
+ case partition_manager::partition_shutdown_stage::removing_persistent_state:
480
+ return o << " removing_persistent_state" ;
481
+ case partition_manager::partition_shutdown_stage::stopping_storage:
482
+ return o << " stopping_storage" ;
483
+ case partition_manager::partition_shutdown_stage::removing_storage:
484
+ return o << " removing_storage" ;
485
+ case partition_manager::partition_shutdown_stage::finalizing_remote_storage:
486
+ return o << " finalizing_remote_storage" ;
487
+ }
488
+ }
489
+
418
490
} // namespace cluster
0 commit comments