Skip to content

Commit 0924911

Browse files
author
Stjepan Glavina
committed
Implement simple work stealing
1 parent dd92d8d commit 0924911

File tree

9 files changed

+306
-148
lines changed

9 files changed

+306
-148
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ unstable = []
2828
async-task = "1.0.0"
2929
cfg-if = "0.1.9"
3030
crossbeam-channel = "0.3.9"
31+
crossbeam-deque = "0.7.1"
3132
futures-core-preview = "0.3.0-alpha.18"
3233
futures-io-preview = "0.3.0-alpha.18"
3334
futures-timer = "0.4.0"

src/task/block_on.rs

+7-10
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ use std::sync::Arc;
66
use std::task::{RawWaker, RawWakerVTable};
77
use std::thread::{self, Thread};
88

9-
use super::pool;
9+
use super::local;
1010
use super::task;
11+
use super::worker;
1112
use crate::future::Future;
1213
use crate::task::{Context, Poll, Waker};
13-
use crate::utils::abort_on_panic;
1414

1515
use kv_log_macro::trace;
1616

@@ -58,39 +58,36 @@ where
5858

5959
// Log this `block_on` operation.
6060
let child_id = tag.task_id().as_u64();
61-
let parent_id = pool::get_task(|t| t.id().as_u64()).unwrap_or(0);
61+
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
6262

6363
trace!("block_on", {
6464
parent_id: parent_id,
6565
child_id: child_id,
6666
});
6767

6868
// Wrap the future into one that drops task-local variables on exit.
69+
let future = local::add_finalizer(future);
70+
6971
let future = async move {
7072
let res = future.await;
71-
72-
// Abort on panic because thread-local variables behave the same way.
73-
abort_on_panic(|| pool::get_task(|task| task.metadata().local_map.clear()));
74-
7573
trace!("block_on completed", {
7674
parent_id: parent_id,
7775
child_id: child_id,
7876
});
79-
8077
res
8178
};
8279

8380
// Pin the future onto the stack.
8481
pin_utils::pin_mut!(future);
8582

86-
// Transmute the future into one that is static.
83+
// Transmute the future into one that is futurestatic.
8784
let future = mem::transmute::<
8885
Pin<&'_ mut dyn Future<Output = ()>>,
8986
Pin<&'static mut dyn Future<Output = ()>>,
9087
>(future);
9188

9289
// Block on the future and and wait for it to complete.
93-
pool::set_tag(&tag, || block(future));
90+
worker::set_tag(&tag, || block(future));
9491

9592
// Take out the result.
9693
match (*out.get()).take().unwrap() {

src/task/builder.rs

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use super::pool;
2+
use super::JoinHandle;
3+
use crate::future::Future;
4+
use crate::io;
5+
6+
/// Task builder that configures the settings of a new task.
7+
#[derive(Debug)]
8+
pub struct Builder {
9+
pub(crate) name: Option<String>,
10+
}
11+
12+
impl Builder {
13+
/// Creates a new builder.
14+
pub fn new() -> Builder {
15+
Builder { name: None }
16+
}
17+
18+
/// Configures the name of the task.
19+
pub fn name(mut self, name: String) -> Builder {
20+
self.name = Some(name);
21+
self
22+
}
23+
24+
/// Spawns a task with the configured settings.
25+
pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
26+
where
27+
F: Future<Output = T> + Send + 'static,
28+
T: Send + 'static,
29+
{
30+
Ok(pool::get().spawn(future, self))
31+
}
32+
}

src/task/local.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use std::sync::Mutex;
66

77
use lazy_static::lazy_static;
88

9-
use super::pool;
9+
use super::worker;
10+
use crate::future::Future;
11+
use crate::utils::abort_on_panic;
1012

1113
/// Declares task-local values.
1214
///
@@ -152,7 +154,7 @@ impl<T: Send + 'static> LocalKey<T> {
152154
where
153155
F: FnOnce(&T) -> R,
154156
{
155-
pool::get_task(|task| unsafe {
157+
worker::get_task(|task| unsafe {
156158
// Prepare the numeric key, initialization function, and the map of task-locals.
157159
let key = self.key();
158160
let init = || Box::new((self.__init)()) as Box<dyn Send>;
@@ -250,3 +252,15 @@ impl Map {
250252
entries.clear();
251253
}
252254
}
255+
256+
// Wrap the future into one that drops task-local variables on exit.
257+
pub(crate) unsafe fn add_finalizer<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
258+
async move {
259+
let res = f.await;
260+
261+
// Abort on panic because thread-local variables behave the same way.
262+
abort_on_panic(|| worker::get_task(|task| task.metadata().local_map.clear()));
263+
264+
res
265+
}
266+
}

src/task/mod.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,20 @@
2525
pub use std::task::{Context, Poll, Waker};
2626

2727
pub use block_on::block_on;
28+
pub use builder::Builder;
2829
pub use local::{AccessError, LocalKey};
29-
pub use pool::{current, spawn, Builder};
30+
pub use pool::spawn;
3031
pub use sleep::sleep;
3132
pub use task::{JoinHandle, Task, TaskId};
33+
pub use worker::current;
3234

3335
mod block_on;
36+
mod builder;
3437
mod local;
3538
mod pool;
3639
mod sleep;
40+
mod sleepers;
3741
mod task;
42+
mod worker;
3843

3944
pub(crate) mod blocking;

0 commit comments

Comments
 (0)