From df1fccdc6d8414db352fb5802a07ffa9912f2eea Mon Sep 17 00:00:00 2001 From: Cassy343 Date: Wed, 20 Jul 2022 14:46:17 -0400 Subject: [PATCH] Internal code improvements --- Cargo.toml | 4 + src/core/handle.rs | 223 ------------------------------------------- src/core/mod.rs | 232 ++++++++++++++++++++++++++++++++++++++++++++- src/core/store.rs | 11 +-- src/lib.rs | 4 +- src/read.rs | 20 ++-- src/util/mod.rs | 14 +++ src/view.rs | 1 + src/write.rs | 27 +++--- 9 files changed, 280 insertions(+), 256 deletions(-) delete mode 100644 src/core/handle.rs diff --git a/Cargo.toml b/Cargo.toml index 2cb283e..28e26d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,10 @@ features = ["inline-more"] [target.'cfg(loom)'.dependencies] loom = { version = "0.5.6", features = ["checkpoint"] } +[features] +default = [] +nightly = [] + [profile.loomtest] inherits = "release" debug = 2 diff --git a/src/core/handle.rs b/src/core/handle.rs deleted file mode 100644 index a2635ee..0000000 --- a/src/core/handle.rs +++ /dev/null @@ -1,223 +0,0 @@ -use hashbrown::hash_map::DefaultHashBuilder; -use slab::Slab; - -use crate::{ - core::MapIndex, - loom::{ - cell::{Cell, UnsafeCell}, - sync::{ - atomic::{fence, AtomicIsize, AtomicU8, Ordering}, - Arc, Mutex, - }, - thread::{self, Thread}, - }, - util::{cold, lock, Alias}, -}; -use crate::{util::CachePadded, Map, ReadHandle, WriteHandle}; -use crate::{Builder, BuilderArgs}; -use std::hash::{BuildHasher, Hash}; -use std::marker::PhantomData; -use std::process::abort; -use std::ptr::{self, NonNull}; - -use super::{OwnedMapAccess, RefCount}; - -const WRITABLE: u8 = 0; -const NOT_WRITABLE: u8 = 1; -const WAITING_ON_READERS: u8 = 2; - -pub struct Handle { - residual: AtomicIsize, - // All readers need to be dropped before we're dropped, so we don't need to worry about - // freeing any refcounts. - refcounts: Mutex>>, - writer_thread: UnsafeCell>, - writer_state: AtomicU8, - writer_map: Cell, - maps: OwnedMapAccess, - _not_send_sync: PhantomData<*const u8>, -} - -impl Handle -where - K: Eq + Hash, - S: BuildHasher, -{ - // TODO: maybe find a better name but handles are still being returned - #[allow(clippy::new_ret_no_self)] - pub fn new(options: Builder) -> (WriteHandle, ReadHandle) { - let BuilderArgs { capacity, h1, h2 } = options.into_args(); - - let maps = Box::new([ - CachePadded::new(UnsafeCell::new(Map::with_capacity_and_hasher(capacity, h1))), - CachePadded::new(UnsafeCell::new(Map::with_capacity_and_hasher(capacity, h2))), - ]); - - #[cfg(not(miri))] - let init_refcount_capacity = num_cpus::get(); - - #[cfg(miri)] - let init_refcount_capacity = 1; - - let me = Arc::new(Self { - residual: AtomicIsize::new(0), - refcounts: Mutex::new(Slab::with_capacity(init_refcount_capacity)), - writer_thread: UnsafeCell::new(None), - writer_state: AtomicU8::new(WRITABLE), - writer_map: Cell::new(MapIndex::Second), - maps: OwnedMapAccess::new(maps), - _not_send_sync: PhantomData, - }); - - let write_handle = WriteHandle::new(Arc::clone(&me)); - let read_handle = Self::new_reader(me); - - (write_handle, read_handle) - } -} - -impl Handle { - #[inline] - pub fn new_reader(me: Arc) -> ReadHandle { - let mut guard = lock(&me.refcounts); - let refcount = RefCount::new(me.writer_map.get().other()); - let refcount = NonNull::new(Box::into_raw(Box::new(refcount))).unwrap(); - let key = guard.insert(refcount); - drop(guard); - - let map_access = me.maps.share(); - ReadHandle::new(me, map_access, refcount, key) - } - - #[inline] - pub unsafe fn release_refcount(&self, key: usize) { - let refcount = lock(&self.refcounts).remove(key); - - drop(unsafe { Box::from_raw(refcount.as_ptr()) }); - } - - #[inline] - #[allow(clippy::collapsible_if)] // Nested if makes logic more clear here - pub unsafe fn release_residual(&self) { - // TODO: why does loom fail if either of these are anything weaker than AcqRel? - - if self.residual.fetch_sub(1, Ordering::AcqRel) == 1 { - if self.writer_state.swap(WRITABLE, Ordering::AcqRel) == WAITING_ON_READERS { - let thread = self - .writer_thread - .with_mut(|ptr| unsafe { &mut *ptr }.take()); - - match thread { - Some(thread) => thread.unpark(), - None => { - if cfg!(debug_assertions) { - unreachable!( - "WAITING_ON_READERS state observed when writer_thread is None" - ); - } else { - cold(); - } - } - } - } - } - } - - #[inline] - pub fn synchronize(&self) { - let writer_state = self.writer_state.load(Ordering::Acquire); - - if writer_state == NOT_WRITABLE { - let current = Some(thread::current()); - let old = self - .writer_thread - .with_mut(|ptr| unsafe { ptr::replace(ptr, current) }); - drop(old); - - let exchange_result = self.writer_state.compare_exchange( - NOT_WRITABLE, - WAITING_ON_READERS, - Ordering::AcqRel, - Ordering::Acquire, - ); - - if exchange_result == Ok(NOT_WRITABLE) { - loop { - // Wait for the next writable map to become available - thread::park(); - - let writer_state = self.writer_state.load(Ordering::Acquire); - if writer_state == WRITABLE { - break; - } else { - debug_assert_eq!(writer_state, WAITING_ON_READERS); - } - } - } else { - debug_assert_eq!(exchange_result, Err(WRITABLE)); - } - } else { - debug_assert_eq!(writer_state, WRITABLE); - } - } - - #[inline] - pub fn writer_map(&self) -> &UnsafeCell> { - self.maps.get(self.writer_map.get()) - } - - #[inline] - pub unsafe fn finish_write(&self) { - debug_assert_eq!(self.residual.load(Ordering::Relaxed), 0); - debug_assert_eq!(self.writer_state.load(Ordering::Relaxed), WRITABLE); - - self.writer_state.store(NOT_WRITABLE, Ordering::Relaxed); - - let guard = lock(&self.refcounts); - - // This needs to be within the mutex - self.writer_map.set(self.writer_map.get().other()); - - fence(Ordering::Release); - - let mut initial_residual = 0isize; - - // Clippy doesn't like that we're iterating over something in a mutex apparently - #[allow(clippy::significant_drop_in_scrutinee)] - for (_, refcount) in guard.iter() { - let refcount = unsafe { refcount.as_ref() }; - - // Because the highest bit is used in the refcount, this cast will not be lossy - initial_residual += refcount.swap_maps() as isize; - - // If we overflowed, then abort. - if initial_residual < 0 { - abort(); - } - } - - fence(Ordering::Acquire); - - drop(guard); - - let latest_residual = self.residual.fetch_add(initial_residual, Ordering::AcqRel); - let residual = initial_residual.wrapping_add(latest_residual); - if residual == 0 { - self.writer_state.store(WRITABLE, Ordering::Relaxed); - } else { - debug_assert!(residual > 0); - } - } -} - -impl Drop for Handle { - fn drop(&mut self) { - let reader_map_index = self.writer_map.get().other(); - self.maps.get(reader_map_index).with_mut(|ptr| unsafe { - (*ptr).drain().for_each(|(ref mut key, ref mut value)| { - Alias::drop(key); - Alias::drop(value); - }); - }); - } -} diff --git a/src/core/mod.rs b/src/core/mod.rs index 48318df..e61c14b 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,7 +1,235 @@ -mod handle; mod refcount; mod store; -pub use handle::*; pub use refcount::*; pub use store::*; + +use hashbrown::hash_map::DefaultHashBuilder; +use slab::Slab; + +use crate::{ + loom::{ + cell::{Cell, UnsafeCell}, + sync::{ + atomic::{fence, AtomicIsize, AtomicU8, Ordering}, + Arc, Mutex, + }, + thread::{self, Thread}, + }, + util::{likely, lock, Alias}, +}; +use crate::{util::CachePadded, Map, ReadHandle, WriteHandle}; +use crate::{Builder, BuilderArgs}; +use std::hash::{BuildHasher, Hash}; +use std::marker::PhantomData; +use std::process::abort; +use std::ptr::{self, NonNull}; + +const WRITABLE: u8 = 0; +const NOT_WRITABLE: u8 = 1; +const WAITING_ON_READERS: u8 = 2; + +pub struct Core { + residual: AtomicIsize, + // All readers need to be dropped before we're dropped, so we don't need to worry about + // freeing any refcounts. + refcounts: Mutex>>, + writer_thread: UnsafeCell>, + writer_state: AtomicU8, + writer_map: Cell, + maps: OwnedMapAccess, + // TODO: figure out if core can implement send or sync + _not_send_sync: PhantomData<*const u8>, +} + +impl Core +where + K: Eq + Hash, + S: BuildHasher, +{ + pub fn build_map(options: Builder) -> (WriteHandle, ReadHandle) { + let BuilderArgs { capacity, h1, h2 } = options.into_args(); + + let maps = Box::new([ + CachePadded::new(UnsafeCell::new(Map::with_capacity_and_hasher(capacity, h1))), + CachePadded::new(UnsafeCell::new(Map::with_capacity_and_hasher(capacity, h2))), + ]); + + #[cfg(not(miri))] + let init_refcount_capacity = num_cpus::get(); + + #[cfg(miri)] + let init_refcount_capacity = 1; + + let me = Arc::new(Self { + residual: AtomicIsize::new(0), + refcounts: Mutex::new(Slab::with_capacity(init_refcount_capacity)), + writer_thread: UnsafeCell::new(None), + writer_state: AtomicU8::new(WRITABLE), + writer_map: Cell::new(MapIndex::Second), + maps: OwnedMapAccess::new(maps), + _not_send_sync: PhantomData, + }); + + let write_handle = WriteHandle::new(Arc::clone(&me)); + let read_handle = Self::new_reader(me); + + (write_handle, read_handle) + } +} + +impl Core { + pub fn new_reader(me: Arc) -> ReadHandle { + let mut guard = lock(&me.refcounts); + let refcount = RefCount::new(me.writer_map.get().other()); + let refcount = NonNull::new(Box::into_raw(Box::new(refcount))).unwrap(); + let key = guard.insert(refcount); + drop(guard); + + let map_access = me.maps.share(); + ReadHandle::new(me, map_access, refcount, key) + } + + pub unsafe fn release_refcount(&self, key: usize) { + let refcount = lock(&self.refcounts).remove(key); + + drop(unsafe { Box::from_raw(refcount.as_ptr()) }); + } + + #[inline] + pub unsafe fn release_residual(&self) { + // TODO: why does loom fail if either of these are anything weaker than AcqRel? + + // If we were not the last residual reader, we do nothing. + if self.residual.fetch_sub(1, Ordering::AcqRel) != 1 { + return; + } + + // If we were the last residual reader, but the writer is not waiting on us, we do nothing. + if self.writer_state.swap(WRITABLE, Ordering::AcqRel) != WAITING_ON_READERS { + return; + } + + // Since we were the last reader, and the writer was waiting on us, it's our job to wake it + // up. + let thread = self + .writer_thread + .with_mut(|ptr| unsafe { &mut *ptr }.take()); + + match thread { + Some(thread) => thread.unpark(), + // This branch is entirely unreachable (assuming this library is coded correctly), + // however I'd like to keep the additional code around reading as small as possible, + // so in release mode we currently do nothing on this branch. + None => { + #[cfg(debug_assertions)] + { + unreachable!("WAITING_ON_READERS state observed when writer_thread is None"); + } + + #[cfg(not(debug_assertions))] + { + crate::util::cold(); + } + } + } + } + + #[inline] + pub fn synchronize(&self) { + let writer_state = self.writer_state.load(Ordering::Acquire); + + if writer_state == NOT_WRITABLE { + let current = Some(thread::current()); + let old = self + .writer_thread + .with_mut(|ptr| unsafe { ptr::replace(ptr, current) }); + drop(old); + + let exchange_result = self.writer_state.compare_exchange( + NOT_WRITABLE, + WAITING_ON_READERS, + Ordering::AcqRel, + Ordering::Acquire, + ); + + if likely(exchange_result == Ok(NOT_WRITABLE)) { + loop { + // Wait for the next writable map to become available + thread::park(); + + let writer_state = self.writer_state.load(Ordering::Acquire); + if likely(writer_state == WRITABLE) { + break; + } else { + debug_assert_eq!(writer_state, WAITING_ON_READERS); + } + } + } else { + debug_assert_eq!(exchange_result, Err(WRITABLE)); + } + } else { + debug_assert_eq!(writer_state, WRITABLE); + } + } + + #[inline] + pub fn writer_map(&self) -> &UnsafeCell> { + self.maps.get(self.writer_map.get()) + } + + #[inline] + pub unsafe fn finish_write(&self) { + debug_assert_eq!(self.residual.load(Ordering::Relaxed), 0); + debug_assert_eq!(self.writer_state.load(Ordering::Relaxed), WRITABLE); + + self.writer_state.store(NOT_WRITABLE, Ordering::Relaxed); + + let guard = lock(&self.refcounts); + + // This needs to be within the mutex + self.writer_map.set(self.writer_map.get().other()); + + fence(Ordering::Release); + + let mut initial_residual = 0isize; + + // Clippy doesn't like that we're iterating over something in a mutex apparently + #[allow(clippy::significant_drop_in_scrutinee)] + for (_, refcount) in guard.iter() { + let refcount = unsafe { refcount.as_ref() }; + + // Because the highest bit is used in the refcount, this cast will not be lossy + initial_residual += refcount.swap_maps() as isize; + + // If we overflowed, then abort. + if initial_residual < 0 { + abort(); + } + } + + fence(Ordering::Acquire); + + drop(guard); + + let latest_residual = self.residual.fetch_add(initial_residual, Ordering::AcqRel); + let residual = initial_residual.wrapping_add(latest_residual); + if residual == 0 { + self.writer_state.store(WRITABLE, Ordering::Relaxed); + } else { + debug_assert!(residual > 0); + } + } +} + +impl Drop for Core { + fn drop(&mut self) { + let reader_map_index = self.writer_map.get().other(); + self.maps.get(reader_map_index).with_mut(|ptr| unsafe { + (*ptr).drain().for_each(|(ref mut key, ref mut value)| { + Alias::drop(key); + Alias::drop(value); + }); + }); + } +} diff --git a/src/core/store.rs b/src/core/store.rs index 1cbaeea..1fe6b90 100644 --- a/src/core/store.rs +++ b/src/core/store.rs @@ -29,14 +29,14 @@ impl MapIndex { type MapArray = [CachePadded>>; 2]; pub struct OwnedMapAccess { - access: MapAccess, + access: SharedMapAccess, _dropck: PhantomData>, } impl OwnedMapAccess { pub fn new(boxed: Box>) -> Self { Self { - access: MapAccess::new(NonNull::new(Box::into_raw(boxed)).unwrap()), + access: SharedMapAccess::new(NonNull::new(Box::into_raw(boxed)).unwrap()), _dropck: PhantomData, } } @@ -46,8 +46,7 @@ impl OwnedMapAccess { unsafe { self.access.get(map_index) } } - #[inline] - pub fn share(&self) -> MapAccess { + pub fn share(&self) -> SharedMapAccess { self.access.clone() } } @@ -58,11 +57,11 @@ impl Drop for OwnedMapAccess { } } -pub struct MapAccess { +pub struct SharedMapAccess { maps: NonNull>, } -impl MapAccess { +impl SharedMapAccess { fn new(maps: NonNull>) -> Self { Self { maps } } diff --git a/src/lib.rs b/src/lib.rs index ad6fff1..bad5902 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![cfg_attr(feature = "nightly", feature(core_intrinsics))] #![deny(rust_2018_idioms, unsafe_op_in_unsafe_fn)] #![warn(missing_docs)] #![doc = include_str!("../README.md")] @@ -15,6 +16,7 @@ pub use util::Alias; pub use view::View; pub use write::*; +use self::core::Core; use std::{ collections::hash_map::RandomState, fmt::{self, Debug, Formatter}, @@ -149,7 +151,7 @@ impl Builder { K: Eq + Hash, S: BuildHasher, { - core::Handle::new(self) + Core::build_map(self) } pub(crate) fn into_args(self) -> BuilderArgs { diff --git a/src/read.rs b/src/read.rs index 75becb8..9031e9c 100644 --- a/src/read.rs +++ b/src/read.rs @@ -1,7 +1,7 @@ use std::{collections::hash_map::RandomState, ptr::NonNull}; use crate::{ - core::{Handle, MapAccess, MapIndex, RefCount}, + core::{Core, MapIndex, RefCount, SharedMapAccess}, loom::cell::UnsafeCell, loom::sync::Arc, util::unlikely, @@ -14,8 +14,8 @@ use crate::{ /// This type allows for the creation of [`ReadGuard`s](crate::ReadGuard), which provide immutable /// access to the underlying data. pub struct ReadHandle { - inner: Arc>, - map_access: MapAccess, + core: Arc>, + map_access: SharedMapAccess, refcount: NonNull, refcount_key: usize, } @@ -37,15 +37,15 @@ where impl ReadHandle { pub(crate) fn new( - inner: Arc>, - map_access: MapAccess, + core: Arc>, + map_access: SharedMapAccess, refcount: NonNull, refcount_key: usize, ) -> Self { Self { refcount, map_access, - inner, + core, refcount_key, } } @@ -106,13 +106,13 @@ impl ReadHandle { impl Clone for ReadHandle { fn clone(&self) -> Self { - Handle::new_reader(Arc::clone(&self.inner)) + Core::new_reader(Arc::clone(&self.core)) } } impl Drop for ReadHandle { fn drop(&mut self) { - unsafe { self.inner.release_refcount(self.refcount_key) }; + unsafe { self.core.release_refcount(self.refcount_key) }; } } @@ -146,6 +146,7 @@ where impl<'guard, K, V, S> ReadAccess for ReadGuard<'guard, K, V, S> { type Map = Map; + #[inline] fn with_map<'read, F, R>(&'read self, op: F) -> R where F: FnOnce(&'read Self::Map) -> R, @@ -155,11 +156,12 @@ impl<'guard, K, V, S> ReadAccess for ReadGuard<'guard, K, V, S> { } impl<'guard, K, V, S> Drop for ReadGuard<'guard, K, V, S> { + #[inline] fn drop(&mut self) { let current_reader_map = unsafe { self.handle.refcount.as_ref() }.decrement(); if unlikely(current_reader_map != self.map_index) { - unsafe { self.handle.inner.release_residual() }; + unsafe { self.handle.core.release_residual() }; } } } diff --git a/src/util/mod.rs b/src/util/mod.rs index 65a25ce..311e4d3 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -21,8 +21,22 @@ pub(crate) fn lock(mutex: &Mutex) -> MutexGuard<'_, T> { #[cold] #[inline] +#[allow(dead_code)] pub(crate) fn cold() {} +#[cfg(feature = "nightly")] +pub(crate) use ::core::intrinsics::{likely, unlikely}; + +#[cfg(not(feature = "nightly"))] +#[inline] +pub(crate) fn likely(b: bool) -> bool { + if !b { + cold(); + } + b +} + +#[cfg(not(feature = "nightly"))] #[inline] pub(crate) fn unlikely(b: bool) -> bool { if b { diff --git a/src/view.rs b/src/view.rs index f4c3c4b..854f0f0 100644 --- a/src/view.rs +++ b/src/view.rs @@ -23,6 +23,7 @@ pub struct View { } impl View { + #[inline] pub(crate) fn new(guard: G) -> Self { Self { guard } } diff --git a/src/write.rs b/src/write.rs index 2382e09..cb579a4 100644 --- a/src/write.rs +++ b/src/write.rs @@ -10,7 +10,7 @@ use std::{ use hashbrown::hash_map::RawEntryMut; use crate::{ - core::Handle, + core::Core, loom::cell::UnsafeCell, loom::sync::Arc, util::{Alias, BorrowHelper}, @@ -19,13 +19,13 @@ use crate::{ }; static NEXT_WRITER_UID: AtomicUsize = AtomicUsize::new(1); +const LEAKED_VALUE_MISMATCH: &str = "Leaked value is not from this map"; #[repr(transparent)] #[derive(PartialEq, Eq, Clone, Copy)] struct WriterUid(NonZeroUsize); impl WriterUid { - #[inline] fn next() -> Self { Self( NonZeroUsize::new(NEXT_WRITER_UID.fetch_add(1, Ordering::Relaxed)) @@ -43,7 +43,7 @@ where K: Hash + Eq, S: BuildHasher, { - inner: Arc>, + core: Arc>, operations: UnsafeCell>>, uid: WriterUid, } @@ -61,9 +61,9 @@ where K: Hash + Eq, S: BuildHasher, { - pub(crate) fn new(inner: Arc>) -> Self { + pub(crate) fn new(core: Arc>) -> Self { Self { - inner, + core, operations: UnsafeCell::new(Vec::new()), uid: WriterUid::next(), } @@ -78,7 +78,7 @@ where /// `Leaked::`[`into_inner`](crate::Leaked::into_inner) for an example use-case. #[inline] pub fn synchronize(&self) { - self.inner.synchronize(); + self.core.synchronize(); } /// Creates a new [`WriteGuard`](crate::WriteGuard) wrapped in a [`View`](crate::View), @@ -124,10 +124,9 @@ where /// assert_eq!(&*guard.remove("apple".to_owned()).unwrap(), "red"); /// assert!(!guard.contains_key("apple")); /// ``` - #[inline] pub fn guard(&mut self) -> View> { self.synchronize(); - let map = self.inner.writer_map(); + let map = self.core.writer_map(); map.with_mut(|map_ptr| { self.operations.with_mut(|ops_ptr| { let operations = unsafe { &mut *ops_ptr }; @@ -213,10 +212,7 @@ where self.synchronize(); let uid = self.uid; move |leaked| { - assert!( - uid == leaked.handle_uid, - "Leaked value is not from this map" - ); + assert!(uid == leaked.handle_uid, "{LEAKED_VALUE_MISMATCH}"); unsafe { Alias::into_owned(leaked.value) } } } @@ -266,7 +262,7 @@ where { fn drop(&mut self) { self.synchronize(); - let map = self.inner.writer_map(); + let map = self.core.writer_map(); map.with_mut(|map_ptr| { self.operations.with_mut(|ops_ptr| unsafe { Self::flush_operations(&mut *ops_ptr, &mut *map_ptr) @@ -293,6 +289,7 @@ where { type Map = Map; + #[inline] fn with_map<'read, F, R>(&'read self, op: F) -> R where F: FnOnce(&'read Self::Map) -> R, @@ -390,7 +387,7 @@ where pub(crate) fn drop_lazily(&self, leaked: Leaked) { assert!( self.handle_uid == leaked.handle_uid, - "Leaked value is not from this map" + "{LEAKED_VALUE_MISMATCH}" ); self.handle.operations.with_mut(|ops_ptr| { unsafe { &mut *ops_ptr }.push(Operation::new(RawOperation::Drop(Leaked::into_inner( @@ -412,7 +409,7 @@ where S: BuildHasher, { fn drop(&mut self) { - unsafe { self.handle.inner.finish_write() }; + unsafe { self.handle.core.finish_write() }; } }