mmtk/util/heap/
blockpageresource.rs

1use super::pageresource::{PRAllocFail, PRAllocResult};
2use super::{FreeListPageResource, PageResource};
3use crate::util::address::Address;
4use crate::util::constants::*;
5use crate::util::heap::layout::vm_layout::*;
6use crate::util::heap::layout::VMMap;
7use crate::util::heap::pageresource::CommonPageResource;
8use crate::util::heap::space_descriptor::SpaceDescriptor;
9use crate::util::linear_scan::Region;
10use crate::util::opaque_pointer::*;
11use crate::util::rust_util::zeroed_alloc::new_zeroed_vec;
12use crate::vm::*;
13use atomic::Ordering;
14use spin::RwLock;
15use std::cell::UnsafeCell;
16use std::mem::MaybeUninit;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Mutex;
19
20const UNINITIALIZED_WATER_MARK: i32 = -1;
21const LOCAL_BUFFER_SIZE: usize = 128;
22
23/// A fast PageResource for fixed-size block allocation only.
24pub struct BlockPageResource<VM: VMBinding, B: Region + 'static> {
25    flpr: FreeListPageResource<VM>,
26    /// A buffer for storing all the free blocks
27    block_queue: BlockPool<B>,
28    /// Slow-path allocation synchronization
29    sync: Mutex<()>,
30}
31
32impl<VM: VMBinding, B: Region> PageResource<VM> for BlockPageResource<VM, B> {
33    fn common(&self) -> &CommonPageResource {
34        self.flpr.common()
35    }
36
37    fn common_mut(&mut self) -> &mut CommonPageResource {
38        self.flpr.common_mut()
39    }
40
41    fn update_discontiguous_start(&mut self, start: Address) {
42        self.flpr.update_discontiguous_start(start)
43    }
44
45    fn alloc_pages(
46        &self,
47        space_descriptor: SpaceDescriptor,
48        reserved_pages: usize,
49        required_pages: usize,
50        tls: VMThread,
51    ) -> Result<PRAllocResult, PRAllocFail> {
52        self.alloc_pages_fast(space_descriptor, reserved_pages, required_pages, tls)
53    }
54
55    fn get_available_physical_pages(&self) -> usize {
56        let _sync = self.sync.lock().unwrap();
57        self.flpr.get_available_physical_pages()
58    }
59}
60
61impl<VM: VMBinding, B: Region> BlockPageResource<VM, B> {
62    /// Block granularity in pages
63    const LOG_PAGES: usize = B::LOG_BYTES - LOG_BYTES_IN_PAGE as usize;
64
65    pub fn new_contiguous(
66        log_pages: usize,
67        start: Address,
68        bytes: usize,
69        vm_map: &'static dyn VMMap,
70        num_workers: usize,
71    ) -> Self {
72        assert!((1 << log_pages) <= PAGES_IN_CHUNK);
73        Self {
74            flpr: FreeListPageResource::new_contiguous(start, bytes, vm_map),
75            block_queue: BlockPool::new(num_workers),
76            sync: Mutex::new(()),
77        }
78    }
79
80    pub fn new_discontiguous(
81        log_pages: usize,
82        vm_map: &'static dyn VMMap,
83        num_workers: usize,
84    ) -> Self {
85        assert!((1 << log_pages) <= PAGES_IN_CHUNK);
86        Self {
87            flpr: FreeListPageResource::new_discontiguous(vm_map),
88            block_queue: BlockPool::new(num_workers),
89            sync: Mutex::new(()),
90        }
91    }
92
93    /// Grow contiguous space
94    fn alloc_pages_slow_sync(
95        &self,
96        space_descriptor: SpaceDescriptor,
97        reserved_pages: usize,
98        required_pages: usize,
99        tls: VMThread,
100    ) -> Result<PRAllocResult, PRAllocFail> {
101        let _guard = self.sync.lock().unwrap();
102        // Retry fast allocation
103        if let Some(block) = self.block_queue.pop() {
104            self.commit_pages(reserved_pages, required_pages, tls);
105            return Result::Ok(PRAllocResult {
106                start: block.start(),
107                pages: required_pages,
108                new_chunk: false,
109            });
110        }
111        // Grow space (a chunk at a time)
112        // 1. Grow space
113        let start: Address = match self.flpr.allocate_one_chunk_no_commit(space_descriptor) {
114            Ok(result) => result.start,
115            err => return err,
116        };
117        assert!(start.is_aligned_to(BYTES_IN_CHUNK));
118        // 2. Take the first block int the chunk as the allocation result
119        let first_block = start;
120        // 3. Push all remaining blocks to one or more block lists
121        let last_block = start + BYTES_IN_CHUNK;
122        let mut array = BlockQueue::new();
123        let mut cursor = start + B::BYTES;
124        while cursor < last_block {
125            let result = unsafe { array.push_relaxed(B::from_aligned_address(cursor)) };
126            if let Err(block) = result {
127                self.block_queue.add_global_array(array);
128                array = BlockQueue::new();
129                let result2 = unsafe { array.push_relaxed(block) };
130                debug_assert!(result2.is_ok());
131            }
132            cursor += B::BYTES;
133        }
134        debug_assert!(!array.is_empty());
135        // 4. Push the block list to the global pool
136        self.block_queue.add_global_array(array);
137        // Finish slow-allocation
138        self.commit_pages(reserved_pages, required_pages, tls);
139        Result::Ok(PRAllocResult {
140            start: first_block,
141            pages: required_pages,
142            new_chunk: true,
143        })
144    }
145
146    /// Allocate a block
147    fn alloc_pages_fast(
148        &self,
149        space_descriptor: SpaceDescriptor,
150        reserved_pages: usize,
151        required_pages: usize,
152        tls: VMThread,
153    ) -> Result<PRAllocResult, PRAllocFail> {
154        debug_assert_eq!(reserved_pages, required_pages);
155        debug_assert_eq!(reserved_pages, 1 << Self::LOG_PAGES);
156        // Fast allocate from the blocks list
157        if let Some(block) = self.block_queue.pop() {
158            self.commit_pages(reserved_pages, required_pages, tls);
159            return Result::Ok(PRAllocResult {
160                start: block.start(),
161                pages: required_pages,
162                new_chunk: false,
163            });
164        }
165        // Slow-path:we need to grow space
166        self.alloc_pages_slow_sync(space_descriptor, reserved_pages, required_pages, tls)
167    }
168
169    pub fn release_block(&self, block: B) {
170        let pages = 1 << Self::LOG_PAGES;
171        debug_assert!(pages as usize <= self.common().accounting.get_committed_pages());
172        self.common().accounting.release(pages as _);
173        self.block_queue.push(block)
174    }
175
176    pub fn flush_all(&self) {
177        self.block_queue.flush_all()
178        // TODO: For 32-bit space, we may want to free some contiguous chunks.
179    }
180}
181
182/// A block list that supports fast lock-free push/pop operations
183struct BlockQueue<B: Region> {
184    /// The number of elements in the queue.
185    cursor: AtomicUsize,
186    /// The underlying data storage.
187    ///
188    /// -   `UnsafeCell<T>`: It may be accessed by multiple threads.
189    /// -   `Box<[T]>`: It holds an array allocated on the heap.  It cannot be resized, but can be
190    ///     replaced with another array as a whole.
191    /// -   `MaybeUninit<T>`: It may contain uninitialized elements.
192    ///
193    /// The implementaiton of `BlockQueue` must ensure there is no data race, and it never reads
194    /// uninitialized elements.
195    data: UnsafeCell<Box<[MaybeUninit<B>]>>,
196}
197
198impl<B: Region> BlockQueue<B> {
199    /// Create an array
200    fn new() -> Self {
201        let zeroed_vec = new_zeroed_vec(Self::CAPACITY);
202        let boxed_slice = zeroed_vec.into_boxed_slice();
203        let data = UnsafeCell::new(boxed_slice);
204        Self {
205            cursor: AtomicUsize::new(0),
206            data,
207        }
208    }
209}
210
211impl<B: Region> BlockQueue<B> {
212    const CAPACITY: usize = 256;
213
214    /// Get an entry
215    fn get_entry(&self, i: usize) -> B {
216        unsafe { (*self.data.get())[i].assume_init() }
217    }
218
219    /// Set an entry.
220    ///
221    /// It's unsafe unless the array is accessed by only one thread (i.e. used as a thread-local array).
222    unsafe fn set_entry(&self, i: usize, block: B) {
223        (*self.data.get())[i].write(block);
224    }
225
226    /// Non-atomically push an element.
227    ///
228    /// It's unsafe unless the array is accessed by only one thread (i.e. used as a thread-local array).
229    unsafe fn push_relaxed(&self, block: B) -> Result<(), B> {
230        let i = self.cursor.load(Ordering::Relaxed);
231        if i < Self::CAPACITY {
232            self.set_entry(i, block);
233            self.cursor.store(i + 1, Ordering::Relaxed);
234            Ok(())
235        } else {
236            Err(block)
237        }
238    }
239
240    /// Atomically pop an element from the array.
241    fn pop(&self) -> Option<B> {
242        let i = self
243            .cursor
244            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |i| {
245                if i > 0 {
246                    Some(i - 1)
247                } else {
248                    None
249                }
250            });
251        if let Ok(i) = i {
252            Some(self.get_entry(i - 1))
253        } else {
254            None
255        }
256    }
257
258    /// Get array size
259    fn len(&self) -> usize {
260        self.cursor.load(Ordering::SeqCst)
261    }
262
263    /// Test if the array is empty
264    fn is_empty(&self) -> bool {
265        self.len() == 0
266    }
267
268    /// Iterate all elements in the array
269    fn iterate_blocks(&self, f: &mut impl FnMut(B)) {
270        let len = self.len();
271        for i in 0..len {
272            f(self.get_entry(i))
273        }
274    }
275
276    /// Replace the array with a new array.
277    ///
278    /// Return the old array
279    fn replace(&self, new_array: Self) -> Self {
280        // Swap cursor
281        let temp = self.cursor.load(Ordering::Relaxed);
282        self.cursor
283            .store(new_array.cursor.load(Ordering::Relaxed), Ordering::Relaxed);
284        new_array.cursor.store(temp, Ordering::Relaxed);
285        // Swap data
286        unsafe {
287            core::ptr::swap(self.data.get(), new_array.data.get());
288        }
289        // Return old array
290        new_array
291    }
292}
293
294/// A block queue which contains a global pool and a set of thread-local queues.
295///
296/// Mutator or collector threads always allocate blocks by poping from the global pool。
297///
298/// Collector threads free blocks to their thread-local queues, and then flush to the global pools before GC ends.
299pub struct BlockPool<B: Region> {
300    /// First global BlockArray for fast allocation
301    head_global_freed_blocks: RwLock<Option<BlockQueue<B>>>,
302    /// A list of BlockArray that is flushed to the global pool
303    global_freed_blocks: RwLock<Vec<BlockQueue<B>>>,
304    /// Thread-local block queues
305    worker_local_freed_blocks: Vec<BlockQueue<B>>,
306    /// Total number of blocks in the whole BlockQueue
307    count: AtomicUsize,
308}
309
310impl<B: Region> BlockPool<B> {
311    /// Create a BlockQueue
312    pub fn new(num_workers: usize) -> Self {
313        Self {
314            head_global_freed_blocks: RwLock::new(None),
315            global_freed_blocks: RwLock::new(vec![]),
316            worker_local_freed_blocks: (0..num_workers).map(|_| BlockQueue::new()).collect(),
317            count: AtomicUsize::new(0),
318        }
319    }
320
321    /// Add a BlockArray to the global pool
322    fn add_global_array(&self, array: BlockQueue<B>) {
323        self.count.fetch_add(array.len(), Ordering::SeqCst);
324        self.global_freed_blocks.write().push(array);
325    }
326
327    /// Push a block to the thread-local queue
328    pub fn push(&self, block: B) {
329        self.count.fetch_add(1, Ordering::SeqCst);
330        let id = crate::scheduler::current_worker_ordinal();
331        let failed = unsafe {
332            self.worker_local_freed_blocks[id]
333                .push_relaxed(block)
334                .is_err()
335        };
336        if failed {
337            let queue = BlockQueue::new();
338            let result = unsafe { queue.push_relaxed(block) };
339            debug_assert!(result.is_ok());
340            let old_queue = self.worker_local_freed_blocks[id].replace(queue);
341            assert!(!old_queue.is_empty());
342            self.global_freed_blocks.write().push(old_queue);
343        }
344    }
345
346    /// Pop a block from the global pool
347    pub fn pop(&self) -> Option<B> {
348        if self.len() == 0 {
349            return None;
350        }
351        let head_global_freed_blocks = self.head_global_freed_blocks.upgradeable_read();
352        if let Some(block) = head_global_freed_blocks.as_ref().and_then(|q| q.pop()) {
353            self.count.fetch_sub(1, Ordering::SeqCst);
354            Some(block)
355        } else {
356            let mut global_freed_blocks = self.global_freed_blocks.write();
357            // Retry fast-alloc
358            if let Some(block) = head_global_freed_blocks.as_ref().and_then(|q| q.pop()) {
359                self.count.fetch_sub(1, Ordering::SeqCst);
360                return Some(block);
361            }
362            // Get a new list of blocks for allocation
363            let blocks = global_freed_blocks.pop()?;
364            let block = blocks.pop().unwrap();
365            if !blocks.is_empty() {
366                let mut head_global_freed_blocks = head_global_freed_blocks.upgrade();
367                debug_assert!(head_global_freed_blocks
368                    .as_ref()
369                    .map(|blocks| blocks.is_empty())
370                    .unwrap_or(true));
371                *head_global_freed_blocks = Some(blocks);
372            }
373            self.count.fetch_sub(1, Ordering::SeqCst);
374            Some(block)
375        }
376    }
377
378    /// Flush a given thread-local queue to the global pool
379    fn flush(&self, id: usize) {
380        if !self.worker_local_freed_blocks[id].is_empty() {
381            let queue = self.worker_local_freed_blocks[id].replace(BlockQueue::new());
382            if !queue.is_empty() {
383                self.global_freed_blocks.write().push(queue)
384            }
385        }
386    }
387
388    /// Flush all thread-local queues to the global pool
389    pub fn flush_all(&self) {
390        if self.len() == 0 {
391            return;
392        }
393        for i in 0..self.worker_local_freed_blocks.len() {
394            self.flush(i)
395        }
396    }
397
398    /// Get total number of blocks in the whole BlockQueue
399    pub fn len(&self) -> usize {
400        self.count.load(Ordering::SeqCst)
401    }
402
403    /// Iterate all the blocks in the BlockQueue
404    pub fn iterate_blocks(&self, f: &mut impl FnMut(B)) {
405        if let Some(array) = &*self.head_global_freed_blocks.read() {
406            array.iterate_blocks(f);
407        }
408        for array in &*self.global_freed_blocks.read() {
409            array.iterate_blocks(f);
410        }
411        for array in &self.worker_local_freed_blocks {
412            array.iterate_blocks(f);
413        }
414    }
415}