1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
use crate::global_state::GlobalState;
use crate::plan::PlanConstraints;
use crate::scheduler::GCWorkScheduler;
use crate::util::conversions::*;
use crate::util::metadata::side_metadata::{
    SideMetadataContext, SideMetadataSanity, SideMetadataSpec,
};
use crate::util::object_enum::ObjectEnumerator;
use crate::util::Address;
use crate::util::ObjectReference;

use crate::util::heap::layout::vm_layout::{vm_layout, LOG_BYTES_IN_CHUNK};
use crate::util::heap::{PageResource, VMRequest};
use crate::util::options::Options;
use crate::vm::{ActivePlan, Collection};

use crate::util::constants::{LOG_BYTES_IN_MBYTE, LOG_BYTES_IN_PAGE};
use crate::util::conversions;
use crate::util::opaque_pointer::*;

use crate::mmtk::SFT_MAP;
#[cfg(debug_assertions)]
use crate::policy::sft::EMPTY_SFT_NAME;
use crate::policy::sft::SFT;
use crate::util::copy::*;
use crate::util::heap::gc_trigger::GCTrigger;
use crate::util::heap::layout::vm_layout::BYTES_IN_CHUNK;
use crate::util::heap::layout::Mmapper;
use crate::util::heap::layout::VMMap;
use crate::util::heap::space_descriptor::SpaceDescriptor;
use crate::util::heap::HeapMeta;
use crate::util::memory::{self, HugePageSupport, MmapProtection, MmapStrategy};
use crate::vm::VMBinding;

use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::Mutex;

use downcast_rs::Downcast;

