Skip to content

Commit 9d197dd

Browse files
mrnuggetbennetbo
andauthored
ssh remoting: Fix SSH connection not being closed (#18329)
This fixes the `SshSession` being leaked. There were two leaks: 1. `Arc<SshSession>` itself got leaked into the `SettingsObserver` that lives as long as the application. Fixed with a weak reference. 2. The two tasks spawned by an `SshSession` had a circular dependency and didn't exit while the other one was running. Fixed by fixing (1) and then attaching one of the tasks to the `SshSession`, which means it gets dropped with the session itself, which leads the other task to error and exit. Co-authored-by: Bennet <[email protected]> Release Notes: - N/A --------- Co-authored-by: Bennet <[email protected]>
1 parent 623a6ec commit 9d197dd

File tree

5 files changed

+53
-15
lines changed

5 files changed

+53
-15
lines changed

crates/project/src/project_settings.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -334,17 +334,20 @@ impl SettingsObserver {
334334
.log_err();
335335
}
336336

337+
let weak_client = ssh.downgrade();
337338
cx.observe_global::<SettingsStore>(move |_, cx| {
338339
let new_settings = cx.global::<SettingsStore>().raw_user_settings();
339340
if &settings != new_settings {
340341
settings = new_settings.clone()
341342
}
342343
if let Some(content) = serde_json::to_string(&settings).log_err() {
343-
ssh.send(proto::UpdateUserSettings {
344-
project_id: 0,
345-
content,
346-
})
347-
.log_err();
344+
if let Some(ssh) = weak_client.upgrade() {
345+
ssh.send(proto::UpdateUserSettings {
346+
project_id: 0,
347+
content,
348+
})
349+
.log_err();
350+
}
348351
}
349352
})
350353
.detach();

crates/recent_projects/src/recent_projects.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ impl PickerDelegate for RecentProjectsDelegate {
509509
.color(Color::Muted)
510510
.into_any_element()
511511
}
512-
SerializedWorkspaceLocation::Ssh(_) => Icon::new(IconName::Screen)
512+
SerializedWorkspaceLocation::Ssh(_) => Icon::new(IconName::Server)
513513
.color(Color::Muted)
514514
.into_any_element(),
515515
SerializedWorkspaceLocation::DevServer(_) => {

crates/remote/src/ssh_session.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use futures::{
1111
future::BoxFuture,
1212
select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, StreamExt as _,
1313
};
14-
use gpui::{AppContext, AsyncAppContext, Model, SemanticVersion};
14+
use gpui::{AppContext, AsyncAppContext, Model, SemanticVersion, Task};
1515
use parking_lot::Mutex;
1616
use rpc::{
1717
proto::{self, build_typed_envelope, Envelope, EnvelopedMessage, PeerId, RequestMessage},
@@ -51,6 +51,7 @@ pub struct SshSession {
5151
spawn_process_tx: mpsc::UnboundedSender<SpawnRequest>,
5252
client_socket: Option<SshSocket>,
5353
state: Mutex<ProtoMessageHandlerSet>, // Lock
54+
_io_task: Option<Task<Result<()>>>,
5455
}
5556

5657
struct SshClientState {
@@ -173,8 +174,7 @@ impl SshSession {
173174
let mut child_stdout = remote_server_child.stdout.take().unwrap();
174175
let mut child_stdin = remote_server_child.stdin.take().unwrap();
175176

176-
let executor = cx.background_executor().clone();
177-
executor.clone().spawn(async move {
177+
let io_task = cx.background_executor().spawn(async move {
178178
let mut stdin_buffer = Vec::new();
179179
let mut stdout_buffer = Vec::new();
180180
let mut stderr_buffer = Vec::new();
@@ -264,9 +264,18 @@ impl SshSession {
264264
}
265265
}
266266
}
267-
}).detach();
267+
});
268268

269-
cx.update(|cx| Self::new(incoming_rx, outgoing_tx, spawn_process_tx, Some(socket), cx))
269+
cx.update(|cx| {
270+
Self::new(
271+
incoming_rx,
272+
outgoing_tx,
273+
spawn_process_tx,
274+
Some(socket),
275+
Some(io_task),
276+
cx,
277+
)
278+
})
270279
}
271280

272281
pub fn server(
@@ -275,7 +284,7 @@ impl SshSession {
275284
cx: &AppContext,
276285
) -> Arc<SshSession> {
277286
let (tx, _rx) = mpsc::unbounded();
278-
Self::new(incoming_rx, outgoing_tx, tx, None, cx)
287+
Self::new(incoming_rx, outgoing_tx, tx, None, None, cx)
279288
}
280289

281290
#[cfg(any(test, feature = "test-support"))]
@@ -293,6 +302,7 @@ impl SshSession {
293302
client_to_server_tx,
294303
tx.clone(),
295304
None, // todo()
305+
None,
296306
cx,
297307
)
298308
}),
@@ -302,6 +312,7 @@ impl SshSession {
302312
server_to_client_tx,
303313
tx.clone(),
304314
None,
315+
None,
305316
cx,
306317
)
307318
}),
@@ -313,6 +324,7 @@ impl SshSession {
313324
outgoing_tx: mpsc::UnboundedSender<Envelope>,
314325
spawn_process_tx: mpsc::UnboundedSender<SpawnRequest>,
315326
client_socket: Option<SshSocket>,
327+
io_task: Option<Task<Result<()>>>,
316328
cx: &AppContext,
317329
) -> Arc<SshSession> {
318330
let this = Arc::new(Self {
@@ -322,13 +334,18 @@ impl SshSession {
322334
spawn_process_tx,
323335
client_socket,
324336
state: Default::default(),
337+
_io_task: io_task,
325338
});
326339

327340
cx.spawn(|cx| {
328-
let this = this.clone();
341+
let this = Arc::downgrade(&this);
329342
async move {
330343
let peer_id = PeerId { owner_id: 0, id: 0 };
331344
while let Some(incoming) = incoming_rx.next().await {
345+
let Some(this) = this.upgrade() else {
346+
return anyhow::Ok(());
347+
};
348+
332349
if let Some(request_id) = incoming.responding_to {
333350
let request_id = MessageId(request_id);
334351
let sender = this.response_channels.lock().remove(&request_id);

crates/rpc/src/proto_client.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,29 @@ use proto::{
1010
error::ErrorExt as _, AnyTypedEnvelope, EntityMessage, Envelope, EnvelopedMessage,
1111
RequestMessage, TypedEnvelope,
1212
};
13-
use std::{any::TypeId, sync::Arc};
13+
use std::{
14+
any::TypeId,
15+
sync::{Arc, Weak},
16+
};
1417

1518
#[derive(Clone)]
1619
pub struct AnyProtoClient(Arc<dyn ProtoClient>);
1720

21+
impl AnyProtoClient {
22+
pub fn downgrade(&self) -> AnyWeakProtoClient {
23+
AnyWeakProtoClient(Arc::downgrade(&self.0))
24+
}
25+
}
26+
27+
#[derive(Clone)]
28+
pub struct AnyWeakProtoClient(Weak<dyn ProtoClient>);
29+
30+
impl AnyWeakProtoClient {
31+
pub fn upgrade(&self) -> Option<AnyProtoClient> {
32+
self.0.upgrade().map(AnyProtoClient)
33+
}
34+
}
35+
1836
pub trait ProtoClient: Send + Sync {
1937
fn request(
2038
&self,

crates/worktree/src/worktree.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ impl Worktree {
472472
disconnected: false,
473473
};
474474

475-
// Apply updates to a separate snapshto in a background task, then
475+
// Apply updates to a separate snapshot in a background task, then
476476
// send them to a foreground task which updates the model.
477477
cx.background_executor()
478478
.spawn(async move {

0 commit comments

Comments
 (0)