Skip to content

Commit 729c710

Browse files
committed
Async Runtime
1 parent aa826e2 commit 729c710

File tree

3 files changed

+316
-0
lines changed

3 files changed

+316
-0
lines changed
+308
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
/*
2+
* Copyright (c) godot-rust; Bromeon and contributors.
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
*/
7+
8+
use std::cell::RefCell;
9+
use std::future::Future;
10+
use std::marker::PhantomData;
11+
use std::pin::Pin;
12+
use std::sync::Arc;
13+
use std::task::{Context, Poll, Wake, Waker};
14+
use std::thread::{self, ThreadId};
15+
16+
use crate::builtin::{Callable, Variant};
17+
use crate::classes::Os;
18+
use crate::meta::ToGodot;
19+
20+
// ----------------------------------------------------------------------------------------------------------------------------------------------
21+
// Public interface
22+
23+
pub fn godot_task(future: impl Future<Output = ()> + 'static) -> TaskHandle {
24+
let os = Os::singleton();
25+
26+
// Spawning new tasks is only allowed on the main thread for now.
27+
// We can not accept Sync + Send futures since all object references (i.e. Gd<T>) are not thread-safe. So a future has to remain on the
28+
// same thread it was created on. Godots signals on the other hand can be emitted on any thread, so it can't be guaranteed on which thread
29+
// a future will be polled.
30+
// By limiting async tasks to the main thread we can redirect all signal callbacks back to the main thread via `call_deferred`.
31+
//
32+
// Once thread-safe futures are possible the restriction can be lifted.
33+
if os.get_thread_caller_id() != os.get_main_thread_id() {
34+
panic!("godot_task can only be used on the main thread!");
35+
}
36+
37+
let (task_handle, waker) = ASYNC_RUNTIME.with_borrow_mut(move |rt| {
38+
let task_handle = rt.add_task(Box::pin(future));
39+
let godot_waker = Arc::new(GodotWaker::new(
40+
task_handle.index,
41+
task_handle.id,
42+
thread::current().id(),
43+
));
44+
45+
(task_handle, Waker::from(godot_waker))
46+
});
47+
48+
waker.wake();
49+
task_handle
50+
}
51+
52+
// ----------------------------------------------------------------------------------------------------------------------------------------------
53+
// Async Runtime
54+
55+
thread_local! {
56+
pub(crate) static ASYNC_RUNTIME: RefCell<AsyncRuntime> = RefCell::new(AsyncRuntime::new());
57+
}
58+
59+
#[derive(Default)]
60+
enum FutureSlotState<T> {
61+
/// Slot is currently empty.
62+
#[default]
63+
Empty,
64+
/// Slot was previously occupied but the future has been canceled or the slot reused.
65+
Gone,
66+
/// Slot contains a pending future.
67+
Pending(T),
68+
/// Slot contains a future which is currently being polled.
69+
Polling,
70+
}
71+
72+
struct FutureSlot<T> {
73+
value: FutureSlotState<T>,
74+
id: u64,
75+
}
76+
77+
impl<T> FutureSlot<T> {
78+
fn pending(id: u64, value: T) -> Self {
79+
Self {
80+
value: FutureSlotState::Pending(value),
81+
id,
82+
}
83+
}
84+
85+
fn is_empty(&self) -> bool {
86+
matches!(self.value, FutureSlotState::Empty | FutureSlotState::Gone)
87+
}
88+
89+
fn clear(&mut self) {
90+
self.value = FutureSlotState::Empty;
91+
}
92+
93+
fn cancel(&mut self) {
94+
self.value = FutureSlotState::Gone;
95+
}
96+
97+
fn take(&mut self, id: u64) -> FutureSlotState<T> {
98+
match self.value {
99+
FutureSlotState::Empty => FutureSlotState::Empty,
100+
FutureSlotState::Polling => FutureSlotState::Polling,
101+
FutureSlotState::Gone => FutureSlotState::Gone,
102+
FutureSlotState::Pending(_) if self.id != id => FutureSlotState::Gone,
103+
FutureSlotState::Pending(_) => {
104+
std::mem::replace(&mut self.value, FutureSlotState::Polling)
105+
}
106+
}
107+
}
108+
109+
fn park(&mut self, value: T) {
110+
match self.value {
111+
FutureSlotState::Empty | FutureSlotState::Gone => {
112+
panic!("Future slot is currently unoccupied, future can not be parked here!");
113+
}
114+
FutureSlotState::Pending(_) => {
115+
panic!("Future slot is already occupied by a different future!")
116+
}
117+
FutureSlotState::Polling => {
118+
self.value = FutureSlotState::Pending(value);
119+
}
120+
}
121+
}
122+
}
123+
124+
pub struct TaskHandle {
125+
index: usize,
126+
id: u64,
127+
_pd: PhantomData<*const ()>,
128+
}
129+
130+
impl TaskHandle {
131+
fn new(index: usize, id: u64) -> Self {
132+
Self {
133+
index,
134+
id,
135+
_pd: PhantomData,
136+
}
137+
}
138+
139+
pub fn cancel(self) {
140+
ASYNC_RUNTIME.with_borrow_mut(|rt| {
141+
let Some(task) = rt.tasks.get(self.index) else {
142+
return;
143+
};
144+
145+
let alive = match task.value {
146+
FutureSlotState::Empty | FutureSlotState::Gone => false,
147+
FutureSlotState::Pending(_) => task.id == self.id,
148+
FutureSlotState::Polling => panic!("Can not cancel future from inside it!"),
149+
};
150+
151+
if !alive {
152+
return;
153+
}
154+
155+
rt.cancel_task(self.index);
156+
})
157+
}
158+
159+
pub fn is_pending(&self) -> bool {
160+
ASYNC_RUNTIME.with_borrow(|rt| {
161+
let slot = rt.tasks.get(self.index).expect("Slot at index must exist!");
162+
163+
if slot.id != self.id {
164+
return false;
165+
}
166+
167+
matches!(
168+
slot.value,
169+
FutureSlotState::Pending(_) | FutureSlotState::Polling
170+
)
171+
})
172+
}
173+
}
174+
175+
#[derive(Default)]
176+
pub(crate) struct AsyncRuntime {
177+
tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
178+
task_counter: u64,
179+
}
180+
181+
impl AsyncRuntime {
182+
fn new() -> Self {
183+
Self {
184+
tasks: Vec::with_capacity(10),
185+
task_counter: 0,
186+
}
187+
}
188+
189+
fn next_id(&mut self) -> u64 {
190+
let id = self.task_counter;
191+
self.task_counter += 1;
192+
id
193+
}
194+
195+
fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> TaskHandle {
196+
let id = self.next_id();
197+
let slot = self
198+
.tasks
199+
.iter_mut()
200+
.enumerate()
201+
.find(|(_, slot)| slot.is_empty());
202+
203+
let boxed = Box::pin(future);
204+
205+
let index = match slot {
206+
Some((index, slot)) => {
207+
*slot = FutureSlot::pending(id, boxed);
208+
index
209+
}
210+
None => {
211+
self.tasks.push(FutureSlot::pending(id, boxed));
212+
self.tasks.len() - 1
213+
}
214+
};
215+
216+
TaskHandle::new(index, id)
217+
}
218+
219+
fn get_task(
220+
&mut self,
221+
index: usize,
222+
id: u64,
223+
) -> FutureSlotState<Pin<Box<dyn Future<Output = ()> + 'static>>> {
224+
let slot = self.tasks.get_mut(index);
225+
226+
slot.map(|inner| inner.take(id)).unwrap_or_default()
227+
}
228+
229+
fn clear_task(&mut self, index: usize) {
230+
self.tasks[index].clear();
231+
}
232+
233+
fn cancel_task(&mut self, index: usize) {
234+
self.tasks[index].cancel();
235+
}
236+
237+
fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
238+
self.tasks[index].park(future);
239+
}
240+
}
241+
242+
struct GodotWaker {
243+
runtime_index: usize,
244+
task_id: u64,
245+
thread_id: ThreadId,
246+
}
247+
248+
impl GodotWaker {
249+
fn new(index: usize, task_id: u64, thread_id: ThreadId) -> Self {
250+
Self {
251+
runtime_index: index,
252+
thread_id,
253+
task_id,
254+
}
255+
}
256+
}
257+
258+
impl Wake for GodotWaker {
259+
fn wake(self: std::sync::Arc<Self>) {
260+
let callable = Callable::from_local_fn("GodotWaker::wake", move |_args| {
261+
let current_thread = thread::current().id();
262+
263+
if self.thread_id != current_thread {
264+
panic!("trying to poll future on a different thread!\nCurrent Thread: {:?}, Future Thread: {:?}", current_thread, self.thread_id);
265+
}
266+
267+
let waker = Waker::from(self.clone());
268+
let mut ctx = Context::from_waker(&waker);
269+
270+
// take future out of the runtime.
271+
let future = ASYNC_RUNTIME.with_borrow_mut(|rt| {
272+
match rt.get_task(self.runtime_index, self.task_id) {
273+
FutureSlotState::Empty => {
274+
panic!("Future no longer exists when waking it! This is a bug!");
275+
},
276+
277+
FutureSlotState::Gone => {
278+
None
279+
}
280+
281+
FutureSlotState::Polling => {
282+
panic!("The same GodotWaker has been called recursively, this is not expected!");
283+
}
284+
285+
FutureSlotState::Pending(future) => Some(future),
286+
}
287+
});
288+
289+
let Some(mut future) = future else {
290+
// future has been canceled while the waker was already triggered.
291+
return Ok(Variant::nil());
292+
};
293+
294+
let result = future.as_mut().poll(&mut ctx);
295+
296+
// update runtime.
297+
ASYNC_RUNTIME.with_borrow_mut(|rt| match result {
298+
Poll::Pending => rt.park_task(self.runtime_index, future),
299+
Poll::Ready(()) => rt.clear_task(self.runtime_index),
300+
});
301+
302+
Ok(Variant::nil())
303+
});
304+
305+
// shedule waker to poll the future on the end of the frame.
306+
callable.to_variant().call("call_deferred", &[]);
307+
}
308+
}

godot-core/src/builtin/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pub mod __prelude_reexport {
155155
use super::*;
156156

157157
pub use aabb::*;
158+
#[cfg(since_api = "4.2")]
159+
pub use async_runtime::*;
158160
pub use basis::*;
159161
pub use callable::*;
160162
pub use collections::containers::*;
@@ -203,6 +205,8 @@ mod macros;
203205

204206
// Other modules
205207
mod aabb;
208+
#[cfg(since_api = "4.2")]
209+
mod async_runtime;
206210
mod basis;
207211
mod callable;
208212
mod collections;

godot-core/src/meta/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,8 @@ pub use property_info::{PropertyHintInfo, PropertyInfo};
8989
/// Must not use meta facilities (e.g. `ClassName`) after this call.
9090
pub(crate) unsafe fn cleanup() {
9191
class_name::cleanup();
92+
// drop all pending tasks from async runtime. Futures have to be dropped before deinit.
93+
// Should be moved somewhere more appropriate.
94+
#[cfg(since_api = "4.2")]
95+
crate::builtin::ASYNC_RUNTIME.take();
9296
}

0 commit comments

Comments
 (0)