Skip to content

Commit 619fd6f

Browse files
authored
refactor: use Weak ref to break cyclic references (#17743)
MetaServiceImpl is not dropped when there is an alive connection. Thus it can not hold a Strong reference to MetaNode. Otherwise when stopping, `MetaNode` is not dropped. This might prevent releasing other resources held by MetaNode.
1 parent c038715 commit 619fd6f

File tree

9 files changed

+169
-30
lines changed

9 files changed

+169
-30
lines changed

src/meta/client/src/grpc_client.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,8 @@ impl MetaGrpcClient {
988988

989989
let mut client = self.get_established_client().await?;
990990

991+
// Since 1.2.677, initial_flush is added to WatchRequest.
992+
// If the server is not upto date to support this feature, return an error.
991993
if watch_request.initial_flush {
992994
let server_version = client.server_protocol_version();
993995
let least_server_version = 1002677;

src/meta/raft-store/src/applier.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ where SM: StateMachineApi + 'static
128128
// Send queued change events to subscriber
129129
if let Some(sender) = self.sm.event_sender() {
130130
for event in self.changes.drain(..) {
131+
debug!("send to EventSender: {:?}", event);
131132
sender.send(event);
132133
}
133134
}

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 80 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::io;
1616
use std::pin::Pin;
1717
use std::sync::Arc;
18+
use std::sync::Weak;
1819

1920
use arrow_flight::BasicAuth;
2021
use databend_common_base::base::tokio::sync::mpsc;
@@ -74,9 +75,11 @@ use watcher::key_range::build_key_range;
7475
use watcher::util::new_watch_sink;
7576
use watcher::util::try_forward;
7677
use watcher::watch_stream::WatchStream;
78+
use watcher::watch_stream::WatchStreamSender;
7779

7880
use crate::message::ForwardRequest;
7981
use crate::message::ForwardRequestBody;
82+
use crate::meta_service::watcher::DispatcherHandle;
8083
use crate::meta_service::watcher::WatchTypes;
8184
use crate::meta_service::MetaNode;
8285
use crate::metrics::network_metrics;
@@ -88,17 +91,36 @@ use crate::version::MIN_METACLI_SEMVER;
8891

8992
pub struct MetaServiceImpl {
9093
token: GrpcToken,
91-
pub(crate) meta_node: Arc<MetaNode>,
94+
/// MetaServiceImpl is not dropped if there is an alive connection.
95+
///
96+
/// Thus make the reference to [`MetaNode`] a Weak reference so that it does not prevent [`MetaNode`] to be dropped
97+
pub(crate) meta_node: Weak<MetaNode>,
98+
}
99+
100+
impl Drop for MetaServiceImpl {
101+
fn drop(&mut self) {
102+
if let Some(meta_node) = self.meta_node.upgrade() {
103+
debug!("MetaServiceImpl::drop: id={}", meta_node.raft_store.id);
104+
} else {
105+
debug!("MetaServiceImpl::drop: inner MetaNode already dropped");
106+
}
107+
}
92108
}
93109

94110
impl MetaServiceImpl {
95-
pub fn create(meta_node: Arc<MetaNode>) -> Self {
111+
pub fn create(meta_node: Weak<MetaNode>) -> Self {
96112
Self {
97113
token: GrpcToken::create(),
98114
meta_node,
99115
}
100116
}
101117

118+
pub fn try_get_meta_node(&self) -> Result<Arc<MetaNode>, Status> {
119+
self.meta_node.upgrade().ok_or_else(|| {
120+
Status::internal("MetaNode is already dropped, can not serve new requests")
121+
})
122+
}
123+
102124
fn check_token(&self, metadata: &MetadataMap) -> Result<GrpcClaim, Status> {
103125
let token = metadata
104126
.get_bin("auth-token-bin")
@@ -117,7 +139,7 @@ impl MetaServiceImpl {
117139
let req: MetaGrpcReq = request.try_into()?;
118140
debug!("{}: Received MetaGrpcReq: {:?}", func_name!(), req);
119141

120-
let m = &self.meta_node;
142+
let m = self.try_get_meta_node()?;
121143
let reply = match &req {
122144
MetaGrpcReq::UpsertKV(a) => {
123145
let res = m
@@ -144,8 +166,9 @@ impl MetaServiceImpl {
144166

145167
let req = ForwardRequest::new(1, req);
146168

147-
let res = self
148-
.meta_node
169+
let meta_node = self.try_get_meta_node()?;
170+
171+
let res = meta_node
149172
.handle_forwardable_request::<MetaGrpcReadReq>(req.clone())
150173
.log_elapsed_info(format!("ReadRequest: {:?}", req))
151174
.await
@@ -168,8 +191,9 @@ impl MetaServiceImpl {
168191

169192
let forward_req = ForwardRequest::new(1, ForwardRequestBody::Write(ent));
170193

171-
let forward_res = self
172-
.meta_node
194+
let meta_node = self.try_get_meta_node()?;
195+
196+
let forward_res = meta_node
173197
.handle_forwardable_request(forward_req)
174198
.log_elapsed_info(format!("TxnRequest: {}", txn))
175199
.await;
@@ -342,7 +366,8 @@ impl MetaService for MetaServiceImpl {
342366
) -> Result<Response<Self::ExportStream>, Status> {
343367
let _guard = RequestInFlight::guard();
344368

345-
let meta_node = &self.meta_node;
369+
let meta_node = self.try_get_meta_node()?;
370+
346371
let strm = meta_node.raft_store.inner().export();
347372

348373
let chunk_size = 32;
@@ -370,7 +395,8 @@ impl MetaService for MetaServiceImpl {
370395
) -> Result<Response<Self::ExportV1Stream>, Status> {
371396
let _guard = RequestInFlight::guard();
372397

373-
let meta_node = &self.meta_node;
398+
let meta_node = self.try_get_meta_node()?;
399+
374400
let strm = meta_node.raft_store.inner().export();
375401

376402
let chunk_size = request.get_ref().chunk_size.unwrap_or(32) as usize;
@@ -400,18 +426,13 @@ impl MetaService for MetaServiceImpl {
400426

401427
let (tx, rx) = mpsc::channel(4);
402428

403-
let mn = &self.meta_node;
429+
let mn = self.try_get_meta_node()?;
404430

405-
let sender = mn.add_watcher(watch, tx.clone()).await?;
431+
let weak_sender = mn.add_watcher(watch, tx.clone()).await?;
406432

407-
let dispatcher_handle = mn.dispatcher_handle.clone();
433+
let weak_handle = Arc::downgrade(&mn.dispatcher_handle);
408434
let on_drop = move || {
409-
let Some(sender) = sender.upgrade() else {
410-
return;
411-
};
412-
dispatcher_handle.request(move |dispatcher| {
413-
dispatcher.remove_watcher(sender);
414-
})
435+
try_remove_sender(weak_sender, weak_handle, "on-drop-WatchStream");
415436
};
416437

417438
let stream = WatchStream::new(rx, Box::new(on_drop));
@@ -453,7 +474,8 @@ impl MetaService for MetaServiceImpl {
453474

454475
let _guard = RequestInFlight::guard();
455476

456-
let meta_node = &self.meta_node;
477+
let meta_node = self.try_get_meta_node()?;
478+
457479
let members = meta_node.get_grpc_advertise_addrs().await;
458480

459481
let resp = MemberListReply { data: members };
@@ -467,8 +489,10 @@ impl MetaService for MetaServiceImpl {
467489
_request: Request<Empty>,
468490
) -> Result<Response<ClusterStatus>, Status> {
469491
let _guard = RequestInFlight::guard();
470-
let status = self
471-
.meta_node
492+
493+
let meta_node = self.try_get_meta_node()?;
494+
495+
let status = meta_node
472496
.get_status()
473497
.await
474498
.map_err(|e| Status::internal(format!("get meta node status failed: {}", e)))?;
@@ -543,3 +567,38 @@ fn thread_tracking_guard<T>(req: &tonic::Request<T>) -> Option<TrackingGuard> {
543567

544568
None
545569
}
570+
571+
/// Try to remove a [`WatchStream`] from the subscriber.
572+
///
573+
/// This function receives two weak references: one to the sender and one to the dispatcher handle.
574+
/// Using weak references prevents memory leaks by avoiding cyclic references when these are captured in closures.
575+
/// If either reference can't be upgraded, the function logs the situation and returns early.
576+
fn try_remove_sender(
577+
weak_sender: Weak<WatchStreamSender<WatchTypes>>,
578+
weak_handle: Weak<DispatcherHandle>,
579+
ctx: &str,
580+
) {
581+
debug!("{ctx}: try removing: {:?}", weak_sender);
582+
583+
let Some(d) = weak_handle.upgrade() else {
584+
debug!(
585+
"{ctx}: when try removing {:?}; dispatcher is already dropped",
586+
weak_sender
587+
);
588+
return;
589+
};
590+
591+
let Some(sender) = weak_sender.upgrade() else {
592+
debug!(
593+
"on_drop is called for WatchStream {:?}; but sender is already dropped",
594+
weak_sender
595+
);
596+
return;
597+
};
598+
599+
debug!("{ctx}: request is sent to remove watcher: {}", sender);
600+
601+
d.request(move |dispatcher| {
602+
dispatcher.remove_watcher(sender);
603+
})
604+
}

src/meta/service/src/api/grpc_server.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use databend_common_meta_types::protobuf::FILE_DESCRIPTOR_SET;
2626
use databend_common_meta_types::GrpcConfig;
2727
use databend_common_meta_types::MetaNetworkError;
2828
use fastrace::prelude::*;
29+
use futures::future::select;
30+
use futures::future::Either;
2931
use log::info;
3032
use tonic::transport::Identity;
3133
use tonic::transport::Server;
@@ -34,14 +36,24 @@ use tonic::transport::ServerTlsConfig;
3436
use crate::api::grpc::grpc_service::MetaServiceImpl;
3537
use crate::configs::Config;
3638
use crate::meta_service::MetaNode;
39+
use crate::util::DropDebug;
3740

3841
pub struct GrpcServer {
3942
conf: Config,
43+
/// GrpcServer is the main container of the gRPC service.
44+
/// [`MetaNode`] should never be dropped while [`GrpcServer`] is alive.
45+
/// Therefore, it is held by a strong reference (Arc) to ensure proper lifetime management.
4046
pub(crate) meta_node: Arc<MetaNode>,
4147
join_handle: Option<JoinHandle<()>>,
4248
stop_grpc_tx: Option<Sender<()>>,
4349
}
4450

51+
impl Drop for GrpcServer {
52+
fn drop(&mut self) {
53+
info!("GrpcServer::drop: id={}", self.conf.raft_config.id);
54+
}
55+
}
56+
4557
impl GrpcServer {
4658
pub fn create(conf: Config, meta_node: Arc<MetaNode>) -> Self {
4759
Self {
@@ -88,13 +100,17 @@ impl GrpcServer {
88100

89101
info!("start gRPC listening: {}", addr);
90102

91-
let grpc_impl = MetaServiceImpl::create(meta_node.clone());
103+
let grpc_impl = MetaServiceImpl::create(Arc::downgrade(&meta_node));
92104
let grpc_srv = MetaServiceServer::new(grpc_impl)
93105
.max_decoding_message_size(GrpcConfig::MAX_DECODING_SIZE)
94106
.max_encoding_message_size(GrpcConfig::MAX_ENCODING_SIZE);
95107

108+
let id = conf.raft_config.id;
109+
96110
let j = databend_common_base::runtime::spawn(
97111
async move {
112+
let _d = DropDebug::new(format!("GrpcServer(id={}) spawned service task", id));
113+
98114
let res = builder
99115
.add_service(reflect_srv)
100116
.add_service(grpc_srv)
@@ -128,20 +144,38 @@ impl GrpcServer {
128144
}
129145

130146
async fn do_stop(&mut self, _force: Option<tokio::sync::broadcast::Receiver<()>>) {
147+
let id = self.meta_node.raft_store.id;
148+
let ctx = format!("gRPC-task(id={id})");
149+
131150
if let Some(stop_grpc_tx) = self.stop_grpc_tx.take() {
132-
info!("Sending stop signal to gRPC server");
151+
info!("Sending stop signal to {ctx}");
133152
let _ = stop_grpc_tx.send(());
134153
}
135154

136-
if let Some(j) = self.join_handle.take() {
137-
info!("Waiting for gRPC server stop");
138-
let x = tokio::time::timeout(Duration::from_millis(1_000), j).await;
139-
info!("Done: waiting for grpc stop: res: {:?}", x);
155+
if let Some(jh) = self.join_handle.take() {
156+
info!("Waiting for {ctx} to stop");
157+
158+
let timeout = Duration::from_millis(1_000);
159+
let sleep = tokio::time::sleep(timeout);
160+
161+
let slp = std::pin::pin!(sleep);
162+
163+
match select(slp, jh).await {
164+
Either::Left((_v1, jh)) => {
165+
info!("{ctx} stop timeout after {:?}; force abort", timeout);
166+
jh.abort();
167+
jh.await.ok();
168+
info!("Done: waiting for {ctx} force stop");
169+
}
170+
Either::Right((_v2, _slp)) => {
171+
info!("Done: waiting for {ctx} normal stop");
172+
}
173+
}
140174
}
141175

142-
info!("Waiting for meta_node stop");
176+
info!("Waiting for meta_node(id={id}) to stop");
143177
let x = tokio::time::timeout(Duration::from_millis(1_000), self.meta_node.stop()).await;
144-
info!("Done: waiting for meta_node stop: res: {:?}", x);
178+
info!("Done: waiting for meta_node(id={id}) stop: res: {:?}", x);
145179
}
146180

147181
async fn tls_config(conf: &Config) -> Result<Option<ServerTlsConfig>, std::io::Error> {

src/meta/service/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,5 @@ pub mod raft_client;
2626
pub(crate) mod request_handling;
2727
pub mod store;
2828
pub mod version;
29+
30+
pub mod util;

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ pub type MetaRaft = Raft<TypeConfig>;
111111
/// MetaNode is the container of metadata related components and threads, such as storage, the raft node and a raft-state monitor.
112112
pub struct MetaNode {
113113
pub raft_store: RaftStore,
114+
/// MetaNode hold a strong reference to the dispatcher handle.
115+
///
116+
/// Other components should keep a weak one.
114117
pub dispatcher_handle: Arc<DispatcherHandle>,
115118
pub raft: MetaRaft,
116119
pub running_tx: watch::Sender<()>,

src/meta/service/src/util.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use log::debug;
16+
17+
/// A struct that implements the Drop trait to log a message when it is dropped.
18+
///
19+
/// It can be used to track the lifetime. For example, use it a struct field of as a local variable of a closure.
20+
pub struct DropDebug {
21+
message: String,
22+
}
23+
24+
impl Drop for DropDebug {
25+
fn drop(&mut self) {
26+
debug!("DropDebug: {}", self.message);
27+
}
28+
}
29+
30+
impl DropDebug {
31+
pub fn new(m: impl ToString) -> Self {
32+
Self {
33+
message: m.to_string(),
34+
}
35+
}
36+
}

src/meta/service/tests/it/grpc/metasrv_grpc_kv_api_restart_cluster.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ async fn test_kv_api_restart_cluster_token_expired() -> anyhow::Result<()> {
190190
stopped_tcs
191191
};
192192

193+
sleep(Duration::from_secs(1)).await;
194+
193195
info!("--- restart the cluster");
194196
let tcs = {
195197
let mut tcs = vec![];

src/meta/service/tests/it/tests/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ pub fn make_grpc_client(addresses: Vec<String>) -> Result<Arc<ClientHandle>, Met
9696
addresses,
9797
"root",
9898
"xxx",
99-
None,
99+
Some(Duration::from_secs(2)), // timeout
100100
Some(Duration::from_secs(10)),
101101
None,
102102
)?;

0 commit comments

Comments
 (0)