Skip to content

Commit

Permalink
Refactor scheduler and worker creation (#539)
Browse files Browse the repository at this point in the history
This commit eliminates unnecessary Option<T> fields, two-phase
initialization and unsafe get_mut_unchecked call in GCWorkScheduler.

We consider MMTK, GCWorkScheduler and GCWorkerShared as shared data
between threads, and GCWorker and GCController as private data of the
workers and the controller.  We now create shared data, including all
GCWorkerShared structs, before spawning any GC threads.  This means we
no longer spawn GC threads before GCWorkScheduler is fully initialized,
and eliminated some unsafe operations.

We temporarily make Options::threads only settable via environment
variable, because we now create GCWorkerShared instances when
GCWorkScheduler is created, which is usually static.
  • Loading branch information
wks authored Feb 11, 2022
1 parent c38365b commit 307c63a
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 169 deletions.
2 changes: 1 addition & 1 deletion src/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub fn initialize_collection<VM: VMBinding>(mmtk: &'static MMTK<VM>, tls: VMThre
!mmtk.plan.is_initialized(),
"MMTk collection has been initialized (was initialize_collection() already called before?)"
);
mmtk.scheduler.initialize(*mmtk.options.threads, mmtk, tls);
mmtk.scheduler.spawn_gc_threads(mmtk, tls);
mmtk.plan.base().initialized.store(true, Ordering::SeqCst);
}

