Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support lgalloc for columnar #31230

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,17 @@ pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
/// Enable lgalloc for columnation.
pub const ENABLE_COLUMNATION_LGALLOC: Config<bool> = Config::new(
"enable_columnation_lgalloc",
false,
true,
"Enable allocating regions from lgalloc.",
);

/// Enable lgalloc for columnation.
pub const ENABLE_LGALLOC_COLUMNAR: Config<bool> = Config::new(
"enable_lgalloc_columnar",
Comment on lines +58 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Call this enable_columnar_lgalloc for consistency with the columniation flag?

true,
"Enable allocating alignd regions in columnar from lgalloc.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Enable allocating alignd regions in columnar from lgalloc.",
"Enable allocating aligned regions in columnar from lgalloc.",

);

/// Enable lgalloc's eager memory return/reclamation feature.
pub const ENABLE_LGALLOC_EAGER_RECLAMATION: Config<bool> = Config::new(
"enable_lgalloc_eager_reclamation",
Expand Down Expand Up @@ -184,6 +191,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&LINEAR_JOIN_YIELDING)
.add(&ENABLE_COLUMNATION_LGALLOC)
.add(&ENABLE_LGALLOC_EAGER_RECLAMATION)
.add(&ENABLE_LGALLOC_COLUMNAR)
.add(&ENABLE_CHUNKED_STACK)
.add(&COMPUTE_SERVER_MAINTENANCE_INTERVAL)
.add(&DATAFLOW_MAX_INFLIGHT_BYTES)
Expand Down
3 changes: 3 additions & 0 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ impl ComputeState {
info!("using chunked stack: {chunked_stack}");
mz_timely_util::containers::stack::use_chunked_stack(chunked_stack);

let enable_lgalloc_columnar = ENABLE_LGALLOC_COLUMNAR.get(config);
mz_timely_util::containers::set_enable_lgalloc_columnar(enable_lgalloc_columnar);

// Remember the maintenance interval locally to avoid reading it from the config set on
// every server iteration.
self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
Expand Down
44 changes: 44 additions & 0 deletions src/ore/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ impl<T> Region<T> {
/// Otherwise, the vector representation could try to reallocate the underlying memory
/// using the global allocator, which would cause problems because the memory might not
/// have originated from it. This is undefined behavior.
///
/// Private because it is too dangerous to expose to the public.
#[inline]
unsafe fn as_mut_vec(&mut self) -> &mut Vec<T> {
match self {
Expand All @@ -342,6 +344,48 @@ impl<T> Region<T> {
}
}

impl Region<u64> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this exist because of columnar's u64 bias? If so, is this great news and we love it, or is this a work around and we're not happy? Mostly trying to grok what's going on here and for what reason, which I imagine is clearer to you than it is to me.

/// Create a new file-based mapped region of a specific capacity, initialized to 0. The
/// capacity of the returned region can be larger than requested to accommodate page sizes.
///
/// # Errors
///
/// Returns an error if the memory allocation fails.
#[inline(always)]
pub fn new_mmap_zeroed(capacity: usize) -> Result<Region<u64>, lgalloc::AllocError> {
lgalloc::allocate::<u64>(capacity).map(|(ptr, capacity, handle)| {
// SAFETY: `allocate` returns a valid memory block
unsafe { ptr.as_ptr().write_bytes(0, capacity) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like something that could introduce performance regressions once we start switching more things from columnation to columnar. Is the zeroing required for columnar to work?

// SAFETY: `ptr` points to suitable memory.
// It is UB to call `from_raw_parts` with a pointer not allocated from the global
// allocator, but we accept this here because we promise never to reallocate the vector.
let inner =
ManuallyDrop::new(unsafe { Vec::from_raw_parts(ptr.as_ptr(), capacity, capacity) });
let handle = Some(handle);
Region::MMap(MMapRegion { inner, handle })
})
}

/// Allocate a zeroed region on the heap.
#[inline(always)]
pub fn new_heap_zeroed(capacity: usize) -> Self {
Region::Heap(vec![0; capacity])
}

/// Construct a new region with the specified capacity, initialized to 0.
pub fn new_auto_zeroed(capacity: usize) -> Self {
match Region::new_mmap_zeroed(capacity) {
Ok(r) => return r,
Err(lgalloc::AllocError::Disabled) | Err(lgalloc::AllocError::InvalidSizeClass(_)) => {}
Err(e) => {
eprintln!("lgalloc error: {e}, falling back to heap");
}
}
// Fall-through
Region::Heap(vec![0; capacity])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Region::Heap(vec![0; capacity])
Self::new_heap_zeroed(capacity)

}
}

impl<T: Clone> Region<T> {
/// Extend the region from a slice.
///
Expand Down
88 changes: 71 additions & 17 deletions src/timely-util/src/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ mod container {
use columnar::Container as _;
use columnar::{AsBytes, Clear, FromBytes, Index, Len};
use mz_ore::cast::CastFrom;
use mz_ore::region::Region;
use timely::bytes::arc::Bytes;
use timely::container::PushInto;
use timely::dataflow::channels::ContainerBytes;
use timely::Container;

use crate::containers::alloc_aligned_zeroed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this import redundant, since it's importing from the current module?


/// A container based on a columnar store, encoded in aligned bytes.
///
/// The type can represent typed data, bytes from Timely, or an aligned allocation. The name
Expand All @@ -47,7 +50,7 @@ mod container {
///
/// Reasons could include misalignment, cloning of data, or wanting
/// to release the `Bytes` as a scarce resource.
Align(Box<[u64]>),
Align(Region<u64>),
}

impl<C: Columnar> Column<C> {
Expand Down Expand Up @@ -86,11 +89,15 @@ mod container {
Column::Typed(t) => Column::Typed(t.clone()),
Column::Bytes(b) => {
assert_eq!(b.len() % 8, 0);
let mut alloc: Vec<u64> = vec![0; b.len() / 8];
let mut alloc: Region<u64> = alloc_aligned_zeroed(b.len() / 8);
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]);
Self::Align(alloc.into())
Self::Align(alloc)
}
Column::Align(a) => {
let mut alloc = alloc_aligned_zeroed(a.len());
alloc.extend_from_slice(&a[..]);
Column::Align(alloc)
}
Column::Align(a) => Column::Align(a.clone()),
}
}
}
Expand All @@ -107,8 +114,7 @@ mod container {
fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Column::Bytes(_) => *self = Column::Typed(Default::default()),
Column::Align(_) => *self = Column::Typed(Default::default()),
Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()),
}
}

