Skip to content

Commit 1220fd0

Browse files
committed
Async Runtime
1 parent 565d898 commit 1220fd0

File tree

3 files changed

+313
-0
lines changed

3 files changed

+313
-0
lines changed
+305
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
/*
2+
* Copyright (c) godot-rust; Bromeon and contributors.
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
*/
7+
8+
use std::cell::RefCell;
9+
use std::future::Future;
10+
use std::marker::PhantomData;
11+
use std::pin::Pin;
12+
use std::sync::Arc;
13+
use std::task::{Context, Poll, Wake, Waker};
14+
use std::thread::{self, ThreadId};
15+
16+
use crate::builtin::{Callable, Variant};
17+
use crate::classes::Os;
18+
use crate::meta::ToGodot;
19+
20+
// ----------------------------------------------------------------------------------------------------------------------------------------------
21+
// Public interface
22+
23+
pub fn godot_task(future: impl Future<Output = ()> + 'static) -> TaskHandle {
24+
let os = Os::singleton();
25+
26+
// Spawning new tasks is only allowed on the main thread for now.
27+
// We can not accept Sync + Send futures since all object references (i.e. Gd<T>) are not thread-safe. So a future has to remain on the
28+
// same thread it was created on. Godots signals on the other hand can be emitted on any thread, so it can't be guaranteed on which thread
29+
// a future will be polled.
30+
// By limiting async tasks to the main thread we can redirect all signal callbacks back to the main thread via `call_deferred`.
31+
//
32+
// Once thread-safe futures are possible the restriction can be lifted.
33+
if os.get_thread_caller_id() != os.get_main_thread_id() {
34+
panic!("godot_task can only be used on the main thread!");
35+
}
36+
37+
let (task_handle, waker) = ASYNC_RUNTIME.with_borrow_mut(move |rt| {
38+
let task_handle = rt.add_task(Box::pin(future));
39+
let godot_waker = Arc::new(GodotWaker::new(
40+
task_handle.index,
41+
task_handle.id,
42+
thread::current().id(),
43+
));
44+
45+
(task_handle, Waker::from(godot_waker))
46+
});
47+
48+
waker.wake();
49+
task_handle
50+
}
51+
52+
// ----------------------------------------------------------------------------------------------------------------------------------------------
53+
// Async Runtime
54+
55+
thread_local! {
56+
pub(crate) static ASYNC_RUNTIME: RefCell<AsyncRuntime> = RefCell::new(AsyncRuntime::new());
57+
}
58+
59+
#[derive(Default)]
60+
enum FutureSlotState<T> {
61+
/// Slot is currently empty.
62+
#[default]
63+
Empty,
64+
/// Slot was previously occupied but the future has been canceled or the slot reused.
65+
Gone,
66+
/// Slot contains a pending future.
67+
Pending(T),
68+
/// Slot contains a future which is currently being polled.
69+
Polling,
70+
}
71+
72+
struct FutureSlot<T> {
73+
value: FutureSlotState<T>,
74+
id: u64,
75+
}
76+
77+
impl<T> FutureSlot<T> {
78+
fn pending(id: u64, value: T) -> Self {
79+
Self {
80+
value: FutureSlotState::Pending(value),
81+
id,
82+
}
83+
}
84+
85+
fn is_empty(&self) -> bool {
86+
matches!(self.value, FutureSlotState::Empty | FutureSlotState::Gone)
87+
}
88+
89+
fn clear(&mut self) {
90+
self.value = FutureSlotState::Empty;
91+
}
92+
93+
fn cancel(&mut self) {
94+
self.value = FutureSlotState::Gone;
95+
}
96+
97+
fn take(&mut self, id: u64) -> FutureSlotState<T> {
98+
match self.value {
99+
FutureSlotState::Empty => FutureSlotState::Empty,
100+
FutureSlotState::Polling => FutureSlotState::Polling,
101+
FutureSlotState::Gone => FutureSlotState::Gone,
102+
FutureSlotState::Pending(_) if self.id != id => FutureSlotState::Gone,
103+
FutureSlotState::Pending(_) => {
104+
std::mem::replace(&mut self.value, FutureSlotState::Polling)
105+
}
106+
}
107+
}
108+
109+
fn park(&mut self, value: T) {
110+
match self.value {
111+
FutureSlotState::Empty | FutureSlotState::Gone => {
112+
panic!("Future slot is currently unoccupied, future can not be parked here!");
113+
}
114+
FutureSlotState::Pending(_) => {
115+
panic!("Future slot is already occupied by a different future!")
116+
}
117+
FutureSlotState::Polling => {
118+
self.value = FutureSlotState::Pending(value);
119+
}
120+
}
121+
}
122+
}
123+
124+
pub struct TaskHandle {
125+
index: usize,
126+
id: u64,
127+
_pd: PhantomData<*const ()>,
128+
}
129+
130+
impl TaskHandle {
131+
fn new(index: usize, id: u64) -> Self {
132+
Self {
133+
index,
134+
id,
135+
_pd: PhantomData,
136+
}
137+
}
138+
139+
pub fn cancel(self) {
140+
ASYNC_RUNTIME.with_borrow_mut(|rt| {
141+
let Some(task) = rt.tasks.get(self.index) else {
142+
return;
143+
};
144+
145+
let alive = match task.value {
146+
FutureSlotState::Empty | FutureSlotState::Gone => false,
147+
FutureSlotState::Pending(_) => task.id == self.id,
148+
FutureSlotState::Polling => panic!("Can not cancel future from inside it!"),
149+
};
150+
151+
if !alive {
152+
return;
153+
}
154+
155+
rt.cancel_task(self.index);
156+
})
157+
}
158+
159+
pub fn is_pending(&self) -> bool {
160+
ASYNC_RUNTIME.with_borrow(|rt| {
161+
let slot = rt.tasks.get(self.index).expect("Slot at index must exist!");
162+
163+
if slot.id != self.id {
164+
return false;
165+
}
166+
167+
matches!(slot.value, FutureSlotState::Pending(_))
168+
})
169+
}
170+
}
171+
172+
#[derive(Default)]
173+
pub(crate) struct AsyncRuntime {
174+
tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
175+
task_counter: u64,
176+
}
177+
178+
impl AsyncRuntime {
179+
fn new() -> Self {
180+
Self {
181+
tasks: Vec::with_capacity(10),
182+
task_counter: 0,
183+
}
184+
}
185+
186+
fn next_id(&mut self) -> u64 {
187+
let id = self.task_counter;
188+
self.task_counter += 1;
189+
id
190+
}
191+
192+
fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> TaskHandle {
193+
let id = self.next_id();
194+
let slot = self
195+
.tasks
196+
.iter_mut()
197+
.enumerate()
198+
.find(|(_, slot)| slot.is_empty());
199+
200+
let boxed = Box::pin(future);
201+
202+
let index = match slot {
203+
Some((index, slot)) => {
204+
*slot = FutureSlot::pending(id, boxed);
205+
index
206+
}
207+
None => {
208+
self.tasks.push(FutureSlot::pending(id, boxed));
209+
self.tasks.len() - 1
210+
}
211+
};
212+
213+
TaskHandle::new(index, id)
214+
}
215+
216+
fn get_task(
217+
&mut self,
218+
index: usize,
219+
id: u64,
220+
) -> FutureSlotState<Pin<Box<dyn Future<Output = ()> + 'static>>> {
221+
let slot = self.tasks.get_mut(index);
222+
223+
slot.map(|inner| inner.take(id)).unwrap_or_default()
224+
}
225+
226+
fn clear_task(&mut self, index: usize) {
227+
self.tasks[index].clear();
228+
}
229+
230+
fn cancel_task(&mut self, index: usize) {
231+
self.tasks[index].cancel();
232+
}
233+
234+
fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
235+
self.tasks[index].park(future);
236+
}
237+
}
238+
239+
struct GodotWaker {
240+
runtime_index: usize,
241+
task_id: u64,
242+
thread_id: ThreadId,
243+
}
244+
245+
impl GodotWaker {
246+
fn new(index: usize, task_id: u64, thread_id: ThreadId) -> Self {
247+
Self {
248+
runtime_index: index,
249+
thread_id,
250+
task_id,
251+
}
252+
}
253+
}
254+
255+
impl Wake for GodotWaker {
256+
fn wake(self: std::sync::Arc<Self>) {
257+
let callable = Callable::from_local_fn("GodotWaker::wake", move |_args| {
258+
let current_thread = thread::current().id();
259+
260+
if self.thread_id != current_thread {
261+
panic!("trying to poll future on a different thread!\nCurrent Thread: {:?}, Future Thread: {:?}", current_thread, self.thread_id);
262+
}
263+
264+
let waker = Waker::from(self.clone());
265+
let mut ctx = Context::from_waker(&waker);
266+
267+
// take future out of the runtime.
268+
let future = ASYNC_RUNTIME.with_borrow_mut(|rt| {
269+
match rt.get_task(self.runtime_index, self.task_id) {
270+
FutureSlotState::Empty => {
271+
panic!("Future no longer exists when waking it! This is a bug!");
272+
},
273+
274+
FutureSlotState::Gone => {
275+
None
276+
}
277+
278+
FutureSlotState::Polling => {
279+
panic!("The same GodotWaker has been called recursively, this is not expected!");
280+
}
281+
282+
FutureSlotState::Pending(future) => Some(future),
283+
}
284+
});
285+
286+
let Some(mut future) = future else {
287+
// future has been canceled while the waker was already triggered.
288+
return Ok(Variant::nil());
289+
};
290+
291+
let result = future.as_mut().poll(&mut ctx);
292+
293+
// update runtime.
294+
ASYNC_RUNTIME.with_borrow_mut(|rt| match result {
295+
Poll::Pending => rt.park_task(self.runtime_index, future),
296+
Poll::Ready(()) => rt.clear_task(self.runtime_index),
297+
});
298+
299+
Ok(Variant::nil())
300+
});
301+
302+
// shedule waker to poll the future on the end of the frame.
303+
callable.to_variant().call("call_deferred", &[]);
304+
}
305+
}

godot-core/src/builtin/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pub mod __prelude_reexport {
155155
use super::*;
156156

157157
pub use aabb::*;
158+
#[cfg(since_api = "4.2")]
159+
pub use async_runtime::*;
158160
pub use basis::*;
159161
pub use callable::*;
160162
pub use collections::containers::*;
@@ -203,6 +205,8 @@ mod macros;
203205

204206
// Other modules
205207
mod aabb;
208+
#[cfg(since_api = "4.2")]
209+
mod async_runtime;
206210
mod basis;
207211
mod callable;
208212
mod collections;

godot-core/src/meta/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,8 @@ pub use property_info::{PropertyHintInfo, PropertyInfo};
8989
/// Must not use meta facilities (e.g. `ClassName`) after this call.
9090
pub(crate) unsafe fn cleanup() {
9191
class_name::cleanup();
92+
// drop all pending tasks from async runtime. Futures have to be dropped before deinit.
93+
// Should be moved somewhere more appropriate.
94+
#[cfg(since_api = "4.2")]
95+
crate::builtin::ASYNC_RUNTIME.take();
9296
}

0 commit comments

Comments
 (0)