11use std:: cell:: RefCell ;
22use std:: future:: Future ;
3+ use std:: marker:: PhantomData ;
34use std:: pin:: Pin ;
45use std:: sync:: { Arc , Mutex } ;
56use std:: task:: { Context , Poll , Wake , Waker } ;
@@ -8,10 +9,11 @@ use std::thread::{self, ThreadId};
89use crate :: builtin:: { Callable , Signal , Variant } ;
910use crate :: classes:: object:: ConnectFlags ;
1011use crate :: classes:: Os ;
12+ use crate :: godot_warn;
1113use crate :: meta:: { FromGodot , ToGodot } ;
1214use crate :: obj:: EngineEnum ;
1315
14- pub fn godot_task ( future : impl Future < Output = ( ) > + ' static ) {
16+ pub fn godot_task ( future : impl Future < Output = ( ) > + ' static ) -> TaskHandle {
1517 let os = Os :: singleton ( ) ;
1618
1719 // Spawning new tasks is only allowed on the main thread for now.
@@ -22,71 +24,162 @@ pub fn godot_task(future: impl Future<Output = ()> + 'static) {
2224 //
2325 // Once thread-safe futures are possible the restriction can be lifted.
2426 if os. get_thread_caller_id ( ) != os. get_main_thread_id ( ) {
25- return ;
27+ panic ! ( "godot_task can only be used on the main thread!" ) ;
2628 }
2729
28- let waker: Waker = ASYNC_RUNTIME . with_borrow_mut ( move |rt| {
29- let task_index = rt. add_task ( Box :: pin ( future) ) ;
30- Arc :: new ( GodotWaker :: new ( task_index, thread:: current ( ) . id ( ) ) ) . into ( )
30+ let ( task_handle, waker) : ( _ , Waker ) = ASYNC_RUNTIME . with_borrow_mut ( move |rt| {
31+ let task_handle = rt. add_task ( Box :: pin ( future) ) ;
32+ let waker = Arc :: new ( GodotWaker :: new (
33+ task_handle. index ,
34+ task_handle. id ,
35+ thread:: current ( ) . id ( ) ,
36+ ) )
37+ . into ( ) ;
38+
39+ ( task_handle, waker)
3140 } ) ;
3241
3342 waker. wake ( ) ;
43+ task_handle
3444}
3545
3646thread_local ! { pub ( crate ) static ASYNC_RUNTIME : RefCell <AsyncRuntime > = RefCell :: new( AsyncRuntime :: new( ) ) ; }
3747
3848#[ derive( Default ) ]
39- enum FutureSlot < T > {
49+ enum FutureSlotState < T > {
50+ /// Slot is currently empty.
4051 #[ default]
4152 Empty ,
53+ /// Slot was previously occupied but the future has been canceled or the slot reused.
54+ Gone ,
55+ /// Slot contains a pending future.
4256 Pending ( T ) ,
57+ /// slot contains a future which is currently being polled.
4358 Polling ,
4459}
4560
61+ struct FutureSlot < T > {
62+ value : FutureSlotState < T > ,
63+ id : u64 ,
64+ }
65+
4666impl < T > FutureSlot < T > {
67+ fn pending ( id : u64 , value : T ) -> Self {
68+ Self {
69+ value : FutureSlotState :: Pending ( value) ,
70+ id,
71+ }
72+ }
73+
4774 fn is_empty ( & self ) -> bool {
48- matches ! ( self , Self :: Empty )
75+ matches ! ( self . value , FutureSlotState :: Empty | FutureSlotState :: Gone )
4976 }
5077
5178 fn clear ( & mut self ) {
52- * self = Self :: Empty ;
79+ self . value = FutureSlotState :: Empty ;
5380 }
5481
55- fn take ( & mut self ) -> Self {
56- match self {
57- Self :: Empty => Self :: Empty ,
58- Self :: Pending ( _) => std:: mem:: replace ( self , Self :: Polling ) ,
59- Self :: Polling => Self :: Polling ,
82+ fn cancel ( & mut self ) {
83+ self . value = FutureSlotState :: Gone ;
84+ }
85+
86+ fn take ( & mut self , id : u64 ) -> FutureSlotState < T > {
87+ match self . value {
88+ FutureSlotState :: Empty => FutureSlotState :: Empty ,
89+ FutureSlotState :: Polling => FutureSlotState :: Polling ,
90+ FutureSlotState :: Gone => FutureSlotState :: Gone ,
91+ FutureSlotState :: Pending ( _) if self . id != id => FutureSlotState :: Gone ,
92+ FutureSlotState :: Pending ( _) => {
93+ std:: mem:: replace ( & mut self . value , FutureSlotState :: Polling )
94+ }
6095 }
6196 }
6297
6398 fn park ( & mut self , value : T ) {
64- match self {
65- Self :: Empty => {
99+ match self . value {
100+ FutureSlotState :: Empty | FutureSlotState :: Gone => {
66101 panic ! ( "Future slot is currently unoccupied, future can not be parked here!" ) ;
67102 }
68-
69- Self :: Pending ( _) => panic ! ( "Future slot is already occupied by a different future!" ) ,
70- Self :: Polling => {
71- * self = Self :: Pending ( value) ;
103+ FutureSlotState :: Pending ( _) => {
104+ panic ! ( "Future slot is already occupied by a different future!" )
105+ }
106+ FutureSlotState :: Polling => {
107+ self . value = FutureSlotState :: Pending ( value) ;
72108 }
73109 }
74110 }
75111}
76112
113+ pub struct TaskHandle {
114+ index : usize ,
115+ id : u64 ,
116+ _pd : PhantomData < * const ( ) > ,
117+ }
118+
119+ impl TaskHandle {
120+ fn new ( index : usize , id : u64 ) -> Self {
121+ Self {
122+ index,
123+ id,
124+ _pd : PhantomData ,
125+ }
126+ }
127+
128+ pub fn cancel ( self ) {
129+ ASYNC_RUNTIME . with_borrow_mut ( |rt| {
130+ let Some ( task) = rt. tasks . get ( self . index ) else {
131+ return ;
132+ } ;
133+
134+ let alive = match task. value {
135+ FutureSlotState :: Empty | FutureSlotState :: Gone => false ,
136+ FutureSlotState :: Pending ( _) => task. id == self . id ,
137+ FutureSlotState :: Polling => panic ! ( "Can not cancel future from inside it!" ) ,
138+ } ;
139+
140+ if !alive {
141+ return ;
142+ }
143+
144+ rt. cancel_task ( self . index ) ;
145+ } )
146+ }
147+
148+ pub fn is_pending ( & self ) -> bool {
149+ ASYNC_RUNTIME . with_borrow ( |rt| {
150+ let slot = rt. tasks . get ( self . index ) . expect ( "Slot at index must exist!" ) ;
151+
152+ if slot. id != self . id {
153+ return false ;
154+ }
155+
156+ matches ! ( slot. value, FutureSlotState :: Pending ( _) )
157+ } )
158+ }
159+ }
160+
77161#[ derive( Default ) ]
78162pub ( crate ) struct AsyncRuntime {
79163 tasks : Vec < FutureSlot < Pin < Box < dyn Future < Output = ( ) > > > > > ,
164+ task_counter : u64 ,
80165}
81166
82167impl AsyncRuntime {
83168 fn new ( ) -> Self {
84169 Self {
85170 tasks : Vec :: with_capacity ( 10 ) ,
171+ task_counter : 0 ,
86172 }
87173 }
88174
89- fn add_task < F : Future < Output = ( ) > + ' static > ( & mut self , future : F ) -> usize {
175+ fn next_id ( & mut self ) -> u64 {
176+ let id = self . task_counter ;
177+ self . task_counter += 1 ;
178+ id
179+ }
180+
181+ fn add_task < F : Future < Output = ( ) > + ' static > ( & mut self , future : F ) -> TaskHandle {
182+ let id = self . next_id ( ) ;
90183 let slot = self
91184 . tasks
92185 . iter_mut ( )
@@ -95,33 +188,36 @@ impl AsyncRuntime {
95188
96189 let boxed = Box :: pin ( future) ;
97190
98- match slot {
191+ let index = match slot {
99192 Some ( ( index, slot) ) => {
100- * slot = FutureSlot :: Pending ( boxed) ;
193+ * slot = FutureSlot :: pending ( id , boxed) ;
101194 index
102195 }
103196 None => {
104- self . tasks . push ( FutureSlot :: Pending ( boxed) ) ;
197+ self . tasks . push ( FutureSlot :: pending ( id , boxed) ) ;
105198 self . tasks . len ( ) - 1
106199 }
107- }
200+ } ;
201+
202+ TaskHandle :: new ( index, id)
108203 }
109204
110205 fn get_task (
111206 & mut self ,
112207 index : usize ,
113- ) -> FutureSlot < Pin < Box < dyn Future < Output = ( ) > + ' static > > > {
208+ id : u64 ,
209+ ) -> FutureSlotState < Pin < Box < dyn Future < Output = ( ) > + ' static > > > {
114210 let slot = self . tasks . get_mut ( index) ;
115211
116- slot. map ( |inner| inner. take ( ) ) . unwrap_or_default ( )
212+ slot. map ( |inner| inner. take ( id ) ) . unwrap_or_default ( )
117213 }
118214
119215 fn clear_task ( & mut self , index : usize ) {
120- if index >= self . tasks . len ( ) {
121- return ;
122- }
216+ self . tasks [ index] . clear ( ) ;
217+ }
123218
124- self . tasks [ 0 ] . clear ( ) ;
219+ fn cancel_task ( & mut self , index : usize ) {
220+ self . tasks [ index] . cancel ( ) ;
125221 }
126222
127223 fn park_task ( & mut self , index : usize , future : Pin < Box < dyn Future < Output = ( ) > > > ) {
@@ -131,14 +227,16 @@ impl AsyncRuntime {
131227
132228struct GodotWaker {
133229 runtime_index : usize ,
230+ task_id : u64 ,
134231 thread_id : ThreadId ,
135232}
136233
137234impl GodotWaker {
138- fn new ( index : usize , thread_id : ThreadId ) -> Self {
235+ fn new ( index : usize , task_id : u64 , thread_id : ThreadId ) -> Self {
139236 Self {
140237 runtime_index : index,
141238 thread_id,
239+ task_id,
142240 }
143241 }
144242}
@@ -156,20 +254,29 @@ impl Wake for GodotWaker {
156254 let mut ctx = Context :: from_waker ( & waker) ;
157255
158256 // take future out of the runtime.
159- let mut future = ASYNC_RUNTIME . with_borrow_mut ( |rt| {
160- match rt. get_task ( self . runtime_index ) {
161- FutureSlot :: Empty => {
257+ let future = ASYNC_RUNTIME . with_borrow_mut ( |rt| {
258+ match rt. get_task ( self . runtime_index , self . task_id ) {
259+ FutureSlotState :: Empty => {
162260 panic ! ( "Future no longer exists when waking it! This is a bug!" ) ;
163261 } ,
164262
165- FutureSlot :: Polling => {
263+ FutureSlotState :: Gone => {
264+ None
265+ }
266+
267+ FutureSlotState :: Polling => {
166268 panic ! ( "The same GodotWaker has been called recursively, this is not expected!" ) ;
167269 }
168270
169- FutureSlot :: Pending ( future) => future
271+ FutureSlotState :: Pending ( future) => Some ( future) ,
170272 }
171273 } ) ;
172274
275+ let Some ( mut future) = future else {
276+ // future has been canceled while the waker was already triggered.
277+ return Ok ( Variant :: nil ( ) ) ;
278+ } ;
279+
173280 let result = future. as_mut ( ) . poll ( & mut ctx) ;
174281
175282 // update runtime.
@@ -241,14 +348,17 @@ impl<R: FromSignalArgs> Future for SignalFuture<R> {
241348impl < R : FromSignalArgs > Drop for SignalFuture < R > {
242349 fn drop ( & mut self ) {
243350 if !self . callable . is_valid ( ) {
351+ godot_warn ! ( "dropping furure but callable no longer exists!" ) ;
244352 return ;
245353 }
246354
247355 if self . signal . object ( ) . is_none ( ) {
356+ godot_warn ! ( "dropping furure but signal owner no longer exists!" ) ;
248357 return ;
249358 }
250359
251360 if self . signal . is_connected ( self . callable . clone ( ) ) {
361+ godot_warn ! ( "dropping furure but signal still connected!" ) ;
252362 self . signal . disconnect ( self . callable . clone ( ) ) ;
253363 }
254364 }
0 commit comments