Skip to content

Commit bb260e8

Browse files
authored
Rollup merge of #92555 - m-ou-se:scoped-threads, r=Amanieu
Implement RFC 3151: Scoped threads. This implements rust-lang/rfcs#3151 r? `@Amanieu`
2 parents 89baf0f + 465c405 commit bb260e8

File tree

2 files changed

+382
-27
lines changed

2 files changed

+382
-27
lines changed

library/std/src/thread/mod.rs

+66-27
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,12 @@ use crate::time::Duration;
180180
#[macro_use]
181181
mod local;
182182

183+
#[unstable(feature = "scoped_threads", issue = "93203")]
184+
mod scoped;
185+
186+
#[unstable(feature = "scoped_threads", issue = "93203")]
187+
pub use scoped::{scope, Scope, ScopedJoinHandle};
188+
183189
#[stable(feature = "rust1", since = "1.0.0")]
184190
pub use self::local::{AccessError, LocalKey};
185191

@@ -446,6 +452,20 @@ impl Builder {
446452
F: FnOnce() -> T,
447453
F: Send + 'a,
448454
T: Send + 'a,
455+
{
456+
Ok(JoinHandle(unsafe { self.spawn_unchecked_(f, None) }?))
457+
}
458+
459+
unsafe fn spawn_unchecked_<'a, 'scope, F, T>(
460+
self,
461+
f: F,
462+
scope_data: Option<&'scope scoped::ScopeData>,
463+
) -> io::Result<JoinInner<'scope, T>>
464+
where
465+
F: FnOnce() -> T,
466+
F: Send + 'a,
467+
T: Send + 'a,
468+
'scope: 'a,
449469
{
450470
let Builder { name, stack_size } = self;
451471

@@ -456,7 +476,8 @@ impl Builder {
456476
}));
457477
let their_thread = my_thread.clone();
458478

459-
let my_packet: Arc<UnsafeCell<Option<Result<T>>>> = Arc::new(UnsafeCell::new(None));
479+
let my_packet: Arc<Packet<'scope, T>> =
480+
Arc::new(Packet { scope: scope_data, result: UnsafeCell::new(None) });
460481
let their_packet = my_packet.clone();
461482

462483
let output_capture = crate::io::set_output_capture(None);
@@ -480,10 +501,14 @@ impl Builder {
480501
// closure (it is an Arc<...>) and `my_packet` will be stored in the
481502
// same `JoinInner` as this closure meaning the mutation will be
482503
// safe (not modify it and affect a value far away).
483-
unsafe { *their_packet.get() = Some(try_result) };
504+
unsafe { *their_packet.result.get() = Some(try_result) };
484505
};
485506

486-
Ok(JoinHandle(JoinInner {
507+
if let Some(scope_data) = scope_data {
508+
scope_data.increment_num_running_threads();
509+
}
510+
511+
Ok(JoinInner {
487512
// SAFETY:
488513
//
489514
// `imp::Thread::new` takes a closure with a `'static` lifetime, since it's passed
@@ -506,8 +531,8 @@ impl Builder {
506531
)?
507532
},
508533
thread: my_thread,
509-
packet: Packet(my_packet),
510-
}))
534+
packet: my_packet,
535+
})
511536
}
512537
}
513538