pub trait Space<VM: VMBinding>: 'static + SFT + Sync + Downcast {
    fn as_space(&self) -> &dyn Space<VM>;
    fn as_sft(&self) -> &(dyn SFT + Sync + 'static);
    fn get_page_resource(&self) -> &dyn PageResource<VM>;

    /// Get a mutable reference to the underlying page resource, or `None` if the space does not
    /// have a page resource.
    fn maybe_get_page_resource_mut(&mut self) -> Option<&mut dyn PageResource<VM>>;

    /// Initialize entires in SFT map for the space. This is called when the Space object
    /// has a non-moving address, as we will use the address to set sft.
    /// Currently after we create a boxed plan, spaces in the plan have a non-moving address.
    fn initialize_sft(&self, sft_map: &mut dyn crate::policy::sft_map::SFTMap);

    /// A check for the obvious out-of-memory case: if the requested size is larger than
    /// the heap size, it is definitely an OOM. We would like to identify that, and
    /// allows the binding to deal with OOM. Without this check, we will attempt
    /// to allocate from the page resource. If the requested size is unrealistically large
    /// (such as `usize::MAX`), it breaks the assumptions of our implementation of
    /// page resource, vm map, etc. This check prevents that, and allows us to
    /// handle the OOM case.
    /// Each allocator that may request an arbitrary size should call this method before
    /// acquring memory from the space. For example, bump pointer allocator and large object
    /// allocator need to call this method. On the other hand, allocators that only allocate
    /// memory in fixed size blocks do not need to call this method.
    /// An allocator should call this method before doing any computation on the size to
    /// avoid arithmatic overflow. If we have to do computation in the allocation fastpath and
    /// overflow happens there, there is nothing we can do about it.
    /// Return a boolean to indicate if we will be out of memory, determined by the check.
    fn will_oom_on_acquire(&self, tls: VMThread, size: usize) -> bool {
        let max_pages = self.get_gc_trigger().policy.get_max_heap_size_in_pages();
        let requested_pages = size >> LOG_BYTES_IN_PAGE;
        if requested_pages > max_pages {
            VM::VMCollection::out_of_memory(
                tls,
                crate::util::alloc::AllocationError::HeapOutOfMemory,
            );
            return true;
        }
        false
    }

    fn acquire(&self, tls: VMThread, pages: usize) -> Address {
        trace!("Space.acquire, tls={:?}", tls);

        debug_assert!(
            !self.will_oom_on_acquire(tls, pages << LOG_BYTES_IN_PAGE),
            "The requested pages is larger than the max heap size. Is will_go_oom_on_acquire used before acquring memory?"
        );

        // Should we poll to attempt to GC?
        // - If tls is collector, we cannot attempt a GC.
        // - If gc is disabled, we cannot attempt a GC.
        let should_poll =
            VM::VMActivePlan::is_mutator(tls) && VM::VMCollection::is_collection_enabled();
        // Is a GC allowed here? If we should poll but are not allowed to poll, we will panic.
        // initialize_collection() has to be called so we know GC is initialized.
        let allow_gc = should_poll && self.common().global_state.is_initialized();

        trace!("Reserving pages");
        let pr = self.get_page_resource();
        let pages_reserved = pr.reserve_pages(pages);
        trace!("Pages reserved");
        trace!("Polling ..");

        if should_poll && self.get_gc_trigger().poll(false, Some(self.as_space())) {
            debug!("Collection required");
            assert!(allow_gc, "GC is not allowed here: collection is not initialized (did you call initialize_collection()?).");

            // Clear the request, and inform GC trigger about the pending allocation.
            pr.clear_request(pages_reserved);
            self.get_gc_trigger()
                .policy
                .on_pending_allocation(pages_reserved);

            VM::VMCollection::block_for_gc(VMMutatorThread(tls)); // We have checked that this is mutator
            unsafe { Address::zero() }
        } else {
            debug!("Collection not required");

            // We need this lock: Othrewise, it is possible that one thread acquires pages in a new chunk, but not yet
            // set SFT for it (in grow_space()), and another thread acquires pages in the same chunk, which is not
            // a new chunk so grow_space() won't be called on it. The second thread could return a result in the chunk before
            // its SFT is properly set.
            // We need to minimize the scope of this lock for performance when we have many threads (mutator threads, or GC threads with copying allocators).
            // See: https://github.com/mmtk/mmtk-core/issues/610
            let lock = self.common().acquire_lock.lock().unwrap();

            match pr.get_new_pages(self.common().descriptor, pages_reserved, pages, tls) {
                Ok(res) => {
                    debug!(
                        "Got new pages {} ({} pages) for {} in chunk {}, new_chunk? {}",
                        res.start,
                        res.pages,
                        self.get_name(),
                        conversions::chunk_align_down(res.start),
                        res.new_chunk
                    );
                    let bytes = conversions::pages_to_bytes(res.pages);

                    let mmap = || {
                        // Mmap the pages and the side metadata, and handle error. In case of any error,
                        // we will either call back to the VM for OOM, or simply panic.
                        if let Err(mmap_error) = self
                            .common()
                            .mmapper
                            .ensure_mapped(
                                res.start,
                                res.pages,
                                self.common().mmap_strategy(),
                                &memory::MmapAnnotation::Space {
                                    name: self.get_name(),
                                },
                            )
                            .and(self.common().metadata.try_map_metadata_space(
                                res.start,
                                bytes,
                                self.get_name(),
                            ))
                        {
                            memory::handle_mmap_error::<VM>(mmap_error, tls, res.start, bytes);
                        }
                    };
                    let grow_space = || {
                        self.grow_space(res.start, bytes, res.new_chunk);
                    };

                    // The scope of the lock is important in terms of performance when we have many allocator threads.
                    if SFT_MAP.get_side_metadata().is_some() {
                        // If the SFT map uses side metadata, so we have to initialize side metadata first.
                        mmap();
                        // then grow space, which will use the side metadata we mapped above
                        grow_space();
                        // then we can drop the lock after grow_space()
                        drop(lock);
                    } else {
                        // In normal cases, we can drop lock immediately after grow_space()
                        grow_space();
                        drop(lock);
                        // and map side metadata without holding the lock
                        mmap();
                    }

                    // TODO: Concurrent zeroing
                    if self.common().zeroed {
                        memory::zero(res.start, bytes);
                    }

                    // Some assertions
                    {
                        // --- Assert the start of the allocated region ---
                        // The start address SFT should be correct.
                        debug_assert_eq!(SFT_MAP.get_checked(res.start).name(), self.get_name());
                        // The start address is in our space.
                        debug_assert!(self.address_in_space(res.start));
                        // The descriptor should be correct.
                        debug_assert_eq!(
                            self.common().vm_map().get_descriptor_for_address(res.start),
                            self.common().descriptor
                        );

                        // --- Assert the last byte in the allocated region ---
                        let last_byte = res.start + bytes - 1;
                        // The SFT for the last byte in the allocated memory should be correct.
                        debug_assert_eq!(SFT_MAP.get_checked(last_byte).name(), self.get_name());
                        // The last byte in the allocated memory should be in this space.
                        debug_assert!(self.address_in_space(last_byte));
                        // The descriptor for the last byte should be correct.
                        debug_assert_eq!(
                            self.common().vm_map().get_descriptor_for_address(last_byte),
                            self.common().descriptor
                        );
                    }

                    debug!("Space.acquire(), returned = {}", res.start);
                    res.start
                }
                Err(_) => {
                    drop(lock); // drop the lock immediately

                    // We thought we had memory to allocate, but somehow failed the allocation. Will force a GC.
                    assert!(
                        allow_gc,
                        "Physical allocation failed when GC is not allowed!"
                    );

                    let gc_performed = self.get_gc_trigger().poll(true, Some(self.as_space()));
                    debug_assert!(gc_performed, "GC not performed when forced.");

                    // Clear the request, and inform GC trigger about the pending allocation.
                    pr.clear_request(pages_reserved);
                    self.get_gc_trigger()
                        .policy
                        .on_pending_allocation(pages_reserved);

                    VM::VMCollection::block_for_gc(VMMutatorThread(tls)); // We asserted that this is mutator.
                    unsafe { Address::zero() }
                }
            }
        }
    }

    fn address_in_space(&self, start: Address) -> bool {
        if !self.common().descriptor.is_contiguous() {
            self.common().vm_map().get_descriptor_for_address(start) == self.common().descriptor
        } else {
            start >= self.common().start && start < self.common().start + self.common().extent
        }
    }

    fn in_space(&self, object: ObjectReference) -> bool {
        self.address_in_space(object.to_raw_address())
    }

    /**
     * This is called after we get result from page resources.  The space may
     * tap into the hook to monitor heap growth.  The call is made from within the
     * page resources' critical region, immediately before yielding the lock.
     *
     * @param start The start of the newly allocated space
     * @param bytes The size of the newly allocated space
     * @param new_chunk {@code true} if the new space encroached upon or started a new chunk or chunks.
     */
    fn grow_space(&self, start: Address, bytes: usize, new_chunk: bool) {
        trace!(
            "Grow space from {} for {} bytes (new chunk = {})",
            start,
            bytes,
            new_chunk
        );

        // If this is not a new chunk, the SFT for [start, start + bytes) should alreayd be initialized.
        #[cfg(debug_assertions)]
        if !new_chunk {
            debug_assert!(
                SFT_MAP.get_checked(start).name() != EMPTY_SFT_NAME,
                "In grow_space(start = {}, bytes = {}, new_chunk = {}), we have empty SFT entries (chunk for {} = {})",
                start,
                bytes,
                new_chunk,
                start,
                SFT_MAP.get_checked(start).name()
            );
            debug_assert!(
                SFT_MAP.get_checked(start + bytes - 1).name() != EMPTY_SFT_NAME,
                "In grow_space(start = {}, bytes = {}, new_chunk = {}), we have empty SFT entries (chunk for {} = {})",
                start,
                bytes,
                new_chunk,
                start + bytes - 1,
                SFT_MAP.get_checked(start + bytes - 1).name()
            );
        }

        if new_chunk {
            unsafe { SFT_MAP.update(self.as_sft(), start, bytes) };
        }
    }

    /// Ensure this space is marked as mapped -- used when the space is already
    /// mapped (e.g. for a vm image which is externally mmapped.)
    fn ensure_mapped(&self) {
        self.common()
            .metadata
            .try_map_metadata_space(self.common().start, self.common().extent, self.get_name())
            .unwrap_or_else(|e| {
                // TODO(Javad): handle meta space allocation failure
                panic!("failed to mmap meta memory: {e}");
            });

        self.common()
            .mmapper
            .mark_as_mapped(self.common().start, self.common().extent);
    }

    fn reserved_pages(&self) -> usize {
        let data_pages = self.get_page_resource().reserved_pages();
        let meta_pages = self.common().metadata.calculate_reserved_pages(data_pages);
        data_pages + meta_pages
    }

    /// Return the number of physical pages available.
    fn available_physical_pages(&self) -> usize {
        self.get_page_resource().get_available_physical_pages()
    }

    fn get_name(&self) -> &'static str {
        self.common().name
    }

    fn get_descriptor(&self) -> SpaceDescriptor {
        self.common().descriptor
    }

    fn common(&self) -> &CommonSpace<VM>;
    fn get_gc_trigger(&self) -> &GCTrigger<VM> {
        self.common().gc_trigger.as_ref()
    }

    fn release_multiple_pages(&mut self, start: Address);

    /// What copy semantic we should use for this space if we copy objects from this space.
    /// This is only needed for plans that use SFTProcessEdges
    fn set_copy_for_sft_trace(&mut self, _semantics: Option<CopySemantics>) {
        panic!("A copying space should override this method")
    }

    /// Ensure that the current space's metadata context does not have any issues.
    /// Panics with a suitable message if any issue is detected.
    /// It also initialises the sanity maps which will then be used if the `extreme_assertions` feature is active.
    /// Internally this calls verify_metadata_context() from `util::metadata::sanity`
    ///
    /// This function is called once per space by its parent plan but may be called multiple times per policy.
    ///
    /// Arguments:
    /// * `side_metadata_sanity_checker`: The `SideMetadataSanity` object instantiated in the calling plan.
    fn verify_side_metadata_sanity(&self, side_metadata_sanity_checker: &mut SideMetadataSanity) {
        side_metadata_sanity_checker
            .verify_metadata_context(std::any::type_name::<Self>(), &self.common().metadata)
    }

    /// Enumerate objects in the current space.
    ///
    /// Implementers can use the `enumerator` to report
    ///
    /// -   individual objects within the space using `enumerator.visit_object`, and
    /// -   ranges of address that may contain objects using `enumerator.visit_address_range`. The
    ///     caller will then enumerate objects in the range using the VO bits metadata.
    ///
    /// Each object in the space shall be covered by one of the two methods above.
    ///
    /// # Implementation considerations
    ///
    /// **Skipping empty ranges**: When enumerating address ranges, spaces can skip ranges (blocks,
    /// chunks, etc.) that are guarenteed not to contain objects.
    ///
    /// **Dynamic dispatch**: Because `Space` is a trait object type and `enumerator` is a `dyn`
    /// reference, invoking methods of `enumerator` involves a dynamic dispatching.  But the
    /// overhead is OK if we call it a block at a time because scanning the VO bits will dominate
    /// the execution time.  For LOS, it will be cheaper to enumerate individual objects than
    /// scanning VO bits because it is sparse.
    fn enumerate_objects(&self, enumerator: &mut dyn ObjectEnumerator);
}

