Skip to content

Commit 993079d

Browse files
committed
Async Runtime
1 parent daf1d62 commit 993079d

File tree

3 files changed

+321
-0
lines changed

3 files changed

+321
-0
lines changed
+313
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
/*
2+
* Copyright (c) godot-rust; Bromeon and contributors.
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
*/
7+
8+
use std::cell::RefCell;
9+
use std::future::Future;
10+
use std::marker::PhantomData;
11+
use std::pin::Pin;
12+
use std::sync::Arc;
13+
use std::task::{Context, Poll, Wake, Waker};
14+
use std::thread::{self, ThreadId};
15+
16+
use crate::builtin::{Callable, Variant};
17+
use crate::meta::ToGodot;
18+
19+
// ----------------------------------------------------------------------------------------------------------------------------------------------
20+
// Public interface
21+
22+
pub fn godot_task(future: impl Future<Output = ()> + 'static) -> TaskHandle {
23+
// Spawning new tasks is only allowed on the main thread for now.
24+
// We can not accept Sync + Send futures since all object references (i.e. Gd<T>) are not thread-safe. So a future has to remain on the
25+
// same thread it was created on. Godots signals on the other hand can be emitted on any thread, so it can't be guaranteed on which thread
26+
// a future will be polled.
27+
// By limiting async tasks to the main thread we can redirect all signal callbacks back to the main thread via `call_deferred`.
28+
//
29+
// Once thread-safe futures are possible the restriction can be lifted.
30+
assert!(
31+
!crate::init::is_main_thread(),
32+
"godot_task can only be used on the main thread!"
33+
);
34+
35+
let (task_handle, waker) = ASYNC_RUNTIME.with_borrow_mut(move |rt| {
36+
let task_handle = rt.add_task(Box::pin(future));
37+
let godot_waker = Arc::new(GodotWaker::new(
38+
task_handle.index,
39+
task_handle.id,
40+
thread::current().id(),
41+
));
42+
43+
(task_handle, Waker::from(godot_waker))
44+
});
45+
46+
waker.wake();
47+
task_handle
48+
}
49+
50+
// ----------------------------------------------------------------------------------------------------------------------------------------------
51+
// Async Runtime
52+
53+
thread_local! {
54+
pub(crate) static ASYNC_RUNTIME: RefCell<AsyncRuntime> = RefCell::new(AsyncRuntime::new());
55+
}
56+
57+
#[derive(Default)]
58+
enum FutureSlotState<T> {
59+
/// Slot is currently empty.
60+
#[default]
61+
Empty,
62+
/// Slot was previously occupied but the future has been canceled or the slot reused.
63+
Gone,
64+
/// Slot contains a pending future.
65+
Pending(T),
66+
/// Slot contains a future which is currently being polled.
67+
Polling,
68+
}
69+
70+
struct FutureSlot<T> {
71+
value: FutureSlotState<T>,
72+
id: u64,
73+
}
74+
75+
impl<T> FutureSlot<T> {
76+
fn pending(id: u64, value: T) -> Self {
77+
Self {
78+
value: FutureSlotState::Pending(value),
79+
id,
80+
}
81+
}
82+
83+
fn is_empty(&self) -> bool {
84+
matches!(self.value, FutureSlotState::Empty | FutureSlotState::Gone)
85+
}
86+
87+
fn clear(&mut self) {
88+
self.value = FutureSlotState::Empty;
89+
}
90+
91+
fn cancel(&mut self) {
92+
self.value = FutureSlotState::Gone;
93+
}
94+
95+
fn take(&mut self, id: u64) -> FutureSlotState<T> {
96+
match self.value {
97+
FutureSlotState::Empty => FutureSlotState::Empty,
98+
FutureSlotState::Polling => FutureSlotState::Polling,
99+
FutureSlotState::Gone => FutureSlotState::Gone,
100+
FutureSlotState::Pending(_) if self.id != id => FutureSlotState::Gone,
101+
FutureSlotState::Pending(_) => {
102+
std::mem::replace(&mut self.value, FutureSlotState::Polling)
103+
}
104+
}
105+
}
106+
107+
fn park(&mut self, value: T) {
108+
match self.value {
109+
FutureSlotState::Empty | FutureSlotState::Gone => {
110+
panic!("Future slot is currently unoccupied, future can not be parked here!");
111+
}
112+
FutureSlotState::Pending(_) => {
113+
panic!("Future slot is already occupied by a different future!")
114+
}
115+
FutureSlotState::Polling => {
116+
self.value = FutureSlotState::Pending(value);
117+
}
118+
}
119+
}
120+
}
121+
122+
pub struct TaskHandle {
123+
index: usize,
124+
id: u64,
125+
_pd: PhantomData<*const ()>,
126+
}
127+
128+
impl TaskHandle {
129+
fn new(index: usize, id: u64) -> Self {
130+
Self {
131+
index,
132+
id,
133+
_pd: PhantomData,
134+
}
135+
}
136+
137+
pub fn cancel(self) {
138+
ASYNC_RUNTIME.with_borrow_mut(|rt| {
139+
let Some(task) = rt.tasks.get(self.index) else {
140+
return;
141+
};
142+
143+
let alive = match task.value {
144+
FutureSlotState::Empty | FutureSlotState::Gone => false,
145+
FutureSlotState::Pending(_) => task.id == self.id,
146+
FutureSlotState::Polling => panic!("Can not cancel future from inside it!"),
147+
};
148+
149+
if !alive {
150+
return;
151+
}
152+
153+
rt.cancel_task(self.index);
154+
})
155+
}
156+
157+
pub fn is_pending(&self) -> bool {
158+
ASYNC_RUNTIME.with_borrow(|rt| {
159+
let slot = rt.tasks.get(self.index).expect("Slot at index must exist!");
160+
161+
if slot.id != self.id {
162+
return false;
163+
}
164+
165+
matches!(
166+
slot.value,
167+
FutureSlotState::Pending(_) | FutureSlotState::Polling
168+
)
169+
})
170+
}
171+
}
172+
173+
#[derive(Default)]
174+
pub(crate) struct AsyncRuntime {
175+
tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
176+
task_counter: u64,
177+
}
178+
179+
impl AsyncRuntime {
180+
fn new() -> Self {
181+
Self {
182+
tasks: Vec::with_capacity(10),
183+
task_counter: 0,
184+
}
185+
}
186+
187+
fn next_id(&mut self) -> u64 {
188+
let id = self.task_counter;
189+
self.task_counter += 1;
190+
id
191+
}
192+
193+
fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> TaskHandle {
194+
let id = self.next_id();
195+
let index_slot = self
196+
.tasks
197+
.iter_mut()
198+
.enumerate()
199+
.find(|(_, slot)| slot.is_empty());
200+
201+
let boxed = Box::pin(future);
202+
203+
let index = match index_slot {
204+
Some((index, slot)) => {
205+
*slot = FutureSlot::pending(id, boxed);
206+
index
207+
}
208+
None => {
209+
self.tasks.push(FutureSlot::pending(id, boxed));
210+
self.tasks.len() - 1
211+
}
212+
};
213+
214+
TaskHandle::new(index, id)
215+
}
216+
217+
fn get_task(
218+
&mut self,
219+
index: usize,
220+
id: u64,
221+
) -> FutureSlotState<Pin<Box<dyn Future<Output = ()> + 'static>>> {
222+
let slot = self.tasks.get_mut(index);
223+
224+
slot.map(|inner| inner.take(id)).unwrap_or_default()
225+
}
226+
227+
fn clear_task(&mut self, index: usize) {
228+
self.tasks[index].clear();
229+
}
230+
231+
fn cancel_task(&mut self, index: usize) {
232+
self.tasks[index].cancel();
233+
}
234+
235+
fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
236+
self.tasks[index].park(future);
237+
}
238+
}
239+
240+
struct GodotWaker {
241+
runtime_index: usize,
242+
task_id: u64,
243+
thread_id: ThreadId,
244+
}
245+
246+
impl GodotWaker {
247+
fn new(index: usize, task_id: u64, thread_id: ThreadId) -> Self {
248+
Self {
249+
runtime_index: index,
250+
thread_id,
251+
task_id,
252+
}
253+
}
254+
}
255+
256+
impl Wake for GodotWaker {
257+
fn wake(self: std::sync::Arc<Self>) {
258+
let callable = Callable::from_local_fn("GodotWaker::wake", move |_args| {
259+
let current_thread = thread::current().id();
260+
261+
assert_ne!(
262+
self.thread_id,
263+
current_thread,
264+
"trying to poll future on a different thread!\n Current thread: {:?}\n Future thread: {:?}",
265+
current_thread,
266+
self.thread_id,
267+
);
268+
269+
let waker = Waker::from(self.clone());
270+
let mut ctx = Context::from_waker(&waker);
271+
272+
// Take future out of the runtime.
273+
let future = ASYNC_RUNTIME.with_borrow_mut(|rt| {
274+
match rt.get_task(self.runtime_index, self.task_id) {
275+
FutureSlotState::Empty => {
276+
panic!("Future no longer exists when waking it! This is a bug!");
277+
},
278+
279+
FutureSlotState::Gone => {
280+
None
281+
}
282+
283+
FutureSlotState::Polling => {
284+
panic!("The same GodotWaker has been called recursively, this is not expected!");
285+
}
286+
287+
FutureSlotState::Pending(future) => Some(future),
288+
}
289+
});
290+
291+
let Some(mut future) = future else {
292+
// Future has been canceled while the waker was already triggered.
293+
return Ok(Variant::nil());
294+
};
295+
296+
let result = future.as_mut().poll(&mut ctx);
297+
298+
// Update the state of the Future in the runtime.
299+
ASYNC_RUNTIME.with_borrow_mut(|rt| match result {
300+
// Future is still pending, so we park it again.
301+
Poll::Pending => rt.park_task(self.runtime_index, future),
302+
303+
// Future has resolved, so we remove it from the runtime.
304+
Poll::Ready(()) => rt.clear_task(self.runtime_index),
305+
});
306+
307+
Ok(Variant::nil())
308+
});
309+
310+
// Schedule waker to poll the Future at the end of the frame.
311+
callable.to_variant().call("call_deferred", &[]);
312+
}
313+
}

godot-core/src/builtin/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pub mod __prelude_reexport {
155155
use super::*;
156156

157157
pub use aabb::*;
158+
#[cfg(since_api = "4.2")]
159+
pub use async_runtime::*;
158160
pub use basis::*;
159161
pub use callable::*;
160162
pub use collections::containers::*;
@@ -203,6 +205,8 @@ mod macros;
203205

204206
// Other modules
205207
mod aabb;
208+
#[cfg(since_api = "4.2")]
209+
mod async_runtime;
206210
mod basis;
207211
mod callable;
208212
mod collections;

godot-core/src/meta/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,8 @@ pub use property_info::{PropertyHintInfo, PropertyInfo};
8989
/// Must not use meta facilities (e.g. `ClassName`) after this call.
9090
pub(crate) unsafe fn cleanup() {
9191
class_name::cleanup();
92+
// drop all pending tasks from async runtime. Futures have to be dropped before deinit.
93+
// Should be moved somewhere more appropriate.
94+
#[cfg(since_api = "4.2")]
95+
crate::builtin::ASYNC_RUNTIME.take();
9296
}

0 commit comments

Comments
 (0)