mmtk/scheduler/work_bucket.rs
1use super::worker_monitor::WorkerMonitor;
2use super::*;
3use crate::vm::VMBinding;
4use crossbeam::deque::{Injector, Steal, Worker};
5use enum_map::Enum;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9pub(super) struct BucketQueue<VM: VMBinding> {
10 queue: Injector<Box<dyn GCWork<VM>>>,
11}
12
13impl<VM: VMBinding> BucketQueue<VM> {
14 fn new() -> Self {
15 Self {
16 queue: Injector::new(),
17 }
18 }
19
20 fn is_empty(&self) -> bool {
21 self.queue.is_empty()
22 }
23
24 fn steal_batch_and_pop(
25 &self,
26 dest: &Worker<Box<dyn GCWork<VM>>>,
27 ) -> Steal<Box<dyn GCWork<VM>>> {
28 self.queue.steal_batch_and_pop(dest)
29 }
30
31 fn push(&self, w: Box<dyn GCWork<VM>>) {
32 self.queue.push(w);
33 }
34
35 fn push_all(&self, ws: Vec<Box<dyn GCWork<VM>>>) {
36 for w in ws {
37 self.queue.push(w);
38 }
39 }
40
41 /// Dump all the packets in this queue for debugging purpose.
42 /// This function may dump items from the queue temporarily, thus should only be called when it is safe to do so
43 /// (e.g. when the execution has failed already and the system is going to panic).
44 pub fn debug_dump_packets(&self) -> Vec<String> {
45 let mut items = Vec::new();
46
47 {
48 // Drain queue by stealing until empty
49 loop {
50 match self.queue.steal() {
51 crossbeam::deque::Steal::Success(work) => {
52 items.push(work);
53 }
54 crossbeam::deque::Steal::Retry => continue,
55 crossbeam::deque::Steal::Empty => break,
56 }
57 }
58 }
59
60 // Format collected items (just type names or Debug, depending on GCWork)
61 let debug_items: Vec<String> = items
62 .iter()
63 .map(|i| i.get_type_name().to_string()) // placeholder since GCWork isn’t Debug
64 .collect();
65
66 // Push items back into the queue
67 {
68 for work in items {
69 self.queue.push(work);
70 }
71 }
72
73 debug_items
74 }
75}
76
77pub type BucketOpenCondition<VM> = Box<dyn (Fn(&GCWorkScheduler<VM>) -> bool) + Send>;
78
79pub struct WorkBucket<VM: VMBinding> {
80 /// Whether this bucket has been opened. Work from an open bucket can be fetched by workers.
81 open: AtomicBool,
82 /// Whether this bucket is enabled.
83 /// A disabled work bucket will behave as if it does not exist in terms of scheduling,
84 /// except that users can add work to a disabled bucket, and enable it later to allow those
85 /// work to be scheduled.
86 enabled: AtomicBool,
87 /// The stage name of this bucket.
88 stage: WorkBucketStage,
89 queue: BucketQueue<VM>,
90 prioritized_queue: Option<BucketQueue<VM>>,
91 monitor: Arc<WorkerMonitor>,
92 /// The open condition for a bucket. If this is `Some`, the bucket will be open
93 /// when the condition is met. If this is `None`, the bucket needs to be open manually.
94 can_open: Option<BucketOpenCondition<VM>>,
95 /// After this bucket is open and all pending work packets (including the packets in this
96 /// bucket) are drained, this work packet, if exists, will be added to this bucket. When this
97 /// happens, it will prevent opening subsequent work packets.
98 ///
99 /// The sentinel work packet may set another work packet as the new sentinel which will be
100 /// added to this bucket again after all pending work packets are drained. This may happend
101 /// again and again, causing the GC to stay at the same stage and drain work packets in a loop.
102 ///
103 /// This is useful for handling weak references that may expand the transitive closure
104 /// recursively, such as ephemerons and Java-style SoftReference and finalizers. Sentinels
105 /// can be used repeatedly to discover and process more such objects.
106 sentinel: Mutex<Option<Box<dyn GCWork<VM>>>>,
107}
108
109impl<VM: VMBinding> WorkBucket<VM> {
110 pub(crate) fn new(stage: WorkBucketStage, monitor: Arc<WorkerMonitor>) -> Self {
111 Self {
112 open: AtomicBool::new(stage.is_open_by_default()),
113 enabled: AtomicBool::new(stage.is_enabled_by_default()),
114 stage,
115 queue: BucketQueue::new(),
116 prioritized_queue: None,
117 monitor,
118 can_open: None,
119 sentinel: Mutex::new(None),
120 }
121 }
122
123 pub fn set_enabled(&self, enabled: bool) {
124 self.enabled.store(enabled, Ordering::SeqCst)
125 }
126
127 pub fn is_enabled(&self) -> bool {
128 self.enabled.load(Ordering::Relaxed)
129 }
130
131 pub fn enable_prioritized_queue(&mut self) {
132 self.prioritized_queue = Some(BucketQueue::new());
133 }
134
135 fn notify_one_worker(&self) {
136 // If the bucket is not open, don't notify anyone.
137 if !self.is_open() || !self.is_enabled() {
138 return;
139 }
140 // Notify one if there're any parked workers.
141 self.monitor.notify_work_available(false);
142 }
143
144 pub fn notify_all_workers(&self) {
145 // If the bucket is not open, don't notify anyone.
146 if !self.is_open() || !self.is_enabled() {
147 return;
148 }
149 // Notify all if there're any parked workers.
150 self.monitor.notify_work_available(true);
151 }
152
153 pub fn is_open(&self) -> bool {
154 self.open.load(Ordering::SeqCst)
155 }
156
157 /// Open the bucket
158 pub fn open(&self) {
159 self.open.store(true, Ordering::SeqCst);
160 }
161
162 /// Test if the bucket is drained
163 pub fn is_empty(&self) -> bool {
164 self.queue.is_empty()
165 && self
166 .prioritized_queue
167 .as_ref()
168 .map(|q| q.is_empty())
169 .unwrap_or(true)
170 }
171
172 pub fn is_drained(&self) -> bool {
173 !self.is_enabled() || (self.is_open() && self.is_empty())
174 }
175
176 /// Close the bucket
177 pub fn close(&self) {
178 debug_assert!(
179 self.queue.is_empty(),
180 "Bucket {:?} not drained before close",
181 self.stage
182 );
183 self.open.store(false, Ordering::Relaxed);
184 }
185
186 /// Add a work packet to this bucket
187 /// Panic if this bucket cannot receive prioritized packets.
188 pub fn add_prioritized(&self, work: Box<dyn GCWork<VM>>) {
189 self.prioritized_queue.as_ref().unwrap().push(work);
190 self.notify_one_worker();
191 }
192
193 /// Add a work packet to this bucket
194 pub fn add<W: GCWork<VM>>(&self, work: W) {
195 self.queue.push(Box::new(work));
196 self.notify_one_worker();
197 }
198
199 /// Add a work packet to this bucket
200 pub fn add_boxed(&self, work: Box<dyn GCWork<VM>>) {
201 self.queue.push(work);
202 self.notify_one_worker();
203 }
204
205 /// Add a work packet to this bucket, but do not notify any workers.
206 /// This is useful when the current thread is holding the mutex of `WorkerMonitor` which is
207 /// used for notifying workers. This usually happens if the current thread is the last worker
208 /// parked.
209 pub(crate) fn add_no_notify<W: GCWork<VM>>(&self, work: W) {
210 self.queue.push(Box::new(work));
211 }
212
213 /// Like [`WorkBucket::add_no_notify`], but the work is boxed.
214 pub(crate) fn add_boxed_no_notify(&self, work: Box<dyn GCWork<VM>>) {
215 self.queue.push(work);
216 }
217
218 /// Add multiple packets with a higher priority.
219 /// Panic if this bucket cannot receive prioritized packets.
220 pub fn bulk_add_prioritized(&self, work_vec: Vec<Box<dyn GCWork<VM>>>) {
221 self.prioritized_queue.as_ref().unwrap().push_all(work_vec);
222 self.notify_all_workers();
223 }
224
225 /// Add multiple packets
226 pub fn bulk_add(&self, work_vec: Vec<Box<dyn GCWork<VM>>>) {
227 if work_vec.is_empty() {
228 return;
229 }
230 self.queue.push_all(work_vec);
231 self.notify_all_workers();
232 }
233
234 /// Get a work packet from this bucket
235 pub fn poll(&self, worker: &Worker<Box<dyn GCWork<VM>>>) -> Steal<Box<dyn GCWork<VM>>> {
236 if !self.is_enabled() || !self.is_open() || self.is_empty() {
237 return Steal::Empty;
238 }
239 if let Some(prioritized_queue) = self.prioritized_queue.as_ref() {
240 prioritized_queue
241 .steal_batch_and_pop(worker)
242 .or_else(|| self.queue.steal_batch_and_pop(worker))
243 } else {
244 self.queue.steal_batch_and_pop(worker)
245 }
246 }
247
248 pub fn set_open_condition(
249 &mut self,
250 pred: impl Fn(&GCWorkScheduler<VM>) -> bool + Send + 'static,
251 ) {
252 self.can_open = Some(Box::new(pred));
253 }
254
255 pub fn set_sentinel(&self, new_sentinel: Box<dyn GCWork<VM>>) {
256 let mut sentinel = self.sentinel.lock().unwrap();
257 *sentinel = Some(new_sentinel);
258 }
259
260 pub fn has_sentinel(&self) -> bool {
261 let sentinel = self.sentinel.lock().unwrap();
262 sentinel.is_some()
263 }
264
265 pub fn update(&self, scheduler: &GCWorkScheduler<VM>) -> bool {
266 if let Some(can_open) = self.can_open.as_ref() {
267 if !self.is_open() && can_open(scheduler) {
268 debug!("Opening work bucket: {:?}", self.stage);
269 self.open();
270 return true;
271 }
272 }
273 false
274 }
275
276 pub fn maybe_schedule_sentinel(&self) -> bool {
277 debug_assert!(
278 self.is_open(),
279 "Attempted to schedule sentinel work while bucket is not open"
280 );
281 let maybe_sentinel = {
282 let mut sentinel = self.sentinel.lock().unwrap();
283 sentinel.take()
284 };
285 if let Some(work) = maybe_sentinel {
286 // We don't need to notify other workers because this function is called by the last
287 // parked worker. After this function returns, the caller will notify workers because
288 // more work packets become available.
289 self.add_boxed_no_notify(work);
290 true
291 } else {
292 false
293 }
294 }
295
296 pub(super) fn get_queue(&self) -> &BucketQueue<VM> {
297 &self.queue
298 }
299
300 pub(super) fn get_stage(&self) -> WorkBucketStage {
301 self.stage
302 }
303}
304
305/// This enum defines all the work bucket types. The scheduler
306/// will instantiate a work bucket for each stage defined here.
307#[derive(Debug, Enum, Copy, Clone, Eq, PartialEq)]
308pub enum WorkBucketStage {
309 /// This bucket is always open.
310 Unconstrained,
311 /// This bucket is intended for concurrent work. Though some concurrent work may be put and executed in the unconstrained bucket,
312 /// work in the unconstrained bucket will always be consumed during STW. Users can disable this bucket
313 /// and cache some concurrent work during STW, and only enable this bucket and allow concurrent execution once a STW is done.
314 Concurrent,
315 /// Preparation work. Plans, spaces, GC workers, mutators, etc. should be prepared for GC at
316 /// this stage.
317 Prepare,
318 /// Clear the VO bit metadata. Mainly used by ImmixSpace.
319 #[cfg(feature = "vo_bit")]
320 ClearVOBits,
321 /// Compute the transtive closure starting from transitively pinning (TP) roots following only strong references.
322 /// No objects in this closure are allow to move.
323 TPinningClosure,
324 /// Trace (non-transitively) pinning roots. Objects pointed by those roots must not move, but their children may. To ensure correctness, these must be processed after TPinningClosure
325 PinningRootsTrace,
326 /// Compute the transtive closure following only strong references.
327 Closure,
328 /// Handle Java-style soft references, and potentially expand the transitive closure.
329 SoftRefClosure,
330 /// Handle Java-style weak references.
331 WeakRefClosure,
332 /// Resurrect Java-style finalizable objects, and potentially expand the transitive closure.
333 FinalRefClosure,
334 /// Handle Java-style phantom references.
335 PhantomRefClosure,
336 /// Let the VM handle VM-specific weak data structures, including weak references, weak
337 /// collections, table of finalizable objects, ephemerons, etc. Potentially expand the
338 /// transitive closure.
339 ///
340 /// NOTE: This stage is intended to replace the Java-specific weak reference handling stages
341 /// above.
342 VMRefClosure,
343 /// Compute the forwarding addresses of objects (mark-compact-only).
344 CalculateForwarding,
345 /// Scan roots again to initiate another transitive closure to update roots and reference
346 /// after computing the forwarding addresses (mark-compact-only).
347 SecondRoots,
348 /// Update Java-style weak references after computing forwarding addresses (mark-compact-only).
349 ///
350 /// NOTE: This stage should be updated to adapt to the VM-side reference handling. It shall
351 /// be kept after removing `{Soft,Weak,Final,Phantom}RefClosure`.
352 RefForwarding,
353 /// Update the list of Java-style finalization cadidates and finalizable objects after
354 /// computing forwarding addresses (mark-compact-only).
355 FinalizableForwarding,
356 /// Let the VM handle the forwarding of reference fields in any VM-specific weak data
357 /// structures, including weak references, weak collections, table of finalizable objects,
358 /// ephemerons, etc., after computing forwarding addresses (mark-compact-only).
359 ///
360 /// NOTE: This stage is intended to replace Java-specific forwarding phases above.
361 VMRefForwarding,
362 /// Compact objects (mark-compact-only).
363 Compact,
364 /// Work packets that should be done just before GC shall go here. This includes releasing
365 /// resources and setting states in plans, spaces, GC workers, mutators, etc.
366 Release,
367 /// Resume mutators and end GC.
368 Final,
369}
370
371impl WorkBucketStage {
372 /// The first stop-the-world stage. This stage has no open condition, and will be opened manually
373 /// once all the mutators threads are stopped.
374 pub const FIRST_STW_STAGE: Self = WorkBucketStage::Prepare;
375
376 /// Is this the first stop-the-world stage? See [`Self::FIRST_STW_STAGE`].
377 pub const fn is_first_stw_stage(&self) -> bool {
378 matches!(self, &WorkBucketStage::FIRST_STW_STAGE)
379 }
380
381 /// Is this stage always open?
382 pub const fn is_always_open(&self) -> bool {
383 matches!(self, WorkBucketStage::Unconstrained)
384 }
385
386 /// Is this stage open by default?
387 pub const fn is_open_by_default(&self) -> bool {
388 matches!(
389 self,
390 WorkBucketStage::Unconstrained | WorkBucketStage::Concurrent
391 )
392 }
393
394 /// Is this stage enabled by default?
395 pub const fn is_enabled_by_default(&self) -> bool {
396 !matches!(self, WorkBucketStage::Concurrent)
397 }
398
399 /// Is this stage sequentially opened? All the stop-the-world stages, except the first one, are sequentially opened.
400 pub const fn is_sequentially_opened(&self) -> bool {
401 self.is_stw() && !self.is_first_stw_stage()
402 }
403
404 /// Is this stage a stop-the-world stage?
405 pub const fn is_stw(&self) -> bool {
406 !self.is_concurrent()
407 }
408
409 /// Is this stage concurrent (which may be executed during mutator time)?
410 pub const fn is_concurrent(&self) -> bool {
411 matches!(
412 self,
413 WorkBucketStage::Unconstrained | WorkBucketStage::Concurrent
414 )
415 }
416}