/// Print the VM map for a space.
/// Space needs to be object-safe, so it cannot have methods that use extra generic type paramters. So this method is placed outside the Space trait.
/// This method can be invoked on a &dyn Space (space.as_space() will return &dyn Space).
#[allow(unused)]
pub(crate) fn print_vm_map<VM: VMBinding>(
    space: &dyn Space<VM>,
    out: &mut impl std::fmt::Write,
) -> Result<(), std::fmt::Error> {
    let common = space.common();
    write!(out, "{} ", common.name)?;
    if common.immortal {
        write!(out, "I")?;
    } else {
        write!(out, " ")?;
    }
    if common.movable {
        write!(out, " ")?;
    } else {
        write!(out, "N")?;
    }
    write!(out, " ")?;
    if common.contiguous {
        write!(
            out,
            "{}->{}",
            common.start,
            common.start + common.extent - 1
        )?;
        match common.vmrequest {
            VMRequest::Extent { extent, .. } => {
                write!(out, " E {}", extent)?;
            }
            VMRequest::Fraction { frac, .. } => {
                write!(out, " F {}", frac)?;
            }
            _ => {}
        }
    } else {
        let mut a = space
            .get_page_resource()
            .common()
            .get_head_discontiguous_region();
        while !a.is_zero() {
            write!(
                out,
                "{}->{}",
                a,
                a + space.common().vm_map().get_contiguous_region_size(a) - 1
            )?;
            a = space.common().vm_map().get_next_contiguous_region(a);
            if !a.is_zero() {
                write!(out, " ")?;
            }
        }
    }
    writeln!(out)?;

    Ok(())
}

