mmtk/scheduler/worker_monitor.rs
1//! This module contains `WorkerMonitor` and related types. It purposes includes:
2//!
3//! - allowing workers to park,
4//! - letting the last parked worker take action, and
5//! - letting workers and mutators notify workers when workers are given things to do.
6
7use std::sync::{Condvar, Mutex};
8
9use super::{
10 worker::WorkerShouldExit,
11 worker_goals::{WorkerGoal, WorkerGoals},
12};
13
14/// The result type of the `on_last_parked` call-back in `WorkMonitor::park_and_wait`.
15/// It decides how many workers should wake up after `on_last_parked`.
16pub(crate) enum LastParkedResult {
17 /// The last parked worker should wait, too, until more work packets are added.
18 ParkSelf,
19 /// The last parked worker should unpark and find work packet to do.
20 WakeSelf,
21 /// Wake up all parked GC workers.
22 WakeAll,
23}
24
25/// A data structure for synchronizing workers with each other and with mutators.
26///
27/// Unlike `GCWorkerShared`, there is only one instance of `WorkerMonitor`.
28///
29/// - It allows workers to park and unpark.
30/// - It allows mutators to notify workers to schedule a GC.
31pub(crate) struct WorkerMonitor {
32 /// The synchronized part.
33 sync: Mutex<WorkerMonitorSync>,
34 /// Workers wait on this when idle. Notified if workers have things to do. That include:
35 /// - any work packets available, and
36 /// - any field in `sync.goals.requests` set to true.
37 workers_have_anything_to_do: Condvar,
38}
39
40/// The synchronized part of `WorkerMonitor`.
41struct WorkerMonitorSync {
42 /// Count parked workers.
43 parker: WorkerParker,
44 /// Current and requested goals.
45 goals: WorkerGoals,
46}
47
48/// This struct counts the number of workers parked and identifies the last parked worker.
49struct WorkerParker {
50 /// The total number of workers.
51 worker_count: usize,
52 /// Number of parked workers.
53 parked_workers: usize,
54}
55
56impl WorkerParker {
57 fn new(worker_count: usize) -> Self {
58 Self {
59 worker_count,
60 parked_workers: 0,
61 }
62 }
63
64 /// Increase the packed-workers counter.
65 /// Called before a worker is parked.
66 ///
67 /// Return true if all the workers are parked.
68 fn inc_parked_workers(&mut self) -> bool {
69 let old = self.parked_workers;
70 debug_assert!(old < self.worker_count);
71 let new = old + 1;
72 self.parked_workers = new;
73 new == self.worker_count
74 }
75
76 /// Decrease the packed-workers counter.
77 /// Called after a worker is resumed from the parked state.
78 fn dec_parked_workers(&mut self) {
79 let old = self.parked_workers;
80 debug_assert!(old <= self.worker_count);
81 debug_assert!(old > 0);
82 let new = old - 1;
83 self.parked_workers = new;
84 }
85}
86
87impl WorkerMonitor {
88 pub fn new(worker_count: usize) -> Self {
89 Self {
90 sync: Mutex::new(WorkerMonitorSync {
91 parker: WorkerParker::new(worker_count),
92 goals: Default::default(),
93 }),
94 workers_have_anything_to_do: Default::default(),
95 }
96 }
97
98 /// Make a request. Can be called by a mutator to request the workers to work towards the
99 /// given `goal`.
100 pub fn make_request(&self, goal: WorkerGoal) {
101 let mut guard = self.sync.lock().unwrap();
102 let newly_requested = guard.goals.set_request(goal);
103 if newly_requested {
104 self.notify_work_available(false);
105 }
106 }
107
108 /// Wake up workers when more work packets are made available for workers,
109 /// or a mutator has requested the GC workers to schedule a GC.
110 pub fn notify_work_available(&self, all: bool) {
111 if all {
112 self.workers_have_anything_to_do.notify_all();
113 } else {
114 self.workers_have_anything_to_do.notify_one();
115 }
116 }
117
118 /// Park a worker and wait on the CondVar `workers_have_anything_to_do`.
119 ///
120 /// If it is the last worker parked, `on_last_parked` will be called.
121 /// The argument of `on_last_parked` is true if `sync.gc_requested` is `true`.
122 /// The return value of `on_last_parked` will determine whether this worker and other workers
123 /// will wake up or block waiting.
124 ///
125 /// This function returns `Ok(())` if the current worker should continue working,
126 /// or `Err(WorkerShouldExit)` if the current worker should exit now.
127 pub fn park_and_wait<F>(
128 &self,
129 ordinal: usize,
130 on_last_parked: F,
131 ) -> Result<(), WorkerShouldExit>
132 where
133 F: FnOnce(&mut WorkerGoals) -> LastParkedResult,
134 {
135 let mut sync = self.sync.lock().unwrap();
136
137 // Park this worker
138 let all_parked = sync.parker.inc_parked_workers();
139 trace!(
140 "Worker {} parked. parked/total: {}/{}. All parked: {}",
141 ordinal,
142 sync.parker.parked_workers,
143 sync.parker.worker_count,
144 all_parked
145 );
146
147 let mut should_wait = false;
148
149 if all_parked {
150 trace!("Worker {} is the last worker parked.", ordinal);
151 let result = on_last_parked(&mut sync.goals);
152 match result {
153 LastParkedResult::ParkSelf => {
154 should_wait = true;
155 }
156 LastParkedResult::WakeSelf => {
157 // Continue without waiting.
158 }
159 LastParkedResult::WakeAll => {
160 self.notify_work_available(true);
161 }
162 }
163 } else {
164 should_wait = true;
165 }
166
167 if should_wait {
168 // Notes on CondVar usage:
169 //
170 // Conditional variables are usually tested in a loop while holding a mutex
171 //
172 // lock();
173 // while condition() {
174 // condvar.wait();
175 // }
176 // unlock();
177 //
178 // The actual condition for this `self.workers_have_anything_to_do.wait(sync)` is:
179 //
180 // 1. any work packet is available, or
181 // 2. a goal (such as doing GC) is requested
182 //
183 // But it is not used like the typical use pattern shown above, mainly because work
184 // packets can be added without holding the mutex `self.sync`. This means one worker
185 // can add a new work packet (no mutex needed) right after another worker finds no work
186 // packets are available and then park. In other words, condition (1) can suddenly
187 // become true after a worker sees it is false but before the worker blocks waiting on
188 // the CondVar. If this happens, the last parked worker will block forever and never
189 // get notified. This may happen if mutators or the previously existing "coordinator
190 // thread" can add work packets.
191 //
192 // However, after the "coordinator thread" was removed, only GC worker threads can add
193 // work packets during GC. Parked workers (except the last parked worker) cannot make
194 // more work packets availble (by adding new packets or opening buckets). For this
195 // reason, the **last** parked worker can be sure that after it finds no packets
196 // available, no other workers can add another work packet (because they all parked).
197 // So the **last** parked worker can open more buckets or declare GC finished.
198 //
199 // Condition (2), i.e. goals added to `sync.goals`, is guarded by the monitor `sync`.
200 // When a mutator adds a goal via `WorkerMonitor::make_request`, it will notify a
201 // worker; and the last parked worker always checks it before waiting. So this
202 // condition will not be set without any worker noticing.
203 //
204 // Note that generational barriers may add `ProcessModBuf` work packets when not in GC.
205 // This is benign because those work packets are not executed immediately, and are
206 // guaranteed to be executed in the next GC.
207
208 // Notes on spurious wake-up:
209 //
210 // 1. The condition variable `workers_have_anything_to_do` is guarded by `self.sync`.
211 // Because the last parked worker is holding the mutex `self.sync` when executing
212 // `on_last_parked`, no workers can unpark (even if they spuriously wake up) during
213 // `on_last_parked` because they cannot re-acquire the mutex `self.sync`.
214 //
215 // 2. Workers may spuriously wake up and unpark when `on_last_parked` is not being
216 // executed (including the case when the last parked worker is waiting here, too).
217 // If one or more GC workers spuriously wake up, they will check for work packets,
218 // and park again if not available. The last parked worker will ensure the two
219 // conditions listed above are both false before blocking. If either condition is
220 // true, the last parked worker will take action.
221 sync = self.workers_have_anything_to_do.wait(sync).unwrap();
222 }
223
224 // Unpark this worker.
225 sync.parker.dec_parked_workers();
226 trace!(
227 "Worker {} unparked. parked/total: {}/{}.",
228 ordinal,
229 sync.parker.parked_workers,
230 sync.parker.worker_count,
231 );
232
233 // If the current goal is `StopForFork`, the worker thread should exit.
234 if matches!(sync.goals.current(), Some(WorkerGoal::StopForFork)) {
235 return Err(WorkerShouldExit);
236 }
237
238 Ok(())
239 }
240
241 /// Called when all workers have exited.
242 pub fn on_all_workers_exited(&self) {
243 let mut sync = self.sync.try_lock().unwrap();
244 sync.goals.on_current_goal_completed();
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use std::sync::{
251 atomic::{AtomicBool, AtomicUsize, Ordering},
252 Arc,
253 };
254
255 use super::WorkerMonitor;
256
257 /// Test if the `WorkerMonitor::park_and_wait` method calls the `on_last_parked` callback
258 /// properly.
259 #[test]
260 fn test_last_worker_park_wake_all() {
261 let number_threads = 4;
262 let worker_monitor = Arc::new(WorkerMonitor::new(number_threads));
263 let on_last_parked_called = AtomicUsize::new(0);
264 let should_unpark = AtomicBool::new(false);
265
266 std::thread::scope(|scope| {
267 for ordinal in 0..number_threads {
268 let worker_monitor = worker_monitor.clone();
269 let on_last_parked_called = &on_last_parked_called;
270 let should_unpark = &should_unpark;
271 scope.spawn(move || {
272 // This emulates the use pattern in the scheduler, i.e. checking the condition
273 // ("Is there any work packets available") without holding a mutex.
274 while !should_unpark.load(Ordering::SeqCst) {
275 println!("Thread {} parking...", ordinal);
276 worker_monitor
277 .park_and_wait(ordinal, |_goals| {
278 println!("Thread {} is the last thread parked.", ordinal);
279 on_last_parked_called.fetch_add(1, Ordering::SeqCst);
280 should_unpark.store(true, Ordering::SeqCst);
281 super::LastParkedResult::WakeAll
282 })
283 .unwrap();
284 println!("Thread {} unparked.", ordinal);
285 }
286 });
287 }
288 });
289
290 // `on_last_parked` should only be called once.
291 assert_eq!(on_last_parked_called.load(Ordering::SeqCst), 1);
292 }
293
294 /// Like `test_last_worker_park_wake_all`, but only wake up the last parked worker when it
295 /// parked.
296 #[test]
297 fn test_last_worker_park_wake_self() {
298 let number_threads = 4;
299 let worker_monitor = Arc::new(WorkerMonitor::new(number_threads));
300 let on_last_parked_called = AtomicUsize::new(0);
301 let threads_running = AtomicUsize::new(0);
302 let should_unpark = AtomicBool::new(false);
303
304 std::thread::scope(|scope| {
305 for ordinal in 0..number_threads {
306 let worker_monitor = worker_monitor.clone();
307 let on_last_parked_called = &on_last_parked_called;
308 let threads_running = &threads_running;
309 let should_unpark = &should_unpark;
310 scope.spawn(move || {
311 let mut i_am_the_last_parked_worker = false;
312 // Record the number of threads entering the following `while` loop.
313 threads_running.fetch_add(1, Ordering::SeqCst);
314 while !should_unpark.load(Ordering::SeqCst) {
315 println!("Thread {} parking...", ordinal);
316 worker_monitor
317 .park_and_wait(ordinal, |_goals| {
318 println!("Thread {} is the last thread parked.", ordinal);
319 on_last_parked_called.fetch_add(1, Ordering::SeqCst);
320 should_unpark.store(true, Ordering::SeqCst);
321 i_am_the_last_parked_worker = true;
322 super::LastParkedResult::WakeSelf
323 })
324 .unwrap();
325 println!("Thread {} unparked.", ordinal);
326 }
327 threads_running.fetch_sub(1, Ordering::SeqCst);
328
329 if i_am_the_last_parked_worker {
330 println!("The last parked worker woke up");
331 // Only the current worker should wake and leave the `while` loop above.
332 assert_eq!(threads_running.load(Ordering::SeqCst), number_threads - 1);
333 should_unpark.store(true, Ordering::SeqCst);
334 worker_monitor.notify_work_available(true);
335 }
336 });
337 }
338 });
339
340 // `on_last_parked` should only be called once.
341 assert_eq!(on_last_parked_called.load(Ordering::SeqCst), 1);
342 }
343}