1use super::stat::WorkerLocalStat;
2use super::work_bucket::*;
3use super::*;
4use crate::mmtk::MMTK;
5use crate::util::copy::GCWorkerCopyContext;
6use crate::util::heap::layout::heap_parameters::MAX_SPACES;
7use crate::util::opaque_pointer::*;
8use crate::util::ObjectReference;
9use crate::vm::{Collection, GCThreadContext, VMBinding};
10use atomic::Atomic;
11use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
12use crossbeam::deque::{self, Stealer};
13use crossbeam::queue::ArrayQueue;
14use std::sync::atomic::Ordering;
15use std::sync::{Arc, Mutex};
16
17pub type ThreadId = usize;
19
20thread_local! {
21 static WORKER_ORDINAL: Atomic<ThreadId> = const { Atomic::new(ThreadId::MAX) };
23}
24
25pub fn current_worker_ordinal() -> ThreadId {
27 let ordinal = WORKER_ORDINAL.with(|x| x.load(Ordering::Relaxed));
28 debug_assert_ne!(
29 ordinal,
30 ThreadId::MAX,
31 "Thread-local variable WORKER_ORDINAL not set yet."
32 );
33 ordinal
34}
35
36pub struct GCWorkerShared<VM: VMBinding> {
40 stat: AtomicRefCell<WorkerLocalStat<VM>>,
42 pub live_bytes_per_space: AtomicRefCell<[usize; MAX_SPACES]>,
47 pub designated_work: ArrayQueue<Box<dyn GCWork<VM>>>,
49 pub stealer: Option<Stealer<Box<dyn GCWork<VM>>>>,
51}
52
53impl<VM: VMBinding> GCWorkerShared<VM> {
54 pub fn new(stealer: Option<Stealer<Box<dyn GCWork<VM>>>>) -> Self {
55 Self {
56 stat: Default::default(),
57 live_bytes_per_space: AtomicRefCell::new([0; MAX_SPACES]),
58 designated_work: ArrayQueue::new(16),
59 stealer,
60 }
61 }
62
63 pub(crate) fn increase_live_bytes(
64 live_bytes_per_space: &mut [usize; MAX_SPACES],
65 object: ObjectReference,
66 ) {
67 use crate::mmtk::VM_MAP;
68 use crate::vm::object_model::ObjectModel;
69
70 let bytes = VM::VMObjectModel::get_current_size(object);
72 let space_descriptor = VM_MAP.get_descriptor_for_address(object.to_raw_address());
74 if space_descriptor != crate::util::heap::space_descriptor::SpaceDescriptor::UNINITIALIZED {
75 let space_index = space_descriptor.get_index();
76 debug_assert!(
77 space_index < MAX_SPACES,
78 "Space index {} is not in the range of [0, {})",
79 space_index,
80 MAX_SPACES
81 );
82 live_bytes_per_space[space_index] += bytes;
84 }
85 }
86}
87
88pub struct GCWorker<VM: VMBinding> {
90 pub tls: VMWorkerThread,
92 pub ordinal: ThreadId,
94 scheduler: Arc<GCWorkScheduler<VM>>,
96 copy: GCWorkerCopyContext<VM>,
98 pub mmtk: &'static MMTK<VM>,
100 pub shared: Arc<GCWorkerShared<VM>>,
102 pub local_work_buffer: deque::Worker<Box<dyn GCWork<VM>>>,
104}
105
106unsafe impl<VM: VMBinding> Sync for GCWorkerShared<VM> {}
107unsafe impl<VM: VMBinding> Send for GCWorkerShared<VM> {}
108
109const STAT_BORROWED_MSG: &str = "GCWorkerShared.stat is already borrowed. This may happen if \
111 the mutator calls harness_begin or harness_end while the GC is running.";
112
113impl<VM: VMBinding> GCWorkerShared<VM> {
114 pub fn borrow_stat(&self) -> AtomicRef<'_, WorkerLocalStat<VM>> {
115 self.stat.try_borrow().expect(STAT_BORROWED_MSG)
116 }
117
118 pub fn borrow_stat_mut(&self) -> AtomicRefMut<'_, WorkerLocalStat<VM>> {
119 self.stat.try_borrow_mut().expect(STAT_BORROWED_MSG)
120 }
121}
122
123#[derive(Debug)]
126pub(crate) struct WorkerShouldExit;
127
128pub(crate) type PollResult<VM> = Result<Box<dyn GCWork<VM>>, WorkerShouldExit>;
133
134impl<VM: VMBinding> GCWorker<VM> {
135 pub(crate) fn new(
136 mmtk: &'static MMTK<VM>,
137 ordinal: ThreadId,
138 scheduler: Arc<GCWorkScheduler<VM>>,
139 shared: Arc<GCWorkerShared<VM>>,
140 local_work_buffer: deque::Worker<Box<dyn GCWork<VM>>>,
141 ) -> Self {
142 Self {
143 tls: VMWorkerThread(VMThread::UNINITIALIZED),
144 ordinal,
145 copy: GCWorkerCopyContext::new_non_copy(),
147 scheduler,
148 mmtk,
149 shared,
150 local_work_buffer,
151 }
152 }
153
154 const LOCALLY_CACHED_WORK_PACKETS: usize = 16;
155
156 pub fn add_work_prioritized(&mut self, bucket: WorkBucketStage, work: impl GCWork<VM>) {
160 if !self.scheduler().work_buckets[bucket].is_open()
161 || self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS
162 {
163 self.scheduler.work_buckets[bucket].add_prioritized(Box::new(work));
164 return;
165 }
166 self.local_work_buffer.push(Box::new(work));
167 }
168
169 pub fn add_work(&mut self, bucket: WorkBucketStage, work: impl GCWork<VM>) {
173 if !self.scheduler().work_buckets[bucket].is_open()
174 || self.local_work_buffer.len() >= Self::LOCALLY_CACHED_WORK_PACKETS
175 {
176 self.scheduler.work_buckets[bucket].add(work);
177 return;
178 }
179 self.local_work_buffer.push(Box::new(work));
180 }
181
182 pub fn scheduler(&self) -> &GCWorkScheduler<VM> {
184 &self.scheduler
185 }
186
187 pub fn get_copy_context_mut(&mut self) -> &mut GCWorkerCopyContext<VM> {
189 &mut self.copy
190 }
191
192 fn poll(&mut self) -> PollResult<VM> {
199 if let Some(work) = self.shared.designated_work.pop() {
200 return Ok(work);
201 }
202
203 if let Some(work) = self.local_work_buffer.pop() {
204 return Ok(work);
205 }
206
207 self.scheduler().poll(self)
208 }
209
210 pub fn run(mut self: Box<Self>, tls: VMWorkerThread, mmtk: &'static MMTK<VM>) {
222 probe!(mmtk, gcworker_run);
223 debug!(
224 "Worker started. ordinal: {}, {}",
225 self.ordinal,
226 crate::util::rust_util::debug_process_thread_id(),
227 );
228 WORKER_ORDINAL.with(|x| x.store(self.ordinal, Ordering::SeqCst));
229 self.scheduler.resolve_affinity(self.ordinal);
230 self.tls = tls;
231 self.copy = crate::plan::create_gc_worker_context(tls, mmtk);
232 loop {
233 probe!(mmtk, work_poll);
242 let Ok(mut work) = self.poll() else {
243 break;
245 };
246 #[allow(unused_variables)]
248 let typename = work.get_type_name();
249
250 #[cfg(feature = "bpftrace_workaround")]
251 std::hint::black_box(unsafe { *(typename.as_ptr()) });
255
256 probe!(mmtk, work, typename.as_ptr(), typename.len());
257 work.do_work_with_stat(&mut self, mmtk);
258 }
259 debug!(
260 "Worker exiting. ordinal: {}, {}",
261 self.ordinal,
262 crate::util::rust_util::debug_process_thread_id(),
263 );
264 probe!(mmtk, gcworker_exit);
265
266 mmtk.scheduler.surrender_gc_worker(self);
267 }
268}
269
270enum WorkerCreationState<VM: VMBinding> {
272 Initial {
275 local_work_queues: Vec<deque::Worker<Box<dyn GCWork<VM>>>>,
277 },
278 Spawned,
281 Surrendered {
284 #[allow(clippy::vec_box)]
290 workers: Vec<Box<GCWorker<VM>>>,
291 },
292}
293
294pub(crate) struct WorkerGroup<VM: VMBinding> {
296 pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
298 state: Mutex<Option<WorkerCreationState<VM>>>,
300}
301
302unsafe impl<VM: VMBinding> Sync for WorkerGroup<VM> {}
306
307impl<VM: VMBinding> WorkerGroup<VM> {
308 pub fn new(num_workers: usize) -> Arc<Self> {
310 let local_work_queues = (0..num_workers)
311 .map(|_| deque::Worker::new_fifo())
312 .collect::<Vec<_>>();
313
314 let workers_shared = (0..num_workers)
315 .map(|i| {
316 Arc::new(GCWorkerShared::<VM>::new(Some(
317 local_work_queues[i].stealer(),
318 )))
319 })
320 .collect::<Vec<_>>();
321
322 Arc::new(Self {
323 workers_shared,
324 state: Mutex::new(Some(WorkerCreationState::Initial { local_work_queues })),
325 })
326 }
327
328 pub fn initial_spawn(&self, tls: VMThread, mmtk: &'static MMTK<VM>) {
330 let mut state = self.state.lock().unwrap();
331
332 let WorkerCreationState::Initial { local_work_queues } = state.take().unwrap() else {
333 panic!("GCWorker structs have already been created");
334 };
335
336 let workers = self.create_workers(local_work_queues, mmtk);
337 self.spawn(workers, tls);
338
339 *state = Some(WorkerCreationState::Spawned);
340 }
341
342 pub fn respawn(&self, tls: VMThread) {
344 let mut state = self.state.lock().unwrap();
345
346 let WorkerCreationState::Surrendered { workers } = state.take().unwrap() else {
347 panic!("GCWorker structs have not been created, yet.");
348 };
349
350 self.spawn(workers, tls);
351
352 *state = Some(WorkerCreationState::Spawned)
353 }
354
355 #[allow(clippy::vec_box)] fn create_workers(
358 &self,
359 local_work_queues: Vec<deque::Worker<Box<dyn GCWork<VM>>>>,
360 mmtk: &'static MMTK<VM>,
361 ) -> Vec<Box<GCWorker<VM>>> {
362 debug!("Creating GCWorker instances...");
363
364 assert_eq!(self.workers_shared.len(), local_work_queues.len());
365
366 let workers = (local_work_queues.into_iter())
368 .zip(self.workers_shared.iter())
369 .enumerate()
370 .map(|(ordinal, (queue, shared))| {
371 Box::new(GCWorker::new(
372 mmtk,
373 ordinal,
374 mmtk.scheduler.clone(),
375 shared.clone(),
376 queue,
377 ))
378 })
379 .collect::<Vec<_>>();
380
381 debug!("Created {} GCWorker instances.", workers.len());
382 workers
383 }
384
385 #[allow(clippy::vec_box)] fn spawn(&self, workers: Vec<Box<GCWorker<VM>>>, tls: VMThread) {
388 debug!(
389 "Spawning GC workers. {}",
390 crate::util::rust_util::debug_process_thread_id(),
391 );
392
393 for worker in workers {
395 VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Worker(worker));
396 }
397
398 debug!(
399 "Spawned {} worker threads. {}",
400 self.worker_count(),
401 crate::util::rust_util::debug_process_thread_id(),
402 );
403 }
404
405 pub fn prepare_surrender_buffer(&self) {
407 let mut state = self.state.lock().unwrap();
408 assert!(matches!(*state, Some(WorkerCreationState::Spawned)));
409
410 *state = Some(WorkerCreationState::Surrendered {
411 workers: Vec::with_capacity(self.worker_count()),
412 })
413 }
414
415 pub fn surrender_gc_worker(&self, worker: Box<GCWorker<VM>>) -> bool {
418 let mut state = self.state.lock().unwrap();
419 let WorkerCreationState::Surrendered { ref mut workers } = state.as_mut().unwrap() else {
420 panic!("GCWorker structs have not been created, yet.");
421 };
422 let ordinal = worker.ordinal;
423 workers.push(worker);
424 trace!(
425 "Worker {} surrendered. ({}/{})",
426 ordinal,
427 workers.len(),
428 self.worker_count()
429 );
430 workers.len() == self.worker_count()
431 }
432
433 pub fn worker_count(&self) -> usize {
435 self.workers_shared.len()
436 }
437
438 pub fn has_designated_work(&self) -> bool {
440 self.workers_shared
441 .iter()
442 .any(|w| !w.designated_work.is_empty())
443 }
444
445 pub fn get_and_clear_worker_live_bytes(&self) -> [usize; MAX_SPACES] {
447 let mut ret = [0; MAX_SPACES];
448 self.workers_shared.iter().for_each(|w| {
449 let mut live_bytes_per_space = w.live_bytes_per_space.borrow_mut();
450 for (idx, val) in live_bytes_per_space.iter_mut().enumerate() {
451 ret[idx] += *val;
452 *val = 0;
453 }
454 });
455 ret
456 }
457}