impl_downcast!(Space<VM> where VM: VMBinding);

pub struct CommonSpace<VM: VMBinding> {
    pub name: &'static str,
    pub descriptor: SpaceDescriptor,
    pub vmrequest: VMRequest,

    /// For a copying space that allows sft_trace_object(), this should be set before each GC so we know
    // the copy semantics for the space.
    pub copy: Option<CopySemantics>,

    pub immortal: bool,
    pub movable: bool,
    pub contiguous: bool,
    pub zeroed: bool,

    pub permission_exec: bool,

    pub start: Address,
    pub extent: usize,

    pub vm_map: &'static dyn VMMap,
    pub mmapper: &'static dyn Mmapper,

    pub(crate) metadata: SideMetadataContext,

    /// This field equals to needs_log_bit in the plan constraints.
    // TODO: This should be a constant for performance.
    pub needs_log_bit: bool,

    /// A lock used during acquire() to make sure only one thread can allocate.
    pub acquire_lock: Mutex<()>,

    pub gc_trigger: Arc<GCTrigger<VM>>,
    pub global_state: Arc<GlobalState>,
    pub options: Arc<Options>,

    p: PhantomData<VM>,
}

/// Arguments passed from a policy to create a space. This includes policy specific args.
pub struct PolicyCreateSpaceArgs<'a, VM: VMBinding> {
    pub plan_args: PlanCreateSpaceArgs<'a, VM>,
    pub movable: bool,
    pub immortal: bool,
    pub local_side_metadata_specs: Vec<SideMetadataSpec>,
}

