@@ -161,17 +161,18 @@ mod tests;
161
161
use crate :: any:: Any ;
162
162
use crate :: cell:: UnsafeCell ;
163
163
use crate :: ffi:: CStr ;
164
+ use crate :: future:: Future ;
164
165
use crate :: marker:: PhantomData ;
165
166
use crate :: mem:: { self , ManuallyDrop , forget} ;
166
167
use crate :: num:: NonZero ;
167
168
use crate :: pin:: Pin ;
168
- use crate :: sync:: Arc ;
169
169
use crate :: sync:: atomic:: { AtomicUsize , Ordering } ;
170
+ use crate :: sync:: { Arc , Mutex , PoisonError } ;
170
171
use crate :: sys:: sync:: Parker ;
171
172
use crate :: sys:: thread as imp;
172
173
use crate :: sys_common:: { AsInner , IntoInner } ;
173
174
use crate :: time:: { Duration , Instant } ;
174
- use crate :: { env, fmt, io, panic, panicking, str} ;
175
+ use crate :: { env, fmt, io, panic, panicking, str, task } ;
175
176
176
177
#[ stable( feature = "scoped_threads" , since = "1.63.0" ) ]
177
178
mod scoped;
@@ -487,6 +488,7 @@ impl Builder {
487
488
let my_packet: Arc < Packet < ' scope , T > > = Arc :: new ( Packet {
488
489
scope : scope_data,
489
490
result : UnsafeCell :: new ( None ) ,
491
+ waker : Mutex :: new ( task:: Waker :: noop ( ) . clone ( ) ) ,
490
492
_marker : PhantomData ,
491
493
} ) ;
492
494
let their_packet = my_packet. clone ( ) ;
@@ -532,15 +534,35 @@ impl Builder {
532
534
let try_result = panic:: catch_unwind ( panic:: AssertUnwindSafe ( || {
533
535
crate :: sys:: backtrace:: __rust_begin_short_backtrace ( f)
534
536
} ) ) ;
537
+
538
+ // Store the `Result` of the thread that the `JoinHandle` can retrieve.
539
+ //
535
540
// SAFETY: `their_packet` as been built just above and moved by the
536
541
// closure (it is an Arc<...>) and `my_packet` will be stored in the
537
542
// same `JoinInner` as this closure meaning the mutation will be
538
543
// safe (not modify it and affect a value far away).
539
544
unsafe { * their_packet. result . get ( ) = Some ( try_result) } ;
540
- // Here `their_packet` gets dropped, and if this is the last `Arc` for that packet that
541
- // will call `decrement_num_running_threads` and therefore signal that this thread is
542
- // done.
545
+
546
+ // Fetch the `Waker` from the packet; this is needed to support `.into_join_future()`.
547
+ // If unused, this just returns `Waker::noop()` which will do nothing.
548
+ let waker: task:: Waker = {
549
+ let placeholder = task:: Waker :: noop ( ) . clone ( ) ;
550
+ let mut guard = their_packet. waker . lock ( ) . unwrap_or_else ( PoisonError :: into_inner) ;
551
+ mem:: replace ( & mut * guard, placeholder)
552
+ } ;
553
+
554
+ // Here `their_packet` gets dropped, and if this is the last `Arc` for that packet
555
+ // (which happens if the `JoinHandle` has been dropped) that will call
556
+ // `decrement_num_running_threads` and therefore signal to the scope (if there is one)
557
+ // that this thread is done.
543
558
drop ( their_packet) ;
559
+
560
+ // Now that we have become visibly “finished” by dropping the packet
561
+ // (`JoinInner::is_finished` will return true), we can use the `Waker` to signal
562
+ // any waiting `JoinFuture`. If instead we are being waited for by
563
+ // `JoinHandle::join()`, the actual platform thread termination will be the wakeup.
564
+ waker. wake ( ) ;
565
+
544
566
// Here, the lifetime `'scope` can end. `main` keeps running for a bit
545
567
// after that before returning itself.
546
568
} ;
@@ -1187,8 +1209,6 @@ impl ThreadId {
1187
1209
}
1188
1210
}
1189
1211
} else {
1190
- use crate :: sync:: { Mutex , PoisonError } ;
1191
-
1192
1212
static COUNTER : Mutex <u64 > = Mutex :: new( 1 ) ;
1193
1213
1194
1214
let mut counter = COUNTER . lock( ) . unwrap_or_else( PoisonError :: into_inner) ;
@@ -1569,16 +1589,30 @@ impl fmt::Debug for Thread {
1569
1589
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
1570
1590
pub type Result < T > = crate :: result:: Result < T , Box < dyn Any + Send + ' static > > ;
1571
1591
1572
- // This packet is used to communicate the return value between the spawned
1573
- // thread and the rest of the program. It is shared through an `Arc` and
1574
- // there's no need for a mutex here because synchronization happens with `join()`
1575
- // (the caller will never read this packet until the thread has exited).
1576
- //
1577
- // An Arc to the packet is stored into a `JoinInner` which in turns is placed
1578
- // in `JoinHandle`.
1592
+ /// This packet is used to communicate the return value between the spawned
1593
+ /// thread and the rest of the program. It is shared through an [`Arc`].
1594
+ ///
1595
+ /// An Arc to the packet is stored into a [`JoinInner`] which in turn is placed
1596
+ /// in [`JoinHandle`] or [`ScopedJoinHandle`].
1579
1597
struct Packet < ' scope , T > {
1598
+ /// Communication with the enclosing thread scope if there is one.
1580
1599
scope : Option < Arc < scoped:: ScopeData > > ,
1600
+
1601
+ /// Holds the return value.
1602
+ ///
1603
+ /// Synchronization happens via reference counting: as long as the `Arc<Packet>`
1604
+ /// has two or more references, this field is never read, and will only be written
1605
+ /// once as the thread terminates. After that happens, either the packet is dropped,
1606
+ /// or [`JoinInner::join()`] will `take()` the result value from here.
1581
1607
result : UnsafeCell < Option < Result < T > > > ,
1608
+
1609
+ /// If a [`JoinFuture`] for this thread exists and has been polled,
1610
+ /// this is the waker from that poll. If it does not exist or has not
1611
+ /// been polled yet, this is [`task::Waker::noop()`].
1612
+ // FIXME: This should be an `AtomicWaker` instead of a `Mutex`,
1613
+ // to be cheaper and impossible to deadlock.
1614
+ waker : Mutex < task:: Waker > ,
1615
+
1582
1616
_marker : PhantomData < Option < & ' scope scoped:: ScopeData > > ,
1583
1617
}
1584
1618
@@ -1632,6 +1666,10 @@ impl<'scope, T> JoinInner<'scope, T> {
1632
1666
self . native . join ( ) ;
1633
1667
Arc :: get_mut ( & mut self . packet ) . unwrap ( ) . result . get_mut ( ) . take ( ) . unwrap ( )
1634
1668
}
1669
+
1670
+ fn is_finished ( & self ) -> bool {
1671
+ Arc :: strong_count ( & self . packet ) == 1
1672
+ }
1635
1673
}
1636
1674
1637
1675
/// An owned permission to join on a thread (block on its termination).
@@ -1778,6 +1816,45 @@ impl<T> JoinHandle<T> {
1778
1816
self . 0 . join ( )
1779
1817
}
1780
1818
1819
+ /// Returns a [`Future`] that resolves when the thread has finished.
1820
+ ///
1821
+ /// Its [output](Future::Output) value is identical to that of [`JoinHandle::join()`];
1822
+ /// this is the `async` equivalent of that blocking function.
1823
+ ///
1824
+ /// If the returned future is dropped (cancelled), the thread will become *detached*;
1825
+ /// there will be no way to observe or wait for the thread’s termination.
1826
+ /// This is identical to the behavior of `JoinHandle` itself.
1827
+ ///
1828
+ /// # Example
1829
+ ///
1830
+ // FIXME: ideally we would actually run this example, with the help of a trivial async executor
1831
+ /// ```no_run
1832
+ /// #![feature(thread_join_future)]
1833
+ /// use std::thread;
1834
+ ///
1835
+ /// async fn do_some_heavy_tasks_in_parallel() -> thread::Result<()> {
1836
+ /// let future_1 = thread::spawn(|| {
1837
+ /// // ... do something ...
1838
+ /// }).into_join_future();
1839
+ /// let future_2 = thread::spawn(|| {
1840
+ /// // ... do something else ...
1841
+ /// }).into_join_future();
1842
+ ///
1843
+ /// // Both threads have been started; now await the completion of both.
1844
+ /// future_1.await?;
1845
+ /// future_2.await?;
1846
+ /// Ok(())
1847
+ /// }
1848
+ /// ```
1849
+ #[ unstable( feature = "thread_join_future" , issue = "none" ) ]
1850
+ pub fn into_join_future ( self ) -> JoinFuture < ' static , T > {
1851
+ // The method is not named `into_future()` to avoid overlapping with the stable
1852
+ // `IntoFuture::into_future()`. We're not implementing `IntoFuture` in order to
1853
+ // keep this unstable and preserve the *option* of compatibly making this obey structured
1854
+ // concurrency via an async-Drop that waits for the thread to end.
1855
+ JoinFuture :: new ( self . 0 )
1856
+ }
1857
+
1781
1858
/// Checks if the associated thread has finished running its main function.
1782
1859
///
1783
1860
/// `is_finished` supports implementing a non-blocking join operation, by checking
@@ -1790,7 +1867,7 @@ impl<T> JoinHandle<T> {
1790
1867
/// to return quickly, without blocking for any significant amount of time.
1791
1868
#[ stable( feature = "thread_is_running" , since = "1.61.0" ) ]
1792
1869
pub fn is_finished ( & self ) -> bool {
1793
- Arc :: strong_count ( & self . 0 . packet ) == 1
1870
+ self . 0 . is_finished ( )
1794
1871
}
1795
1872
}
1796
1873
@@ -1816,9 +1893,88 @@ impl<T> fmt::Debug for JoinHandle<T> {
1816
1893
fn _assert_sync_and_send ( ) {
1817
1894
fn _assert_both < T : Send + Sync > ( ) { }
1818
1895
_assert_both :: < JoinHandle < ( ) > > ( ) ;
1896
+ _assert_both :: < JoinFuture < ' static , ( ) > > ( ) ;
1819
1897
_assert_both :: < Thread > ( ) ;
1820
1898
}
1821
1899
1900
+ /// A [`Future`] that resolves when a thread has finished.
1901
+ ///
1902
+ /// Its [output](Future::Output) value is identical to that of [`JoinHandle::join()`];
1903
+ /// this is the `async` equivalent of that blocking function.
1904
+ /// Obtain it by calling [`JoinHandle::into_join_future()`] or
1905
+ /// [`ScopedJoinHandle::into_join_future()`].
1906
+ ///
1907
+ /// If a `JoinFuture` is dropped (cancelled), and the thread does not belong to a [scope],
1908
+ /// the associated thread will become *detached*;
1909
+ /// there will be no way to observe or wait for the thread’s termination.
1910
+ #[ unstable( feature = "thread_join_future" , issue = "none" ) ]
1911
+ pub struct JoinFuture < ' scope , T > ( Option < JoinInner < ' scope , T > > ) ;
1912
+
1913
+ impl < ' scope , T > JoinFuture < ' scope , T > {
1914
+ fn new ( inner : JoinInner < ' scope , T > ) -> Self {
1915
+ Self ( Some ( inner) )
1916
+ }
1917
+
1918
+ /// Implements the “getting a result” part of joining/polling, without blocking or changing
1919
+ /// the `Waker`. Part of the implementation of `poll()`.
1920
+ ///
1921
+ /// If this returns `Some`, then `self.0` is now `None` and the future will panic
1922
+ /// if polled again.
1923
+ fn take_result ( & mut self ) -> Option < Result < T > > {
1924
+ self . 0 . take_if ( |i| i. is_finished ( ) ) . map ( JoinInner :: join)
1925
+ }
1926
+ }
1927
+
1928
+ #[ unstable( feature = "thread_join_future" , issue = "none" ) ]
1929
+ impl < T > Future for JoinFuture < ' _ , T > {
1930
+ type Output = Result < T > ;
1931
+ fn poll ( mut self : Pin < & mut Self > , cx : & mut task:: Context < ' _ > ) -> task:: Poll < Self :: Output > {
1932
+ if let Some ( result) = self . take_result ( ) {
1933
+ return task:: Poll :: Ready ( result) ;
1934
+ }
1935
+
1936
+ // Update the `Waker` the thread should wake when it completes.
1937
+ {
1938
+ let Some ( inner) = & mut self . 0 else {
1939
+ panic ! ( "polled after complete" ) ;
1940
+ } ;
1941
+
1942
+ let new_waker = cx. waker ( ) ;
1943
+
1944
+ // Lock the mutex, and ignore the poison state because there are no meaningful ways
1945
+ // the existing contents can be corrupted; they will be overwritten completely and the
1946
+ // overwrite is atomic-in-the-database-sense.
1947
+ let mut current_waker_guard =
1948
+ inner. packet . waker . lock ( ) . unwrap_or_else ( PoisonError :: into_inner) ;
1949
+
1950
+ // Overwrite the waker. Note that we are executing the new waker’s clone and the old
1951
+ // waker’s destructor; these could panic (which will merely poison the lock) or hang,
1952
+ // which will hold the lock, but the most that can do is prevent the thread from
1953
+ // exiting because it's trying to acquire `packet.waker`, which it won't do while
1954
+ // holding any *other* locks (...unless the thread’s data includes a lock guard that
1955
+ // the waker also wants).
1956
+ if !new_waker. will_wake ( & * current_waker_guard) {
1957
+ * current_waker_guard = new_waker. clone ( ) ;
1958
+ }
1959
+ }
1960
+
1961
+ // Check for completion again in case the thread finished while we were busy
1962
+ // setting the waker, to prevent a lost wakeup in that case.
1963
+ if let Some ( result) = self . take_result ( ) {
1964
+ task:: Poll :: Ready ( result)
1965
+ } else {
1966
+ task:: Poll :: Pending
1967
+ }
1968
+ }
1969
+ }
1970
+
1971
+ #[ unstable( feature = "thread_join_future" , issue = "none" ) ]
1972
+ impl < T > fmt:: Debug for JoinFuture < ' _ , T > {
1973
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
1974
+ f. debug_struct ( "JoinHandle" ) . finish_non_exhaustive ( )
1975
+ }
1976
+ }
1977
+
1822
1978
/// Returns an estimate of the default amount of parallelism a program should use.
1823
1979
///
1824
1980
/// Parallelism is a resource. A given machine provides a certain capacity for
0 commit comments