mmtk/scheduler/
work_bucket.rs

1use super::worker_monitor::WorkerMonitor;
2use super::*;
3use crate::vm::VMBinding;
4use crossbeam::deque::{Injector, Steal, Worker};
5use enum_map::Enum;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9pub(super) struct BucketQueue<VM: VMBinding> {
10    queue: Injector<Box<dyn GCWork<VM>>>,
11}
12
13impl<VM: VMBinding> BucketQueue<VM> {
14    fn new() -> Self {
15        Self {
16            queue: Injector::new(),
17        }
18    }
19
20    fn is_empty(&self) -> bool {
21        self.queue.is_empty()
22    }
23
24    fn steal_batch_and_pop(
25        &self,
26        dest: &Worker<Box<dyn GCWork<VM>>>,
27    ) -> Steal<Box<dyn GCWork<VM>>> {
28        self.queue.steal_batch_and_pop(dest)
29    }
30
31    fn push(&self, w: Box<dyn GCWork<VM>>) {
32        self.queue.push(w);
33    }
34
35    fn push_all(&self, ws: Vec<Box<dyn GCWork<VM>>>) {
36        for w in ws {
37            self.queue.push(w);
38        }
39    }
40
41    /// Dump all the packets in this queue for debugging purpose.
42    /// This function may dump items from the queue temporarily, thus should only be called when it is safe to do so
43    /// (e.g. when the execution has failed already and the system is going to panic).
44    pub fn debug_dump_packets(&self) -> Vec<String> {
45        let mut items = Vec::new();
46
47        {
48            // Drain queue by stealing until empty
49            loop {
50                match self.queue.steal() {
51                    crossbeam::deque::Steal::Success(work) => {
52                        items.push(work);
53                    }
54                    crossbeam::deque::Steal::Retry => continue,
55                    crossbeam::deque::Steal::Empty => break,
56                }
57            }
58        }
59
60        // Format collected items (just type names or Debug, depending on GCWork)
61        let debug_items: Vec<String> = items
62            .iter()
63            .map(|i| i.get_type_name().to_string()) // placeholder since GCWork isn’t Debug
64            .collect();
65
66        // Push items back into the queue
67        {
68            for work in items {
69                self.queue.push(work);
70            }
71        }
72
73        debug_items
74    }
75}
76
77pub type BucketOpenCondition<VM> = Box<dyn (Fn(&GCWorkScheduler<VM>) -> bool) + Send>;
78
79pub struct WorkBucket<VM: VMBinding> {
80    /// Whether this bucket has been opened. Work from an open bucket can be fetched by workers.
81    open: AtomicBool,
82    /// Whether this bucket is enabled.
83    /// A disabled work bucket will behave as if it does not exist in terms of scheduling,
84    /// except that users can add work to a disabled bucket, and enable it later to allow those
85    /// work to be scheduled.
86    enabled: AtomicBool,
87    /// The stage name of this bucket.
88    stage: WorkBucketStage,
89    queue: BucketQueue<VM>,
90    prioritized_queue: Option<BucketQueue<VM>>,
91    monitor: Arc<WorkerMonitor>,
92    /// The open condition for a bucket. If this is `Some`, the bucket will be open
93    /// when the condition is met. If this is `None`, the bucket needs to be open manually.
94    can_open: Option<BucketOpenCondition<VM>>,
95    /// After this bucket is open and all pending work packets (including the packets in this
96    /// bucket) are drained, this work packet, if exists, will be added to this bucket.  When this
97    /// happens, it will prevent opening subsequent work packets.
98    ///
99    /// The sentinel work packet may set another work packet as the new sentinel which will be
100    /// added to this bucket again after all pending work packets are drained.  This may happend
101    /// again and again, causing the GC to stay at the same stage and drain work packets in a loop.
102    ///
103    /// This is useful for handling weak references that may expand the transitive closure
104    /// recursively, such as ephemerons and Java-style SoftReference and finalizers.  Sentinels
105    /// can be used repeatedly to discover and process more such objects.
106    sentinel: Mutex<Option<Box<dyn GCWork<VM>>>>,
107}
108
109impl<VM: VMBinding> WorkBucket<VM> {
110    pub(crate) fn new(stage: WorkBucketStage, monitor: Arc<WorkerMonitor>) -> Self {
111        Self {
112            open: AtomicBool::new(stage.is_open_by_default()),
113            enabled: AtomicBool::new(stage.is_enabled_by_default()),
114            stage,
115            queue: BucketQueue::new(),
116            prioritized_queue: None,
117            monitor,
118            can_open: None,
119            sentinel: Mutex::new(None),
120        }
121    }
122
123    pub fn set_enabled(&self, enabled: bool) {
124        self.enabled.store(enabled, Ordering::SeqCst)
125    }
126
127    pub fn is_enabled(&self) -> bool {
128        self.enabled.load(Ordering::Relaxed)
129    }
130
131    pub fn enable_prioritized_queue(&mut self) {
132        self.prioritized_queue = Some(BucketQueue::new());
133    }
134
135    fn notify_one_worker(&self) {
136        // If the bucket is not open, don't notify anyone.
137        if !self.is_open() || !self.is_enabled() {
138            return;
139        }
140        // Notify one if there're any parked workers.
141        self.monitor.notify_work_available(false);
142    }
143
144    pub fn notify_all_workers(&self) {
145        // If the bucket is not open, don't notify anyone.
146        if !self.is_open() || !self.is_enabled() {
147            return;
148        }
149        // Notify all if there're any parked workers.
150        self.monitor.notify_work_available(true);
151    }
152
153    pub fn is_open(&self) -> bool {
154        self.open.load(Ordering::SeqCst)
155    }
156
157    /// Open the bucket
158    pub fn open(&self) {
159        self.open.store(true, Ordering::SeqCst);
160    }
161
162    /// Test if the bucket is drained
163    pub fn is_empty(&self) -> bool {
164        self.queue.is_empty()
165            && self
166                .prioritized_queue
167                .as_ref()
168                .map(|q| q.is_empty())
169                .unwrap_or(true)
170    }
171
172    pub fn is_drained(&self) -> bool {
173        !self.is_enabled() || (self.is_open() && self.is_empty())
174    }
175
176    /// Close the bucket
177    pub fn close(&self) {
178        debug_assert!(
179            self.queue.is_empty(),
180            "Bucket {:?} not drained before close",
181            self.stage
182        );
183        self.open.store(false, Ordering::Relaxed);
184    }
185
186    /// Add a work packet to this bucket
187    /// Panic if this bucket cannot receive prioritized packets.
188    pub fn add_prioritized(&self, work: Box<dyn GCWork<VM>>) {
189        self.prioritized_queue.as_ref().unwrap().push(work);
190        self.notify_one_worker();
191    }
192
193    /// Add a work packet to this bucket
194    pub fn add<W: GCWork<VM>>(&self, work: W) {
195        self.queue.push(Box::new(work));
196        self.notify_one_worker();
197    }
198
199    /// Add a work packet to this bucket
200    pub fn add_boxed(&self, work: Box<dyn GCWork<VM>>) {
201        self.queue.push(work);
202        self.notify_one_worker();
203    }
204
205    /// Add a work packet to this bucket, but do not notify any workers.
206    /// This is useful when the current thread is holding the mutex of `WorkerMonitor` which is
207    /// used for notifying workers.  This usually happens if the current thread is the last worker
208    /// parked.
209    pub(crate) fn add_no_notify<W: GCWork<VM>>(&self, work: W) {
210        self.queue.push(Box::new(work));
211    }
212
213    /// Like [`WorkBucket::add_no_notify`], but the work is boxed.
214    pub(crate) fn add_boxed_no_notify(&self, work: Box<dyn GCWork<VM>>) {
215        self.queue.push(work);
216    }
217
218    /// Add multiple packets with a higher priority.
219    /// Panic if this bucket cannot receive prioritized packets.
220    pub fn bulk_add_prioritized(&self, work_vec: Vec<Box<dyn GCWork<VM>>>) {
221        self.prioritized_queue.as_ref().unwrap().push_all(work_vec);
222        self.notify_all_workers();
223    }
224
225    /// Add multiple packets
226    pub fn bulk_add(&self, work_vec: Vec<Box<dyn GCWork<VM>>>) {
227        if work_vec.is_empty() {
228            return;
229        }
230        self.queue.push_all(work_vec);
231        self.notify_all_workers();
232    }
233
234    /// Get a work packet from this bucket
235    pub fn poll(&self, worker: &Worker<Box<dyn GCWork<VM>>>) -> Steal<Box<dyn GCWork<VM>>> {
236        if !self.is_enabled() || !self.is_open() || self.is_empty() {
237            return Steal::Empty;
238        }
239        if let Some(prioritized_queue) = self.prioritized_queue.as_ref() {
240            prioritized_queue
241                .steal_batch_and_pop(worker)
242                .or_else(|| self.queue.steal_batch_and_pop(worker))
243        } else {
244            self.queue.steal_batch_and_pop(worker)
245        }
246    }
247
248    pub fn set_open_condition(
249        &mut self,
250        pred: impl Fn(&GCWorkScheduler<VM>) -> bool + Send + 'static,
251    ) {
252        self.can_open = Some(Box::new(pred));
253    }
254
255    pub fn set_sentinel(&self, new_sentinel: Box<dyn GCWork<VM>>) {
256        let mut sentinel = self.sentinel.lock().unwrap();
257        *sentinel = Some(new_sentinel);
258    }
259
260    pub fn has_sentinel(&self) -> bool {
261        let sentinel = self.sentinel.lock().unwrap();
262        sentinel.is_some()
263    }
264
265    pub fn update(&self, scheduler: &GCWorkScheduler<VM>) -> bool {
266        if let Some(can_open) = self.can_open.as_ref() {
267            if !self.is_open() && can_open(scheduler) {
268                debug!("Opening work bucket: {:?}", self.stage);
269                self.open();
270                return true;
271            }
272        }
273        false
274    }
275
276    pub fn maybe_schedule_sentinel(&self) -> bool {
277        debug_assert!(
278            self.is_open(),
279            "Attempted to schedule sentinel work while bucket is not open"
280        );
281        let maybe_sentinel = {
282            let mut sentinel = self.sentinel.lock().unwrap();
283            sentinel.take()
284        };
285        if let Some(work) = maybe_sentinel {
286            // We don't need to notify other workers because this function is called by the last
287            // parked worker.  After this function returns, the caller will notify workers because
288            // more work packets become available.
289            self.add_boxed_no_notify(work);
290            true
291        } else {
292            false
293        }
294    }
295
296    pub(super) fn get_queue(&self) -> &BucketQueue<VM> {
297        &self.queue
298    }
299
300    pub(super) fn get_stage(&self) -> WorkBucketStage {
301        self.stage
302    }
303}
304
305/// This enum defines all the work bucket types. The scheduler
306/// will instantiate a work bucket for each stage defined here.
307#[derive(Debug, Enum, Copy, Clone, Eq, PartialEq)]
308pub enum WorkBucketStage {
309    /// This bucket is always open.
310    Unconstrained,
311    /// This bucket is intended for concurrent work. Though some concurrent work may be put and executed in the unconstrained bucket,
312    /// work in the unconstrained bucket will always be consumed during STW. Users can disable this bucket
313    /// and cache some concurrent work during STW, and only enable this bucket and allow concurrent execution once a STW is done.
314    Concurrent,
315    /// Preparation work.  Plans, spaces, GC workers, mutators, etc. should be prepared for GC at
316    /// this stage.
317    Prepare,
318    /// Clear the VO bit metadata.  Mainly used by ImmixSpace.
319    #[cfg(feature = "vo_bit")]
320    ClearVOBits,
321    /// Compute the transtive closure starting from transitively pinning (TP) roots following only strong references.
322    /// No objects in this closure are allow to move.
323    TPinningClosure,
324    /// Trace (non-transitively) pinning roots. Objects pointed by those roots must not move, but their children may. To ensure correctness, these must be processed after TPinningClosure
325    PinningRootsTrace,
326    /// Compute the transtive closure following only strong references.
327    Closure,
328    /// Handle Java-style soft references, and potentially expand the transitive closure.
329    SoftRefClosure,
330    /// Handle Java-style weak references.
331    WeakRefClosure,
332    /// Resurrect Java-style finalizable objects, and potentially expand the transitive closure.
333    FinalRefClosure,
334    /// Handle Java-style phantom references.
335    PhantomRefClosure,
336    /// Let the VM handle VM-specific weak data structures, including weak references, weak
337    /// collections, table of finalizable objects, ephemerons, etc.  Potentially expand the
338    /// transitive closure.
339    ///
340    /// NOTE: This stage is intended to replace the Java-specific weak reference handling stages
341    /// above.
342    VMRefClosure,
343    /// Compute the forwarding addresses of objects (mark-compact-only).
344    CalculateForwarding,
345    /// Scan roots again to initiate another transitive closure to update roots and reference
346    /// after computing the forwarding addresses (mark-compact-only).
347    SecondRoots,
348    /// Update Java-style weak references after computing forwarding addresses (mark-compact-only).
349    ///
350    /// NOTE: This stage should be updated to adapt to the VM-side reference handling.  It shall
351    /// be kept after removing `{Soft,Weak,Final,Phantom}RefClosure`.
352    RefForwarding,
353    /// Update the list of Java-style finalization cadidates and finalizable objects after
354    /// computing forwarding addresses (mark-compact-only).
355    FinalizableForwarding,
356    /// Let the VM handle the forwarding of reference fields in any VM-specific weak data
357    /// structures, including weak references, weak collections, table of finalizable objects,
358    /// ephemerons, etc., after computing forwarding addresses (mark-compact-only).
359    ///
360    /// NOTE: This stage is intended to replace Java-specific forwarding phases above.
361    VMRefForwarding,
362    /// Compact objects (mark-compact-only).
363    Compact,
364    /// Work packets that should be done just before GC shall go here.  This includes releasing
365    /// resources and setting states in plans, spaces, GC workers, mutators, etc.
366    Release,
367    /// Resume mutators and end GC.
368    Final,
369}
370
371impl WorkBucketStage {
372    /// The first stop-the-world stage. This stage has no open condition, and will be opened manually
373    /// once all the mutators threads are stopped.
374    pub const FIRST_STW_STAGE: Self = WorkBucketStage::Prepare;
375
376    /// Is this the first stop-the-world stage? See [`Self::FIRST_STW_STAGE`].
377    pub const fn is_first_stw_stage(&self) -> bool {
378        matches!(self, &WorkBucketStage::FIRST_STW_STAGE)
379    }
380
381    /// Is this stage always open?
382    pub const fn is_always_open(&self) -> bool {
383        matches!(self, WorkBucketStage::Unconstrained)
384    }
385
386    /// Is this stage open by default?
387    pub const fn is_open_by_default(&self) -> bool {
388        matches!(
389            self,
390            WorkBucketStage::Unconstrained | WorkBucketStage::Concurrent
391        )
392    }
393
394    /// Is this stage enabled by default?
395    pub const fn is_enabled_by_default(&self) -> bool {
396        !matches!(self, WorkBucketStage::Concurrent)
397    }
398
399    /// Is this stage sequentially opened? All the stop-the-world stages, except the first one, are sequentially opened.
400    pub const fn is_sequentially_opened(&self) -> bool {
401        self.is_stw() && !self.is_first_stw_stage()
402    }
403
404    /// Is this stage a stop-the-world stage?
405    pub const fn is_stw(&self) -> bool {
406        !self.is_concurrent()
407    }
408
409    /// Is this stage concurrent (which may be executed during mutator time)?
410    pub const fn is_concurrent(&self) -> bool {
411        matches!(
412            self,
413            WorkBucketStage::Unconstrained | WorkBucketStage::Concurrent
414        )
415    }
416}