/// Arguments passed from a plan to create a space.
pub struct PlanCreateSpaceArgs<'a, VM: VMBinding> {
    pub name: &'static str,
    pub zeroed: bool,
    pub permission_exec: bool,
    pub vmrequest: VMRequest,
    pub global_side_metadata_specs: Vec<SideMetadataSpec>,
    pub vm_map: &'static dyn VMMap,
    pub mmapper: &'static dyn Mmapper,
    pub heap: &'a mut HeapMeta,
    pub constraints: &'a PlanConstraints,
    pub gc_trigger: Arc<GCTrigger<VM>>,
    pub scheduler: Arc<GCWorkScheduler<VM>>,
    pub options: Arc<Options>,
    pub global_state: Arc<GlobalState>,
}

impl<'a, VM: VMBinding> PlanCreateSpaceArgs<'a, VM> {
    /// Turning PlanCreateSpaceArgs into a PolicyCreateSpaceArgs
    pub fn into_policy_args(
        self,
        movable: bool,
        immortal: bool,
        policy_metadata_specs: Vec<SideMetadataSpec>,
    ) -> PolicyCreateSpaceArgs<'a, VM> {
        PolicyCreateSpaceArgs {
            movable,
            immortal,
            local_side_metadata_specs: policy_metadata_specs,
            plan_args: self,
        }
    }
}

