mmtk/plan/concurrent/
concurrent_marking_work.rs

1use crate::plan::concurrent::global::ConcurrentPlan;
2use crate::plan::concurrent::Pause;
3use crate::plan::tracing::{PlanTrace, Trace};
4use crate::plan::PlanTraceObject;
5use crate::policy::gc_work::TraceKind;
6use crate::scheduler::{GCWork, GCWorker, WorkBucketStage};
7use crate::util::{scanning_helper, ObjectReference};
8use crate::vm::slot::Slot;
9use crate::vm::{RootsKind, RootsWorkFactory, VMBinding};
10use crate::MMTK;
11
12use std::collections::VecDeque;
13use std::marker::PhantomData;
14
15pub struct ConcurrentTraceObjects<
16    VM: VMBinding,
17    P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>,
18    const KIND: TraceKind,
19> {
20    /// initial objects to mark and scan
21    initial_objects: Vec<ObjectReference>,
22    /// `true` if the `initial_objects` are already marked.
23    already_marked: bool,
24    phantom_data: PhantomData<(VM, P)>,
25}
26
27impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
28    ConcurrentTraceObjects<VM, P, KIND>
29{
30    const SATB_BUFFER_SIZE: usize = 8192;
31    const CONCURRENT_TRACE_OVERFLOW: usize = Self::SATB_BUFFER_SIZE * 2;
32
33    pub fn new(initial_objects: Vec<ObjectReference>, already_marked: bool) -> Self {
34        Self {
35            initial_objects,
36            already_marked,
37            phantom_data: PhantomData,
38        }
39    }
40}
41
42unsafe impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
43    Send for ConcurrentTraceObjects<VM, P, KIND>
44{
45}
46
47impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
48    GCWork<VM> for ConcurrentTraceObjects<VM, P, KIND>
49{
50    fn do_work(&mut self, worker: &mut GCWorker<VM>, mmtk: &'static MMTK<VM>) {
51        let tls = worker.tls;
52        let trace = PlanTrace::<P, KIND>::from_mmtk(mmtk);
53
54        // These are initial objects.  They may not have been marked.
55        let initial_objects = std::mem::take(&mut self.initial_objects);
56        let num_initial_objects = initial_objects.len();
57        let mut num_queued_objects = 0;
58
59        // This queue contains marked but not scanned objects.
60        let mut queue = VecDeque::new();
61        if self.already_marked {
62            // The initial objects are already marked.  Put them in the queue.
63            queue.extend(initial_objects);
64        } else {
65            // We scan each object and only enqueue newly visited objects.
66            for object in initial_objects {
67                trace.trace_object(worker, object, &mut |enqueued_object| {
68                    debug_assert_eq!(enqueued_object, object);
69                    queue.push_back(enqueued_object);
70                    num_queued_objects += 1;
71                });
72            }
73        }
74
75        // Loop until the queue is drained.
76        while let Some(object) = queue.pop_back() {
77            scanning_helper::visit_children_non_moving::<VM>(tls, object, &mut |child| {
78                trace.trace_object(worker, child, &mut |enqueued_child| {
79                    debug_assert_eq!(enqueued_child, child);
80                    queue.push_back(enqueued_child);
81                    num_queued_objects += 1;
82                })
83            });
84            trace.post_scan_object(object);
85
86            if queue.len() >= Self::CONCURRENT_TRACE_OVERFLOW {
87                let offloaded_objects = queue.drain(..Self::SATB_BUFFER_SIZE).collect();
88                let w = Self::new(offloaded_objects, true);
89                worker.add_work(WorkBucketStage::Concurrent, w);
90            }
91        }
92
93        probe!(
94            mmtk,
95            concurrent_trace_objects,
96            num_initial_objects,
97            num_queued_objects
98        );
99    }
100}
101
102pub struct ProcessModBufSATB<
103    VM: VMBinding,
104    P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>,
105    const KIND: TraceKind,
106> {
107    nodes: Option<Vec<ObjectReference>>,
108    _p: std::marker::PhantomData<(VM, P)>,
109}
110
111unsafe impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
112    Send for ProcessModBufSATB<VM, P, KIND>
113{
114}
115
116impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
117    ProcessModBufSATB<VM, P, KIND>
118{
119    pub fn new(nodes: Vec<ObjectReference>) -> Self {
120        Self {
121            nodes: Some(nodes),
122            _p: std::marker::PhantomData,
123        }
124    }
125}
126
127impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
128    GCWork<VM> for ProcessModBufSATB<VM, P, KIND>
129{
130    fn do_work(&mut self, worker: &mut GCWorker<VM>, mmtk: &'static MMTK<VM>) {
131        let mut w = if let Some(nodes) = self.nodes.take() {
132            if nodes.is_empty() {
133                return;
134            }
135
136            ConcurrentTraceObjects::<VM, P, KIND>::new(
137                nodes, false, // These objects are not marked, yet.
138            )
139        } else {
140            return;
141        };
142        GCWork::do_work(&mut w, worker, mmtk);
143    }
144}
145
146/// A custom implementation of [`RootsWorkFactory`] for concurrent marking.
147///
148/// Slot roots are loaded immediately and represented as root nodes, just like pinning roots.  All
149/// roots are handled by the [`ConcurrentTraceObjects`] work packets.
150pub(crate) struct ConcurrentMarkingRootsWorkFactory<
151    VM: VMBinding,
152    P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>,
153    const KIND: TraceKind,
154> {
155    pub(crate) mmtk: &'static MMTK<VM>,
156    phantom_data: PhantomData<P>,
157}
158
159impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind> Clone
160    for ConcurrentMarkingRootsWorkFactory<VM, P, KIND>
161{
162    fn clone(&self) -> Self {
163        Self {
164            mmtk: self.mmtk,
165            phantom_data: PhantomData,
166        }
167    }
168}
169
170unsafe impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
171    Send for ConcurrentMarkingRootsWorkFactory<VM, P, KIND>
172{
173}
174
175impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
176    ConcurrentMarkingRootsWorkFactory<VM, P, KIND>
177{
178    pub(crate) fn new(mmtk: &'static MMTK<VM>) -> Self {
179        Self {
180            mmtk,
181            phantom_data: PhantomData,
182        }
183    }
184
185    fn debug_assert_initial_mark(&self) {
186        let pause = self.mmtk.get_plan().concurrent().unwrap().current_pause();
187
188        debug_assert_eq!(
189            pause,
190            Some(Pause::InitialMark),
191            "Concurrent marking only scans roots during InitialMark."
192        );
193    }
194
195    fn create_and_schedule_root_nodes_work(&mut self, nodes: Vec<ObjectReference>) {
196        let mmtk = self.mmtk;
197        let work_packet = ConcurrentTraceObjects::<VM, P, KIND>::new(nodes, false);
198        mmtk.scheduler.work_buckets[WorkBucketStage::Concurrent].add_no_notify(work_packet);
199    }
200}
201
202impl<VM: VMBinding, P: ConcurrentPlan<VM = VM> + PlanTraceObject<VM>, const KIND: TraceKind>
203    RootsWorkFactory<VM::VMSlot> for ConcurrentMarkingRootsWorkFactory<VM, P, KIND>
204{
205    fn create_process_roots_work(&mut self, slots: Vec<VM::VMSlot>) {
206        probe!(mmtk, roots, RootsKind::NORMAL, slots.len());
207
208        self.debug_assert_initial_mark();
209
210        // We don't divide the `slots` vector into smaller chunks here.  We assume the VM binding
211        // respects the constant `EDGES_WORK_BUFFER_SIZE` and provides lists of slots in reasonable
212        // lengths.  Even if a single `ConcurrentTraceObjects` work packet is too large, it can
213        // still break up the list during tracing using the constant `CONCURRENT_TRACE_OVERFLOW`.
214        let nodes = slots
215            .iter()
216            .flat_map(|slot| slot.load())
217            .collect::<Vec<_>>();
218
219        // Note: During concurrent marking, mutators can overwrite the root slots and make the roots unstable.
220        // Therefore, instead of recording the root slots, we record the loaded root nodes.
221        #[cfg(feature = "sanity")]
222        self.mmtk
223            .sanity_checker
224            .lock()
225            .unwrap()
226            .add_root_nodes(nodes.clone());
227
228        self.create_and_schedule_root_nodes_work(nodes);
229    }
230
231    fn create_process_pinning_roots_work(&mut self, nodes: Vec<ObjectReference>) {
232        probe!(mmtk, roots, RootsKind::PINNING, nodes.len());
233
234        self.debug_assert_initial_mark();
235
236        #[cfg(feature = "sanity")]
237        self.mmtk
238            .sanity_checker
239            .lock()
240            .unwrap()
241            .add_root_nodes(nodes.clone());
242
243        self.create_and_schedule_root_nodes_work(nodes);
244    }
245
246    fn create_process_tpinning_roots_work(&mut self, nodes: Vec<ObjectReference>) {
247        probe!(mmtk, roots, RootsKind::TPINNING, nodes.len());
248
249        self.debug_assert_initial_mark();
250
251        #[cfg(feature = "sanity")]
252        self.mmtk
253            .sanity_checker
254            .lock()
255            .unwrap()
256            .add_root_nodes(nodes.clone());
257
258        self.create_and_schedule_root_nodes_work(nodes);
259    }
260}