Expand Down
9 changes: 8 additions & 1 deletion src/mmtk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,15 @@ impl<VM: VMBinding> MMTK<VM> {
// The first call will initialize SFT map. Other calls will be blocked until SFT map is initialized.
SFT_MAP.initialize_once();

let scheduler = GCWorkScheduler::new();
let options = Arc::new(UnsafeOptionsWrapper::new(Options::default()));

let num_workers = if cfg!(feature = "single_worker") {
1
} else {
*options.threads
};

let scheduler = GCWorkScheduler::new(num_workers);
let plan = crate::plan::create_plan(
*options.plan,
&VM_MAP,
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<VM: VMBinding> GCController<VM> {
}
}
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
if self.scheduler.worker_group().all_parked() && self.scheduler.all_buckets_empty() {
if self.scheduler.all_workers_parked() && self.scheduler.all_buckets_empty() {
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Prepare<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
.add(PrepareMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers_shared {
for w in &mmtk.scheduler.workers_shared {
w.local_work_bucket.add(PrepareCollector);
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
.add(ReleaseMutator::<C::VM>::new(mutator));
}
for w in &mmtk.scheduler.worker_group().workers_shared {
for w in &mmtk.scheduler.workers_shared {
w.local_work_bucket.add(ReleaseCollector);
}
// TODO: Process weak references properly
Expand Down
177 changes: 90 additions & 87 deletions src/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::stat::SchedulerStat;
use super::work_bucket::WorkBucketStage::*;
use super::work_bucket::*;
use super::worker::{GCWorker, GCWorkerShared, WorkerGroup};
use super::worker::{GCWorker, GCWorkerShared};
use super::*;
use crate::mmtk::MMTK;
use crate::util::opaque_pointer::*;
Expand All @@ -10,23 +11,24 @@ use enum_map::{enum_map, EnumMap};
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::mpsc::channel;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::sync::{Arc, Condvar, Mutex};

pub enum CoordinatorMessage<VM: VMBinding> {
Work(Box<dyn CoordinatorWork<VM>>),
AllWorkerParked,
BucketDrained,
}

/// The shared data structure for distributing work packets between worker threads and the coordinator thread.
pub struct GCWorkScheduler<VM: VMBinding> {
/// Work buckets
/// Work buckets for worker threads
pub work_buckets: EnumMap<WorkBucketStage, WorkBucket<VM>>,
/// Work for the coordinator thread
pub coordinator_work: WorkBucket<VM>,
/// The shared parts of GC workers
worker_group: Option<WorkerGroup<VM>>,
pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
/// The shared part of the GC worker object of the controller thread
coordinator_worker_shared: Option<RwLock<Arc<GCWorkerShared<VM>>>>,
coordinator_worker_shared: Arc<GCWorkerShared<VM>>,
/// Condition Variable for worker synchronization
pub worker_monitor: Arc<(Mutex<()>, Condvar)>,
/// A callback to be fired after the `Closure` bucket is drained.
Expand All @@ -41,79 +43,42 @@ pub struct GCWorkScheduler<VM: VMBinding> {
closure_end: Mutex<Option<Box<dyn Send + Fn() -> bool>>>,
}

// The 'channel' inside Scheduler disallows Sync for Scheduler. We have to make sure we use channel properly:
// 1. We should never directly use Sender. We clone the sender and let each worker have their own copy.
// 2. Only the coordinator can use Receiver.
// TODO: We should remove channel from Scheduler, and directly send Sender/Receiver when creating the coordinator and
// the workers.
// FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet.
// Some subtle interaction between ObjectRememberingBarrier, Mutator and some GCWork instances
// makes the compiler think WorkBucket is not Sync.
unsafe impl<VM: VMBinding> Sync for GCWorkScheduler<VM> {}

impl<VM: VMBinding> GCWorkScheduler<VM> {
pub fn new() -> Arc<Self> {
pub fn new(num_workers: usize) -> Arc<Self> {
let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default();
Arc::new(Self {
work_buckets: enum_map! {
WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()),
WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()),
},
coordinator_work: WorkBucket::new(true, worker_monitor.clone()),
worker_group: None,
coordinator_worker_shared: None,
worker_monitor,
closure_end: Mutex::new(None),
})
}

#[inline]
pub fn num_workers(&self) -> usize {
self.worker_group.as_ref().unwrap().worker_count()
}

pub fn initialize(
self: &'static Arc<Self>,
num_workers: usize,
mmtk: &'static MMTK<VM>,
tls: VMThread,
) {
use crate::scheduler::work_bucket::WorkBucketStage::*;
let num_workers = if cfg!(feature = "single_worker") {
1
} else {
num_workers
// Create work buckets for workers.
let mut work_buckets = enum_map! {
WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()),
WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()),
WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()),
};

let (sender, receiver) = channel::<CoordinatorMessage<VM>>();

let mut self_mut = self.clone();
let self_mut = unsafe { Arc::get_mut_unchecked(&mut self_mut) };

let coordinator_worker = GCWorker::new(mmtk, 0, self.clone(), true, sender.clone());
self_mut.coordinator_worker_shared = Some(RwLock::new(coordinator_worker.shared.clone()));

let (worker_group, spawn_workers) =
WorkerGroup::new(mmtk, num_workers, self.clone(), sender);
self_mut.worker_group = Some(worker_group);

// Set the open condition of each bucket.
{
// Unconstrained is always open. Prepare will be opened at the beginning of a GC.
// This vec will grow for each stage we call with open_next()
let mut open_stages: Vec<WorkBucketStage> = vec![Unconstrained, Prepare];
// The rest will open after the previous stage is done.
let mut open_next = |s: WorkBucketStage| {
let cur_stages = open_stages.clone();
self_mut.work_buckets[s].set_open_condition(move || {
let should_open =
self.are_buckets_drained(&cur_stages) && self.worker_group().all_parked();
work_buckets[s].set_open_condition(move |scheduler: &GCWorkScheduler<VM>| {
let should_open = scheduler.are_buckets_drained(&cur_stages)
&& scheduler.all_workers_parked();
// Additional check before the `RefClosure` bucket opens.
if should_open && s == WorkBucketStage::RefClosure {
if let Some(closure_end) = self.closure_end.lock().unwrap().as_ref() {
if let Some(closure_end) = scheduler.closure_end.lock().unwrap().as_ref() {
if closure_end() {
// Don't open `RefClosure` if `closure_end` added more works to `Closure`.
return false;
Expand All @@ -134,18 +99,72 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
open_next(Final);
}

// Now that the scheduler is initialized, we spawn the worker threads and the controller thread.
spawn_workers(tls);
// Create the work bucket for the controller.
let coordinator_work = WorkBucket::new(true, worker_monitor.clone());

// We prepare the shared part of workers, but do not create the actual workers now.
// The shared parts of workers are communication hubs between controller and workers.
let workers_shared = (0..num_workers)
.map(|_| Arc::new(GCWorkerShared::<VM>::new(worker_monitor.clone())))
.collect::<Vec<_>>();

// Similarly, we create the shared part of the work of the controller, but not the controller itself.
let coordinator_worker_shared = Arc::new(GCWorkerShared::<VM>::new(worker_monitor.clone()));

Arc::new(Self {
work_buckets,
coordinator_work,
workers_shared,
coordinator_worker_shared,
worker_monitor,
closure_end: Mutex::new(None),
})
}

#[inline]
pub fn num_workers(&self) -> usize {
self.workers_shared.len()
}

pub fn all_workers_parked(&self) -> bool {
self.workers_shared.iter().all(|w| w.is_parked())
}

/// Create GC threads, including the controller thread and all workers.
pub fn spawn_gc_threads(self: &Arc<Self>, mmtk: &'static MMTK<VM>, tls: VMThread) {
// Create the communication channel.
let (sender, receiver) = channel::<CoordinatorMessage<VM>>();

// Spawn the controller thread.
let coordinator_worker = GCWorker::new(
mmtk,
0,
self.clone(),
true,
sender.clone(),
self.coordinator_worker_shared.clone(),
);
let gc_controller = GCController::new(
mmtk,
mmtk.plan.base().gc_requester.clone(),
self.clone(),
receiver,
coordinator_worker,
);

VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Controller(gc_controller));

// Spawn each worker thread.
for (ordinal, shared) in self.workers_shared.iter().enumerate() {
let worker = Box::new(GCWorker::new(
mmtk,
ordinal,
self.clone(),
false,
sender.clone(),
shared.clone(),
));
VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Worker(worker));
}
}

/// Schedule all the common work packets
Expand Down Expand Up @@ -206,10 +225,6 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
*self.closure_end.lock().unwrap() = Some(f);
}

pub fn worker_group(&self) -> &WorkerGroup<VM> {
self.worker_group.as_ref().unwrap()
}

pub fn all_buckets_empty(&self) -> bool {
self.work_buckets.values().all(|bucket| bucket.is_empty())
}
Expand All @@ -221,7 +236,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
if id == WorkBucketStage::Unconstrained {
continue;
}
buckets_updated |= bucket.update();
buckets_updated |= bucket.update(self);
}
if buckets_updated {
// Notify the workers for new work
Expand Down Expand Up @@ -317,7 +332,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
}
// Park this worker
worker.shared.parked.store(true, Ordering::SeqCst);
if self.worker_group().all_parked() {
if self.all_workers_parked() {
worker
.sender
.send(CoordinatorMessage::AllWorkerParked)
Expand All @@ -331,33 +346,21 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
}

pub fn enable_stat(&self) {
for worker in &self.worker_group().workers_shared {
for worker in &self.workers_shared {
let worker_stat = worker.borrow_stat();
worker_stat.enable();
}
let coordinator_worker_shared = self
.coordinator_worker_shared
.as_ref()
.unwrap()
.read()
.unwrap();
let coordinator_worker_stat = coordinator_worker_shared.borrow_stat();
let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat();
coordinator_worker_stat.enable();
}

pub fn statistics(&self) -> HashMap<String, String> {
let mut summary = SchedulerStat::default();
for worker in &self.worker_group().workers_shared {
for worker in &self.workers_shared {
let worker_stat = worker.borrow_stat();
summary.merge(&worker_stat);
}
let coordinator_worker_shared = self
.coordinator_worker_shared
.as_ref()
.unwrap()
.read()
.unwrap();
let coordinator_worker_stat = coordinator_worker_shared.borrow_stat();
let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat();
summary.merge(&coordinator_worker_stat);
summary.harness_stat()
}
Expand Down
11 changes: 7 additions & 4 deletions src/scheduler/work_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct WorkBucket<VM: VMBinding> {
/// A priority queue
queue: RwLock<BinaryHeap<PrioritizedWork<VM>>>,
monitor: Arc<(Mutex<()>, Condvar)>,
can_open: Option<Box<dyn (Fn() -> bool) + Send>>,
can_open: Option<Box<dyn (Fn(&GCWorkScheduler<VM>) -> bool) + Send>>,
}

impl<VM: VMBinding> WorkBucket<VM> {
Expand Down Expand Up @@ -132,12 +132,15 @@ impl<VM: VMBinding> WorkBucket<VM> {
}
self.queue.write().pop().map(|v| v.work)
}
pub fn set_open_condition(&mut self, pred: impl Fn() -> bool + Send + 'static) {
pub fn set_open_condition(
&mut self,
pred: impl Fn(&GCWorkScheduler<VM>) -> bool + Send + 'static,
) {
self.can_open = Some(box pred);
}
pub fn update(&self) -> bool {
pub fn update(&self, scheduler: &GCWorkScheduler<VM>) -> bool {
if let Some(can_open) = self.can_open.as_ref() {
if !self.is_activated() && can_open() {
if !self.is_activated() && can_open(scheduler) {
self.activate();
return true;
}
Expand Down
Loading

0 comments on commit 307c63a

Please sign in to comment.