Expand Down Expand Up @@ -155,9 +161,9 @@ mod container {
Self::Bytes(bytes)
} else {
// We failed to cast the slice, so we'll reallocate.
let mut alloc: Vec<u64> = vec![0; bytes.len() / 8];
let mut alloc: Region<u64> = alloc_aligned_zeroed(bytes.len() / 8);
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]);
Self::Align(alloc.into())
Self::Align(alloc)
}
}

Expand Down Expand Up @@ -194,16 +200,24 @@ mod container {
}
}

pub use builder::ColumnBuilder;
pub(self) use builder::alloc_aligned_zeroed;
pub use builder::{set_enable_lgalloc_columnar, ColumnBuilder};
mod builder {
use std::collections::VecDeque;
use std::io::Write;

use columnar::{AsBytes, Clear, Columnar, Len, Push};
use mz_ore::cast::CastFrom;
use mz_ore::region::Region;
use timely::container::PushInto;
use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};

use super::Column;

thread_local! {
static ENABLE_LGALLOC_COLUMNAR: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static ENABLE_LGALLOC_COLUMNAR: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) };
static ENABLE_LGALLOC_COLUMNAR: std::cell::RefCell<bool> = std::cell::RefCell::new(false);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also use an AtomicBool here, to rule out the possibility of panics.

}