@@ -1242,34 +1267,48 @@ impl fmt::Debug for Thread {
12421267
#[stable(feature = "rust1", since = "1.0.0")]
12431268
pub type Result<T> = crate::result::Result<T, Box<dyn Any + Send + 'static>>;
12441269

1245-
// This packet is used to communicate the return value between the spawned thread
1246-
// and the rest of the program. Memory is shared through the `Arc` within and there's
1247-
// no need for a mutex here because synchronization happens with `join()` (the
1248-
// caller will never read this packet until the thread has exited).
1270+
// This packet is used to communicate the return value between the spawned
1271+
// thread and the rest of the program. It is shared through an `Arc` and
1272+
// there's no need for a mutex here because synchronization happens with `join()`
1273+
// (the caller will never read this packet until the thread has exited).
12491274
//
1250-
// This packet itself is then stored into a `JoinInner` which in turns is placed
1251-
// in `JoinHandle` and `JoinGuard`. Due to the usage of `UnsafeCell` we need to
1252-
// manually worry about impls like Send and Sync. The type `T` should
1253-
// already always be Send (otherwise the thread could not have been created) and
1254-
// this type is inherently Sync because no methods take &self. Regardless,
1255-
// however, we add inheriting impls for Send/Sync to this type to ensure it's
1256-
// Send/Sync and that future modifications will still appropriately classify it.
1257-
struct Packet<T>(Arc<UnsafeCell<Option<Result<T>>>>);
1258-
1259-
unsafe impl<T: Send> Send for Packet<T> {}
1260-
unsafe impl<T: Sync> Sync for Packet<T> {}
1275+
// An Arc to the packet is stored into a `JoinInner` which in turns is placed
1276+
// in `JoinHandle`.
1277+
struct Packet<'scope, T> {
1278+
scope: Option<&'scope scoped::ScopeData>,
1279+
result: UnsafeCell<Option<Result<T>>>,
1280+
}
1281+
1282+
// Due to the usage of `UnsafeCell` we need to manually implement Sync.
1283+
// The type `T` should already always be Send (otherwise the thread could not
1284+
// have been created) and the Packet is Sync because all access to the
1285+
// `UnsafeCell` synchronized (by the `join()` boundary), and `ScopeData` is Sync.
1286+
unsafe impl<'scope, T: Sync> Sync for Packet<'scope, T> {}
1287+
1288+
impl<'scope, T> Drop for Packet<'scope, T> {
1289+
fn drop(&mut self) {
1290+
// Book-keeping so the scope knows when it's done.
1291+
if let Some(scope) = self.scope {
1292+
// If this packet was for a thread that ran in a scope, the thread
1293+
// panicked, and nobody consumed the panic payload, we make sure
1294+
// the scope function will panic.
1295+
let unhandled_panic = matches!(self.result.get_mut(), Some(Err(_)));
1296+
scope.decrement_num_running_threads(unhandled_panic);
1297+
}
1298+
}
1299+
}
12611300

12621301
/// Inner representation for JoinHandle
1263-
struct JoinInner<T> {
1302+
struct JoinInner<'scope, T> {
12641303
native: imp::Thread,
12651304
thread: Thread,
1266-
packet: Packet<T>,
1305+
packet: Arc<Packet<'scope, T>>,
12671306
}
12681307

1269-
impl<T> JoinInner<T> {
1308+
impl<'scope, T> JoinInner<'scope, T> {
12701309
fn join(mut self) -> Result<T> {
12711310
self.native.join();
1272-
Arc::get_mut(&mut self.packet.0).unwrap().get_mut().take().unwrap()
1311+
Arc::get_mut(&mut self.packet).unwrap().result.get_mut().take().unwrap()
12731312
}
12741313
}
12751314

@@ -1336,7 +1375,7 @@ impl<T> JoinInner<T> {
13361375
/// [`thread::Builder::spawn`]: Builder::spawn
13371376
/// [`thread::spawn`]: spawn
13381377
#[stable(feature = "rust1", since = "1.0.0")]
1339-
pub struct JoinHandle<T>(JoinInner<T>);
1378+
pub struct JoinHandle<T>(JoinInner<'static, T>);
13401379

13411380
#[stable(feature = "joinhandle_impl_send_sync", since = "1.29.0")]
13421381
unsafe impl<T> Send for JoinHandle<T> {}
@@ -1404,13 +1443,13 @@ impl<T> JoinHandle<T> {
14041443
self.0.join()
14051444
}
14061445

1407-
/// Checks if the the associated thread is still running its main function.
1446+
/// Checks if the associated thread is still running its main function.
14081447
///
14091448
/// This might return `false` for a brief moment after the thread's main
14101449
/// function has returned, but before the thread itself has stopped running.
14111450
#[unstable(feature = "thread_is_running", issue = "90470")]
14121451
pub fn is_running(&self) -> bool {
1413-
Arc::strong_count(&self.0.packet.0) > 1
1452+
Arc::strong_count(&self.0.packet) > 1
14141453
}
14151454
}
14161455

0 commit comments

Comments
 (0)