mmtk/plan/concurrent/
concurrent_marking_work.rs

1use crate::plan::concurrent::global::ConcurrentPlan;
2use crate::plan::concurrent::Pause;
3use crate::plan::PlanTraceObject;
4use crate::plan::VectorQueue;
5use crate::policy::gc_work::TraceKind;
6use crate::scheduler::gc_work::{ScanObjects, SlotOf};
7use crate::util::ObjectReference;
8use crate::vm::slot::Slot;
9use crate::{
10    plan::ObjectQueue,
11    scheduler::{gc_work::ProcessEdgesBase, GCWork, GCWorker, ProcessEdgesWork, WorkBucketStage},
12    vm::*,
13    MMTK,
14};
15use std::ops::{Deref, DerefMut};
16
17pub struct ConcurrentTraceObjects<
18    VM: VMBinding,
19    P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>,
20    const KIND: TraceKind,
21> {
22    plan: &'static P,
23    // objects to mark and scan
24    objects: Option<Vec<ObjectReference>>,
25    // recursively generated objects
26    next_objects: VectorQueue<ObjectReference>,
27    worker: *mut GCWorker<VM>,
28}
29
30impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
31    ConcurrentTraceObjects<VM, P, KIND>
32{
33    const SATB_BUFFER_SIZE: usize = 8192;
34
35    pub fn new(objects: Vec<ObjectReference>, mmtk: &'static MMTK<VM>) -> Self {
36        let plan = mmtk.get_plan().downcast_ref::<P>().unwrap();
37
38        Self {
39            plan,
40            objects: Some(objects),
41            next_objects: VectorQueue::default(),
42            worker: std::ptr::null_mut(),
43        }
44    }
45
46    pub fn worker(&self) -> &'static mut GCWorker<VM> {
47        debug_assert_ne!(self.worker, std::ptr::null_mut());
48        unsafe { &mut *self.worker }
49    }
50
51    #[cold]
52    fn flush(&mut self) {
53        if !self.next_objects.is_empty() {
54            let objects = self.next_objects.take();
55            let worker = self.worker();
56            let w = Self::new(objects, worker.mmtk);
57            worker.add_work(WorkBucketStage::Concurrent, w);
58        }
59    }
60
61    fn trace_object(&mut self, object: ObjectReference) -> ObjectReference {
62        let new_object = self
63            .plan
64            .trace_object::<Self, KIND>(self, object, self.worker());
65        // No copying should happen.
66        debug_assert_eq!(object, new_object);
67        object
68    }
69
70    fn trace_objects(&mut self, objects: &[ObjectReference]) {
71        for o in objects.iter() {
72            self.trace_object(*o);
73        }
74    }
75
76    fn scan_and_enqueue(&mut self, object: ObjectReference) {
77        crate::plan::tracing::SlotIterator::<VM>::iterate_fields(
78            object,
79            self.worker().tls.0,
80            |s| {
81                let Some(t) = s.load() else {
82                    return;
83                };
84
85                self.next_objects.push(t);
86                if self.next_objects.len() > Self::SATB_BUFFER_SIZE {
87                    self.flush();
88                }
89            },
90        );
91        self.plan.post_scan_object(object);
92    }
93}
94
95impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
96    ObjectQueue for ConcurrentTraceObjects<VM, P, KIND>
97{
98    fn enqueue(&mut self, object: ObjectReference) {
99        debug_assert!(
100            object.to_raw_address().is_mapped(),
101            "Invalid obj {:?}: address is not mapped",
102            object
103        );
104        self.scan_and_enqueue(object);
105    }
106}
107
108unsafe impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
109    Send for ConcurrentTraceObjects<VM, P, KIND>
110{
111}
112
113impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
114    GCWork<VM> for ConcurrentTraceObjects<VM, P, KIND>
115{
116    fn do_work(&mut self, worker: &mut GCWorker<VM>, _mmtk: &'static MMTK<VM>) {
117        self.worker = worker;
118        let mut num_objects = 0;
119        let mut num_next_objects = 0;
120        let mut iterations = 0;
121        // mark objects
122        if let Some(objects) = self.objects.take() {
123            self.trace_objects(&objects);
124            num_objects = objects.len();
125        }
126        let pause_opt = self.plan.current_pause();
127        if pause_opt == Some(Pause::FinalMark) || pause_opt.is_none() {
128            while !self.next_objects.is_empty() {
129                let pause_opt = self.plan.current_pause();
130                if !(pause_opt == Some(Pause::FinalMark) || pause_opt.is_none()) {
131                    break;
132                }
133                let next_objects = self.next_objects.take();
134                self.trace_objects(&next_objects);
135                num_next_objects += next_objects.len();
136                iterations += 1;
137            }
138        }
139        probe!(
140            mmtk,
141            concurrent_trace_objects,
142            num_objects,
143            num_next_objects,
144            iterations
145        );
146        self.flush();
147    }
148}
149
150pub struct ProcessModBufSATB<
151    VM: VMBinding,
152    P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>,
153    const KIND: TraceKind,
154> {
155    nodes: Option<Vec<ObjectReference>>,
156    _p: std::marker::PhantomData<(VM, P)>,
157}
158
159unsafe impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
160    Send for ProcessModBufSATB<VM, P, KIND>
161{
162}
163
164impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
165    ProcessModBufSATB<VM, P, KIND>
166{
167    pub fn new(nodes: Vec<ObjectReference>) -> Self {
168        Self {
169            nodes: Some(nodes),
170            _p: std::marker::PhantomData,
171        }
172    }
173}
174
175impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
176    GCWork<VM> for ProcessModBufSATB<VM, P, KIND>
177{
178    fn do_work(&mut self, worker: &mut GCWorker<VM>, mmtk: &'static MMTK<VM>) {
179        let mut w = if let Some(nodes) = self.nodes.take() {
180            if nodes.is_empty() {
181                return;
182            }
183
184            ConcurrentTraceObjects::<VM, P, KIND>::new(nodes, mmtk)
185        } else {
186            return;
187        };
188        GCWork::do_work(&mut w, worker, mmtk);
189    }
190}
191
192pub struct ProcessRootSlots<
193    VM: VMBinding,
194    P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>,
195    const KIND: TraceKind,
196> {
197    base: ProcessEdgesBase<VM>,
198    _p: std::marker::PhantomData<P>,
199}
200
201unsafe impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
202    Send for ProcessRootSlots<VM, P, KIND>
203{
204}
205
206impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
207    ProcessRootSlots<VM, P, KIND>
208{
209    fn create_and_schedule_concurrent_trace_objects_work(&self, objects: Vec<ObjectReference>) {
210        let worker = self.worker();
211        let mmtk = self.mmtk();
212        let w = ConcurrentTraceObjects::<VM, P, KIND>::new(objects.clone(), mmtk);
213
214        worker.scheduler().work_buckets[WorkBucketStage::Concurrent].add_no_notify(w);
215    }
216}
217
218impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
219    ProcessEdgesWork for ProcessRootSlots<VM, P, KIND>
220{
221    type VM = VM;
222    type ScanObjectsWorkType = ScanObjects<Self>;
223    const OVERWRITE_REFERENCE: bool = false;
224    const SCAN_OBJECTS_IMMEDIATELY: bool = true;
225
226    fn new(
227        slots: Vec<SlotOf<Self>>,
228        roots: bool,
229        mmtk: &'static MMTK<VM>,
230        bucket: WorkBucketStage,
231    ) -> Self {
232        debug_assert!(roots);
233        let base = ProcessEdgesBase::new(slots, roots, mmtk, bucket);
234        Self {
235            base,
236            _p: std::marker::PhantomData,
237        }
238    }
239
240    fn flush(&mut self) {}
241
242    fn trace_object(&mut self, _object: ObjectReference) -> ObjectReference {
243        unreachable!()
244    }
245
246    fn process_slots(&mut self) {
247        let pause = self
248            .base
249            .plan()
250            .concurrent()
251            .unwrap()
252            .current_pause()
253            .unwrap();
254        // No need to scan roots in the final mark
255        if pause == Pause::FinalMark {
256            return;
257        }
258        debug_assert_eq!(pause, Pause::InitialMark);
259        let mut root_objects = Vec::with_capacity(Self::CAPACITY);
260        if !self.slots.is_empty() {
261            let slots = std::mem::take(&mut self.slots);
262            for slot in slots {
263                if let Some(object) = slot.load() {
264                    root_objects.push(object);
265                    if root_objects.len() == Self::CAPACITY {
266                        let mut buffer = Vec::with_capacity(Self::CAPACITY);
267                        std::mem::swap(&mut buffer, &mut root_objects);
268                        self.create_and_schedule_concurrent_trace_objects_work(buffer);
269                    }
270                }
271            }
272            if !root_objects.is_empty() {
273                self.create_and_schedule_concurrent_trace_objects_work(root_objects);
274            }
275        }
276    }
277
278    fn create_scan_work(&self, _nodes: Vec<ObjectReference>) -> Self::ScanObjectsWorkType {
279        unimplemented!()
280    }
281}
282
283impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind> Deref
284    for ProcessRootSlots<VM, P, KIND>
285{
286    type Target = ProcessEdgesBase<VM>;
287    fn deref(&self) -> &Self::Target {
288        &self.base
289    }
290}
291
292impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
293    DerefMut for ProcessRootSlots<VM, P, KIND>
294{
295    fn deref_mut(&mut self) -> &mut Self::Target {
296        &mut self.base
297    }
298}