/// A container builder for `Column<C>`.
pub struct ColumnBuilder<C: Columnar> {
/// Container that we're writing to.
Expand All @@ -217,6 +231,28 @@ mod builder {
pending: VecDeque<Column<C>>,
}

/// Returns `true` if columnar allocations should come from lgalloc.
#[inline]
pub fn enable_lgalloc_columnar() -> bool {
ENABLE_LGALLOC_COLUMNAR.with(|enabled| *enabled.borrow())
}

/// Set whether columnar allocations should come from lgalloc. Applies to future allocations.
pub fn set_enable_lgalloc_columnar(enabled: bool) {
ENABLE_LGALLOC_COLUMNAR.with(|enable| *enable.borrow_mut() = enabled);
}

/// Allocate a region of memory with a capacity of at least `len` that is aligned to 8 bytes
/// and zeroed.
#[inline]
pub(crate) fn alloc_aligned_zeroed(len: usize) -> Region<u64> {
if enable_lgalloc_columnar() {
Region::new_auto_zeroed(len)
} else {
Region::new_heap_zeroed(len)
}
}
Comment on lines +245 to +254
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind adding to this comment that it's aligned to 8 bytes because it's a Region of u64s?


impl<C: Columnar, T> PushInto<T> for ColumnBuilder<C>
where
C::Container: Push<T>,
Expand All @@ -229,14 +265,32 @@ mod builder {
let words = self.current.borrow().length_in_words();
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
if round - words < round / 10 {
let mut alloc = Vec::with_capacity(round);
columnar::bytes::serialization::encode(
&mut alloc,
self.current.borrow().as_bytes(),
);
self.pending
.push_back(Column::Align(alloc.into_boxed_slice()));
self.current.clear();
/// Move the contents from `current` to an aligned allocation, and push it to `pending`.
/// The contents must fit in `round` words (u64).
#[cold]
fn outlined_align<C>(
current: &mut C::Container,
round: usize,
pending: &mut VecDeque<Column<C>>,
) where
C: Columnar,
{
let mut alloc = alloc_aligned_zeroed(round);
let mut writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..]));
for (align, bytes) in current.borrow().as_bytes() {
assert!(align <= 8);
let length = u64::cast_from(bytes.len());
let length_slice = bytemuck::cast_slice(std::slice::from_ref(&length));
writer.write_all(length_slice).unwrap();
writer.write_all(bytes).unwrap();
let padding = usize::cast_from((8 - (length % 8)) % 8);
writer.write_all(&[0; 8][..padding]).unwrap();
Comment on lines +281 to +287
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not totally up-to-date on the encoded layout of a Column, but my read is this layouts data like:

<u64 encoded in native endianness><buffer whose length is the u64><optional padding>

If that's what it's supposed to do then it LGTM!

Comment on lines +282 to +287
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a columnar method we could invoke so we don't have to worry about this code diverging from the columnar implementation?

}
pending.push_back(Column::Align(alloc));
current.clear();
}

outlined_align(&mut self.current, round, &mut self.pending);
}
}
}
Expand Down
26 changes: 25 additions & 1 deletion src/timely-util/src/containers/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! An array of fixed length, allocated from lgalloc if possible.

use std::mem::{ManuallyDrop, MaybeUninit};
use std::ops::Deref;
use std::ops::{Deref, DerefMut};

/// A fixed-length region in memory, which is either allocated from heap or lgalloc.
pub struct Array<T> {
Expand Down Expand Up @@ -102,6 +102,14 @@ impl<T> Array<T> {
}
}

impl<T: Clone> Clone for Array<T> {
fn clone(&self) -> Self {
let mut clone = Self::with_capacity(self.length);
clone.deref_mut().clone_from_slice(&**self);
clone
}
}

impl<T> Deref for Array<T> {
type Target = [T];

Expand All @@ -120,6 +128,22 @@ impl<T> Deref for Array<T> {
}
}

impl<T> DerefMut for Array<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
// TODO: Use `slice_assume_init_ref` once stable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably slice_assume_init_mut rather than _ref, I think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If nothing else, we'll want to have the unsafe justification for _mut instead, I think. Maybe a nit, but "have the right explanation when we use unsafe" seems like a great place to pick nits.

// Context: https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.slice_assume_init_ref
// The following safety argument is adapted from the source.
// SAFETY: casting `elements` to a `*const [T]` is safe since the caller guarantees that
// `slice` is initialized, and `MaybeUninit` is guaranteed to have the same layout as `T`.
// The pointer obtained is valid since it refers to memory owned by `elements` which is a
// reference and thus guaranteed to be valid for reads.
Comment on lines +136 to +139
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This safety argument talks about *const and reads, but we're casting to a *mut and deference that for writes. I think a similar argument still holds, based on that elements is mutably borrowed here, so we can produce a &mut into it.

#[allow(clippy::as_conversions)]
unsafe {
&mut *(&mut self.elements[..self.length] as *mut [MaybeUninit<T>] as *mut [T])
}
}
}

impl<T> Drop for Array<T> {
fn drop(&mut self) {
self.clear();
Expand Down