Skip to content

Commit a3c28c7

Browse files
committed
ASYNC_RUNTIME should not be borrowed while polling futures
This resolves the panic in nested godot_tasks. All async tasks are also dropped when the bindings are deinitialized.
1 parent 2e7d419 commit a3c28c7

File tree

3 files changed

+104
-28
lines changed

3 files changed

+104
-28
lines changed

godot-core/src/meta/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,7 @@ pub use property_info::{PropertyHintInfo, PropertyInfo};
8989
/// Must not use meta facilities (e.g. `ClassName`) after this call.
9090
pub(crate) unsafe fn cleanup() {
9191
class_name::cleanup();
92+
// drop all pending tasks from async runtime. Futures have to be dropped before deinit.
93+
// Should be moved somewhere more appropriate.
94+
crate::tools::ASYNC_RUNTIME.take();
9295
}

godot-core/src/tools/async_support.rs

+87-27
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use std::thread::{self, ThreadId};
88
use crate::builtin::{Callable, Signal, Variant};
99
use crate::classes::object::ConnectFlags;
1010
use crate::classes::Os;
11-
use crate::godot_error;
1211
use crate::meta::{FromGodot, ToGodot};
1312
use crate::obj::EngineEnum;
1413

@@ -34,10 +33,50 @@ pub fn godot_task(future: impl Future<Output = ()> + 'static) {
3433
waker.wake();
3534
}
3635

37-
thread_local! { static ASYNC_RUNTIME: RefCell<AsyncRuntime> = RefCell::new(AsyncRuntime::new()); }
36+
thread_local! { pub(crate) static ASYNC_RUNTIME: RefCell<AsyncRuntime> = RefCell::new(AsyncRuntime::new()); }
3837

39-
struct AsyncRuntime {
40-
tasks: Vec<Option<Pin<Box<dyn Future<Output = ()>>>>>,
38+
#[derive(Default)]
39+
enum FutureSlot<T> {
40+
#[default]
41+
Empty,
42+
Pending(T),
43+
Polling,
44+
}
45+
46+
impl<T> FutureSlot<T> {
47+
fn is_empty(&self) -> bool {
48+
matches!(self, Self::Empty)
49+
}
50+
51+
fn clear(&mut self) {
52+
*self = Self::Empty;
53+
}
54+
55+
fn take(&mut self) -> Self {
56+
match self {
57+
Self::Empty => Self::Empty,
58+
Self::Pending(_) => std::mem::replace(self, Self::Polling),
59+
Self::Polling => Self::Polling,
60+
}
61+
}
62+
63+
fn park(&mut self, value: T) {
64+
match self {
65+
Self::Empty => {
66+
panic!("Future slot is currently unoccupied, future can not be parked here!");
67+
}
68+
69+
Self::Pending(_) => panic!("Future slot is already occupied by a different future!"),
70+
Self::Polling => {
71+
*self = Self::Pending(value);
72+
}
73+
}
74+
}
75+
}
76+
77+
#[derive(Default)]
78+
pub(crate) struct AsyncRuntime {
79+
tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
4180
}
4281

4382
impl AsyncRuntime {
@@ -52,35 +91,41 @@ impl AsyncRuntime {
5291
.tasks
5392
.iter_mut()
5493
.enumerate()
55-
.find(|(_, slot)| slot.is_none());
94+
.find(|(_, slot)| slot.is_empty());
5695

5796
let boxed = Box::pin(future);
5897

5998
match slot {
6099
Some((index, slot)) => {
61-
*slot = Some(boxed);
100+
*slot = FutureSlot::Pending(boxed);
62101
index
63102
}
64103
None => {
65-
self.tasks.push(Some(boxed));
104+
self.tasks.push(FutureSlot::Pending(boxed));
66105
self.tasks.len() - 1
67106
}
68107
}
69108
}
70109

71-
fn get_task(&mut self, index: usize) -> Option<Pin<&mut (dyn Future<Output = ()> + 'static)>> {
110+
fn get_task(
111+
&mut self,
112+
index: usize,
113+
) -> FutureSlot<Pin<Box<dyn Future<Output = ()> + 'static>>> {
72114
let slot = self.tasks.get_mut(index);
73115

74-
slot.and_then(|inner| inner.as_mut())
75-
.map(|fut| fut.as_mut())
116+
slot.map(|inner| inner.take()).unwrap_or_default()
76117
}
77118

78119
fn clear_task(&mut self, index: usize) {
79120
if index >= self.tasks.len() {
80121
return;
81122
}
82123

83-
self.tasks[0] = None;
124+
self.tasks[0].clear();
125+
}
126+
127+
fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
128+
self.tasks[index].park(future);
84129
}
85130
}
86131

