mmtk/scheduler/
scheduler.rs

1use self::worker::PollResult;
2
3use super::gc_work::ScheduleCollection;
4use super::stat::SchedulerStat;
5use super::work_bucket::*;
6use super::worker::{GCWorker, ThreadId, WorkerGroup};
7use super::worker_goals::{WorkerGoal, WorkerGoals};
8use super::worker_monitor::{LastParkedResult, WorkerMonitor};
9use super::*;
10use crate::global_state::GcStatus;
11use crate::mmtk::MMTK;
12use crate::util::opaque_pointer::*;
13use crate::util::options::AffinityKind;
14use crate::vm::Collection;
15use crate::vm::VMBinding;
16use crate::Plan;
17use crossbeam::deque::Steal;
18use enum_map::{Enum, EnumMap};
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Instant;
22
23pub struct GCWorkScheduler<VM: VMBinding> {
24    /// Work buckets
25    pub work_buckets: EnumMap<WorkBucketStage, WorkBucket<VM>>,
26    /// Workers
27    pub(crate) worker_group: Arc<WorkerGroup<VM>>,
28    /// For synchronized communication between workers and with mutators.
29    pub(crate) worker_monitor: Arc<WorkerMonitor>,
30    /// How to assign the affinity of each GC thread. Specified by the user.
31    affinity: AffinityKind,
32}
33
34// FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet.
35// Some subtle interaction between ObjectRememberingBarrier, Mutator and some GCWork instances
36// makes the compiler think WorkBucket is not Sync.
37unsafe impl<VM: VMBinding> Sync for GCWorkScheduler<VM> {}
38
39impl<VM: VMBinding> GCWorkScheduler<VM> {
40    pub fn new(num_workers: usize, affinity: AffinityKind) -> Arc<Self> {
41        let worker_monitor: Arc<WorkerMonitor> = Arc::new(WorkerMonitor::new(num_workers));
42        let worker_group = WorkerGroup::new(num_workers);
43
44        // Create work buckets for workers.
45        let mut work_buckets = EnumMap::from_fn(|stage: WorkBucketStage| {
46            WorkBucket::new(stage, worker_monitor.clone())
47        });
48
49        // Set the open condition of each bucket.
50        {
51            let mut open_stages: Vec<WorkBucketStage> = vec![WorkBucketStage::FIRST_STW_STAGE];
52            let stages = (0..WorkBucketStage::LENGTH).map(WorkBucketStage::from_usize);
53            for stage in stages {
54                if stage.is_sequentially_opened() {
55                    let cur_stages = open_stages.clone();
56                    // Other work packets will be opened after previous stages are done
57                    // (i.e their buckets are drained and all workers parked).
58                    work_buckets[stage].set_open_condition(
59                        move |scheduler: &GCWorkScheduler<VM>| {
60                            debug!(
61                                "Check if {:?} can be opened? These needs to be drained: {:?}",
62                                stage, &cur_stages
63                            );
64                            scheduler.are_buckets_drained(&cur_stages)
65                        },
66                    );
67                    open_stages.push(stage);
68                }
69            }
70        }
71
72        Arc::new(Self {
73            work_buckets,
74            worker_group,
75            worker_monitor,
76            affinity,
77        })
78    }
79
80    pub fn num_workers(&self) -> usize {
81        self.worker_group.as_ref().worker_count()
82    }
83
84    /// Create GC threads for the first time.  It will also create the `GCWorker` instances.
85    ///
86    /// Currently GC threads only include worker threads, and we currently have only one worker
87    /// group.  We may add more worker groups in the future.
88    pub fn spawn_gc_threads(self: &Arc<Self>, mmtk: &'static MMTK<VM>, tls: VMThread) {
89        self.worker_group.initial_spawn(tls, mmtk);
90    }
91
92    /// Ask all GC workers to exit for forking.
93    pub fn stop_gc_threads_for_forking(self: &Arc<Self>) {
94        self.worker_group.prepare_surrender_buffer();
95
96        debug!("A mutator is requesting GC threads to stop for forking...");
97        self.worker_monitor.make_request(WorkerGoal::StopForFork);
98    }
99
100    /// Surrender the `GCWorker` struct of a GC worker when it exits.
101    pub fn surrender_gc_worker(&self, worker: Box<GCWorker<VM>>) {
102        let all_surrendered = self.worker_group.surrender_gc_worker(worker);
103
104        if all_surrendered {
105            debug!(
106                "All {} workers surrendered.",
107                self.worker_group.worker_count()
108            );
109            self.worker_monitor.on_all_workers_exited();
110        }
111    }
112
113    /// Respawn GC threads after forking.  This will reuse the `GCWorker` instances of stopped
114    /// workers.  `tls` is the VM thread that requests GC threads to be re-spawn, and will be
115    /// passed down to [`crate::vm::Collection::spawn_gc_thread`].
116    pub fn respawn_gc_threads_after_forking(self: &Arc<Self>, tls: VMThread) {
117        self.worker_group.respawn(tls)
118    }
119
120    /// Resolve the affinity of a thread.
121    pub fn resolve_affinity(&self, thread: ThreadId) {
122        self.affinity.resolve_affinity(thread);
123    }
124
125    /// Request a GC to be scheduled.  Called by mutator via `GCTrigger`.
126    pub(crate) fn request_schedule_collection(&self) {
127        debug!("A mutator is sending GC-scheduling request to workers...");
128        self.worker_monitor.make_request(WorkerGoal::Gc);
129    }
130
131    /// Add the `ScheduleCollection` packet.  Called by the last parked worker.
132    fn add_schedule_collection_packet(&self) {
133        // We are still holding the mutex `WorkerMonitor::sync`.  Do not notify now.
134        probe!(mmtk, add_schedule_collection_packet);
135        self.work_buckets[WorkBucketStage::Unconstrained].add_no_notify(ScheduleCollection);
136    }
137
138    /// Schedule all the common work packets
139    pub fn schedule_common_work<C: GCWorkContext<VM = VM>>(&self, plan: &'static C::PlanType) {
140        use crate::scheduler::gc_work::*;
141        // Stop & scan mutators (mutator scanning can happen before STW)
142        self.work_buckets[WorkBucketStage::Unconstrained].add(StopMutators::<C>::new());
143
144        // Prepare global/collectors/mutators
145        self.work_buckets[WorkBucketStage::Prepare].add(Prepare::<C>::new(plan));
146
147        // Release global/collectors/mutators
148        self.work_buckets[WorkBucketStage::Release].add(Release::<C>::new(plan));
149
150        // Analysis GC work
151        #[cfg(feature = "analysis")]
152        {
153            use crate::util::analysis::GcHookWork;
154            self.work_buckets[WorkBucketStage::Unconstrained].add(GcHookWork);
155        }
156
157        // Sanity
158        #[cfg(feature = "sanity")]
159        {
160            use crate::util::sanity::sanity_checker::ScheduleSanityGC;
161            self.work_buckets[WorkBucketStage::Final]
162                .add(ScheduleSanityGC::<C::PlanType>::new(plan));
163        }
164
165        // Reference processing
166        if !*plan.base().options.no_reference_types {
167            use crate::util::reference_processor::{
168                PhantomRefProcessing, SoftRefProcessing, WeakRefProcessing,
169            };
170            self.work_buckets[WorkBucketStage::SoftRefClosure]
171                .add(SoftRefProcessing::<C::DefaultProcessEdges>::new());
172            self.work_buckets[WorkBucketStage::WeakRefClosure].add(WeakRefProcessing::<VM>::new());
173            self.work_buckets[WorkBucketStage::PhantomRefClosure]
174                .add(PhantomRefProcessing::<VM>::new());
175
176            use crate::util::reference_processor::RefForwarding;
177            if plan.constraints().needs_forward_after_liveness {
178                self.work_buckets[WorkBucketStage::RefForwarding]
179                    .add(RefForwarding::<C::DefaultProcessEdges>::new());
180            }
181
182            use crate::util::reference_processor::RefEnqueue;
183            self.work_buckets[WorkBucketStage::Release].add(RefEnqueue::<VM>::new());
184        }
185
186        // Finalization
187        if !*plan.base().options.no_finalizer {
188            use crate::util::finalizable_processor::{Finalization, ForwardFinalization};
189            // finalization
190            self.work_buckets[WorkBucketStage::FinalRefClosure]
191                .add(Finalization::<C::DefaultProcessEdges>::new());
192            // forward refs
193            if plan.constraints().needs_forward_after_liveness {
194                self.work_buckets[WorkBucketStage::FinalizableForwarding]
195                    .add(ForwardFinalization::<C::DefaultProcessEdges>::new());
196            }
197        }
198
199        // We add the VM-specific weak ref processing work regardless of MMTK-side options,
200        // including Options::no_finalizer and Options::no_reference_types.
201        //
202        // VMs need weak reference handling to function properly.  The VM may treat weak references
203        // as strong references, but it is not appropriate to simply disable weak reference
204        // handling from MMTk's side.  The VM, however, may choose to do nothing in
205        // `Collection::process_weak_refs` if appropriate.
206        //
207        // It is also not sound for MMTk core to turn off weak
208        // reference processing or finalization alone, because (1) not all VMs have the notion of
209        // weak references or finalizers, so it may not make sence, and (2) the VM may
210        // processing them together.
211
212        // VM-specific weak ref processing
213        // The `VMProcessWeakRefs` work packet is set as the sentinel so that it is executed when
214        // the `VMRefClosure` bucket is drained.  The VM binding may spawn new work packets into
215        // the `VMRefClosure` bucket, and request another `VMProcessWeakRefs` work packet to be
216        // executed again after this bucket is drained again.  Strictly speaking, the first
217        // `VMProcessWeakRefs` packet can be an ordinary packet (doesn't have to be a sentinel)
218        // because there are no other packets in the bucket.  We set it as sentinel for
219        // consistency.
220        self.work_buckets[WorkBucketStage::VMRefClosure]
221            .set_sentinel(Box::new(VMProcessWeakRefs::<C::DefaultProcessEdges>::new()));
222
223        if plan.constraints().needs_forward_after_liveness {
224            // VM-specific weak ref forwarding
225            self.work_buckets[WorkBucketStage::VMRefForwarding]
226                .add(VMForwardWeakRefs::<C::DefaultProcessEdges>::new());
227        }
228
229        self.work_buckets[WorkBucketStage::Release].add(VMPostForwarding::<VM>::default());
230    }
231
232    fn are_buckets_drained(&self, buckets: &[WorkBucketStage]) -> bool {
233        buckets
234            .iter()
235            .all(|&b| !self.work_buckets[b].is_enabled() || self.work_buckets[b].is_drained())
236    }
237
238    pub fn debug_assert_all_stw_buckets_empty(&self) {
239        debug_assert!(self
240            .work_buckets
241            .values()
242            .filter(|bucket| bucket.get_stage().is_stw())
243            .all(|bucket| {
244                if !bucket.is_empty() {
245                    warn!(
246                        "Work bucket {:?} is not empty but it is expected to be empty!",
247                        bucket.get_stage()
248                    );
249                    warn!("Queue: {:?}", bucket.get_queue().debug_dump_packets());
250                    false
251                } else {
252                    true
253                }
254            }))
255    }
256
257    /// Schedule "sentinel" work packets for all open buckets.
258    pub(crate) fn schedule_sentinels(&self) -> bool {
259        let mut new_packets = false;
260        for (id, work_bucket) in self.work_buckets.iter() {
261            if work_bucket.is_open() && work_bucket.maybe_schedule_sentinel() {
262                trace!("Scheduled sentinel packet into {:?}", id);
263                new_packets = true;
264            }
265        }
266        new_packets
267    }
268
269    /// Open buckets if their conditions are met.
270    ///
271    /// This function should only be called after all the workers are parked.
272    /// No workers will be waked up by this function. The caller is responsible for that.
273    ///
274    /// Return true if there're any non-empty buckets updated.
275    pub(crate) fn update_buckets(&self) -> bool {
276        debug!("update_buckets");
277        let mut buckets_updated = false;
278        let mut new_packets = false;
279        for i in 0..WorkBucketStage::LENGTH {
280            let id = WorkBucketStage::from_usize(i);
281            if id.is_always_open() {
282                continue;
283            }
284            let bucket = &self.work_buckets[id];
285            if !bucket.is_enabled() {
286                debug!("Work bucket {:?} is disabled. Skip.", id);
287                continue;
288            }
289            debug!("Checking if {:?} can be opened...", id);
290            let bucket_opened = bucket.update(self);
291            buckets_updated = buckets_updated || bucket_opened;
292            if bucket_opened {
293                probe!(mmtk, bucket_opened, id);
294                new_packets = new_packets || !bucket.is_drained();
295                if new_packets {
296                    // Quit the loop. There are already new packets in the newly opened buckets.
297                    trace!("Found new packets at stage {:?}.  Break.", id);
298                    break;
299                }
300                new_packets = new_packets || bucket.maybe_schedule_sentinel();
301                if new_packets {
302                    // Quit the loop. A sentinel packet is added to the newly opened buckets.
303                    trace!("Sentinel is scheduled at stage {:?}.  Break.", id);
304                    break;
305                }
306            }
307        }
308        buckets_updated && new_packets
309    }
310
311    pub fn close_all_stw_buckets(&self) {
312        self.work_buckets.iter().for_each(|(id, bkt)| {
313            if id.is_stw() {
314                bkt.close();
315            }
316        });
317    }
318
319    pub fn reset_state(&self) {
320        self.work_buckets.iter().for_each(|(id, bkt)| {
321            if id.is_stw() && !id.is_first_stw_stage() {
322                bkt.close();
323            }
324        });
325    }
326
327    pub fn debug_assert_all_stw_buckets_closed(&self) {
328        if cfg!(debug_assertions) {
329            self.work_buckets.iter().for_each(|(id, bkt)| {
330                if id.is_stw() {
331                    assert!(!bkt.is_open());
332                }
333            });
334        }
335    }
336
337    /// Check if all the work buckets are empty
338    pub(crate) fn assert_all_open_buckets_are_empty(&self) {
339        let mut error_example = None;
340        for (id, bucket) in self.work_buckets.iter() {
341            if bucket.is_enabled() && bucket.is_open() && !bucket.is_empty() {
342                error!("Work bucket {:?} is not drained!", id);
343                error!("Queue: {:?}", bucket.get_queue().debug_dump_packets());
344                // This error can be hard to reproduce.
345                // If an error happens in the release build where logs are turned off,
346                // we should show at least one abnormal bucket in the panic message
347                // so that we still have some information for debugging.
348                error_example = Some(id);
349            }
350        }
351        if let Some(id) = error_example {
352            panic!("Some open buckets (such as {:?}) are not empty.", id);
353        }
354    }
355
356    /// Get a schedulable work packet without retry.
357    fn poll_schedulable_work_once(&self, worker: &GCWorker<VM>) -> Steal<Box<dyn GCWork<VM>>> {
358        let mut should_retry = false;
359        // Try find a packet that can be processed only by this worker.
360        if let Some(w) = worker.shared.designated_work.pop() {
361            return Steal::Success(w);
362        }
363        // Try get a packet from a work bucket.
364        for work_bucket in self.work_buckets.values() {
365            match work_bucket.poll(&worker.local_work_buffer) {
366                Steal::Success(w) => return Steal::Success(w),
367                Steal::Retry => should_retry = true,
368                _ => {}
369            }
370        }
371        // Try steal some packets from any worker
372        for (id, worker_shared) in self.worker_group.workers_shared.iter().enumerate() {
373            if id == worker.ordinal {
374                continue;
375            }
376            match worker_shared.stealer.as_ref().unwrap().steal() {
377                Steal::Success(w) => return Steal::Success(w),
378                Steal::Retry => should_retry = true,
379                _ => {}
380            }
381        }
382        if should_retry {
383            Steal::Retry
384        } else {
385            Steal::Empty
386        }
387    }
388
389    /// Get a schedulable work packet.
390    fn poll_schedulable_work(&self, worker: &GCWorker<VM>) -> Option<Box<dyn GCWork<VM>>> {
391        // Loop until we successfully get a packet.
392        loop {
393            match self.poll_schedulable_work_once(worker) {
394                Steal::Success(w) => {
395                    return Some(w);
396                }
397                Steal::Retry => {
398                    std::thread::yield_now();
399                    continue;
400                }
401                Steal::Empty => {
402                    return None;
403                }
404            }
405        }
406    }
407
408    /// Called by workers to get a schedulable work packet.
409    /// Park the worker if there're no available packets.
410    pub(crate) fn poll(&self, worker: &GCWorker<VM>) -> PollResult<VM> {
411        if let Some(work) = self.poll_schedulable_work(worker) {
412            return Ok(work);
413        }
414        self.poll_slow(worker)
415    }
416
417    fn poll_slow(&self, worker: &GCWorker<VM>) -> PollResult<VM> {
418        loop {
419            // Retry polling
420            if let Some(work) = self.poll_schedulable_work(worker) {
421                return Ok(work);
422            }
423
424            let ordinal = worker.ordinal;
425            self.worker_monitor
426                .park_and_wait(ordinal, |goals| self.on_last_parked(worker, goals))?;
427        }
428    }
429
430    /// Called when the last worker parked.  `goal` allows this function to inspect and change the
431    /// current goal.
432    fn on_last_parked(&self, worker: &GCWorker<VM>, goals: &mut WorkerGoals) -> LastParkedResult {
433        let Some(ref current_goal) = goals.current() else {
434            // There is no goal.  Find a request to respond to.
435            return self.respond_to_requests(worker, goals);
436        };
437
438        match current_goal {
439            WorkerGoal::Gc => {
440                // We are in the progress of GC.
441
442                // In stop-the-world GC, mutators cannot request for GC while GC is in progress.
443                // When we support concurrent GC, we should remove this assertion.
444                assert!(
445                    !goals.debug_is_requested(WorkerGoal::Gc),
446                    "GC request sent to WorkerMonitor while GC is still in progress."
447                );
448
449                // We are in the middle of GC, and the last GC worker parked.
450                trace!("The last worker parked during GC.  Try to find more work to do...");
451
452                // During GC, if all workers parked, all open buckets must have been drained.
453                self.assert_all_open_buckets_are_empty();
454
455                // Find more work for workers to do.
456                let found_more_work = self.find_more_work_for_workers();
457
458                if found_more_work {
459                    LastParkedResult::WakeAll
460                } else {
461                    // GC finished.
462                    let concurrent_work_scheduled = self.on_gc_finished(worker);
463
464                    // Clear the current goal
465                    goals.on_current_goal_completed();
466
467                    if concurrent_work_scheduled {
468                        // It was the initial mark pause and scheduled concurrent work.
469                        // Wake up all GC workers to do concurrent work.
470                        LastParkedResult::WakeAll
471                    } else {
472                        // It was an STW GC or the final mark pause of a concurrent GC.
473                        // Respond to another goal.
474                        self.respond_to_requests(worker, goals)
475                    }
476                }
477            }
478            WorkerGoal::StopForFork => {
479                panic!(
480                    "Worker {} parked again when it is asked to exit.",
481                    worker.ordinal
482                )
483            }
484        }
485    }
486
487    /// Respond to a worker reqeust.
488    fn respond_to_requests(
489        &self,
490        worker: &GCWorker<VM>,
491        goals: &mut WorkerGoals,
492    ) -> LastParkedResult {
493        assert!(goals.current().is_none());
494
495        let Some(goal) = goals.poll_next_goal() else {
496            // No requests.  Park this worker, too.
497            return LastParkedResult::ParkSelf;
498        };
499
500        match goal {
501            WorkerGoal::Gc => {
502                trace!("A mutator requested a GC to be scheduled.");
503
504                // We set the eBPF trace point here so that bpftrace scripts can start recording
505                // work packet events before the `ScheduleCollection` work packet starts.
506                probe!(mmtk, gc_start);
507
508                {
509                    let mut gc_start_time = worker.mmtk.state.gc_start_time.borrow_mut();
510                    assert!(gc_start_time.is_none(), "GC already started?");
511                    *gc_start_time = Some(Instant::now());
512                }
513
514                self.add_schedule_collection_packet();
515                LastParkedResult::WakeSelf
516            }
517            WorkerGoal::StopForFork => {
518                trace!("A mutator wanted to fork.");
519                LastParkedResult::WakeAll
520            }
521        }
522    }
523
524    /// Find more work for workers to do.  Return true if more work is available.
525    fn find_more_work_for_workers(&self) -> bool {
526        if self.worker_group.has_designated_work() {
527            trace!("Some workers have designated work.");
528            return true;
529        }
530
531        // See if any bucket has a sentinel.
532        if self.schedule_sentinels() {
533            trace!("Some sentinels are scheduled.");
534            return true;
535        }
536
537        // Try to open new buckets.
538        if self.update_buckets() {
539            trace!("Some buckets are opened.");
540            return true;
541        }
542
543        // If all of the above failed, it means GC has finished.
544        false
545    }
546
547    /// Called when GC has finished, i.e. when all work packets have been executed.
548    ///
549    /// Return `true` if any concurrent work packets have been scheduled.
550    fn on_gc_finished(&self, worker: &GCWorker<VM>) -> bool {
551        // All GC workers must have parked by now.
552        debug_assert!(!self.worker_group.has_designated_work());
553        self.debug_assert_all_stw_buckets_empty();
554
555        // Close all work buckets to prepare for the next GC.
556        self.close_all_stw_buckets();
557        self.debug_assert_all_stw_buckets_closed();
558
559        let mmtk = worker.mmtk;
560
561        // Tell GC trigger that GC ended - this happens before we resume mutators.
562        mmtk.gc_trigger.policy.on_gc_end(mmtk);
563
564        // All other workers are parked, so it is safe to access the Plan instance mutably.
565        probe!(mmtk, plan_end_of_gc_begin);
566        let plan_mut: &mut dyn Plan<VM = VM> = unsafe { mmtk.get_plan_mut() };
567        plan_mut.end_of_gc(worker.tls);
568        probe!(mmtk, plan_end_of_gc_end);
569
570        // Compute the elapsed time of the GC.
571        let start_time = {
572            let mut gc_start_time = worker.mmtk.state.gc_start_time.borrow_mut();
573            gc_start_time.take().expect("GC not started yet?")
574        };
575        let elapsed = start_time.elapsed();
576
577        info!(
578            "End of GC ({}/{} pages, took {} ms)",
579            mmtk.get_plan().get_reserved_pages(),
580            mmtk.get_plan().get_total_pages(),
581            elapsed.as_millis()
582        );
583
584        // USDT tracepoint for the end of GC.
585        probe!(mmtk, gc_end);
586
587        if *mmtk.get_options().count_live_bytes_in_gc {
588            // Aggregate the live bytes
589            let live_bytes = mmtk
590                .scheduler
591                .worker_group
592                .get_and_clear_worker_live_bytes();
593            let mut live_bytes_in_last_gc = mmtk.state.live_bytes_in_last_gc.borrow_mut();
594            *live_bytes_in_last_gc = mmtk.aggregate_live_bytes_in_last_gc(live_bytes);
595            // Logging
596            for (space_name, &stats) in live_bytes_in_last_gc.iter() {
597                info!(
598                    "{} = {} pages ({:.1}% live)",
599                    space_name,
600                    stats.used_pages,
601                    stats.live_bytes as f64 * 100.0 / stats.used_bytes as f64,
602                );
603            }
604        }
605
606        mmtk.state
607            .set_used_pages_after_last_gc(mmtk.get_plan().get_used_pages());
608
609        #[cfg(feature = "extreme_assertions")]
610        if crate::util::slot_logger::should_check_duplicate_slots(mmtk.get_plan()) {
611            // reset the logging info at the end of each GC
612            mmtk.slot_logger.reset();
613        }
614
615        // Reset the triggering information.
616        mmtk.state.reset_collection_trigger();
617
618        let concurrent_work_scheduled = self.schedule_concurrent_packets();
619        self.debug_assert_all_stw_buckets_closed();
620
621        // Set to NotInGC after everything, and right before resuming mutators.
622        mmtk.set_gc_status(GcStatus::NotInGC);
623        <VM as VMBinding>::VMCollection::resume_mutators(worker.tls);
624
625        concurrent_work_scheduled
626    }
627
628    pub fn enable_stat(&self) {
629        for worker in &self.worker_group.workers_shared {
630            let worker_stat = worker.borrow_stat();
631            worker_stat.enable();
632        }
633    }
634
635    pub fn statistics(&self) -> HashMap<String, String> {
636        let mut summary = SchedulerStat::default();
637        for worker in &self.worker_group.workers_shared {
638            let worker_stat = worker.borrow_stat();
639            summary.merge(&worker_stat);
640        }
641        summary.harness_stat()
642    }
643
644    pub fn notify_mutators_paused(&self, mmtk: &'static MMTK<VM>) {
645        mmtk.gc_trigger.clear_request();
646        let first_stw_bucket = &self.work_buckets[WorkBucketStage::FIRST_STW_STAGE];
647        debug_assert!(!first_stw_bucket.is_open());
648        // Note: This is the only place where a bucket is opened without having all workers parked.
649        // We usually require all workers to park before opening new buckets because otherwise
650        // packets will be executed out of order.  However, since `Prepare` is the first STW
651        // bucket, and all subsequent buckets require all workers to park before opening, workers
652        // cannot execute work packets out of order.  This is not generally true if we are not
653        // opening the first STW bucket.  In the future, we should redesign the opening condition
654        // of work buckets to make the synchronization more robust,
655        first_stw_bucket.open();
656        self.worker_monitor.notify_work_available(true);
657    }
658
659    pub(super) fn schedule_concurrent_packets(&self) -> bool {
660        let concurrent_bucket = &self.work_buckets[WorkBucketStage::Concurrent];
661        if !concurrent_bucket.is_empty() {
662            concurrent_bucket.set_enabled(true);
663            concurrent_bucket.open();
664            true
665        } else {
666            concurrent_bucket.set_enabled(false);
667            concurrent_bucket.close();
668            false
669        }
670    }
671}