mmtk/scheduler/
worker_monitor.rs

1//! This module contains `WorkerMonitor` and related types.  It purposes includes:
2//!
3//! -   allowing workers to park,
4//! -   letting the last parked worker take action, and
5//! -   letting workers and mutators notify workers when workers are given things to do.
6
7use std::sync::{Condvar, Mutex};
8
9use super::{
10    worker::WorkerShouldExit,
11    worker_goals::{WorkerGoal, WorkerGoals},
12};
13
14/// The result type of the `on_last_parked` call-back in `WorkMonitor::park_and_wait`.
15/// It decides how many workers should wake up after `on_last_parked`.
16pub(crate) enum LastParkedResult {
17    /// The last parked worker should wait, too, until more work packets are added.
18    ParkSelf,
19    /// The last parked worker should unpark and find work packet to do.
20    WakeSelf,
21    /// Wake up all parked GC workers.
22    WakeAll,
23}
24
25/// A data structure for synchronizing workers with each other and with mutators.
26///
27/// Unlike `GCWorkerShared`, there is only one instance of `WorkerMonitor`.
28///
29/// -   It allows workers to park and unpark.
30/// -   It allows mutators to notify workers to schedule a GC.
31pub(crate) struct WorkerMonitor {
32    /// The synchronized part.
33    sync: Mutex<WorkerMonitorSync>,
34    /// Workers wait on this when idle.  Notified if workers have things to do.  That include:
35    /// -   any work packets available, and
36    /// -   any field in `sync.goals.requests` set to true.
37    workers_have_anything_to_do: Condvar,
38}
39
40/// The synchronized part of `WorkerMonitor`.
41struct WorkerMonitorSync {
42    /// Count parked workers.
43    parker: WorkerParker,
44    /// Current and requested goals.
45    goals: WorkerGoals,
46}
47
48/// This struct counts the number of workers parked and identifies the last parked worker.
49struct WorkerParker {
50    /// The total number of workers.
51    worker_count: usize,
52    /// Number of parked workers.
53    parked_workers: usize,
54}
55
56impl WorkerParker {
57    fn new(worker_count: usize) -> Self {
58        Self {
59            worker_count,
60            parked_workers: 0,
61        }
62    }
63
64    /// Increase the packed-workers counter.
65    /// Called before a worker is parked.
66    ///
67    /// Return true if all the workers are parked.
68    fn inc_parked_workers(&mut self) -> bool {
69        let old = self.parked_workers;
70        debug_assert!(old < self.worker_count);
71        let new = old + 1;
72        self.parked_workers = new;
73        new == self.worker_count
74    }
75
76    /// Decrease the packed-workers counter.
77    /// Called after a worker is resumed from the parked state.
78    fn dec_parked_workers(&mut self) {
79        let old = self.parked_workers;
80        debug_assert!(old <= self.worker_count);
81        debug_assert!(old > 0);
82        let new = old - 1;
83        self.parked_workers = new;
84    }
85}
86
87impl WorkerMonitor {
88    pub fn new(worker_count: usize) -> Self {
89        Self {
90            sync: Mutex::new(WorkerMonitorSync {
91                parker: WorkerParker::new(worker_count),
92                goals: Default::default(),
93            }),
94            workers_have_anything_to_do: Default::default(),
95        }
96    }
97
98    /// Make a request.  Can be called by a mutator to request the workers to work towards the
99    /// given `goal`.
100    pub fn make_request(&self, goal: WorkerGoal) {
101        let mut guard = self.sync.lock().unwrap();
102        let newly_requested = guard.goals.set_request(goal);
103        if newly_requested {
104            self.notify_work_available(false);
105        }
106    }
107
108    /// Wake up workers when more work packets are made available for workers,
109    /// or a mutator has requested the GC workers to schedule a GC.
110    pub fn notify_work_available(&self, all: bool) {
111        if all {
112            self.workers_have_anything_to_do.notify_all();
113        } else {
114            self.workers_have_anything_to_do.notify_one();
115        }
116    }
117
118    /// Park a worker and wait on the CondVar `workers_have_anything_to_do`.
119    ///
120    /// If it is the last worker parked, `on_last_parked` will be called.
121    /// The argument of `on_last_parked` is true if `sync.gc_requested` is `true`.
122    /// The return value of `on_last_parked` will determine whether this worker and other workers
123    /// will wake up or block waiting.
124    ///
125    /// This function returns `Ok(())` if the current worker should continue working,
126    /// or `Err(WorkerShouldExit)` if the current worker should exit now.
127    pub fn park_and_wait<F>(
128        &self,
129        ordinal: usize,
130        on_last_parked: F,
131    ) -> Result<(), WorkerShouldExit>
132    where
133        F: FnOnce(&mut WorkerGoals) -> LastParkedResult,
134    {
135        let mut sync = self.sync.lock().unwrap();
136
137        // Park this worker
138        let all_parked = sync.parker.inc_parked_workers();
139        trace!(
140            "Worker {} parked.  parked/total: {}/{}.  All parked: {}",
141            ordinal,
142            sync.parker.parked_workers,
143            sync.parker.worker_count,
144            all_parked
145        );
146
147        let mut should_wait = false;
148
149        if all_parked {
150            trace!("Worker {} is the last worker parked.", ordinal);
151            let result = on_last_parked(&mut sync.goals);
152            match result {
153                LastParkedResult::ParkSelf => {
154                    should_wait = true;
155                }
156                LastParkedResult::WakeSelf => {
157                    // Continue without waiting.
158                }
159                LastParkedResult::WakeAll => {
160                    self.notify_work_available(true);
161                }
162            }
163        } else {
164            should_wait = true;
165        }
166
167        if should_wait {
168            // Notes on CondVar usage:
169            //
170            // Conditional variables are usually tested in a loop while holding a mutex
171            //
172            //      lock();
173            //      while condition() {
174            //          condvar.wait();
175            //      }
176            //      unlock();
177            //
178            // The actual condition for this `self.workers_have_anything_to_do.wait(sync)` is:
179            //
180            // 1.  any work packet is available, or
181            // 2.  a goal (such as doing GC) is requested
182            //
183            // But it is not used like the typical use pattern shown above, mainly because work
184            // packets can be added without holding the mutex `self.sync`.  This means one worker
185            // can add a new work packet (no mutex needed) right after another worker finds no work
186            // packets are available and then park.  In other words, condition (1) can suddenly
187            // become true after a worker sees it is false but before the worker blocks waiting on
188            // the CondVar.  If this happens, the last parked worker will block forever and never
189            // get notified.  This may happen if mutators or the previously existing "coordinator
190            // thread" can add work packets.
191            //
192            // However, after the "coordinator thread" was removed, only GC worker threads can add
193            // work packets during GC.  Parked workers (except the last parked worker) cannot make
194            // more work packets availble (by adding new packets or opening buckets).  For this
195            // reason, the **last** parked worker can be sure that after it finds no packets
196            // available, no other workers can add another work packet (because they all parked).
197            // So the **last** parked worker can open more buckets or declare GC finished.
198            //
199            // Condition (2), i.e. goals added to `sync.goals`, is guarded by the monitor `sync`.
200            // When a mutator adds a goal via `WorkerMonitor::make_request`, it will notify a
201            // worker; and the last parked worker always checks it before waiting.  So this
202            // condition will not be set without any worker noticing.
203            //
204            // Note that generational barriers may add `ProcessModBuf` work packets when not in GC.
205            // This is benign because those work packets are not executed immediately, and are
206            // guaranteed to be executed in the next GC.
207
208            // Notes on spurious wake-up:
209            //
210            // 1.  The condition variable `workers_have_anything_to_do` is guarded by `self.sync`.
211            //     Because the last parked worker is holding the mutex `self.sync` when executing
212            //     `on_last_parked`, no workers can unpark (even if they spuriously wake up) during
213            //     `on_last_parked` because they cannot re-acquire the mutex `self.sync`.
214            //
215            // 2.  Workers may spuriously wake up and unpark when `on_last_parked` is not being
216            //     executed (including the case when the last parked worker is waiting here, too).
217            //     If one or more GC workers spuriously wake up, they will check for work packets,
218            //     and park again if not available.  The last parked worker will ensure the two
219            //     conditions listed above are both false before blocking.  If either condition is
220            //     true, the last parked worker will take action.
221            sync = self.workers_have_anything_to_do.wait(sync).unwrap();
222        }
223
224        // Unpark this worker.
225        sync.parker.dec_parked_workers();
226        trace!(
227            "Worker {} unparked.  parked/total: {}/{}.",
228            ordinal,
229            sync.parker.parked_workers,
230            sync.parker.worker_count,
231        );
232
233        // If the current goal is `StopForFork`, the worker thread should exit.
234        if matches!(sync.goals.current(), Some(WorkerGoal::StopForFork)) {
235            return Err(WorkerShouldExit);
236        }
237
238        Ok(())
239    }
240
241    /// Called when all workers have exited.
242    pub fn on_all_workers_exited(&self) {
243        let mut sync = self.sync.try_lock().unwrap();
244        sync.goals.on_current_goal_completed();
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use std::sync::{
251        atomic::{AtomicBool, AtomicUsize, Ordering},
252        Arc,
253    };
254
255    use super::WorkerMonitor;
256
257    /// Test if the `WorkerMonitor::park_and_wait` method calls the `on_last_parked` callback
258    /// properly.
259    #[test]
260    fn test_last_worker_park_wake_all() {
261        let number_threads = 4;
262        let worker_monitor = Arc::new(WorkerMonitor::new(number_threads));
263        let on_last_parked_called = AtomicUsize::new(0);
264        let should_unpark = AtomicBool::new(false);
265
266        std::thread::scope(|scope| {
267            for ordinal in 0..number_threads {
268                let worker_monitor = worker_monitor.clone();
269                let on_last_parked_called = &on_last_parked_called;
270                let should_unpark = &should_unpark;
271                scope.spawn(move || {
272                    // This emulates the use pattern in the scheduler, i.e. checking the condition
273                    // ("Is there any work packets available") without holding a mutex.
274                    while !should_unpark.load(Ordering::SeqCst) {
275                        println!("Thread {} parking...", ordinal);
276                        worker_monitor
277                            .park_and_wait(ordinal, |_goals| {
278                                println!("Thread {} is the last thread parked.", ordinal);
279                                on_last_parked_called.fetch_add(1, Ordering::SeqCst);
280                                should_unpark.store(true, Ordering::SeqCst);
281                                super::LastParkedResult::WakeAll
282                            })
283                            .unwrap();
284                        println!("Thread {} unparked.", ordinal);
285                    }
286                });
287            }
288        });
289
290        // `on_last_parked` should only be called once.
291        assert_eq!(on_last_parked_called.load(Ordering::SeqCst), 1);
292    }
293
294    /// Like `test_last_worker_park_wake_all`, but only wake up the last parked worker when it
295    /// parked.
296    #[test]
297    fn test_last_worker_park_wake_self() {
298        let number_threads = 4;
299        let worker_monitor = Arc::new(WorkerMonitor::new(number_threads));
300        let on_last_parked_called = AtomicUsize::new(0);
301        let threads_running = AtomicUsize::new(0);
302        let should_unpark = AtomicBool::new(false);
303
304        std::thread::scope(|scope| {
305            for ordinal in 0..number_threads {
306                let worker_monitor = worker_monitor.clone();
307                let on_last_parked_called = &on_last_parked_called;
308                let threads_running = &threads_running;
309                let should_unpark = &should_unpark;
310                scope.spawn(move || {
311                    let mut i_am_the_last_parked_worker = false;
312                    // Record the number of threads entering the following `while` loop.
313                    threads_running.fetch_add(1, Ordering::SeqCst);
314                    while !should_unpark.load(Ordering::SeqCst) {
315                        println!("Thread {} parking...", ordinal);
316                        worker_monitor
317                            .park_and_wait(ordinal, |_goals| {
318                                println!("Thread {} is the last thread parked.", ordinal);
319                                on_last_parked_called.fetch_add(1, Ordering::SeqCst);
320                                should_unpark.store(true, Ordering::SeqCst);
321                                i_am_the_last_parked_worker = true;
322                                super::LastParkedResult::WakeSelf
323                            })
324                            .unwrap();
325                        println!("Thread {} unparked.", ordinal);
326                    }
327                    threads_running.fetch_sub(1, Ordering::SeqCst);
328
329                    if i_am_the_last_parked_worker {
330                        println!("The last parked worker woke up");
331                        // Only the current worker should wake and leave the `while` loop above.
332                        assert_eq!(threads_running.load(Ordering::SeqCst), number_threads - 1);
333                        should_unpark.store(true, Ordering::SeqCst);
334                        worker_monitor.notify_work_available(true);
335                    }
336                });
337            }
338        });
339
340        // `on_last_parked` should only be called once.
341        assert_eq!(on_last_parked_called.load(Ordering::SeqCst), 1);
342    }
343}