@@ -101,27 +146,36 @@ impl GodotWaker {
101146
impl Wake for GodotWaker {
102147
fn wake(self: std::sync::Arc<Self>) {
103148
let callable = Callable::from_fn("GodotWaker::wake", move |_args| {
149+
let current_thread = thread::current().id();
150+
151+
if self.thread_id != current_thread {
152+
panic!("trying to poll future on a different thread!\nCurrent Thread: {:?}, Future Thread: {:?}", current_thread, self.thread_id);
153+
}
154+
104155
let waker: Waker = self.clone().into();
105156
let mut ctx = Context::from_waker(&waker);
106157

107-
ASYNC_RUNTIME.with_borrow_mut(|rt| {
108-
let current_thread = thread::current().id();
158+
// take future out of the runtime.
159+
let mut future = ASYNC_RUNTIME.with_borrow_mut(|rt| {
160+
match rt.get_task(self.runtime_index) {
161+
FutureSlot::Empty => {
162+
panic!("Future no longer exists when waking it! This is a bug!");
163+
},
109164

110-
if self.thread_id != current_thread {
111-
panic!("trying to poll future on a different thread!\nCurrent Thread: {:?}, Future Thread: {:?}", current_thread, self.thread_id);
165+
FutureSlot::Polling => {
166+
panic!("The same GodotWaker has been called recursively, this is not expected!");
167+
}
168+
169+
FutureSlot::Pending(future) => future
112170
}
171+
});
113172

114-
let Some(future) = rt.get_task(self.runtime_index) else {
115-
godot_error!("Future no longer exists! This is a bug!");
116-
return;
117-
};
173+
let result = future.as_mut().poll(&mut ctx);
118174

119-
// this does currently not support nested tasks.
120-
let result = future.poll(&mut ctx);
121-
match result {
122-
Poll::Pending => (),
123-
Poll::Ready(()) => rt.clear_task(self.runtime_index),
124-
}
175+
// update runtime.
176+
ASYNC_RUNTIME.with_borrow_mut(|rt| match result {
177+
Poll::Pending => rt.park_task(self.runtime_index, future),
178+
Poll::Ready(()) => rt.clear_task(self.runtime_index),
125179
});
126180

127181
Ok(Variant::nil())
@@ -186,11 +240,17 @@ impl<R: FromSignalArgs> Future for SignalFuture<R> {
186240

187241
impl<R: FromSignalArgs> Drop for SignalFuture<R> {
188242
fn drop(&mut self) {
189-
if !self.signal.is_connected(self.callable.clone()) {
243+
if !self.callable.is_valid() {
190244
return;
191245
}
192246

193-
self.signal.disconnect(self.callable.clone());
247+
if self.signal.object().is_none() {
248+
return;
249+
}
250+
251+
if self.signal.is_connected(self.callable.clone()) {
252+
self.signal.disconnect(self.callable.clone());
253+
}
194254
}
195255
}
196256

itest/rust/src/engine_tests/async_test.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,20 @@ fn start_async_task() {
2525
godot_print!("starting godot_task...");
2626
godot_task(async move {
2727
godot_print!("running async task...");
28-
let result = call_async_fn(signal).await;
28+
29+
godot_print!("starting nested task...");
30+
31+
let inner_signal = signal.clone();
32+
godot_task(async move {
33+
godot_print!("inside nested task...");
34+
35+
let _: () = inner_signal.to_future().await;
36+
37+
godot_print!("nested task after await...");
38+
godot_print!("nested task done!");
39+
});
40+
41+
let result = call_async_fn(signal.clone()).await;
2942
godot_print!("got async result...");
3043

3144
assert_eq!(result, 10);

0 commit comments

Comments
 (0)