1use self::worker::PollResult;
2
3use super::gc_work::ScheduleCollection;
4use super::stat::SchedulerStat;
5use super::work_bucket::*;
6use super::worker::{GCWorker, ThreadId, WorkerGroup};
7use super::worker_goals::{WorkerGoal, WorkerGoals};
8use super::worker_monitor::{LastParkedResult, WorkerMonitor};
9use super::*;
10use crate::global_state::GcStatus;
11use crate::mmtk::MMTK;
12use crate::util::opaque_pointer::*;
13use crate::util::options::AffinityKind;
14use crate::vm::Collection;
15use crate::vm::VMBinding;
16use crate::Plan;
17use crossbeam::deque::Steal;
18use enum_map::{Enum, EnumMap};
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Instant;
22
23pub struct GCWorkScheduler<VM: VMBinding> {
24 pub work_buckets: EnumMap<WorkBucketStage, WorkBucket<VM>>,
26 pub(crate) worker_group: Arc<WorkerGroup<VM>>,
28 pub(crate) worker_monitor: Arc<WorkerMonitor>,
30 affinity: AffinityKind,
32}
33
34unsafe impl<VM: VMBinding> Sync for GCWorkScheduler<VM> {}
38
39impl<VM: VMBinding> GCWorkScheduler<VM> {
40 pub fn new(num_workers: usize, affinity: AffinityKind) -> Arc<Self> {
41 let worker_monitor: Arc<WorkerMonitor> = Arc::new(WorkerMonitor::new(num_workers));
42 let worker_group = WorkerGroup::new(num_workers);
43
44 let mut work_buckets = EnumMap::from_fn(|stage: WorkBucketStage| {
46 WorkBucket::new(stage, worker_monitor.clone())
47 });
48
49 {
51 let mut open_stages: Vec<WorkBucketStage> = vec![WorkBucketStage::FIRST_STW_STAGE];
52 let stages = (0..WorkBucketStage::LENGTH).map(WorkBucketStage::from_usize);
53 for stage in stages {
54 if stage.is_sequentially_opened() {
55 let cur_stages = open_stages.clone();
56 work_buckets[stage].set_open_condition(
59 move |scheduler: &GCWorkScheduler<VM>| {
60 debug!(
61 "Check if {:?} can be opened? These needs to be drained: {:?}",
62 stage, &cur_stages
63 );
64 scheduler.are_buckets_drained(&cur_stages)
65 },
66 );
67 open_stages.push(stage);
68 }
69 }
70 }
71
72 Arc::new(Self {
73 work_buckets,
74 worker_group,
75 worker_monitor,
76 affinity,
77 })
78 }
79
80 pub fn num_workers(&self) -> usize {
81 self.worker_group.as_ref().worker_count()
82 }
83
84 pub fn spawn_gc_threads(self: &Arc<Self>, mmtk: &'static MMTK<VM>, tls: VMThread) {
89 self.worker_group.initial_spawn(tls, mmtk);
90 }
91
92 pub fn stop_gc_threads_for_forking(self: &Arc<Self>) {
94 self.worker_group.prepare_surrender_buffer();
95
96 debug!("A mutator is requesting GC threads to stop for forking...");
97 self.worker_monitor.make_request(WorkerGoal::StopForFork);
98 }
99
100 pub fn surrender_gc_worker(&self, worker: Box<GCWorker<VM>>) {
102 let all_surrendered = self.worker_group.surrender_gc_worker(worker);
103
104 if all_surrendered {
105 debug!(
106 "All {} workers surrendered.",
107 self.worker_group.worker_count()
108 );
109 self.worker_monitor.on_all_workers_exited();
110 }
111 }
112
113 pub fn respawn_gc_threads_after_forking(self: &Arc<Self>, tls: VMThread) {
117 self.worker_group.respawn(tls)
118 }
119
120 pub fn resolve_affinity(&self, thread: ThreadId) {
122 self.affinity.resolve_affinity(thread);
123 }
124
125 pub(crate) fn request_schedule_collection(&self) {
127 debug!("A mutator is sending GC-scheduling request to workers...");
128 self.worker_monitor.make_request(WorkerGoal::Gc);
129 }
130
131 fn add_schedule_collection_packet(&self) {
133 probe!(mmtk, add_schedule_collection_packet);
135 self.work_buckets[WorkBucketStage::Unconstrained].add_no_notify(ScheduleCollection);
136 }
137
138 pub fn schedule_common_work<C: GCWorkContext<VM = VM>>(&self, plan: &'static C::PlanType) {
140 use crate::scheduler::gc_work::*;
141 self.work_buckets[WorkBucketStage::Unconstrained].add(StopMutators::<C>::new());
143
144 self.work_buckets[WorkBucketStage::Prepare].add(Prepare::<C>::new(plan));
146
147 self.work_buckets[WorkBucketStage::Release].add(Release::<C>::new(plan));
149
150 #[cfg(feature = "analysis")]
152 {
153 use crate::util::analysis::GcHookWork;
154 self.work_buckets[WorkBucketStage::Unconstrained].add(GcHookWork);
155 }
156
157 #[cfg(feature = "sanity")]
159 {
160 use crate::util::sanity::sanity_checker::ScheduleSanityGC;
161 self.work_buckets[WorkBucketStage::Final]
162 .add(ScheduleSanityGC::<C::PlanType>::new(plan));
163 }
164
165 if !*plan.base().options.no_reference_types {
167 use crate::util::reference_processor::{
168 PhantomRefProcessing, SoftRefProcessing, WeakRefProcessing,
169 };
170 self.work_buckets[WorkBucketStage::SoftRefClosure]
171 .add(SoftRefProcessing::<C::DefaultProcessEdges>::new());
172 self.work_buckets[WorkBucketStage::WeakRefClosure].add(WeakRefProcessing::<VM>::new());
173 self.work_buckets[WorkBucketStage::PhantomRefClosure]
174 .add(PhantomRefProcessing::<VM>::new());
175
176 use crate::util::reference_processor::RefForwarding;
177 if plan.constraints().needs_forward_after_liveness {
178 self.work_buckets[WorkBucketStage::RefForwarding]
179 .add(RefForwarding::<C::DefaultProcessEdges>::new());
180 }
181
182 use crate::util::reference_processor::RefEnqueue;
183 self.work_buckets[WorkBucketStage::Release].add(RefEnqueue::<VM>::new());
184 }
185
186 if !*plan.base().options.no_finalizer {
188 use crate::util::finalizable_processor::{Finalization, ForwardFinalization};
189 self.work_buckets[WorkBucketStage::FinalRefClosure]
191 .add(Finalization::<C::DefaultProcessEdges>::new());
192 if plan.constraints().needs_forward_after_liveness {
194 self.work_buckets[WorkBucketStage::FinalizableForwarding]
195 .add(ForwardFinalization::<C::DefaultProcessEdges>::new());
196 }
197 }
198
199 self.work_buckets[WorkBucketStage::VMRefClosure]
221 .set_sentinel(Box::new(VMProcessWeakRefs::<C::DefaultProcessEdges>::new()));
222
223 if plan.constraints().needs_forward_after_liveness {
224 self.work_buckets[WorkBucketStage::VMRefForwarding]
226 .add(VMForwardWeakRefs::<C::DefaultProcessEdges>::new());
227 }
228
229 self.work_buckets[WorkBucketStage::Release].add(VMPostForwarding::<VM>::default());
230 }
231
232 fn are_buckets_drained(&self, buckets: &[WorkBucketStage]) -> bool {
233 buckets
234 .iter()
235 .all(|&b| !self.work_buckets[b].is_enabled() || self.work_buckets[b].is_drained())
236 }
237
238 pub fn debug_assert_all_stw_buckets_empty(&self) {
239 debug_assert!(self
240 .work_buckets
241 .values()
242 .filter(|bucket| bucket.get_stage().is_stw())
243 .all(|bucket| {
244 if !bucket.is_empty() {
245 warn!(
246 "Work bucket {:?} is not empty but it is expected to be empty!",
247 bucket.get_stage()
248 );
249 warn!("Queue: {:?}", bucket.get_queue().debug_dump_packets());
250 false
251 } else {
252 true
253 }
254 }))
255 }
256
257 pub(crate) fn schedule_sentinels(&self) -> bool {
259 let mut new_packets = false;
260 for (id, work_bucket) in self.work_buckets.iter() {
261 if work_bucket.is_open() && work_bucket.maybe_schedule_sentinel() {
262 trace!("Scheduled sentinel packet into {:?}", id);
263 new_packets = true;
264 }
265 }
266 new_packets
267 }
268
269 pub(crate) fn update_buckets(&self) -> bool {
276 debug!("update_buckets");
277 let mut buckets_updated = false;
278 let mut new_packets = false;
279 for i in 0..WorkBucketStage::LENGTH {
280 let id = WorkBucketStage::from_usize(i);
281 if id.is_always_open() {
282 continue;
283 }
284 let bucket = &self.work_buckets[id];
285 if !bucket.is_enabled() {
286 debug!("Work bucket {:?} is disabled. Skip.", id);
287 continue;
288 }
289 debug!("Checking if {:?} can be opened...", id);
290 let bucket_opened = bucket.update(self);
291 buckets_updated = buckets_updated || bucket_opened;
292 if bucket_opened {
293 probe!(mmtk, bucket_opened, id);
294 new_packets = new_packets || !bucket.is_drained();
295 if new_packets {
296 trace!("Found new packets at stage {:?}. Break.", id);
298 break;
299 }
300 new_packets = new_packets || bucket.maybe_schedule_sentinel();
301 if new_packets {
302 trace!("Sentinel is scheduled at stage {:?}. Break.", id);
304 break;
305 }
306 }
307 }
308 buckets_updated && new_packets
309 }
310
311 pub fn close_all_stw_buckets(&self) {
312 self.work_buckets.iter().for_each(|(id, bkt)| {
313 if id.is_stw() {
314 bkt.close();
315 }
316 });
317 }
318
319 pub fn reset_state(&self) {
320 self.work_buckets.iter().for_each(|(id, bkt)| {
321 if id.is_stw() && !id.is_first_stw_stage() {
322 bkt.close();
323 }
324 });
325 }
326
327 pub fn debug_assert_all_stw_buckets_closed(&self) {
328 if cfg!(debug_assertions) {
329 self.work_buckets.iter().for_each(|(id, bkt)| {
330 if id.is_stw() {
331 assert!(!bkt.is_open());
332 }
333 });
334 }
335 }
336
337 pub(crate) fn assert_all_open_buckets_are_empty(&self) {
339 let mut error_example = None;
340 for (id, bucket) in self.work_buckets.iter() {
341 if bucket.is_enabled() && bucket.is_open() && !bucket.is_empty() {
342 error!("Work bucket {:?} is not drained!", id);
343 error!("Queue: {:?}", bucket.get_queue().debug_dump_packets());
344 error_example = Some(id);
349 }
350 }
351 if let Some(id) = error_example {
352 panic!("Some open buckets (such as {:?}) are not empty.", id);
353 }
354 }
355
356 fn poll_schedulable_work_once(&self, worker: &GCWorker<VM>) -> Steal<Box<dyn GCWork<VM>>> {
358 let mut should_retry = false;
359 if let Some(w) = worker.shared.designated_work.pop() {
361 return Steal::Success(w);
362 }
363 for work_bucket in self.work_buckets.values() {
365 match work_bucket.poll(&worker.local_work_buffer) {
366 Steal::Success(w) => return Steal::Success(w),
367 Steal::Retry => should_retry = true,
368 _ => {}
369 }
370 }
371 for (id, worker_shared) in self.worker_group.workers_shared.iter().enumerate() {
373 if id == worker.ordinal {
374 continue;
375 }
376 match worker_shared.stealer.as_ref().unwrap().steal() {
377 Steal::Success(w) => return Steal::Success(w),
378 Steal::Retry => should_retry = true,
379 _ => {}
380 }
381 }
382 if should_retry {
383 Steal::Retry
384 } else {
385 Steal::Empty
386 }
387 }
388
389 fn poll_schedulable_work(&self, worker: &GCWorker<VM>) -> Option<Box<dyn GCWork<VM>>> {
391 loop {
393 match self.poll_schedulable_work_once(worker) {
394 Steal::Success(w) => {
395 return Some(w);
396 }
397 Steal::Retry => {
398 std::thread::yield_now();
399 continue;
400 }
401 Steal::Empty => {
402 return None;
403 }
404 }
405 }
406 }
407
408 pub(crate) fn poll(&self, worker: &GCWorker<VM>) -> PollResult<VM> {
411 if let Some(work) = self.poll_schedulable_work(worker) {
412 return Ok(work);
413 }
414 self.poll_slow(worker)
415 }
416
417 fn poll_slow(&self, worker: &GCWorker<VM>) -> PollResult<VM> {
418 loop {
419 if let Some(work) = self.poll_schedulable_work(worker) {
421 return Ok(work);
422 }
423
424 let ordinal = worker.ordinal;
425 self.worker_monitor
426 .park_and_wait(ordinal, |goals| self.on_last_parked(worker, goals))?;
427 }
428 }
429
430 fn on_last_parked(&self, worker: &GCWorker<VM>, goals: &mut WorkerGoals) -> LastParkedResult {
433 let Some(ref current_goal) = goals.current() else {
434 return self.respond_to_requests(worker, goals);
436 };
437
438 match current_goal {
439 WorkerGoal::Gc => {
440 assert!(
445 !goals.debug_is_requested(WorkerGoal::Gc),
446 "GC request sent to WorkerMonitor while GC is still in progress."
447 );
448
449 trace!("The last worker parked during GC. Try to find more work to do...");
451
452 self.assert_all_open_buckets_are_empty();
454
455 let found_more_work = self.find_more_work_for_workers();
457
458 if found_more_work {
459 LastParkedResult::WakeAll
460 } else {
461 let concurrent_work_scheduled = self.on_gc_finished(worker);
463
464 goals.on_current_goal_completed();
466
467 if concurrent_work_scheduled {
468 LastParkedResult::WakeAll
471 } else {
472 self.respond_to_requests(worker, goals)
475 }
476 }
477 }
478 WorkerGoal::StopForFork => {
479 panic!(
480 "Worker {} parked again when it is asked to exit.",
481 worker.ordinal
482 )
483 }
484 }
485 }
486
487 fn respond_to_requests(
489 &self,
490 worker: &GCWorker<VM>,
491 goals: &mut WorkerGoals,
492 ) -> LastParkedResult {
493 assert!(goals.current().is_none());
494
495 let Some(goal) = goals.poll_next_goal() else {
496 return LastParkedResult::ParkSelf;
498 };
499
500 match goal {
501 WorkerGoal::Gc => {
502 trace!("A mutator requested a GC to be scheduled.");
503
504 probe!(mmtk, gc_start);
507
508 {
509 let mut gc_start_time = worker.mmtk.state.gc_start_time.borrow_mut();
510 assert!(gc_start_time.is_none(), "GC already started?");
511 *gc_start_time = Some(Instant::now());
512 }
513
514 self.add_schedule_collection_packet();
515 LastParkedResult::WakeSelf
516 }
517 WorkerGoal::StopForFork => {
518 trace!("A mutator wanted to fork.");
519 LastParkedResult::WakeAll
520 }
521 }
522 }
523
524 fn find_more_work_for_workers(&self) -> bool {
526 if self.worker_group.has_designated_work() {
527 trace!("Some workers have designated work.");
528 return true;
529 }
530
531 if self.schedule_sentinels() {
533 trace!("Some sentinels are scheduled.");
534 return true;
535 }
536
537 if self.update_buckets() {
539 trace!("Some buckets are opened.");
540 return true;
541 }
542
543 false
545 }
546
547 fn on_gc_finished(&self, worker: &GCWorker<VM>) -> bool {
551 debug_assert!(!self.worker_group.has_designated_work());
553 self.debug_assert_all_stw_buckets_empty();
554
555 self.close_all_stw_buckets();
557 self.debug_assert_all_stw_buckets_closed();
558
559 let mmtk = worker.mmtk;
560
561 mmtk.gc_trigger.policy.on_gc_end(mmtk);
563
564 probe!(mmtk, plan_end_of_gc_begin);
566 let plan_mut: &mut dyn Plan<VM = VM> = unsafe { mmtk.get_plan_mut() };
567 plan_mut.end_of_gc(worker.tls);
568 probe!(mmtk, plan_end_of_gc_end);
569
570 let start_time = {
572 let mut gc_start_time = worker.mmtk.state.gc_start_time.borrow_mut();
573 gc_start_time.take().expect("GC not started yet?")
574 };
575 let elapsed = start_time.elapsed();
576
577 info!(
578 "End of GC ({}/{} pages, took {} ms)",
579 mmtk.get_plan().get_reserved_pages(),
580 mmtk.get_plan().get_total_pages(),
581 elapsed.as_millis()
582 );
583
584 probe!(mmtk, gc_end);
586
587 if *mmtk.get_options().count_live_bytes_in_gc {
588 let live_bytes = mmtk
590 .scheduler
591 .worker_group
592 .get_and_clear_worker_live_bytes();
593 let mut live_bytes_in_last_gc = mmtk.state.live_bytes_in_last_gc.borrow_mut();
594 *live_bytes_in_last_gc = mmtk.aggregate_live_bytes_in_last_gc(live_bytes);
595 for (space_name, &stats) in live_bytes_in_last_gc.iter() {
597 info!(
598 "{} = {} pages ({:.1}% live)",
599 space_name,
600 stats.used_pages,
601 stats.live_bytes as f64 * 100.0 / stats.used_bytes as f64,
602 );
603 }
604 }
605
606 mmtk.state
607 .set_used_pages_after_last_gc(mmtk.get_plan().get_used_pages());
608
609 #[cfg(feature = "extreme_assertions")]
610 if crate::util::slot_logger::should_check_duplicate_slots(mmtk.get_plan()) {
611 mmtk.slot_logger.reset();
613 }
614
615 mmtk.state.reset_collection_trigger();
617
618 let concurrent_work_scheduled = self.schedule_concurrent_packets();
619 self.debug_assert_all_stw_buckets_closed();
620
621 mmtk.set_gc_status(GcStatus::NotInGC);
623 <VM as VMBinding>::VMCollection::resume_mutators(worker.tls);
624
625 concurrent_work_scheduled
626 }
627
628 pub fn enable_stat(&self) {
629 for worker in &self.worker_group.workers_shared {
630 let worker_stat = worker.borrow_stat();
631 worker_stat.enable();
632 }
633 }
634
635 pub fn statistics(&self) -> HashMap<String, String> {
636 let mut summary = SchedulerStat::default();
637 for worker in &self.worker_group.workers_shared {
638 let worker_stat = worker.borrow_stat();
639 summary.merge(&worker_stat);
640 }
641 summary.harness_stat()
642 }
643
644 pub fn notify_mutators_paused(&self, mmtk: &'static MMTK<VM>) {
645 mmtk.gc_trigger.clear_request();
646 let first_stw_bucket = &self.work_buckets[WorkBucketStage::FIRST_STW_STAGE];
647 debug_assert!(!first_stw_bucket.is_open());
648 first_stw_bucket.open();
656 self.worker_monitor.notify_work_available(true);
657 }
658
659 pub(super) fn schedule_concurrent_packets(&self) -> bool {
660 let concurrent_bucket = &self.work_buckets[WorkBucketStage::Concurrent];
661 if !concurrent_bucket.is_empty() {
662 concurrent_bucket.set_enabled(true);
663 concurrent_bucket.open();
664 true
665 } else {
666 concurrent_bucket.set_enabled(false);
667 concurrent_bucket.close();
668 false
669 }
670 }
671}