mmtk/scheduler/
worker.rs

1use super::stat::WorkerLocalStat;
2use super::work_bucket::*;
3use super::*;
4use crate::mmtk::MMTK;
5use crate::util::copy::GCWorkerCopyContext;
6use crate::util::heap::layout::heap_parameters::MAX_SPACES;
7use crate::util::opaque_pointer::*;
8use crate::util::ObjectReference;
9use crate::vm::{Collection, GCThreadContext, VMBinding};
10use atomic::Atomic;
11use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
12use crossbeam::deque::{self, Stealer};
13use crossbeam::queue::ArrayQueue;
14use std::sync::atomic::Ordering;
15use std::sync::{Arc, Mutex};
16
17/// Represents the ID of a GC worker thread.
18pub type ThreadId = usize;
19
20thread_local! {
21    /// Current worker's ordinal
22    static WORKER_ORDINAL: Atomic<ThreadId> = const { Atomic::new(ThreadId::MAX) };
23}
24
25/// Get current worker ordinal. Return `None` if the current thread is not a worker.
26pub fn current_worker_ordinal() -> ThreadId {
27    let ordinal = WORKER_ORDINAL.with(|x| x.load(Ordering::Relaxed));
28    debug_assert_ne!(
29        ordinal,
30        ThreadId::MAX,
31        "Thread-local variable WORKER_ORDINAL not set yet."
32    );
33    ordinal
34}
35
36/// The struct has one instance per worker, but is shared between workers via the scheduler
37/// instance.  This structure is used for communication between workers, e.g. adding designated
38/// work packets, stealing work packets from other workers, and collecting per-worker statistics.
39pub struct GCWorkerShared<VM: VMBinding> {
40    /// Worker-local statistics data.
41    stat: AtomicRefCell<WorkerLocalStat<VM>>,
42    /// Accumulated bytes for live objects in this GC. When each worker scans
43    /// objects, we increase the live bytes. We get this value from each worker
44    /// at the end of a GC, and reset this counter.
45    /// The live bytes are stored in an array. The index is the index from the space descriptor.
46    pub live_bytes_per_space: AtomicRefCell<[usize; MAX_SPACES]>,
47    /// A queue of GCWork that can only be processed by the owned thread.
48    pub designated_work: ArrayQueue<Box<dyn GCWork<VM>>>,
49    /// Handle for stealing packets from the current worker
50    pub stealer: Option<Stealer<Box<dyn GCWork<VM>>>>,
51}
52
53impl<VM: VMBinding> GCWorkerShared<VM> {
54    pub fn new(stealer: Option<Stealer<Box<dyn GCWork<VM>>>>) -> Self {
55        Self {
56            stat: Default::default(),
57            live_bytes_per_space: AtomicRefCell::new([0; MAX_SPACES]),
58            designated_work: ArrayQueue::new(16),
59            stealer,
60        }
61    }
62
63    pub(crate) fn increase_live_bytes(
64        live_bytes_per_space: &mut [usize; MAX_SPACES],
65        object: ObjectReference,
66    ) {
67        use crate::mmtk::VM_MAP;
68        use crate::vm::object_model::ObjectModel;
69
70        // The live bytes of the object
71        let bytes = VM::VMObjectModel::get_current_size(object);
72        // Get the space index from descriptor
73        let space_descriptor = VM_MAP.get_descriptor_for_address(object.to_raw_address());
74        if space_descriptor != crate::util::heap::space_descriptor::SpaceDescriptor::UNINITIALIZED {
75            let space_index = space_descriptor.get_index();
76            debug_assert!(
77                space_index < MAX_SPACES,
78                "Space index {} is not in the range of [0, {})",
79                space_index,
80                MAX_SPACES
81            );
82            // Accumulate the live bytes for the index
83            live_bytes_per_space[space_index] += bytes;
84        }
85    }
86}
87
88/// A GC worker.  This part is privately owned by a worker thread.
89pub struct GCWorker<VM: VMBinding> {
90    /// The VM-specific thread-local state of the GC thread.
91    pub tls: VMWorkerThread,
92    /// The ordinal of the worker, numbered from 0 to the number of workers minus one.
93    pub ordinal: ThreadId,
94    /// The reference to the scheduler.
95    scheduler: Arc<GCWorkScheduler<VM>>,
96    /// The copy context, used to implement copying GC.
97    copy: GCWorkerCopyContext<VM>,
98    /// The reference to the MMTk instance.
99    pub mmtk: &'static MMTK<VM>,
100    /// Reference to the shared part of the GC worker.  It is used for synchronization.
101    pub shared: Arc<GCWorkerShared<VM>>,
102    /// Local work packet queue.
103    pub local_work_buffer: deque::Worker<Box<dyn GCWork<VM>>>,
104}
105
106unsafe impl<VM: VMBinding> Sync for GCWorkerShared<VM> {}
107unsafe impl<VM: VMBinding> Send for GCWorkerShared<VM> {}
108
109// Error message for borrowing `GCWorkerShared::stat`.
110const STAT_BORROWED_MSG: &str = "GCWorkerShared.stat is already borrowed.  This may happen if \
111    the mutator calls harness_begin or harness_end while the GC is running.";
112
113impl<VM: VMBinding> GCWorkerShared<VM> {
114    pub fn borrow_stat(&self) -> AtomicRef<'_, WorkerLocalStat<VM>> {
115        self.stat.try_borrow().expect(STAT_BORROWED_MSG)
116    }
117
118    pub fn borrow_stat_mut(&self) -> AtomicRefMut<'_, WorkerLocalStat<VM>> {
119        self.stat.try_borrow_mut().expect(STAT_BORROWED_MSG)
120    }
121}
122
123/// A special error type that indicate a worker should exit.
124/// This may happen if the VM needs to fork and asks workers to exit.
125#[derive(Debug)]
126pub(crate) struct WorkerShouldExit;
127
128/// The result type of `GCWorker::pool`.
129/// Too many functions return `Option<Box<dyn GCWork<VM>>>`.  In most cases, when `None` is
130/// returned, the caller should try getting work packets from another place.  To avoid confusion,
131/// we use `Err(WorkerShouldExit)` to clearly indicate that the worker should exit immediately.
132pub(crate) type PollResult<VM> = Result<Box<dyn GCWork<VM>>, WorkerShouldExit>;
133
134impl<VM: VMBinding> GCWorker<VM> {
135    pub(crate) fn new(
136        mmtk: &'static MMTK<VM>,
137        ordinal: ThreadId,
138        scheduler: Arc<GCWorkScheduler<VM>>,
139        shared: Arc<GCWorkerShared<VM>>,
140        local_work_buffer: deque::Worker<Box<dyn GCWork<VM>>>,
141    ) -> Self {
142        Self {
143            tls: VMWorkerThread(VMThread::UNINITIALIZED),
144            ordinal,
145            // We will set this later
146            copy: GCWorkerCopyContext::new_non_copy(),
147            scheduler,
148            mmtk,
149            shared,
150            local_work_buffer,
151        }
152    }
153
154    const LOCALLY_CACHED_WORK_PACKETS: usize = 16;
155
156    /// Add a work packet to the work queue and mark it with a higher priority.
157    /// If the bucket is open, the packet will be pushed to the local queue, otherwise it will be
158    /// pushed to the global bucket with a higher priority.
159    pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork<VM>) {
160        if !self.scheduler().work_buckets[bucket].is_open()
161            || self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS
162        {
163            self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work));
164            return;
165        }
166        self.local_work_buffer.push(Box::new(work));
167    }
168
169    /// Add a work packet to the work queue.
170    /// If the bucket is open, the packet will be pushed to the local queue, otherwise it will be
171    /// pushed to the global bucket.
172    pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork<VM>) {
173        if !self.scheduler().work_buckets[bucket].is_open()
174            || self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS
175        {
176            self.scheduler.work_buckets[bucket].add(work);
177            return;
178        }
179        self.local_work_buffer.push(Box::new(work));
180    }
181
182    /// Get the scheduler. There is only one scheduler per MMTk instance.
183    pub fn scheduler(&self) -> &GCWorkScheduler<VM> {
184        &self.scheduler
185    }
186
187    /// Get a mutable reference of the copy context for this worker.
188    pub fn get_copy_context_mut(&mut self) -> &mut GCWorkerCopyContext<VM> {
189        &mut self.copy
190    }
191
192    /// Poll a ready-to-execute work packet in the following order:
193    ///
194    /// 1. Any packet that should be processed only by this worker.
195    /// 2. Poll from the local work queue.
196    /// 3. Poll from open global work-buckets
197    /// 4. Steal from other workers
198    fn poll(&mut self) -> PollResult<VM> {
199        if let Some(work) = self.shared.designated_work.pop() {
200            return Ok(work);
201        }
202
203        if let Some(work) = self.local_work_buffer.pop() {
204            return Ok(work);
205        }
206
207        self.scheduler().poll(self)
208    }
209
210    /// Entry point of the worker thread.
211    ///
212    /// This function will resolve thread affinity, if it has been specified by the user.
213    ///
214    /// Each worker will keep polling and executing work packets in a loop.  It runs until the
215    /// worker is requested to exit.  Currently a worker may exit after
216    /// [`crate::mmtk::MMTK::prepare_to_fork`] is called.
217    ///
218    /// Arguments:
219    /// * `tls`: The VM-specific thread-local storage for this GC worker thread.
220    /// * `mmtk`: A reference to an MMTk instance.
221    pub fn run(mut self: Box<Self>, tls: VMWorkerThread, mmtk: &'static MMTK<VM>) {
222        probe!(mmtk, gcworker_run);
223        debug!(
224            "Worker started. ordinal: {}, {}",
225            self.ordinal,
226            crate::util::rust_util::debug_process_thread_id(),
227        );
228        WORKER_ORDINAL.with(|x| x.store(self.ordinal, Ordering::SeqCst));
229        self.scheduler.resolve_affinity(self.ordinal);
230        self.tls = tls;
231        self.copy = crate::plan::create_gc_worker_context(tls, mmtk);
232        loop {
233            // Instead of having work_start and work_end tracepoints, we have
234            // one tracepoint before polling for more work and one tracepoint
235            // before executing the work.
236            // This allows measuring the distribution of both the time needed
237            // poll work (between work_poll and work), and the time needed to
238            // execute work (between work and next work_poll).
239            // If we have work_start and work_end, we cannot measure the first
240            // poll.
241            probe!(mmtk, work_poll);
242            let Ok(mut work) = self.poll() else {
243                // The worker is asked to exit.  Break from the loop.
244                break;
245            };
246            // probe! expands to an empty block on unsupported platforms
247            #[allow(unused_variables)]
248            let typename = work.get_type_name();
249
250            #[cfg(feature = "bpftrace_workaround")]
251            // Workaround a problem where bpftrace script cannot see the work packet names,
252            // by force loading from the packet name.
253            // See the "Known issues" section in `tools/tracing/timeline/README.md`
254            std::hint::black_box(unsafe { *(typename.as_ptr()) });
255
256            probe!(mmtk, work, typename.as_ptr(), typename.len());
257            work.do_work_with_stat(&mut self, mmtk);
258        }
259        debug!(
260            "Worker exiting. ordinal: {}, {}",
261            self.ordinal,
262            crate::util::rust_util::debug_process_thread_id(),
263        );
264        probe!(mmtk, gcworker_exit);
265
266        mmtk.scheduler.surrender_gc_worker(self);
267    }
268}
269
270/// Stateful part of [`WorkerGroup`].
271enum WorkerCreationState<VM: VMBinding> {
272    /// The initial state.  `GCWorker` structs have not been created and GC worker threads have not
273    /// been spawn.
274    Initial {
275        /// The local work queues for to-be-created workers.
276        local_work_queues: Vec<deque::Worker<Box<dyn GCWork<VM>>>>,
277    },
278    /// All worker threads are spawn and running.  `GCWorker` structs have been transferred to
279    /// worker threads.
280    Spawned,
281    /// Worker threads are stopping, or have already stopped, for forking. Instances of `GCWorker`
282    /// structs are collected here to be reused when GC workers are respawn.
283    Surrendered {
284        /// `GCWorker` instances not currently owned by active GC worker threads.  Once GC workers
285        /// are respawn, they will take ownership of these `GCWorker` instances.
286        // Note: Clippy warns about `Vec<Box<T>>` because `Vec<T>` is already in the heap.
287        // However, the purpose of this `Vec` is allowing GC worker threads to give their
288        // `Box<GCWorker<VM>>` instances back to this pool.  Therefore, the `Box` is necessary.
289        #[allow(clippy::vec_box)]
290        workers: Vec<Box<GCWorker<VM>>>,
291    },
292}
293
294/// A worker group to manage all the GC workers.
295pub(crate) struct WorkerGroup<VM: VMBinding> {
296    /// Shared worker data
297    pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
298    /// The stateful part.  `None` means state transition is underway.
299    state: Mutex<Option<WorkerCreationState<VM>>>,
300}
301
302/// We have to persuade Rust that `WorkerGroup` is safe to share because the compiler thinks one
303/// worker can refer to another worker via the path "worker -> scheduler -> worker_group ->
304/// `Surrendered::workers` -> worker" which is cyclic reference and unsafe.
305unsafe impl<VM: VMBinding> Sync for WorkerGroup<VM> {}
306
307impl<VM: VMBinding> WorkerGroup<VM> {
308    /// Create a WorkerGroup
309    pub fn new(num_workers: usize) -> Arc<Self> {
310        let local_work_queues = (0..num_workers)
311            .map(|_| deque::Worker::new_fifo())
312            .collect::<Vec<_>>();
313
314        let workers_shared = (0..num_workers)
315            .map(|i| {
316                Arc::new(GCWorkerShared::<VM>::new(Some(
317                    local_work_queues[i].stealer(),
318                )))
319            })
320            .collect::<Vec<_>>();
321
322        Arc::new(Self {
323            workers_shared,
324            state: Mutex::new(Some(WorkerCreationState::Initial { local_work_queues })),
325        })
326    }
327
328    /// Spawn GC worker threads for the first time.
329    pub fn initial_spawn(&self, tls: VMThread, mmtk: &'static MMTK<VM>) {
330        let mut state = self.state.lock().unwrap();
331
332        let WorkerCreationState::Initial { local_work_queues } = state.take().unwrap() else {
333            panic!("GCWorker structs have already been created");
334        };
335
336        let workers = self.create_workers(local_work_queues, mmtk);
337        self.spawn(workers, tls);
338
339        *state = Some(WorkerCreationState::Spawned);
340    }
341
342    /// Respawn GC threads after stopping for forking.
343    pub fn respawn(&self, tls: VMThread) {
344        let mut state = self.state.lock().unwrap();
345
346        let WorkerCreationState::Surrendered { workers } = state.take().unwrap() else {
347            panic!("GCWorker structs have not been created, yet.");
348        };
349
350        self.spawn(workers, tls);
351
352        *state = Some(WorkerCreationState::Spawned)
353    }
354
355    /// Create `GCWorker` instances.
356    #[allow(clippy::vec_box)] // See `WorkerCreationState::Surrendered`.
357    fn create_workers(
358        &self,
359        local_work_queues: Vec<deque::Worker<Box<dyn GCWork<VM>>>>,
360        mmtk: &'static MMTK<VM>,
361    ) -> Vec<Box<GCWorker<VM>>> {
362        debug!("Creating GCWorker instances...");
363
364        assert_eq!(self.workers_shared.len(), local_work_queues.len());
365
366        // Each `GCWorker` instance corresponds to a `GCWorkerShared` at the same index.
367        let workers = (local_work_queues.into_iter())
368            .zip(self.workers_shared.iter())
369            .enumerate()
370            .map(|(ordinal, (queue, shared))| {
371                Box::new(GCWorker::new(
372                    mmtk,
373                    ordinal,
374                    mmtk.scheduler.clone(),
375                    shared.clone(),
376                    queue,
377                ))
378            })
379            .collect::<Vec<_>>();
380
381        debug!("Created {} GCWorker instances.", workers.len());
382        workers
383    }
384
385    /// Spawn all the worker threads
386    #[allow(clippy::vec_box)] // See `WorkerCreationState::Surrendered`.
387    fn spawn(&self, workers: Vec<Box<GCWorker<VM>>>, tls: VMThread) {
388        debug!(
389            "Spawning GC workers.  {}",
390            crate::util::rust_util::debug_process_thread_id(),
391        );
392
393        // We transfer the ownership of each `GCWorker` instance to a GC thread.
394        for worker in workers {
395            VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Worker(worker));
396        }
397
398        debug!(
399            "Spawned {} worker threads.  {}",
400            self.worker_count(),
401            crate::util::rust_util::debug_process_thread_id(),
402        );
403    }
404
405    /// Prepare the buffer for workers to surrender their `GCWorker` structs.
406    pub fn prepare_surrender_buffer(&self) {
407        let mut state = self.state.lock().unwrap();
408        assert!(matches!(*state, Some(WorkerCreationState::Spawned)));
409
410        *state = Some(WorkerCreationState::Surrendered {
411            workers: Vec::with_capacity(self.worker_count()),
412        })
413    }
414
415    /// Return the `GCWorker` struct to the worker group.
416    /// This function returns `true` if all workers returned their `GCWorker` structs.
417    pub fn surrender_gc_worker(&self, worker: Box<GCWorker<VM>>) -> bool {
418        let mut state = self.state.lock().unwrap();
419        let WorkerCreationState::Surrendered { ref mut workers } = state.as_mut().unwrap() else {
420            panic!("GCWorker structs have not been created, yet.");
421        };
422        let ordinal = worker.ordinal;
423        workers.push(worker);
424        trace!(
425            "Worker {} surrendered. ({}/{})",
426            ordinal,
427            workers.len(),
428            self.worker_count()
429        );
430        workers.len() == self.worker_count()
431    }
432
433    /// Get the number of workers in the group
434    pub fn worker_count(&self) -> usize {
435        self.workers_shared.len()
436    }
437
438    /// Return true if there're any pending designated work
439    pub fn has_designated_work(&self) -> bool {
440        self.workers_shared
441            .iter()
442            .any(|w| !w.designated_work.is_empty())
443    }
444
445    /// Get the live bytes data from the worker, and clear the local data.
446    pub fn get_and_clear_worker_live_bytes(&self) -> [usize; MAX_SPACES] {
447        let mut ret = [0; MAX_SPACES];
448        self.workers_shared.iter().for_each(|w| {
449            let mut live_bytes_per_space = w.live_bytes_per_space.borrow_mut();
450            for (idx, val) in live_bytes_per_space.iter_mut().enumerate() {
451                ret[idx] += *val;
452                *val = 0;
453            }
454        });
455        ret
456    }
457}