impl<VM: VMBinding> CommonSpace<VM> {
    pub fn new(args: PolicyCreateSpaceArgs<VM>) -> Self {
        let mut rtn = CommonSpace {
            name: args.plan_args.name,
            descriptor: SpaceDescriptor::UNINITIALIZED,
            vmrequest: args.plan_args.vmrequest,
            copy: None,
            immortal: args.immortal,
            movable: args.movable,
            contiguous: true,
            permission_exec: args.plan_args.permission_exec,
            zeroed: args.plan_args.zeroed,
            start: unsafe { Address::zero() },
            extent: 0,
            vm_map: args.plan_args.vm_map,
            mmapper: args.plan_args.mmapper,
            needs_log_bit: args.plan_args.constraints.needs_log_bit,
            gc_trigger: args.plan_args.gc_trigger,
            metadata: SideMetadataContext {
                global: args.plan_args.global_side_metadata_specs,
                local: args.local_side_metadata_specs,
            },
            acquire_lock: Mutex::new(()),
            global_state: args.plan_args.global_state,
            options: args.plan_args.options.clone(),
            p: PhantomData,
        };

        let vmrequest = args.plan_args.vmrequest;
        if vmrequest.is_discontiguous() {
            rtn.contiguous = false;
            // FIXME
            rtn.descriptor = SpaceDescriptor::create_descriptor();
            // VM.memory.setHeapRange(index, HEAP_START, HEAP_END);
            return rtn;
        }

        let (extent, top) = match vmrequest {
            VMRequest::Fraction { frac, top: _top } => (get_frac_available(frac), _top),
            VMRequest::Extent {
                extent: _extent,
                top: _top,
            } => (_extent, _top),
            VMRequest::Fixed {
                extent: _extent, ..
            } => (_extent, false),
            _ => unreachable!(),
        };

        assert!(
            extent == raw_align_up(extent, BYTES_IN_CHUNK),
            "{} requested non-aligned extent: {} bytes",
            rtn.name,
            extent
        );

        let start = if let VMRequest::Fixed { start: _start, .. } = vmrequest {
            _start
        } else {
            // FIXME
            //if (HeapLayout.vmMap.isFinalized()) VM.assertions.fail("heap is narrowed after regionMap is finalized: " + name);
            args.plan_args.heap.reserve(extent, top)
        };
        assert!(
            start == chunk_align_up(start),
            "{} starting on non-aligned boundary: {}",
            rtn.name,
            start
        );

        rtn.contiguous = true;
        rtn.start = start;
        rtn.extent = extent;
        // FIXME
        rtn.descriptor = SpaceDescriptor::create_descriptor_from_heap_range(start, start + extent);
        // VM.memory.setHeapRange(index, start, start.plus(extent));

        // We only initialize our vm map if the range of the space is in our available heap range. For normally spaces,
        // they are definitely in our heap range. But for VM space, a runtime could give us an arbitrary range. We only
        // insert into our vm map if the range overlaps with our heap.
        {
            use crate::util::heap::layout;
            let overlap =
                Address::range_intersection(&(start..start + extent), &layout::available_range());
            if !overlap.is_empty() {
                args.plan_args.vm_map.insert(
                    overlap.start,
                    overlap.end - overlap.start,
                    rtn.descriptor,
                );
            }
        }

        // For contiguous space, we know its address range so we reserve metadata memory for its range.
        rtn.metadata
            .try_map_metadata_address_range(rtn.start, rtn.extent, rtn.name)
            .unwrap_or_else(|e| {
                // TODO(Javad): handle meta space allocation failure
                panic!("failed to mmap meta memory: {e}");
            });

        debug!(
            "Created space {} [{}, {}) for {} bytes",
            rtn.name,
            start,
            start + extent,
            extent
        );

        rtn
    }

    pub fn initialize_sft(
        &self,
        sft: &(dyn SFT + Sync + 'static),
        sft_map: &mut dyn crate::policy::sft_map::SFTMap,
    ) {
        // We have to keep this for now: if a space is contiguous, our page resource will NOT consider newly allocated chunks
        // as new chunks (new_chunks = true). In that case, in grow_space(), we do not set SFT when new_chunks = false.
        // We can fix this by either of these:
        // * fix page resource, so it propelry returns new_chunk
        // * change grow_space() so it sets SFT no matter what the new_chunks value is.
        // FIXME: eagerly initializing SFT is not a good idea.
        if self.contiguous {
            unsafe { sft_map.eager_initialize(sft, self.start, self.extent) };
        }
    }

    pub fn vm_map(&self) -> &'static dyn VMMap {
        self.vm_map
    }

    pub fn mmap_strategy(&self) -> MmapStrategy {
        MmapStrategy {
            huge_page: if *self.options.transparent_hugepages {
                HugePageSupport::TransparentHugePages
            } else {
                HugePageSupport::No
            },
            prot: if self.permission_exec || cfg!(feature = "exec_permission_on_all_spaces") {
                MmapProtection::ReadWriteExec
            } else {
                MmapProtection::ReadWrite
            },
        }
    }
}

fn get_frac_available(frac: f32) -> usize {
    trace!("AVAILABLE_START={}", vm_layout().available_start());
    trace!("AVAILABLE_END={}", vm_layout().available_end());
    let bytes = (frac * vm_layout().available_bytes() as f32) as usize;
    trace!("bytes={}*{}={}", frac, vm_layout().available_bytes(), bytes);
    let mb = bytes >> LOG_BYTES_IN_MBYTE;
    let rtn = mb << LOG_BYTES_IN_MBYTE;
    trace!("rtn={}", rtn);
    let aligned_rtn = raw_align_up(rtn, BYTES_IN_CHUNK);
    trace!("aligned_rtn={}", aligned_rtn);
    aligned_rtn
}

pub fn required_chunks(pages: usize) -> usize {
    let extent = raw_align_up(pages_to_bytes(pages), BYTES_IN_CHUNK);
    extent >> LOG_BYTES_IN_CHUNK
}