From e02ae4afa03d9cf29fa139519ab863f179f7c0f8 Mon Sep 17 00:00:00 2001 From: jiangzhe Date: Sun, 24 Nov 2024 13:12:01 +0000 Subject: [PATCH] move index to storage --- doradb-bench/benches/memcmp_sort.rs | 10 +- doradb-index/Cargo.toml | 20 -- doradb-index/src/epoch/atomic/inline.rs | 83 ----- doradb-index/src/epoch/atomic/mod.rs | 254 -------------- doradb-index/src/epoch/atomic/owned.rs | 111 ------- doradb-index/src/epoch/atomic/shared.rs | 144 -------- doradb-index/src/epoch/collector.rs | 69 ---- doradb-index/src/epoch/guard.rs | 97 ------ doradb-index/src/epoch/internal.rs | 405 ----------------------- doradb-index/src/epoch/list.rs | 197 ----------- doradb-index/src/epoch/macros.rs | 46 --- doradb-index/src/epoch/mod.rs | 131 -------- doradb-index/src/epoch/queue.rs | 180 ---------- doradb-index/src/hot/key.rs | 95 ------ doradb-index/src/hot/mod.rs | 420 ------------------------ doradb-index/src/hot/node.rs | 412 ----------------------- doradb-index/src/hot/node_impl.rs | 353 -------------------- doradb-index/src/hot/partial_key.rs | 149 --------- doradb-index/src/hot/value.rs | 20 -- doradb-index/src/lib.rs | 2 - doradb-storage/Cargo.toml | 2 + doradb-storage/src/buffer/mod.rs | 254 ++++++++++++++ doradb-storage/src/buffer/page.rs | 20 ++ doradb-storage/src/buffer/ptr.rs | 81 +++++ doradb-storage/src/error.rs | 10 + doradb-storage/src/index/block_index.rs | 179 ++++++++++ doradb-storage/src/index/mod.rs | 1 + doradb-storage/src/latch.rs | 278 ++++++++++++++++ doradb-storage/src/lib.rs | 3 + 29 files changed, 833 insertions(+), 3193 deletions(-) delete mode 100644 doradb-index/Cargo.toml delete mode 100644 doradb-index/src/epoch/atomic/inline.rs delete mode 100644 doradb-index/src/epoch/atomic/mod.rs delete mode 100644 doradb-index/src/epoch/atomic/owned.rs delete mode 100644 doradb-index/src/epoch/atomic/shared.rs delete mode 100644 doradb-index/src/epoch/collector.rs delete mode 100644 doradb-index/src/epoch/guard.rs delete mode 100644 doradb-index/src/epoch/internal.rs delete mode 100644 doradb-index/src/epoch/list.rs delete mode 100644 doradb-index/src/epoch/macros.rs delete mode 100644 doradb-index/src/epoch/mod.rs delete mode 100644 doradb-index/src/epoch/queue.rs delete mode 100644 doradb-index/src/hot/key.rs delete mode 100644 doradb-index/src/hot/mod.rs delete mode 100644 doradb-index/src/hot/node.rs delete mode 100644 doradb-index/src/hot/node_impl.rs delete mode 100644 doradb-index/src/hot/partial_key.rs delete mode 100644 doradb-index/src/hot/value.rs delete mode 100644 doradb-index/src/lib.rs create mode 100644 doradb-storage/src/buffer/mod.rs create mode 100644 doradb-storage/src/buffer/page.rs create mode 100644 doradb-storage/src/buffer/ptr.rs create mode 100644 doradb-storage/src/index/block_index.rs create mode 100644 doradb-storage/src/index/mod.rs create mode 100644 doradb-storage/src/latch.rs diff --git a/doradb-bench/benches/memcmp_sort.rs b/doradb-bench/benches/memcmp_sort.rs index 3fc6ea5..9e09218 100644 --- a/doradb-bench/benches/memcmp_sort.rs +++ b/doradb-bench/benches/memcmp_sort.rs @@ -26,11 +26,11 @@ fn bench_sort(c: &mut Criterion) { })) }); - let mut buf = vec![0u8; 4]; - for (d1, d2) in data1.iter().zip(data2) { - u32::write_mcf(d1, &mut buf); - assert_eq!(buf, d2); - } + // let mut buf = vec![0u8; 4]; + // for (d1, d2) in data1.iter().zip(data2) { + // u32::write_mcf(d1, &mut buf); + // assert_eq!(buf, d2); + // } } group.finish() } diff --git a/doradb-index/Cargo.toml b/doradb-index/Cargo.toml deleted file mode 100644 index 7380891..0000000 --- a/doradb-index/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "doradb-index" -version = "0.1.0" -edition = "2021" -authors = ["jiangzhe "] -license = "MIT OR Apache-2.0" -description = "Index module of X-Engine" -keywords = ["index"] -categories = ["database-implementations"] -repository = "https://github.com/jiangzhe/doradb/doradb-index/" - -[dependencies] -scopeguard = {version = "1.1", default-features = false} -memoffset = "0.6" -once_cell = "1" -crossbeam-utils = "0.8.11" -parking_lot = "0.12" - -[dev-dependencies] -rand = "0.8" diff --git a/doradb-index/src/epoch/atomic/inline.rs b/doradb-index/src/epoch/atomic/inline.rs deleted file mode 100644 index 584f0cd..0000000 --- a/doradb-index/src/epoch/atomic/inline.rs +++ /dev/null @@ -1,83 +0,0 @@ -use super::{compose_inline_tag, decompose_inline_tag, Guard, Pointable, PointerOrInline, Shared}; -use std::fmt; -use std::marker::PhantomData; - -pub struct Inline { - pub(super) data: *mut (), - _marker: PhantomData, -} - -impl crate::epoch::sealed::Sealed for Inline {} -impl PointerOrInline for Inline { - const MUST_BE_PTR: bool = false; - - #[inline] - fn into_ptr(self) -> *mut () { - self.data - } - - #[inline] - unsafe fn from_ptr(ptr: *mut ()) -> Self { - Self { - data: ptr, - _marker: PhantomData, - } - } -} - -impl Inline { - pub fn new(data: usize) -> Self { - debug_assert!(data.leading_zeros() >= T::ALIGN.trailing_zeros()); - Inline { - data: compose_inline_tag::((data << T::ALIGN.trailing_zeros()) as *mut (), 0) - as *mut (), - _marker: PhantomData, - } - } - - pub fn into_shared(self, _: &Guard) -> Shared<'_, T> { - unsafe { Shared::from_ptr(self.into_ptr()) } - } - - pub fn tag(&self) -> usize { - let (_, tag) = decompose_inline_tag::(self.data); - tag - } - - pub fn with_tag(self, tag: usize) -> Self { - let (ptr, _) = decompose_inline_tag::(self.data); - Inline { - data: compose_inline_tag::(ptr as *mut (), tag) as *mut (), - _marker: PhantomData, - } - } - - pub(crate) fn decompose_value(&self) -> (usize, usize) { - let (ptr, tag) = decompose_inline_tag::(self.data); - (ptr >> T::ALIGN.trailing_zeros(), tag) - } - - pub fn value(&self) -> usize { - let (data, _) = self.decompose_value(); - data - } -} - -impl fmt::Debug for Inline { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let (data, tag) = self.decompose_value(); - f.debug_struct("Inline") - .field("data", &data) - .field("tag", &tag) - .finish() - } -} - -impl Clone for Inline { - fn clone(&self) -> Self { - Inline { - data: self.data, - _marker: PhantomData, - } - } -} diff --git a/doradb-index/src/epoch/atomic/mod.rs b/doradb-index/src/epoch/atomic/mod.rs deleted file mode 100644 index e79a1a7..0000000 --- a/doradb-index/src/epoch/atomic/mod.rs +++ /dev/null @@ -1,254 +0,0 @@ -mod inline; -mod owned; -mod shared; - -use super::guard::Guard; -pub use inline::Inline; -pub use owned::Owned; -pub use shared::Shared; -use std::fmt; -use std::marker::PhantomData; -use std::mem; -use std::ptr; -use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; - -pub struct CompareExchangeError<'g, T: Pointable, P: PointerOrInline> { - pub current: Shared<'g, T>, - pub new: P, -} - -impl + fmt::Debug> fmt::Debug - for CompareExchangeError<'_, T, P> -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("CompareExchangeError") - .field("current", &self.current) - .field("new", &self.new) - .finish() - } -} - -/// Types that are pointed to by a single word. -/// -/// This trait differs from crossbeam-epoch, as it -/// only supports sized type. -/// -/// If user wants to use dynamic sized type, he/she can -/// compose a sized struct with header and 0-length u8 -/// array at end, embed the total length inside header -/// and handle alloc/dealloc accordingly. -/// In this way, user have more flexibilities to design -/// a compact header. -pub trait Pointable { - const ALIGN: usize; - type Init; - - unsafe fn init(init: Self::Init) -> *mut (); - - unsafe fn deref<'a>(ptr: *mut ()) -> &'a Self; - - unsafe fn deref_mut<'a>(ptr: *mut ()) -> &'a mut Self; - - unsafe fn drop(ptr: *mut (), tag: usize); -} - -pub struct Atomic { - data: AtomicPtr<()>, - _marker: PhantomData<*mut T>, -} - -unsafe impl Send for Atomic {} -unsafe impl Sync for Atomic {} - -impl Atomic -where - T: Pointable, -{ - pub fn new(init: T) -> Atomic { - Self::from(Owned::new(init)) - } -} - -impl Atomic { - fn from_ptr(data: *mut ()) -> Self { - Self { - data: AtomicPtr::new(data), - _marker: PhantomData, - } - } - - pub const fn null() -> Atomic { - Self { - data: AtomicPtr::new(ptr::null_mut()), - _marker: PhantomData, - } - } - - pub fn load<'g>(&self, ord: Ordering, _: &'g Guard) -> Shared<'g, T> { - unsafe { Shared::from_ptr(self.data.load(ord)) } - } - - pub fn store>(&self, new: P, ord: Ordering) { - self.data.store(new.into_ptr(), ord); - } - - pub fn swap<'g, P: PointerOrInline>( - &self, - new: P, - ord: Ordering, - _: &'g Guard, - ) -> Shared<'g, T> { - unsafe { Shared::from_ptr(self.data.swap(new.into_ptr(), ord)) } - } - - pub fn fetch_or<'g>(&self, val: usize, ord: Ordering, _: &'g Guard) -> Shared<'g, T> { - unsafe { - Shared::from_ptr( - (*(&self.data as *const AtomicPtr<_> as *const AtomicUsize)) - .fetch_or(val & low_bits::(), ord) as *mut (), - ) - } - } - - pub fn compare_exchange<'g, P>( - &self, - current: Shared<'_, T>, - new: P, - success: Ordering, - failure: Ordering, - _: &'g Guard, - ) -> Result, CompareExchangeError<'g, T, P>> - where - P: PointerOrInline, - { - let new = new.into_ptr(); - self.data - .compare_exchange(current.into_ptr(), new, success, failure) - .map(|_| unsafe { Shared::from_ptr(new) }) - .map_err(|current| unsafe { - CompareExchangeError { - current: Shared::from_ptr(current), - new: P::from_ptr(new), - } - }) - } - - pub fn compare_exchange_weak<'g, P>( - &self, - current: Shared<'_, T>, - new: P, - success: Ordering, - failure: Ordering, - _: &'g Guard, - ) -> Result, CompareExchangeError<'g, T, P>> - where - P: PointerOrInline, - { - let new = new.into_ptr(); - self.data - .compare_exchange_weak(current.into_ptr(), new, success, failure) - .map(|_| unsafe { Shared::from_ptr(new) }) - .map_err(|current| unsafe { - CompareExchangeError { - current: Shared::from_ptr(current), - new: P::from_ptr(new), - } - }) - } -} - -impl From> for Atomic { - fn from(owned: Owned) -> Self { - let data = owned.data; - mem::forget(owned); - Self::from_ptr(data) - } -} - -impl From> for Atomic { - fn from(inline: Inline) -> Self { - Self::from_ptr(inline.data) - } -} - -impl fmt::Debug for Atomic { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let data = self.data.load(Ordering::SeqCst); - let (raw, tag) = decompose_tag::(data); - f.debug_struct("Atomic") - .field("raw", &raw) - .field("tag", &tag) - .finish() - } -} - -pub trait PointerOrInline: super::sealed::Sealed { - const MUST_BE_PTR: bool; - - fn into_ptr(self) -> *mut (); - - unsafe fn from_ptr(data: *mut ()) -> Self; -} - -#[inline] -pub(self) fn ensure_aligned(raw: *mut ()) { - assert_eq!(raw as usize & low_bits::(), 0, "unaligned pointer"); -} - -#[inline] -pub fn low_bits() -> usize { - (1 << T::ALIGN.trailing_zeros()) - 1 -} - -#[inline] -pub(self) fn compose_tag(ptr: *mut (), tag: usize) -> *mut () { - int_to_ptr_with_provenance( - (ptr as usize & !low_bits::()) | (tag & low_bits::()), - ptr, - ) -} - -#[inline] -pub(self) fn decompose_tag(ptr: *mut ()) -> (*mut (), usize) { - ( - int_to_ptr_with_provenance(ptr as usize & !low_bits::(), ptr), - ptr as usize & low_bits::(), - ) -} - -#[inline] -pub(self) fn compose_inline_tag(ptr: *mut (), tag: usize) -> usize { - (ptr as usize & !low_bits::()) | (tag & low_bits::()) -} - -#[inline] -pub(self) fn decompose_inline_tag(ptr: *mut ()) -> (usize, usize) { - ( - ptr as usize & !low_bits::(), - ptr as usize & low_bits::(), - ) -} - -#[inline] -fn int_to_ptr_with_provenance(addr: usize, prov: *mut T) -> *mut T { - let ptr = prov.cast::(); - ptr.wrapping_add(addr.wrapping_sub(ptr as usize)).cast() -} - -#[cfg(test)] -mod tests { - #[test] - fn test_align() { - #[repr(C, align(16))] - struct A { - data: u16, - } - - let size = std::mem::size_of::(); - let align = std::mem::align_of::(); - println!("size={}, align={}", size, align); - let a = A { data: 256 }; - let arr = unsafe { std::mem::transmute::<_, [u8; 16]>(a) }; - println!("arr={:?}", arr); - } -} diff --git a/doradb-index/src/epoch/atomic/owned.rs b/doradb-index/src/epoch/atomic/owned.rs deleted file mode 100644 index 078296a..0000000 --- a/doradb-index/src/epoch/atomic/owned.rs +++ /dev/null @@ -1,111 +0,0 @@ -use super::{ - compose_tag, decompose_tag, ensure_aligned, Guard, Pointable, PointerOrInline, Shared, -}; -use std::fmt; -use std::marker::PhantomData; -use std::mem; -use std::ops::{Deref, DerefMut}; - -pub struct Owned { - pub(super) data: *mut (), - _marker: PhantomData>, -} - -impl crate::epoch::sealed::Sealed for Owned {} -impl PointerOrInline for Owned { - const MUST_BE_PTR: bool = true; - - #[inline] - fn into_ptr(self) -> *mut () { - let data = self.data; - mem::forget(self); - data - } - - #[inline] - unsafe fn from_ptr(data: *mut ()) -> Self { - debug_assert!(!data.is_null(), "converting null into `Owned`"); - Self { - data, - _marker: PhantomData, - } - } -} - -impl> Owned { - pub fn new(init: T) -> Owned { - unsafe { Self::from_ptr(T::init(init)) } - } -} - -impl Owned { - pub fn new_dyn(init: T::Init) -> Owned { - unsafe { Self::from_ptr(T::init(init)) } - } -} - -impl Owned { - pub fn into_box(self) -> Box { - let (raw, _) = decompose_tag::(self.data); - mem::forget(self); - unsafe { Box::from_raw(raw.cast::()) } - } - - pub fn into_shared(self, _: &Guard) -> Shared<'_, T> { - unsafe { Shared::from_ptr(self.into_ptr()) } - } - - pub fn tag(&self) -> usize { - let (_, tag) = decompose_tag::(self.data); - tag - } - - pub fn with_tag(self, tag: usize) -> Owned { - let data = self.into_ptr(); - unsafe { Self::from_ptr(compose_tag::(data, tag)) } - } - - pub(crate) fn decompose(&self) -> (*mut (), usize) { - decompose_tag::(self.data) - } -} - -impl Drop for Owned { - fn drop(&mut self) { - let (raw, tag) = decompose_tag::(self.data); - unsafe { T::drop(raw, tag) } - } -} - -impl fmt::Debug for Owned { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let (raw, tag) = decompose_tag::(self.data); - f.debug_struct("Owned") - .field("raw", &raw) - .field("tag", &tag) - .finish() - } -} - -impl + Clone> Clone for Owned { - fn clone(&self) -> Self { - let owned: Owned = Owned::new((**self).clone()); - owned.with_tag(self.tag()) - } -} - -impl Deref for Owned { - type Target = T; - - fn deref(&self) -> &T { - let (raw, _) = decompose_tag::(self.data); - unsafe { T::deref(raw) } - } -} - -impl DerefMut for Owned { - fn deref_mut(&mut self) -> &mut T { - let (raw, _) = decompose_tag::(self.data); - unsafe { T::deref_mut(raw) } - } -} diff --git a/doradb-index/src/epoch/atomic/shared.rs b/doradb-index/src/epoch/atomic/shared.rs deleted file mode 100644 index 588c5b8..0000000 --- a/doradb-index/src/epoch/atomic/shared.rs +++ /dev/null @@ -1,144 +0,0 @@ -use super::{ - compose_tag, decompose_tag, ensure_aligned, Inline, Owned, Pointable, PointerOrInline, -}; -use std::fmt; -use std::marker::PhantomData; -use std::ptr; - -pub struct Shared<'g, T: 'g + ?Sized + Pointable> { - data: *mut (), - _marker: PhantomData<(&'g (), *const T)>, -} - -impl Clone for Shared<'_, T> { - fn clone(&self) -> Self { - Self { - data: self.data, - _marker: PhantomData, - } - } -} - -impl Copy for Shared<'_, T> {} - -impl crate::epoch::sealed::Sealed for Shared<'_, T> {} - -impl PointerOrInline for Shared<'_, T> { - const MUST_BE_PTR: bool = false; - - #[inline] - fn into_ptr(self) -> *mut () { - self.data - } - - #[inline] - unsafe fn from_ptr(data: *mut ()) -> Self { - Shared { - data, - _marker: PhantomData, - } - } -} - -impl<'g, T: Pointable> Shared<'g, T> { - pub fn as_raw(&self) -> *const T { - let (raw, _) = decompose_tag::(self.data); - raw as *const _ - } -} - -impl<'g, T: 'g + Pointable> Shared<'g, T> { - pub fn null() -> Self { - Shared { - data: ptr::null_mut(), - _marker: PhantomData, - } - } - - pub fn is_null(&self) -> bool { - let (raw, _) = decompose_tag::(self.data); - raw.is_null() - } - - pub unsafe fn deref(&self) -> &'g T { - let (raw, _) = decompose_tag::(self.data); - T::deref(raw) - } - - pub unsafe fn deref_mut(&mut self) -> &'g T { - let (raw, _) = decompose_tag::(self.data); - T::deref_mut(raw) - } - - pub unsafe fn as_ref(&self) -> Option<&'g T> { - let (raw, _) = decompose_tag::(self.data); - if raw.is_null() { - None - } else { - Some(T::deref(raw)) - } - } - - pub unsafe fn try_into_owned(self) -> Option> { - if self.is_null() { - None - } else { - Some(Owned::from_ptr(self.data)) - } - } - - pub unsafe fn into_inline(self) -> Inline { - Inline::from_ptr(self.data) - } - - pub fn tag(&self) -> usize { - let (_, tag) = decompose_tag::(self.data); - tag - } - - pub fn with_tag(&self, tag: usize) -> Shared<'g, T> { - unsafe { Self::from_ptr(compose_tag::(self.data, tag)) } - } - - pub(crate) fn decompose(&self) -> (*mut (), usize) { - decompose_tag::(self.data) - } -} - -impl Default for Shared<'_, T> { - fn default() -> Self { - Shared::null() - } -} - -impl fmt::Debug for Shared<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let (raw, tag) = decompose_tag::(self.data); - f.debug_struct("Shared") - .field("raw", &raw) - .field("tag", &tag) - .finish() - } -} - -impl fmt::Pointer for Shared<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Pointer::fmt(&(unsafe { self.deref() as *const _ }), f) - } -} - -impl From<*const T> for Shared<'_, T> { - fn from(raw: *const T) -> Self { - let raw = raw as *mut (); - ensure_aligned::(raw); - unsafe { Self::from_ptr(raw) } - } -} - -impl<'g, T: ?Sized + Pointable> PartialEq> for Shared<'g, T> { - fn eq(&self, other: &Self) -> bool { - self.data == other.data - } -} - -impl Eq for Shared<'_, T> {} diff --git a/doradb-index/src/epoch/collector.rs b/doradb-index/src/epoch/collector.rs deleted file mode 100644 index a8fa7ac..0000000 --- a/doradb-index/src/epoch/collector.rs +++ /dev/null @@ -1,69 +0,0 @@ -use super::guard::Guard; -use super::internal::{Global, Local}; -use std::fmt; -use std::sync::Arc; - -pub struct Collector { - pub(crate) global: Arc, -} - -unsafe impl Send for Collector {} -unsafe impl Sync for Collector {} - -impl Default for Collector { - fn default() -> Self { - Self { - global: Arc::new(Global::new()), - } - } -} - -impl Collector { - pub fn register(&self) -> LocalHandle { - Local::register(self) - } -} - -impl Clone for Collector { - fn clone(&self) -> Self { - Collector { - global: self.global.clone(), - } - } -} - -pub struct LocalHandle { - pub(crate) local: *const Local, -} - -impl LocalHandle { - #[inline] - pub fn pin(&self) -> Guard { - unsafe { (*self.local).pin() } - } - - #[inline] - pub fn is_pinned(&self) -> bool { - unsafe { (*self.local).is_pinned() } - } - - #[inline] - pub fn collector(&self) -> &Collector { - unsafe { (*self.local).collector() } - } -} - -impl Drop for LocalHandle { - #[inline] - fn drop(&mut self) { - unsafe { - Local::release_handle(&*self.local); - } - } -} - -impl fmt::Debug for LocalHandle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("LocalHandle { .. }") - } -} diff --git a/doradb-index/src/epoch/guard.rs b/doradb-index/src/epoch/guard.rs deleted file mode 100644 index 1025c32..0000000 --- a/doradb-index/src/epoch/guard.rs +++ /dev/null @@ -1,97 +0,0 @@ -use super::atomic::{Pointable, Shared}; -use super::collector::Collector; -use super::internal::Deferred; -use super::internal::Local; -use scopeguard::defer; -use std::fmt; -use std::mem; -use std::ptr; - -pub struct Guard { - pub(crate) local: *const Local, -} - -impl Guard { - pub fn defer(&self, f: F) - where - F: FnOnce() -> R, - F: Send + 'static, - { - unsafe { - self.defer_unchecked(f); - } - } - - pub unsafe fn defer_unchecked(&self, f: F) - where - F: FnOnce() -> R, - { - if let Some(local) = self.local.as_ref() { - local.defer(Deferred::new(move || drop(f())), self); - } else { - drop(f()); - } - } - - pub unsafe fn defer_destroy(&self, ptr: Shared<'_, T>) { - self.defer_unchecked(move || ptr.try_into_owned()); - } - - pub fn flush(&self) { - if let Some(local) = unsafe { self.local.as_ref() } { - local.flush(self); - } - } - - pub fn repin(&mut self) { - if let Some(local) = unsafe { self.local.as_ref() } { - local.repin(); - } - } - - pub fn repin_after(&mut self, f: F) -> R - where - F: FnOnce() -> R, - { - if let Some(local) = unsafe { self.local.as_ref() } { - local.acquire_handle(); - local.unpin(); - } - - defer! { - if let Some(local) = unsafe { self.local.as_ref() } { - mem::forget(local.pin()); - local.release_handle(); - } - } - - f() - } - - pub fn collector(&self) -> Option<&Collector> { - unsafe { self.local.as_ref().map(|local| local.collector()) } - } -} - -impl Drop for Guard { - #[inline] - fn drop(&mut self) { - if let Some(local) = unsafe { self.local.as_ref() } { - local.unpin(); - } - } -} - -impl fmt::Debug for Guard { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Guard { .. }") - } -} - -#[inline] -pub unsafe fn unprotected() -> &'static Guard { - struct GuardWrapper(Guard); - unsafe impl Sync for GuardWrapper {} - static UNPROTECTED: GuardWrapper = GuardWrapper(Guard { local: ptr::null() }); - &UNPROTECTED.0 -} diff --git a/doradb-index/src/epoch/internal.rs b/doradb-index/src/epoch/internal.rs deleted file mode 100644 index e30d014..0000000 --- a/doradb-index/src/epoch/internal.rs +++ /dev/null @@ -1,405 +0,0 @@ -use super::atomic::{Owned, Shared}; -use super::collector::{Collector, LocalHandle}; -use super::guard::{unprotected, Guard}; -use super::list::{Entry, IsElement}; -use super::list::{IterError, List}; -use super::queue::Queue; -use super::{AtomicEpoch, Epoch}; -use crossbeam_utils::CachePadded; -use memoffset::offset_of; -use std::cell::{Cell, UnsafeCell}; -use std::fmt; -use std::marker::PhantomData; -use std::mem::{self, ManuallyDrop, MaybeUninit}; -use std::num::Wrapping; -use std::ptr; -use std::sync::atomic::{self, Ordering}; - -const MAX_OBJECTS: usize = 64; - -pub(crate) struct Bag { - deferreds: [Deferred; MAX_OBJECTS], - len: usize, -} - -unsafe impl Send for Bag {} - -impl Bag { - pub(crate) fn new() -> Self { - Self::default() - } - - pub(crate) fn is_empty(&self) -> bool { - self.len == 0 - } - - pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> { - if self.len < MAX_OBJECTS { - self.deferreds[self.len] = deferred; - self.len += 1; - Ok(()) - } else { - Err(deferred) - } - } - - fn seal(self, epoch: Epoch) -> SealedBag { - SealedBag { epoch, _bag: self } - } -} - -impl Default for Bag { - fn default() -> Self { - Bag { - len: 0, - deferreds: [Deferred::NO_OP; MAX_OBJECTS], - } - } -} - -impl Drop for Bag { - fn drop(&mut self) { - for deferred in &mut self.deferreds[..self.len] { - let no_op = Deferred::NO_OP; - let owned_deferred = mem::replace(deferred, no_op); - owned_deferred.call(); - } - } -} - -impl fmt::Debug for Bag { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Bag") - .field("deferreds", &&self.deferreds[..self.len]) - .finish() - } -} - -#[derive(Default, Debug)] -struct SealedBag { - epoch: Epoch, - _bag: Bag, -} - -unsafe impl Sync for SealedBag {} - -impl SealedBag { - fn is_expired(&self, global_epoch: Epoch) -> bool { - global_epoch.wrapping_sub(self.epoch) >= 2 - } -} - -pub(crate) struct Global { - locals: List, - queue: Queue, - pub(crate) epoch: CachePadded, -} - -impl Global { - const COLLECT_STEPS: usize = 8; - - #[inline] - pub(crate) fn new() -> Self { - Self { - locals: List::new(), - queue: Queue::new(), - epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), - } - } - - pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) { - let bag = mem::replace(bag, Bag::new()); - atomic::fence(Ordering::SeqCst); - let epoch = self.epoch.load(Ordering::Relaxed); - self.queue.push(bag.seal(epoch), guard); - } - - #[cold] - pub(crate) fn collect(&self, guard: &Guard) { - let global_epoch = self.try_advance(guard); - let steps = Self::COLLECT_STEPS; - for _ in 0..steps { - match self.queue.try_pop_if( - |sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch), - guard, - ) { - None => break, - Some(sealed_bag) => drop(sealed_bag), - } - } - } - - #[cold] - pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch { - let global_epoch = self.epoch.load(Ordering::Relaxed); - atomic::fence(Ordering::SeqCst); - for local in self.locals.iter(guard) { - match local { - Err(IterError::Stalled) => { - return global_epoch; - } - Ok(local) => { - let local_epoch = local.epoch.load(Ordering::Relaxed); - if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch { - return global_epoch; - } - } - } - } - atomic::fence(Ordering::Acquire); - - let new_epoch = global_epoch.successor(); - self.epoch.store(new_epoch, Ordering::Release); - new_epoch - } -} - -pub(crate) struct Local { - entry: Entry, - epoch: AtomicEpoch, - collector: UnsafeCell>, - pub(crate) bag: UnsafeCell, - guard_count: Cell, - handle_count: Cell, - pin_count: Cell>, -} - -impl Local { - const PINNINGS_BETWEEN_COLLECT: usize = 128; - - pub(crate) fn register(collector: &Collector) -> LocalHandle { - unsafe { - let local = Owned::new(Local { - entry: Entry::default(), - epoch: AtomicEpoch::new(Epoch::starting()), - collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())), - bag: UnsafeCell::new(Bag::new()), - guard_count: Cell::new(0), - handle_count: Cell::new(1), - pin_count: Cell::new(Wrapping(0)), - }) - .into_shared(unprotected()); - collector.global.locals.insert(local, unprotected()); - LocalHandle { - local: local.as_raw(), - } - } - } - - #[inline] - pub(crate) fn collector(&self) -> &Collector { - let c = self.collector.get(); - unsafe { &*c } - } - - #[inline] - pub(crate) fn global(&self) -> &Global { - &self.collector().global - } - - #[inline] - pub(crate) fn is_pinned(&self) -> bool { - self.guard_count.get() > 0 - } - - pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) { - let bag = &mut *self.bag.get(); - while let Err(d) = bag.try_push(deferred) { - self.global().push_bag(bag, guard); - deferred = d; - } - } - - pub(crate) fn flush(&self, guard: &Guard) { - let bag = unsafe { &mut *self.bag.get() }; - if !bag.is_empty() { - self.global().push_bag(bag, guard); - } - self.global().collect(guard); - } - - #[inline] - pub(crate) fn pin(&self) -> Guard { - let guard = Guard { local: self }; - let guard_count = self.guard_count.get(); - self.guard_count.set(guard_count.checked_add(1).unwrap()); - if guard_count == 0 { - let global_epoch = self.global().epoch.load(Ordering::Relaxed); - let new_epoch = global_epoch.pinned(); - if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { - let current = Epoch::starting(); - let res = self.epoch.compare_exchange( - current, - new_epoch, - Ordering::SeqCst, - Ordering::SeqCst, - ); - debug_assert!(res.is_ok(), "participant was expected to be unpinned"); - atomic::compiler_fence(Ordering::SeqCst); - } else { - self.epoch.store(new_epoch, Ordering::Relaxed); - atomic::fence(Ordering::SeqCst); - } - let count = self.pin_count.get(); - self.pin_count.set(count + Wrapping(1)); - if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 { - self.global().collect(&guard); - } - } - guard - } - - #[inline] - pub(crate) fn unpin(&self) { - let guard_count = self.guard_count.get(); - self.guard_count.set(guard_count - 1); - if guard_count == 1 { - self.epoch.store(Epoch::starting(), Ordering::Release); - if self.handle_count.get() == 0 { - self.finalize(); - } - } - } - - #[inline] - pub(crate) fn repin(&self) { - let guard_count = self.guard_count.get(); - if guard_count == 1 { - let epoch = self.epoch.load(Ordering::Relaxed); - let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned(); - if epoch != global_epoch { - self.epoch.store(global_epoch, Ordering::Release); - } - } - } - - #[inline] - pub(crate) fn acquire_handle(&self) { - let handle_count = self.handle_count.get(); - debug_assert!(handle_count >= 1); - self.handle_count.set(handle_count + 1); - } - - #[inline] - pub(crate) fn release_handle(&self) { - let guard_count = self.guard_count.get(); - let handle_count = self.handle_count.get(); - debug_assert!(handle_count >= 1); - self.handle_count.set(handle_count - 1); - if guard_count == 0 && handle_count == 1 { - self.finalize(); - } - } - - #[cold] - fn finalize(&self) { - debug_assert_eq!(self.guard_count.get(), 0); - debug_assert_eq!(self.handle_count.get(), 0); - self.handle_count.set(1); - unsafe { - let guard = &self.pin(); - let bag = &mut *self.bag.get(); - self.global().push_bag(bag, guard); - } - self.handle_count.set(0); - unsafe { - let c = self.collector.get(); - let collector: Collector = ptr::read(&*(*c)); - self.entry.delete(unprotected()); - drop(collector); - } - } -} - -impl_sized_pointable!(Local); - -impl IsElement for Local { - fn entry_of(local: &Local) -> &Entry { - unsafe { - let entry_ptr = (local as *const Local as *const u8) - .add(offset_of!(Local, entry)) - .cast::(); - &*entry_ptr - } - } - - unsafe fn element_of(entry: &Entry) -> &Local { - let local_ptr = (entry as *const Entry as *const u8) - .sub(offset_of!(Local, entry)) - .cast::(); - &*local_ptr - } - - unsafe fn finalize(entry: &Entry, guard: &Guard) { - guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _)) - } -} - -const DATA_WORDS: usize = 3; -type Data = [usize; DATA_WORDS]; - -pub(crate) struct Deferred { - call: unsafe fn(*mut u8), - data: MaybeUninit, - _marker: PhantomData<*mut ()>, -} - -impl fmt::Debug for Deferred { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Deferred { .. }") - } -} - -impl Deferred { - pub(crate) const NO_OP: Self = { - fn no_op_call(_raw: *mut u8) {} - Self { - call: no_op_call, - data: MaybeUninit::uninit(), - _marker: PhantomData, - } - }; - - pub(crate) fn new(f: F) -> Self { - let size = mem::size_of::(); - let align = mem::align_of::(); - unsafe { - if size <= mem::size_of::() && align <= mem::align_of::() { - let mut data = MaybeUninit::::uninit(); - ptr::write(data.as_mut_ptr().cast::(), f); - - unsafe fn call(raw: *mut u8) { - let f: F = ptr::read(raw.cast::()); - f(); - } - - Deferred { - call: call::, - data, - _marker: PhantomData, - } - } else { - let b: Box = Box::new(f); - let mut data = MaybeUninit::::uninit(); - ptr::write(data.as_mut_ptr().cast::>(), b); - - unsafe fn call(raw: *mut u8) { - let b: Box = ptr::read(raw.cast::>()); - (*b)(); - } - - Deferred { - call: call::, - data, - _marker: PhantomData, - } - } - } - } - - #[inline] - pub(crate) fn call(mut self) { - let call = self.call; - unsafe { call(self.data.as_mut_ptr().cast::()) }; - } -} diff --git a/doradb-index/src/epoch/list.rs b/doradb-index/src/epoch/list.rs deleted file mode 100644 index fa52391..0000000 --- a/doradb-index/src/epoch/list.rs +++ /dev/null @@ -1,197 +0,0 @@ -use super::atomic::{Atomic, Pointable, Shared}; -use super::guard::{unprotected, Guard}; -use std::marker::PhantomData; -use std::sync::atomic::Ordering; - -#[derive(Debug)] -pub(crate) struct Entry { - next: Atomic, -} - -impl_sized_pointable!(Entry); - -pub(crate) trait IsElement { - fn entry_of(_: &T) -> &Entry; - - unsafe fn element_of(_: &Entry) -> &T; - - unsafe fn finalize(_: &Entry, _: &Guard); -} - -#[derive(Debug)] -pub(crate) struct List = T> { - head: Atomic, - _marker: PhantomData<(T, C)>, -} - -pub(crate) struct Iter<'g, T, C: IsElement> { - guard: &'g Guard, - pred: &'g Atomic, - curr: Shared<'g, Entry>, - head: &'g Atomic, - _marker: PhantomData<(&'g T, C)>, -} - -#[derive(PartialEq, Debug)] -pub(crate) enum IterError { - Stalled, -} - -impl Default for Entry { - fn default() -> Self { - Self { - next: Atomic::null(), - } - } -} - -impl Entry { - pub(crate) unsafe fn delete(&self, guard: &Guard) { - self.next.fetch_or(1, Ordering::Release, guard); - } -} - -impl> List { - pub(crate) fn new() -> Self { - Self { - head: Atomic::null(), - _marker: PhantomData, - } - } - - pub(crate) unsafe fn insert<'g>(&'g self, container: Shared<'g, T>, guard: &'g Guard) { - let to = &self.head; - let entry: &Entry = C::entry_of(container.deref()); - let entry_ptr = Shared::from(entry as *const _); - let mut next = to.load(Ordering::Relaxed, guard); - loop { - entry.next.store(next, Ordering::Relaxed); - match to.compare_exchange_weak( - next, - entry_ptr, - Ordering::Release, - Ordering::Relaxed, - guard, - ) { - Ok(_) => break, - Err(err) => next = err.current, - } - } - } - - pub(crate) fn iter<'g>(&'g self, guard: &'g Guard) -> Iter<'g, T, C> { - Iter { - guard, - pred: &self.head, - curr: self.head.load(Ordering::Acquire, guard), - head: &self.head, - _marker: PhantomData, - } - } -} - -impl> Drop for List { - fn drop(&mut self) { - unsafe { - let guard = unprotected(); - let mut curr = self.head.load(Ordering::Relaxed, guard); - while let Some(c) = curr.as_ref() { - let succ = c.next.load(Ordering::Relaxed, guard); - assert_eq!(succ.tag(), 1); - C::finalize(curr.deref(), guard); - curr = succ; - } - } - } -} - -impl<'g, T: 'g, C: IsElement> Iterator for Iter<'g, T, C> { - type Item = Result<&'g T, IterError>; - - fn next(&mut self) -> Option { - while let Some(c) = unsafe { self.curr.as_ref() } { - let succ = c.next.load(Ordering::Acquire, self.guard); - if succ.tag() == 1 { - let succ = succ.with_tag(0); - debug_assert!(self.curr.tag() == 0); - let succ = match self.pred.compare_exchange( - self.curr, - succ, - Ordering::Acquire, - Ordering::Acquire, - self.guard, - ) { - Ok(_) => { - unsafe { - C::finalize(self.curr.deref(), self.guard); - } - succ - } - Err(e) => e.current, - }; - if succ.tag() != 0 { - self.pred = self.head; - self.curr = self.head.load(Ordering::Acquire, self.guard); - return Some(Err(IterError::Stalled)); - } - self.curr = succ; - continue; - } - self.pred = &c.next; - self.curr = succ; - return Some(Ok(unsafe { C::element_of(c) })); - } - None - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::epoch::{self, Collector, Owned}; - use memoffset::offset_of; - - impl_sized_pointable!(A); - - #[derive(Default)] - struct A { - entry: Entry, - data: usize, - } - - impl IsElement for A { - fn entry_of(a: &A) -> &Entry { - let entry_ptr = ((a as *const A as usize) + offset_of!(A, entry)) as *const Entry; - unsafe { &*entry_ptr } - } - - unsafe fn element_of(entry: &Entry) -> &A { - let elem_ptr = ((entry as *const Entry as usize) - offset_of!(A, entry)) as *const A; - &*elem_ptr - } - - unsafe fn finalize(entry: &Entry, guard: &Guard) { - guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _)); - } - } - - #[test] - fn test_list() { - let collector = Collector::default(); - let handle = collector.register(); - let ls: List = List::new(); - let guard = handle.pin(); - - let elem1 = Owned::new(A::default()).into_shared(&guard); - let elem2 = Owned::new(A::default()).into_shared(&guard); - unsafe { - ls.insert(elem1, &guard); - ls.insert(elem2, &guard); - } - - unsafe { - A::entry_of(elem1.as_ref().unwrap()).delete(&guard); - A::entry_of(elem2.as_ref().unwrap()).delete(&guard); - } - } -} diff --git a/doradb-index/src/epoch/macros.rs b/doradb-index/src/epoch/macros.rs deleted file mode 100644 index dbf5f20..0000000 --- a/doradb-index/src/epoch/macros.rs +++ /dev/null @@ -1,46 +0,0 @@ -macro_rules! impl_sized_pointable { - ($ty:ident < $($pty:tt),+ >) => { - impl<$($pty),+> $crate::epoch::atomic::Pointable for $ty<$($pty),+> { - const ALIGN: usize = std::mem::align_of::(); - type Init = Self; - - unsafe fn init(init: Self) -> *mut () { - Box::into_raw(Box::new(init)).cast::<()>() - } - - unsafe fn deref<'a>(ptr: *mut ()) -> &'a Self { - &*(ptr as *const Self) - } - - unsafe fn deref_mut<'a>(ptr: *mut ()) -> &'a mut Self { - &mut *ptr.cast::() - } - - unsafe fn drop(ptr: *mut (), _: usize) { - drop(Box::from_raw(ptr.cast::())); - } - } - }; - ($ty:ty) => { - impl $crate::epoch::atomic::Pointable for $ty { - const ALIGN: usize = std::mem::align_of::<$ty>(); - type Init = Self; - - unsafe fn init(init: Self) -> *mut () { - Box::into_raw(Box::new(init)).cast::<()>() - } - - unsafe fn deref<'a>(ptr: *mut ()) -> &'a Self { - &*(ptr as *const Self) - } - - unsafe fn deref_mut<'a>(ptr: *mut ()) -> &'a mut Self { - &mut *ptr.cast::() - } - - unsafe fn drop(ptr: *mut (), _: usize) { - drop(Box::from_raw(ptr.cast::())); - } - } - } -} diff --git a/doradb-index/src/epoch/mod.rs b/doradb-index/src/epoch/mod.rs deleted file mode 100644 index 1c3d9c2..0000000 --- a/doradb-index/src/epoch/mod.rs +++ /dev/null @@ -1,131 +0,0 @@ -#[macro_use] -mod macros; -mod atomic; -mod collector; -mod guard; -mod internal; -mod list; -mod queue; - -mod sealed { - pub trait Sealed {} -} - -pub use atomic::{low_bits, Atomic, Inline, Owned, Pointable, PointerOrInline, Shared}; -pub use guard::{unprotected, Guard}; - -use collector::{Collector, LocalHandle}; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use once_cell::sync::Lazy; - -static COLLECTOR: Lazy = Lazy::new(Collector::default); - -thread_local! { - static HANDLE: LocalHandle = COLLECTOR.register(); -} - -#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] -pub struct Epoch { - data: usize, -} - -impl Epoch { - #[inline] - pub(crate) fn starting() -> Self { - Self::default() - } - - #[inline] - pub(crate) fn wrapping_sub(self, rhs: Self) -> isize { - self.data.wrapping_sub(rhs.data & !1) as isize >> 1 - } - - #[inline] - pub(crate) fn is_pinned(self) -> bool { - (self.data & 1) == 1 - } - - #[inline] - pub(crate) fn pinned(self) -> Self { - Epoch { - data: self.data | 1, - } - } - - #[inline] - pub(crate) fn unpinned(self) -> Self { - Epoch { - data: self.data & !1, - } - } - - #[inline] - pub(crate) fn successor(self) -> Self { - Epoch { - data: self.data.wrapping_add(2), - } - } -} - -#[derive(Debug, Default)] -pub(crate) struct AtomicEpoch { - data: AtomicUsize, -} - -impl AtomicEpoch { - #[inline] - pub(crate) fn new(epoch: Epoch) -> Self { - let data = AtomicUsize::new(epoch.data); - AtomicEpoch { data } - } - - #[inline] - pub(crate) fn load(&self, ord: Ordering) -> Epoch { - Epoch { - data: self.data.load(ord), - } - } - - #[inline] - pub(crate) fn store(&self, epoch: Epoch, ord: Ordering) { - self.data.store(epoch.data, ord) - } - - #[inline] - pub(crate) fn compare_exchange( - &self, - current: Epoch, - new: Epoch, - success: Ordering, - failure: Ordering, - ) -> Result { - match self - .data - .compare_exchange(current.data, new.data, success, failure) - { - Ok(data) => Ok(Epoch { data }), - Err(data) => Err(Epoch { data }), - } - } -} - -#[inline] -pub fn pin() -> Guard { - with_handle(|h| h.pin()) -} - -#[inline] -pub fn is_pinned() -> bool { - with_handle(|h| h.is_pinned()) -} - -#[inline] -fn with_handle(mut f: F) -> R -where - F: FnMut(&LocalHandle) -> R, -{ - HANDLE - .try_with(|h| f(h)) - .unwrap_or_else(|_| f(&COLLECTOR.register())) -} diff --git a/doradb-index/src/epoch/queue.rs b/doradb-index/src/epoch/queue.rs deleted file mode 100644 index 970f98c..0000000 --- a/doradb-index/src/epoch/queue.rs +++ /dev/null @@ -1,180 +0,0 @@ -use super::atomic::{Atomic, Owned, Shared}; -use super::guard::{unprotected, Guard}; -use crossbeam_utils::CachePadded; -use std::mem::MaybeUninit; -use std::sync::atomic::Ordering; - -pub(crate) struct Queue { - head: CachePadded>>, - tail: CachePadded>>, -} - -unsafe impl Sync for Queue {} -unsafe impl Send for Queue {} - -impl Queue { - pub(crate) fn new() -> Queue { - let q = Queue { - head: CachePadded::new(Atomic::null()), - tail: CachePadded::new(Atomic::null()), - }; - let sentinel = Owned::new(Node { - data: MaybeUninit::uninit(), - next: Atomic::null(), - }); - unsafe { - let guard = unprotected(); - let sentinel = sentinel.into_shared(guard); - q.head.store(sentinel, Ordering::Relaxed); - q.tail.store(sentinel, Ordering::Relaxed); - q - } - } - - #[inline(always)] - fn push_internal( - &self, - onto: Shared<'_, Node>, - new: Shared<'_, Node>, - guard: &Guard, - ) -> bool { - let o = unsafe { onto.deref() }; - let next = o.next.load(Ordering::Acquire, guard); - if unsafe { next.as_ref().is_some() } { - let _ = - self.tail - .compare_exchange(onto, next, Ordering::Release, Ordering::Relaxed, guard); - false - } else { - let result = o - .next - .compare_exchange( - Shared::null(), - new, - Ordering::Release, - Ordering::Relaxed, - guard, - ) - .is_ok(); - if result { - let _ = self.tail.compare_exchange( - onto, - new, - Ordering::Release, - Ordering::Relaxed, - guard, - ); - } - result - } - } - - pub(crate) fn push(&self, t: T, guard: &Guard) { - let new = Owned::new(Node { - data: MaybeUninit::new(t), - next: Atomic::null(), - }); - let new = new.into_shared(guard); - loop { - let tail = self.tail.load(Ordering::Acquire, guard); - if self.push_internal(tail, new, guard) { - break; - } - } - } - - #[inline(always)] - fn pop_if_internal(&self, condition: F, guard: &Guard) -> Result, ()> - where - // T: Sync, - F: Fn(&T) -> bool, - { - let head = self.head.load(Ordering::Acquire, guard); - let h = unsafe { head.deref() }; - let next = h.next.load(Ordering::Acquire, guard); - match unsafe { next.as_ref() } { - Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { - self.head - .compare_exchange(head, next, Ordering::Release, Ordering::Relaxed, guard) - .map(|_| { - let tail = self.tail.load(Ordering::Relaxed, guard); - if head == tail { - let _ = self.tail.compare_exchange( - tail, - next, - Ordering::Release, - Ordering::Relaxed, - guard, - ); - } - guard.defer_destroy(head); - Some(n.data.as_ptr().read()) - }) - .map_err(|_| ()) - }, - None | Some(_) => Ok(None), - } - } - - pub(crate) fn try_pop_if(&self, condition: F, guard: &Guard) -> Option - where - T: Sync, - F: Fn(&T) -> bool, - { - loop { - if let Ok(head) = self.pop_if_internal(&condition, guard) { - return head; - } - } - } - - pub(crate) fn try_pop(&self, guard: &Guard) -> Option { - loop { - if let Ok(head) = self.pop_if_internal(|_| true, guard) { - return head; - } - } - } -} - -impl Drop for Queue { - fn drop(&mut self) { - unsafe { - let guard = unprotected(); - - while self.try_pop(guard).is_some() {} - - // Destroy the remaining sentinel node. - let sentinel = self.head.load(Ordering::Relaxed, guard); - if let Some(node) = sentinel.try_into_owned() { - drop(node); - } - } - } -} - -struct Node { - data: MaybeUninit, - next: Atomic>, -} - -impl_sized_pointable!(Node); - -#[cfg(test)] -mod tests { - use super::Queue; - use crate::epoch; - - #[test] - fn test_queue() { - let queue = Queue::new(); - let guard = epoch::pin(); - queue.push(1, &guard); - queue.push(2, &guard); - drop(guard); - - let g2 = epoch::pin(); - assert!(queue.try_pop_if(|elem| *elem < 10, &g2).is_some()); - assert!(queue.try_pop_if(|elem| *elem > 10, &g2).is_none()); - } -} diff --git a/doradb-index/src/hot/key.rs b/doradb-index/src/hot/key.rs deleted file mode 100644 index 01d57a7..0000000 --- a/doradb-index/src/hot/key.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::mem; -use std::ops::Deref; - -const MAX_OWNED_KEY_LENGTH: usize = 15; - -/// Key can be either a reference of byte slice, -/// or an owned byte array with specified length. -#[derive(Clone, Copy)] -pub enum Key<'a> { - Ref(&'a [u8]), - Owned([u8; MAX_OWNED_KEY_LENGTH], u8), -} - -impl Deref for Key<'_> { - type Target = [u8]; - #[inline] - fn deref(&self) -> &[u8] { - match self { - Key::Ref(r) => r, - Key::Owned(data, len) => &data[..*len as usize], - } - } -} - -/// Specify how to extract key. -pub trait ExtractKey { - fn extract_key(&self) -> Key<'_>; -} - -impl ExtractKey for str { - #[inline] - fn extract_key(&self) -> Key<'_> { - Key::Ref(self.as_bytes()) - } -} - -impl ExtractKey for [u8] { - #[inline] - fn extract_key(&self) -> Key<'_> { - Key::Ref(self) - } -} - -macro_rules! impl_extract_key_for_int { - ($ty:ty) => { - impl ExtractKey for $ty { - #[inline] - fn extract_key(&self) -> Key<'_> { - let mut data = [0u8; MAX_OWNED_KEY_LENGTH]; - let len = mem::size_of::<$ty>(); - data[..len].copy_from_slice(&self.to_be_bytes()); - Key::Owned(data, len as u8) - } - } - }; -} - -impl_extract_key_for_int!(i64); -impl_extract_key_for_int!(u64); -impl_extract_key_for_int!(i32); -impl_extract_key_for_int!(u32); - -/// Specify how to extract TID. -/// TID stands for tuple id, which is unique identifier -/// of a record. -/// Sometimes, if the tuple contains only small values -/// that can be fit in 8 bytes, it can be directly -/// embedded in TID. -pub trait ExtractTID { - fn extract_tid(&self) -> u64; -} - -macro_rules! impl_extract_tid_for_int { - ($ty:ty) => { - impl ExtractTID for $ty { - #[inline] - fn extract_tid(&self) -> u64 { - *self as u64 - } - } - }; -} - -impl_extract_tid_for_int!(i32); -impl_extract_tid_for_int!(u32); - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_key_size() { - println!("size of Key is {}", std::mem::size_of::()); - } -} diff --git a/doradb-index/src/hot/mod.rs b/doradb-index/src/hot/mod.rs deleted file mode 100644 index bae4571..0000000 --- a/doradb-index/src/hot/mod.rs +++ /dev/null @@ -1,420 +0,0 @@ -#[macro_use] -mod node; -mod key; -mod node_impl; -mod partial_key; -mod value; -// mod insert; - -use self::key::{ExtractKey, ExtractTID, Key}; -use self::node::{NodeOps, NodeReadDataOps, NodeSearchOps, NodeSyncOps, NodeWriteDataOps}; -use self::value::ValueLoader; -use crate::epoch::{self, Atomic, Guard, Inline, Owned, Shared}; -use node::{NodeKind, NodeTemplate}; -use node_impl::SP8NodeMut; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::Ordering; - -#[allow(clippy::upper_case_acronyms)] -pub struct HOT { - root: Atomic, -} - -impl HOT { - /// Create an empty HOT. - #[inline] - pub fn new() -> Self { - HOT { - root: Atomic::from(NodeTemplate::empty()), - } - } - - #[inline] - pub fn lookup<'k, L: ValueLoader>( - &self, - key: Key<'k>, - loader: &L, - guard: &epoch::Guard, - ) -> Option { - let mut node = self.root.load(Ordering::Acquire, guard); - loop { - match node.kind() { - NodeKind::Empty => return None, // trie is empty. - NodeKind::Leaf => { - let value = loader.load_leaf(node); - let k = value.extract_key(); - return if *k == *key { - Some(value) - } else { - // key does not match - None - }; - } - _ => { - if let Some(res) = try_opt_read_node!(node, |_, n| { - n.search_partial_key(&key) - .map(|idx| n.value(idx).load(Ordering::Acquire, guard)) - }) { - if let Some(new) = res { - node = new; - } else { - return None; - } - } else { - // optimistic read failed: retry from root. - self.root.load(Ordering::Acquire, guard); - } - } - } - } - } - - #[inline] - pub fn insert>( - &self, - value: V, - loader: &L, - guard: &epoch::Guard, - ) -> bool { - let new_key = &*value.extract_key(); - let new_tid = value.extract_tid(); - loop { - match self.insert_internal(new_key, new_tid, loader, guard) { - InsertResult::Stalled => (), - InsertResult::Ok => return true, - InsertResult::Duplicated => return false, - } - } - } - - #[inline] - fn insert_internal( - &self, - new_key: &[u8], - new_tid: u64, - loader: &L, - guard: &epoch::Guard, - ) -> InsertResult { - let root = &self.root; - let node = root.load(Ordering::Acquire, guard); - match node.kind() { - NodeKind::Empty => { - // empty tree, just replace - let new = NodeTemplate::tid(new_tid); - match root.compare_exchange_weak( - node, - new, - Ordering::SeqCst, - Ordering::Relaxed, - guard, - ) { - Ok(_) => InsertResult::Ok, - Err(_) => InsertResult::Stalled, - } - } - NodeKind::Leaf => { - // single value - let new = - match BiNodeBuilder::from_leaf_and_new_key(node, new_key, 0, new_tid, loader) { - Some(bn) => bn.build(), - None => return InsertResult::Duplicated, - }; - match root.compare_exchange_weak( - node, - new, - Ordering::SeqCst, - Ordering::Relaxed, - guard, - ) { - Ok(_) => InsertResult::Ok, - Err(_) => InsertResult::Stalled, - } - } - _ => { - let mut stack = InsertStack::new(); // todo: thread local cache - self.insert_with_stack(node, new_key, new_tid, loader, &mut stack, guard) - } - } - } - - #[inline] - fn insert_with_stack<'g, L: ValueLoader>( - &self, - node: Shared<'g, NodeTemplate>, - new_key: &[u8], - new_tid: u64, - loader: &L, - stack: &mut InsertStack<'g>, - guard: &'g epoch::Guard, - ) -> InsertResult { - self.search_for_insert(new_key, stack, guard); - let depth = stack.len(); - let entry = &stack[depth - 1]; - debug_assert!(entry.node.kind() == NodeKind::Leaf); - // depth always greater than 0, because root leaf is examined before this method - let parent = &stack[depth - 2]; - let height = parent.node.height(); - if height > 1 { - // perform leaf push down because it won't increase tree height. - let new = match BiNodeBuilder::from_leaf_and_new_key( - node, - new_key, - parent.msb_absolute_idx, - new_tid, - loader, - ) { - Some(bn) => bn.build(), - None => return InsertResult::Duplicated, - }; - let node = parent.node; - with_write_lock_node!(node, |mut n| { - // it's ok to overwrite the original value. - n.value_mut(parent.index as usize) - .store(new, Ordering::Relaxed); - }); - return InsertResult::Ok; - } - // todo: normal insert, parent pull up, or create intermediate node. - todo!() - } - - #[inline] - fn search_for_insert<'g>(&self, key: &[u8], stack: &mut InsertStack<'g>, guard: &'g Guard) { - let mut curr = self.root.load(Ordering::Acquire, guard); - loop { - match curr.kind() { - NodeKind::Empty => unreachable!(), - NodeKind::Leaf => break, - _ => { - if let Some((new, entry)) = try_opt_read_node!(curr, |version, n| { - let index = n.search_partial_key(key).unwrap(); - let entry = InsertStackEntry { - version, - node: curr, - index: index as u8, - msb_absolute_idx: n.msb_offset(), - }; - let shared = n.value(index).load(Ordering::Acquire, guard); - (shared, entry) - }) { - curr = new; - stack.push(entry); - } else { - // retry from root - curr = self.root.load(Ordering::Acquire, guard); - stack.clear(); - } - } - } - } - stack.push(InsertStackEntry { - version: 0, // leaf does not have version. - node: curr, - index: 0, - msb_absolute_idx: !0, - }); - } -} - -// const U8_WITH_MOST_SIGNIFICANT_BIT: u8 = 0b1000_0000; -const U64_WITH_MOST_SIGNIFICANT_BIT: u64 = 0x8000_0000_0000_0000; - -enum InsertResult { - Ok, - Stalled, - Duplicated, -} - -struct BiNodeBuilder<'g> { - height: u8, - msb_absolute_idx: u16, - curr_byte: u8, - new_byte: u8, - curr_node: Shared<'g, NodeTemplate>, - new_node: Inline, -} - -impl<'g> BiNodeBuilder<'g> { - /// Create a new node by given leaf and new key. - /// The returned node contains only two values, so called BiNode. - #[inline] - fn from_leaf_and_new_key( - leaf: Shared<'g, NodeTemplate>, - new_key: &[u8], - msb_absolute_idx: u16, - new_tid: u64, - loader: &L, - ) -> Option { - let curr = loader.load_leaf(leaf); - let curr_key = &*curr.extract_key(); - let start_byte_idx = msb_absolute_idx as usize / 8; - - // todo: two keys may have different length, - // we always extend the shorter key with sequence of 0x00 at the end. - // There are some consequences: - // 1. We cannot identify keys followed by different number of 0x00. - // 2. key may have not enough bytes, according to msb_absolute_idx, we - // also have to extend it. - for (i, (nb, cb)) in new_key[start_byte_idx..] - .iter() - .zip(&curr_key[start_byte_idx..]) - .enumerate() - { - if nb != cb { - let new_msb_absolute_idx = - (start_byte_idx + i) * 8 + (nb ^ cb).leading_zeros() as usize; - let bi_node = BiNodeBuilder { - height: 1, - msb_absolute_idx: new_msb_absolute_idx as u16, - curr_byte: *cb, - new_byte: *nb, - curr_node: leaf, - new_node: NodeTemplate::tid(new_tid), - }; - return Some(bi_node); - } - } - None - } - - #[inline] - fn build(self) -> Owned { - unsafe { - let mut node = NodeTemplate::new(self.height, 2, NodeKind::SP8); - let mut sp8node = SP8NodeMut::from(&mut node); - // initialize offset and single mask. - sp8node.set_msb_offset(self.msb_absolute_idx); - sp8node.set_mask(self.bit_mask()); - // initialize partial keys - // for two-entry node, the first partial key must be 0, - // and second must be 1. - sp8node.set_partial_key(0, 0); - sp8node.set_partial_key(1, 1); - // initialize values - sp8node - .value_mut_unchecked(0) - .write(Atomic::from(self.curr_node.into_inline())); - sp8node - .value_mut_unchecked(1) - .write(Atomic::from(self.new_node)); - node - } - } - - #[inline] - fn bit_mask(&self) -> u64 { - U64_WITH_MOST_SIGNIFICANT_BIT >> (self.curr_byte ^ self.new_byte).leading_zeros() - } -} - -pub struct InsertStack<'g> { - inner: Vec>, -} - -impl<'g> Deref for InsertStack<'g> { - type Target = Vec>; - - #[inline] - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl<'g> DerefMut for InsertStack<'g> { - #[inline] - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -impl<'g> InsertStack<'g> { - #[inline] - fn new() -> Self { - InsertStack { inner: Vec::new() } - } -} - -impl Drop for HOT { - fn drop(&mut self) { - unsafe { - let guard = epoch::unprotected(); - drop_subtree(&self.root, guard); - } - } -} - -unsafe fn drop_subtree(root: &Atomic, guard: &Guard) { - let node = root.load(Ordering::Relaxed, guard); - match node.kind() { - NodeKind::Empty | NodeKind::Leaf => (), - _ => { - unchecked_read_node!(node, |n| { - for i in 0..n.n_values() { - let c = n.value(i); - drop_subtree(c, guard); - } - }); - } - } - guard.defer_destroy(node); -} - -#[derive(Clone, Copy)] -pub struct InsertStackEntry<'g> { - // version of current node. - version: u64, - // node along the insert path. - // This is the shared copy of original node pointer, - // we if we want to change its content, we cannot perform CAS. - // we have to find its parent, perform write lock, and update its - // value at specific position. - node: Shared<'g, NodeTemplate>, - // index of the return entry. - index: u8, - // absolute position of most significant bit. - // discriminative byte index. - msb_absolute_idx: u16, -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::hot::node::{NodeReadDataOps, NodeSyncOps}; - use crate::hot::value::EmbeddedU32; - - #[test] - fn test_leading_zeros() { - for (a, b) in [(1u8, 2u8), (128, 0), (10, 10)] { - let v = a ^ b; - println!("a={}, b={}, a^b={}, clz={}", a, b, v, v.leading_zeros()); - } - } - - #[test] - fn test_hot_insert_ops() { - let hot = HOT::new(); - let guard = unsafe { epoch::unprotected() }; - let node = hot.root.load(Ordering::Acquire, &guard); - assert_eq!(NodeKind::Empty, node.kind()); - // test insert empty - hot.insert(1u32, &EmbeddedU32, &guard); - let node2 = hot.root.load(Ordering::Acquire, &guard); - assert_eq!(NodeKind::Empty, node.kind()); - assert_eq!(NodeKind::Leaf, node2.kind()); - // test insert leaf - hot.insert(2u32, &EmbeddedU32, &guard); - let node3 = hot.root.load(Ordering::Acquire, &guard); - assert_eq!(NodeKind::SP8, node3.kind()); - - try_opt_read_node!(node3, |_, n| { - assert_eq!(0, n.partial_key(0)); - assert_eq!(1, n.partial_key(1)); - }); - assert_eq!(Some(1), hot.lookup(1u32.extract_key(), &EmbeddedU32, guard)); - assert_eq!(Some(2), hot.lookup(2u32.extract_key(), &EmbeddedU32, guard)); - assert_eq!(None, hot.lookup(4u32.extract_key(), &EmbeddedU32, guard)); - assert_eq!(None, hot.lookup(5u32.extract_key(), &EmbeddedU32, guard)); - // test insert sp8 - // hot.insert(3u32, &EmbeddedU32, &guard); - } -} diff --git a/doradb-index/src/hot/node.rs b/doradb-index/src/hot/node.rs deleted file mode 100644 index 078a18e..0000000 --- a/doradb-index/src/hot/node.rs +++ /dev/null @@ -1,412 +0,0 @@ -use super::node_impl::{SP16Node, SP32Node, SP8NodeRef}; -use crate::epoch::{self, low_bits, Atomic, Inline, Owned, Pointable, Shared}; -// use super::partial_key::NodePartialKeyOps; -// use super::value::NodeValueOps; -use parking_lot::lock_api::RawRwLock as RawRwLockAPI; -use parking_lot::RawRwLock; -use scopeguard::defer; -use std::alloc; -use std::cell::UnsafeCell; -use std::fmt; -use std::mem::{self, MaybeUninit}; -use std::sync::atomic::{AtomicU64, AtomicU8, Ordering}; - -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum NodeKind { - Empty = 0, - Leaf = 1, - SP8 = 2, - SP16 = 3, - SP32 = 4, - MP8 = 5, - MP16 = 6, - MP32 = 7, -} - -impl NodeKind { - #[inline] - pub fn mask_size(self) -> usize { - match self { - NodeKind::Empty | NodeKind::Leaf => 0, - NodeKind::SP8 | NodeKind::SP16 | NodeKind::SP32 => mem::size_of::(), // u16 offset(embedded in header) + u64 mask - NodeKind::MP8 => 8 * mem::size_of::() + 8 * mem::size_of::(), // 8 * u16 offset + 8 * u8 mask - NodeKind::MP16 => 16 * mem::size_of::() + 16 * mem::size_of::(), // 16 * u16 offset + 16 * u16 mask - NodeKind::MP32 => 32 * mem::size_of::() + 32 * mem::size_of::(), // 32 * u16 offset + 32 * u32 mask - } - } -} - -impl From for NodeKind { - #[inline] - fn from(src: u8) -> Self { - match src { - 0 => NodeKind::Empty, - 1 => NodeKind::Leaf, - 2 => NodeKind::SP8, - 3 => NodeKind::SP16, - 4 => NodeKind::SP32, - 5 => NodeKind::MP8, - 6 => NodeKind::MP16, - 7 => NodeKind::MP32, - v => panic!("invalid node kind {}", v), - } - } -} - -/// Node with header and data. -/// The actual data is dynamic sized and allocated via construction. -/// Convert [`Node`] to [`super::node_impl::NodeImpl`] to gain full functionality. -/// align is set to 8 bytes, because we use 3 lowest bits as node type, -/// and allow plain value shifted to store in the pointer. -#[repr(C, align(8))] -pub struct NodeTemplate { - /* first word */ - // version to support optimistic read. - pub(super) version: AtomicU64, - - /* second word */ - // lock for write access. - pub(super) lock: RawRwLock, - - /* third word */ - // 65535 bytes is big enough for all types of nodes. - pub(super) data_len: u16, - // height of current node. - pub(super) height: u8, - // number of values in current node. - pub(super) n_values: u8, - // padding to align to single word - pub(super) padding: UnsafeCell<[u8; 4]>, - - /* fourth word */ - // The actual data is allocated just after the 0-sized data array. - pub(super) data: UnsafeCell<[u8; 0]>, -} - -impl NodeTemplate { - /// Create a new node with given height, number of values and node kind. - #[inline] - pub unsafe fn new(height: u8, n_values: u8, kind: NodeKind) -> Owned { - debug_assert!( - kind != NodeKind::Empty && kind != NodeKind::Leaf, - "Invalid node kind" - ); - debug_assert!(n_values <= 32, "Number of values exceeds limitation"); - let size_of_partial_keys = match kind { - NodeKind::SP8 => n_values as usize, - NodeKind::SP16 => 2 * n_values as usize, - NodeKind::SP32 => 4 * n_values as usize, - _ => todo!(), - }; - let data_len = kind.mask_size() // offset and mask - + size_of_partial_keys // partial keys - ; - let data_len = ((data_len + 7) & !7) // must align to 8 bytes - + mem::size_of::() * n_values as usize // values - ; - debug_assert!(data_len <= 65535); // within bound of u16 - let data_len = data_len as u16; - let mut node = Owned::::new_dyn(data_len); - node.version.store(0, Ordering::Relaxed); - node.lock = RawRwLock::INIT; - node.data_len = data_len; - node.height = height; - node.n_values = n_values; - node.with_tag(kind as usize) - } - - /// Create a new tuple id. - #[inline] - pub fn tid(tid: u64) -> Inline { - debug_assert!(tid.leading_zeros() >= ::ALIGN.trailing_zeros()); - Inline::new(tid as usize).with_tag(NodeKind::Leaf as usize) - } - - #[inline] - pub fn empty() -> Inline { - Inline::new(0).with_tag(NodeKind::Empty as usize) - } -} - -pub trait NodeOps { - /// Returns reference of common template. - fn tmpl(&self) -> &NodeTemplate; - - /* Common operations */ - - #[inline] - fn height(&self) -> u8 { - self.tmpl().height - } - - #[inline] - fn n_values(&self) -> usize { - self.tmpl().n_values as usize - } -} - -pub trait NodeSyncOps: NodeOps { - type TargetMut; - - unsafe fn as_mut(&self) -> Self::TargetMut; - - /// Returns current version. - #[inline] - fn version(&self, order: Ordering) -> u64 { - self.tmpl().version.load(order) - } - - /// Increase the version. - #[inline] - fn promote_version(&self, order: Ordering) -> u64 { - self.tmpl().version.fetch_add(1, order) - } - - /// Check whether write lock is acquired. - #[inline] - fn is_locked_exclusive(&self) -> bool { - self.tmpl().lock.is_locked_exclusive() - } - - /// Acquire a write lock. - #[inline] - fn lock_exclusive(&self) { - self.tmpl().lock.lock_exclusive() - } - - /// Acquire a read lock. - #[inline] - fn lock_shared(&self) { - self.tmpl().lock.lock_shared() - } - - /// Release a write lock. - #[inline] - unsafe fn unlock_exclusive(&self) { - self.tmpl().lock.unlock_exclusive() - } - - /// Release a read lock. - #[inline] - unsafe fn unlock_shared(&self) { - self.tmpl().lock.unlock_shared() - } - - /// Try optimistic read and return result if succeeds. - fn try_opt_read<'s, U, F>(&'s self, f: F) -> Option - where - U: 's, - F: Fn(u64, &Self) -> U, - { - if self.is_locked_exclusive() { - return None; - } - let pre_ver = self.version(Ordering::Acquire); - let res = f(pre_ver, self); - if self.is_locked_exclusive() || pre_ver != self.version(Ordering::Acquire) { - return None; - } - Some(res) - } - - fn unchecked_read(&self, f: F) -> U - where - F: Fn(&Self) -> U, - Self: Sized, - { - f(self) - } - - fn with_read_lock(&self, f: F) -> (u64, U) - where - F: FnOnce(&Self) -> U, - Self: Sized, - { - self.lock_shared(); - defer!(unsafe { self.unlock_shared() }); - - let prev_ver = self.version(Ordering::Relaxed); - let res = f(self); - (prev_ver, res) - } - - fn with_write_lock(&self, f: F) -> u64 - where - F: FnOnce(Self::TargetMut), - Self: Sized, - { - self.lock_exclusive(); - defer!(unsafe { self.unlock_exclusive() }); - // SAFETY: - // - // This is safe because we already acquired write lock on this node. - let tgt = unsafe { self.as_mut() }; - f(tgt); - self.promote_version(Ordering::SeqCst) - } -} - -pub trait NodeReadDataOps { - type PartialKey: Copy; - - fn partial_key(&self, index: usize) -> Self::PartialKey; - - fn value(&self, index: usize) -> &Atomic; -} - -pub trait NodeWriteDataOps: NodeReadDataOps { - fn set_partial_key(&mut self, index: usize, key: Self::PartialKey); - - fn value_mut(&mut self, index: usize) -> &mut Atomic; - - unsafe fn value_mut_unchecked( - &mut self, - index: usize, - ) -> &mut MaybeUninit>; -} - -pub trait NodeSearchOps: NodeReadDataOps { - fn extract_partial_key(&self, input: &[u8]) -> Self::PartialKey; - - /// Search given key in partial keys, and return its index if found. - fn search_partial_key(&self, input: &[u8]) -> Option; -} - -macro_rules! unchecked_read_node { - ($id:ident, $e:expr) => { - match $id.kind() { - NodeKind::Empty | NodeKind::Leaf => panic!("invalid node type {:?}", $id.kind()), - NodeKind::SP8 => { - let sp8node = $crate::hot::node_impl::SP8NodeRef::from($id); - sp8node.unchecked_read($e) - } - _ => todo!(), - } - }; -} - -macro_rules! try_opt_read_node { - ($id:ident, $e:expr) => { - match $id.kind() { - NodeKind::Empty | NodeKind::Leaf => panic!("invalid node type {:?}", $id.kind()), - NodeKind::SP8 => { - let sp8node = unsafe { $crate::hot::node_impl::SP8NodeRef::from($id) }; - sp8node.try_opt_read($e) - } - _ => todo!(), - } - }; -} - -macro_rules! with_read_lock_node { - ($id:ident, $e:expr) => { - match $id.kind() { - NodeKind::Empty | NodeKind::Leaf => panic!("invalid node type {:?}", $id.kind()), - NodeKind::SP8 => { - let sp8node = unsafe { $crate::hot::node_impl::SP8NodeRef::from($id) }; - sp8node.with_read_lock($e) - } - _ => todo!(), - } - }; -} - -macro_rules! with_write_lock_node { - ($id:ident, $e:expr) => { - match $id.kind() { - NodeKind::Empty | NodeKind::Leaf => panic!("invalid node type {:?}", $id.kind()), - NodeKind::SP8 => { - let sp8node = unsafe { $crate::hot::node_impl::SP8NodeRef::from($id) }; - sp8node.with_write_lock($e) - } - _ => todo!(), - } - }; -} - -impl Shared<'_, NodeTemplate> { - #[inline] - pub fn height(&self) -> u8 { - match self.kind() { - NodeKind::Empty | NodeKind::Leaf => 0, - _ => unsafe { self.deref().height }, - } - } - - /// Returns whether the node is leaf. - #[inline] - pub fn kind(&self) -> NodeKind { - NodeKind::from(self.tag() as u8) - } - - #[inline] - pub fn tid(&self) -> u64 { - let (p, _) = self.decompose(); - p as u64 >> 3 - } -} - -impl Pointable for NodeTemplate { - const ALIGN: usize = mem::align_of::(); - type Init = u16; - - unsafe fn init(data_len: u16) -> *mut () { - let size = mem::size_of::() + data_len as usize; - let align = mem::align_of::(); - let layout = alloc::Layout::from_size_align(size, align).unwrap(); - let ptr = alloc::alloc(layout).cast::(); - if ptr.is_null() { - alloc::handle_alloc_error(layout); - } - ptr.cast() - } - - unsafe fn deref<'a>(ptr: *mut ()) -> &'a Self { - &*(ptr as *const Self) - } - - unsafe fn deref_mut<'a>(ptr: *mut ()) -> &'a mut Self { - &mut *(ptr as *mut Self) - } - - unsafe fn drop(ptr: *mut (), tag: usize) { - // check the tag to identify the node kind. - // If node is a branch node, drop its children first. - match NodeKind::from(tag as u8) { - NodeKind::Empty | NodeKind::Leaf => return, - // There is no responsibility of parent to drop children. - // The trie implementation should take care of recursive logic. - _ => (), - } - let node = &*ptr.cast::(); - let size = mem::size_of::() + node.data_len as usize; - let align = mem::align_of::(); - let layout = alloc::Layout::from_size_align(size, align).unwrap(); - alloc::dealloc(ptr.cast::(), layout); - } -} - -impl fmt::Debug for NodeTemplate { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Node") - .field("data_len", &self.data_len) - .field("height", &self.height) - .field("n_values", &self.n_values) - .finish() - } -} - -impl Owned { - /// Returns whether the node is leaf. - #[inline] - pub fn kind(&self) -> NodeKind { - NodeKind::from(self.tag() as u8) - } - - #[inline] - pub fn tid(&self) -> u64 { - let (p, _) = self.decompose(); - p as u64 >> 3 - } -} diff --git a/doradb-index/src/hot/node_impl.rs b/doradb-index/src/hot/node_impl.rs deleted file mode 100644 index 0fbe2f9..0000000 --- a/doradb-index/src/hot/node_impl.rs +++ /dev/null @@ -1,353 +0,0 @@ -use super::node::{ - NodeKind, NodeOps, NodeReadDataOps, NodeSearchOps, NodeSyncOps, NodeTemplate, NodeWriteDataOps, -}; -use crate::epoch::{Atomic, Owned, Shared}; -use crate::hot::partial_key::PartialKey; -use std::arch::x86_64::_pext_u64; -use std::marker::PhantomData; -use std::mem::{self, MaybeUninit}; -use std::ptr; - -#[repr(C, align(8))] -pub struct SingleMaskNodeRef<'a, P: PartialKey> { - tmpl: &'a NodeTemplate, - _marker: PhantomData

, -} - -#[repr(C, align(8))] -pub struct SingleMaskNodeMut<'a, P: PartialKey> { - tmpl: &'a mut NodeTemplate, - _marker: PhantomData

, -} - -impl SingleMaskNodeRef<'_, P> { - #[inline] - pub fn data_ptr(&self) -> *mut u8 { - self.tmpl.data.get() as *mut u8 - } - - #[inline] - pub fn mask_ptr(&self) -> *mut u64 { - self.data_ptr() as *mut u64 - } - - #[inline] - fn partial_keys_ptr(&self) -> *mut u8 { - unsafe { self.data_ptr().add(8) } - } - - #[inline] - fn values_ptr(&self) -> *mut u8 { - let n_partial_keys_bytes = mem::size_of::

() * self.n_values(); - let values_offset = 8 + n_partial_keys_bytes; - let aligned_offset = (values_offset + 7) & !7; - unsafe { self.data_ptr().add(aligned_offset) } - } - - #[inline] - fn msb_offset_ptr(&self) -> *mut u16 { - self.tmpl.padding.get() as *mut u16 - } - - /// Offset is 2-byte integer which is embedded in header. - #[inline] - pub fn msb_offset(&self) -> u16 { - let p = self.msb_offset_ptr() as *const u16; - unsafe { p.read() } - } - - #[inline] - fn mask(&self) -> u64 { - unsafe { - let p = self.mask_ptr() as *const u8 as *const u64; - p.read() - } - } -} - -#[allow(dead_code)] -impl SingleMaskNodeMut<'_, P> { - #[inline] - pub fn to_ref(&self) -> SingleMaskNodeRef<'_, P> { - SingleMaskNodeRef { - tmpl: self.tmpl, - _marker: PhantomData, - } - } - - #[inline] - pub fn data_ptr(&self) -> *mut u8 { - self.to_ref().data_ptr() - } - - #[inline] - pub fn mask_ptr(&self) -> *mut u64 { - self.to_ref().mask_ptr() - } - - #[inline] - fn partial_keys_ptr(&self) -> *mut u8 { - self.to_ref().partial_keys_ptr() - } - - #[inline] - fn values_ptr(&self) -> *mut u8 { - self.to_ref().values_ptr() - } - - #[inline] - pub fn msb_offset(&self) -> u16 { - self.to_ref().msb_offset() - } - - #[inline] - pub fn set_msb_offset(&mut self, msb_offset: u16) { - let p = self.to_ref().msb_offset_ptr(); - unsafe { p.write(msb_offset) } - } - - #[inline] - fn mask(&self) -> u64 { - self.to_ref().mask() - } - - #[inline] - pub fn set_mask(&mut self, mask: u64) { - let p = self.to_ref().mask_ptr(); - unsafe { p.write(mask) } - } -} - -impl NodeOps for SingleMaskNodeRef<'_, P> { - #[inline] - fn tmpl(&self) -> &NodeTemplate { - self.tmpl - } -} - -impl<'g, P: PartialKey> NodeSyncOps for SingleMaskNodeRef<'g, P> { - type TargetMut = SingleMaskNodeMut<'g, P>; - - #[allow(clippy::cast_ref_to_mut)] - #[inline] - unsafe fn as_mut(&self) -> Self::TargetMut { - let tmpl = &mut *(self.tmpl as *const _ as *mut _); - SingleMaskNodeMut { - tmpl, - _marker: PhantomData, - } - } -} - -impl NodeOps for SingleMaskNodeMut<'_, P> { - #[inline] - fn tmpl(&self) -> &NodeTemplate { - self.tmpl - } -} - -impl NodeReadDataOps for SingleMaskNodeRef<'_, P> { - type PartialKey = P; - - fn partial_key(&self, index: usize) -> Self::PartialKey { - debug_assert!(index < self.n_values()); - let p = self.partial_keys_ptr() as *const Self::PartialKey; - unsafe { *p.add(index) } - } - - fn value(&self, index: usize) -> &Atomic { - debug_assert!(index < self.n_values()); - let p = self.values_ptr() as *const Atomic; - unsafe { &*p.add(index) } - } -} - -impl NodeReadDataOps for SingleMaskNodeMut<'_, P> { - type PartialKey = P; - - fn partial_key(&self, index: usize) -> Self::PartialKey { - self.to_ref().partial_key(index) - } - - fn value(&self, index: usize) -> &Atomic { - let p = self.values_ptr() as *const Atomic; - unsafe { &*p.add(index) } - } -} - -impl NodeWriteDataOps for SingleMaskNodeMut<'_, P> { - fn set_partial_key(&mut self, index: usize, key: Self::PartialKey) { - debug_assert!(index < self.n_values()); - let p = self.partial_keys_ptr() as *mut Self::PartialKey; - unsafe { p.add(index).write(key) } - } - - fn value_mut(&mut self, index: usize) -> &mut Atomic { - debug_assert!(index < self.n_values()); - unsafe { self.value_mut_unchecked(index).assume_init_mut() } - } - - unsafe fn value_mut_unchecked( - &mut self, - index: usize, - ) -> &mut MaybeUninit> { - let p = self.values_ptr() as *mut MaybeUninit>; - &mut *p.add(index) - } -} - -impl NodeSearchOps for SingleMaskNodeRef<'_, P> { - fn extract_partial_key(&self, input: &[u8]) -> Self::PartialKey { - let start = self.msb_offset() as usize / 8; - let len = (input.len() - start).min(8); - // partial key must reside within 8-bytes, - // so we fix the key size with 8. - let mut key_u64 = [0u8; 8]; - key_u64[..len].copy_from_slice(&input[start..start + len]); - let mask = self.mask(); - unsafe { - let key = _pext_u64(u64::from_be_bytes(key_u64), mask); - P::cast_from(key) - } - } - - /// Search given key in partial keys, and return its index if found. - fn search_partial_key(&self, input: &[u8]) -> Option { - let partial_key = self.extract_partial_key(input); - let step = 32 / mem::size_of::

(); - let mut i = 0; - let mut keys_ptr = self.partial_keys_ptr() as *const P; - unsafe { - let end_ptr = keys_ptr.add(self.n_values()); - while keys_ptr.add(step) <= end_ptr { - let match_mask = partial_key.mm256_search(keys_ptr); - // if we always make keys continugous, we do not need the entries mask. - // let res_mask = (match_mask << i) & self.header.used_entries_mask; - if match_mask != 0 { - return Some(match_mask.trailing_zeros() as usize + i); - } - keys_ptr = keys_ptr.add(step); - i += step; - } - // use common comparison for remained keys - while keys_ptr < end_ptr { - let pk = *keys_ptr; - if pk == partial_key { - return Some(i); - } - keys_ptr = keys_ptr.add(1); - i += 1; - } - None - } - } -} - -impl Drop for SingleMaskNodeRef<'_, P> { - fn drop(&mut self) { - for i in 0..self.n_values() { - unsafe { - ptr::drop_in_place(self.value(i as usize) as *const _ as *mut Atomic) - } - } - } -} - -/// SingleMask + u8 partial key array. -/// header + u16 offset + u64 mask + n * u8 partial keys + n * u64 values -pub type SP8NodeRef<'a> = SingleMaskNodeRef<'a, u8>; - -impl<'g> SP8NodeRef<'g> { - // This method is unsafe because the returned value can read underlying - // fields without synchronization. - #[inline] - pub unsafe fn from(shared: Shared<'g, NodeTemplate>) -> Self { - debug_assert_eq!(shared.kind(), NodeKind::SP8); - let tmpl = shared.deref(); - Self { - tmpl, - _marker: PhantomData, - } - } -} - -pub type SP8NodeMut<'a> = SingleMaskNodeMut<'a, u8>; - -#[allow(dead_code)] -impl<'a> SP8NodeMut<'a> { - #[inline] - pub fn from(owned: &'a mut Owned) -> Self { - debug_assert_eq!(owned.kind(), NodeKind::SP8); - let tmpl = &mut **owned; - Self { - tmpl, - _marker: PhantomData, - } - } -} - -pub type SP16Node<'a> = SingleMaskNodeRef<'a, u16>; - -pub type SP32Node<'a> = SingleMaskNodeRef<'a, u32>; - -#[cfg(test)] -mod tests { - use super::*; - use crate::epoch; - use crate::hot::node::NodeKind; - use crate::hot::node::NodeTemplate; - use memoffset::offset_of; - use std::sync::atomic::Ordering; - - #[test] - fn test_node_impl_size_and_align() { - println!("size of SingleMaskU8Node={}", mem::size_of::()); - println!("size of SingleMaskU16Node={}", mem::size_of::()); - println!("size of SingleMaskU32Node={}", mem::size_of::()); - - let offset_tmpl = offset_of!(NodeTemplate, data); - assert!(offset_tmpl % 8 == 0); - } - - #[test] - fn test_node_impl_new() { - let guard = epoch::pin(); - - let mut node = unsafe { NodeTemplate::new(1, 2, NodeKind::SP8) }; - assert_eq!(NodeKind::SP8, node.kind()); - { - let mut node = unsafe { SP8NodeMut::from(&mut node) }; - assert_eq!(1, node.height()); - assert_eq!(2, node.n_values()); - - node.set_partial_key(1, 10); - node.set_partial_key(0, 20); - - assert_eq!(20, node.partial_key(0)); - assert_eq!(10, node.partial_key(1)); - - unsafe { - node.value_mut_unchecked(0) - .write(Atomic::from(NodeTemplate::tid(100))); - node.value_mut_unchecked(1) - .write(Atomic::from(NodeTemplate::tid(200))); - } - - let v0 = node.value(0).load(Ordering::Relaxed, &guard).tid(); - assert_eq!(100, v0); - let v1 = node.value(1).load(Ordering::Relaxed, &guard).tid(); - assert_eq!(200, v1); - } - assert_eq!(2, node.tag()); - let node = node.with_tag(3); - assert_eq!(3, node.tag()); - let node = node.with_tag(2); - assert_eq!(2, node.tag()); - } - - #[test] - fn test_node_impl_tid() { - let node = NodeTemplate::tid(123); - assert_eq!(123, node.value()) - } -} diff --git a/doradb-index/src/hot/partial_key.rs b/doradb-index/src/hot/partial_key.rs deleted file mode 100644 index 5e17a8c..0000000 --- a/doradb-index/src/hot/partial_key.rs +++ /dev/null @@ -1,149 +0,0 @@ -use super::node::NodeKind; -use std::arch::x86_64::{ - _mm256_cmpeq_epi16, _mm256_cmpeq_epi32, _mm256_cmpeq_epi8, _mm256_loadu_si256, - _mm256_movemask_epi8, _mm256_set1_epi16, _mm256_set1_epi32, _mm256_set1_epi8, _pext_u32, -}; - -const PEXT_MASK_U16_FROM_U8: u32 = 0b1010_1010_1010_1010_1010_1010_1010_1010; -const PEXT_MASK_U32_FROM_U8: u32 = 0b1000_1000_1000_1000_1000_1000_1000_1000; - -pub trait PartialKey: Sized + Copy + PartialEq { - fn cast_from(src: u64) -> Self; - - /// Search partial key in given key array, and return matched mask. - /// Note: the size of given key array must be greater or equal to 256 bits. - /// and only first 256 bits will be searched. - unsafe fn mm256_search(self, keys_ptr: *const Self) -> u32; - - fn kind(single: bool) -> NodeKind; -} -impl PartialKey for u8 { - #[inline] - fn cast_from(src: u64) -> Self { - src as Self - } - - #[inline] - unsafe fn mm256_search(self, keys_ptr: *const u8) -> u32 { - let sparse_keys = _mm256_loadu_si256(keys_ptr as *const _); - let key = _mm256_set1_epi8(self as i8); - // let sel_bits = _mm256_and_si256(sparse_keys, key); - let match_keys = _mm256_cmpeq_epi8(key, sparse_keys); - let match_mask = _mm256_movemask_epi8(match_keys); - match_mask as u32 - } - - #[inline] - fn kind(single: bool) -> NodeKind { - if single { - NodeKind::SP8 - } else { - NodeKind::MP8 - } - } -} -impl PartialKey for u16 { - #[inline] - fn cast_from(src: u64) -> Self { - src as Self - } - - #[inline] - unsafe fn mm256_search(self, keys_ptr: *const u16) -> u32 { - let sparse_keys = _mm256_loadu_si256(keys_ptr as *const _); - let key = _mm256_set1_epi16(self as i16); - let match_keys = _mm256_cmpeq_epi16(key, sparse_keys); - let match_mask = _mm256_movemask_epi8(match_keys); - // extract u16 bits with mask - _pext_u32(match_mask as u32, PEXT_MASK_U16_FROM_U8) - } - - #[inline] - fn kind(single: bool) -> NodeKind { - if single { - NodeKind::SP16 - } else { - NodeKind::MP16 - } - } -} -impl PartialKey for u32 { - #[inline] - fn cast_from(src: u64) -> Self { - src as Self - } - - #[inline] - unsafe fn mm256_search(self, keys_ptr: *const u32) -> u32 { - let sparse_keys = _mm256_loadu_si256(keys_ptr as *const _); - let key = _mm256_set1_epi32(self as i32); - let match_keys = _mm256_cmpeq_epi32(key, sparse_keys); - let match_mask = _mm256_movemask_epi8(match_keys); - // extract u32 bits with mask - _pext_u32(match_mask as u32, PEXT_MASK_U32_FROM_U8) - } - - #[inline] - fn kind(single: bool) -> NodeKind { - if single { - NodeKind::SP32 - } else { - NodeKind::MP32 - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::arch::x86_64::_pdep_u32; - use std::arch::x86_64::_pext_u32; - - #[test] - fn test_pext_pdep() { - unsafe { - for (src, mask, res) in [ - (0xffff, 0x00ff, 0xff), - (0x010f, 0x0f0f, 0x1f), - (0x020f, 0x010f, 0x0f), - ] { - assert_eq!(_pext_u32(src, mask), res); - } - - for (src, mask, res) in [ - (0xffff, 0xff, 0xff), - (0x010f, 0xff, 0x000f), - (0x010f, 0xfffe, 0x021e), - ] { - assert_eq!(_pdep_u32(src, mask), res) - } - } - } - - #[test] - fn test_partial_key_u8_search() { - let keys: Vec<_> = (0u8..32).collect(); - let keys_ptr = keys.as_ptr(); - for key in 0u8..32 { - let mask = unsafe { key.mm256_search(keys_ptr) }; - let n = mask.count_ones(); - println!("key={}, mask={}, n={}", key, mask, n); - assert_eq!(1, n); - } - } - - #[test] - fn test_movemask_u16() { - unsafe { - let rs = vec![ - 0xffffu16, 0, 0xffff, 0xffff, 0, 0, 0xffff, 0, 0, 0, 0, 0xffff, 0, 0, 0xffff, - 0xffff, - ]; - let keys = _mm256_loadu_si256(rs.as_ptr() as *const _); - let res_u8 = _mm256_movemask_epi8(keys) as u32; - println!("res_u8={:032b}", res_u8); - let res_u16 = _pext_u32(res_u8, PEXT_MASK_U16_FROM_U8); - println!("res_u16={:032b}", res_u16); - } - } -} diff --git a/doradb-index/src/hot/value.rs b/doradb-index/src/hot/value.rs deleted file mode 100644 index 38e4f4b..0000000 --- a/doradb-index/src/hot/value.rs +++ /dev/null @@ -1,20 +0,0 @@ -use super::key::ExtractKey; -use super::node::{NodeOps, NodeTemplate}; -use crate::epoch::{Atomic, Guard, Pointable, Shared}; - -pub trait ValueLoader { - type Value: ExtractKey; - - fn load_leaf(&self, node: Shared<'_, NodeTemplate>) -> Self::Value; -} - -pub struct EmbeddedU32; - -impl ValueLoader for EmbeddedU32 { - type Value = u32; - - #[inline] - fn load_leaf(&self, node: Shared<'_, NodeTemplate>) -> Self::Value { - node.tid() as u32 - } -} diff --git a/doradb-index/src/lib.rs b/doradb-index/src/lib.rs deleted file mode 100644 index 353aaeb..0000000 --- a/doradb-index/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod epoch; -mod hot; diff --git a/doradb-storage/Cargo.toml b/doradb-storage/Cargo.toml index daca2c2..d5c0cf8 100644 --- a/doradb-storage/Cargo.toml +++ b/doradb-storage/Cargo.toml @@ -15,6 +15,8 @@ smallvec = {version = "1.8", features = ["union"]} thiserror = "1.0" bitflags = "1.3" bytemuck = "1.7" +parking_lot = "0.12.3" +libc = "0.2.164" [dev-dependencies] rand = "0.8" diff --git a/doradb-storage/src/buffer/mod.rs b/doradb-storage/src/buffer/mod.rs new file mode 100644 index 0000000..3dd34e9 --- /dev/null +++ b/doradb-storage/src/buffer/mod.rs @@ -0,0 +1,254 @@ +pub mod page; +pub mod ptr; + +use crate::latch::{HybridLatch, GuardState}; +use crate::error::{Result, Error}; +use crate::buffer::page::{Page, PageOps, PageID, INVALID_PAGE_ID}; +use std::mem; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::cell::{Cell, UnsafeCell}; +use libc::{mmap, munmap, madvise, c_void, PROT_READ, PROT_WRITE, MAP_PRIVATE, MAP_FAILED, MAP_ANONYMOUS, MADV_HUGEPAGE, MADV_DONTFORK}; +use super::latch::{HybridGuard, LatchFallbackMode}; + +pub struct BufferFrame { + pub page_id: Cell, + pub latch: HybridLatch, // lock proctects free list and page. + pub next_free: UnsafeCell, + pub page: UnsafeCell, +} + +pub struct PageGuard<'a> { + bf: &'a BufferFrame, + guard: HybridGuard<'a>, +} + +impl<'a> PageGuard<'a> { + #[inline] + fn new(bf: &'a BufferFrame, guard: HybridGuard<'a>) -> Self { + Self{bf, guard} + } + + #[inline] + pub fn page_id(&self) -> PageID { + self.bf.page_id.get() + } + + #[inline] + pub fn shared(mut self) -> Result> { + if !self.guard.shared() { + return Err(Error::RetryLatch) + } + Ok(PageSharedGuard{bf: self.bf, guard: self.guard}) + } + + #[inline] + pub fn exclusive(mut self) -> Result> { + if !self.guard.exclusive() { + return Err(Error::RetryLatch) + } + Ok(PageExclusiveGuard{bf: self.bf, guard: self.guard}) + } + + #[inline] + pub fn page(&self) -> &P { + let page_data = unsafe { & *self.bf.page.get() }; + P::cast(page_data) + } + + #[inline] + pub fn validate(&self) -> bool { + self.guard.validate() + } +} + +pub struct PageSharedGuard<'a> { + bf: &'a BufferFrame, + guard: HybridGuard<'a>, +} + +impl<'a> PageSharedGuard<'a> { + /// Convert a page shared guard to optimistic guard + /// with long lifetime. + #[inline] + pub fn keepalive(mut self) -> PageGuard<'a> { + self.guard.keepalive(); + PageGuard{bf: self.bf, guard: self.guard} + } + + #[inline] + pub fn page_id(&self) -> PageID { + self.bf.page_id.get() + } + + /// Returns shared page. + #[inline] + pub fn page(&self) -> &Page { + unsafe { &*self.bf.page.get() } + } +} + +pub struct PageExclusiveGuard<'a> { + bf: &'a BufferFrame, + guard: HybridGuard<'a>, +} + +impl<'a> PageExclusiveGuard<'a> { + /// Convert a page exclusive guard to optimistic guard + /// with long lifetime. + #[inline] + pub fn keepalive(mut self) -> PageGuard<'a> { + self.guard.keepalive(); + PageGuard{bf: self.bf, guard: self.guard} + } + + #[inline] + pub fn page_id(&self) -> PageID { + self.bf.page_id.get() + } + + #[inline] + pub fn page(&self) -> &Page { + unsafe { &*self.bf.page.get() } + } + + #[inline] + pub fn page_mut(&mut self) -> &mut Page { + unsafe { &mut *self.bf.page.get() } + } + + #[inline] + pub fn set_next_free(&mut self, next_free: PageID) { + unsafe { *self.bf.next_free.get() = next_free; } + } +} + +pub const SAFETY_PAGES: usize = 10; + +pub struct FixedBufferPool { + bfs: *mut BufferFrame, + size: usize, + allocated: AtomicU64, + free_list: AtomicU64, +} + +impl FixedBufferPool { + /// Create a buffer pool with given capacity. + #[inline] + pub fn with_capacity(pool_size: usize) -> Result { + let size = pool_size / mem::size_of::(); + let dram_total_size = mem::size_of::() * (size + SAFETY_PAGES); + let bfs = unsafe { + let big_memory_chunk = mmap(std::ptr::null_mut(), dram_total_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if big_memory_chunk == MAP_FAILED { + return Err(Error::InsufficientMemory(dram_total_size)); + } + madvise(big_memory_chunk, dram_total_size, MADV_HUGEPAGE); + madvise(big_memory_chunk, dram_total_size, MADV_DONTFORK); + big_memory_chunk + } as *mut BufferFrame; + Ok(FixedBufferPool{ + bfs, + size, + allocated: AtomicU64::new(0), + free_list: AtomicU64::new(INVALID_PAGE_ID), + }) + } + + /// Create a buffer pool with given capacity, leak it to heap + /// and return the static reference. + #[inline] + pub fn with_capacity_static(pool_size: usize) -> Result<&'static Self> { + let pool = Self::with_capacity(pool_size)?; + let boxed = Box::new(pool); + let leak = Box::leak(boxed); + Ok(leak) + } + + // allocate a new page with exclusive lock. + #[inline] + pub fn allocate_page(&self) -> Result { + // try get from free list. + loop { + let page_id = self.free_list.load(Ordering::Acquire); + if page_id == INVALID_PAGE_ID { + break; + } + let bf = unsafe { &mut *self.bfs.offset(page_id as isize) }; + let new_free = unsafe { *bf.next_free.get() }; + if self.free_list.compare_exchange(page_id, new_free, Ordering::SeqCst, Ordering::Relaxed).is_ok() { + *bf.next_free.get_mut() = INVALID_PAGE_ID; + let guard = bf.latch.exclusive(); + return Ok(PageGuard::new(bf, guard)); + } + } + + // try get from page pool. + let page_id = self.allocated.fetch_add(1, Ordering::AcqRel); + if page_id as usize >= self.size { + return Err(Error::InsufficientBufferPool(page_id)); + } + let bf = unsafe { &mut *self.bfs.offset(page_id as isize) }; + bf.page_id.set(page_id); + *bf.next_free.get_mut() = INVALID_PAGE_ID; // only current thread hold the mutable ref. + let guard = bf.latch.exclusive(); + Ok(PageGuard::new(bf, guard)) + } + + // should return guard with optimisitc guard and let caller lock the page with read lock or write lock. + #[inline] + pub fn get_page(&self, page_id: PageID, mode: LatchFallbackMode) -> Result { + if page_id >= self.allocated.load(Ordering::Relaxed) { + return Err(Error::PageIdOutOfBound(page_id)) + } + let bf = unsafe { &*self.bfs.offset(page_id as isize) }; + let guard = bf.latch.optimistic_fallback(mode)?; + Ok(PageGuard::new(bf, guard)) + } + + #[inline] + pub fn deallocate_page(&self, g: PageGuard) { + let mut g = g.exclusive().expect("no one should hold lock on deallocating page"); + loop { + let page_id = self.free_list.load(Ordering::Acquire); + g.set_next_free(page_id); + if self.free_list.compare_exchange(page_id, g.page_id(), Ordering::SeqCst, Ordering::Relaxed).is_ok() { + return; + } + } + } +} + +impl Drop for FixedBufferPool { + fn drop(&mut self) { + let dram_total_size = mem::size_of::() * (self.size + SAFETY_PAGES); + unsafe { + munmap(self.bfs as *mut c_void, dram_total_size); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fixed_buffer_pool() { + let pool = FixedBufferPool::with_capacity_static(64*1024*1024).unwrap(); + { + let g = pool.allocate_page().unwrap(); + assert_eq!(g.page_id(), 0); + } + { + let g = pool.allocate_page().unwrap(); + assert_eq!(g.page_id(), 1); + pool.deallocate_page(g); + let g = pool.allocate_page().unwrap(); + assert_eq!(g.page_id(), 1); + } + { + let g = pool.get_page(0, LatchFallbackMode::Jump).unwrap(); + assert_eq!(g.page_id(), 0); + } + assert!(pool.get_page(5, LatchFallbackMode::Jump).is_err()); + } +} diff --git a/doradb-storage/src/buffer/page.rs b/doradb-storage/src/buffer/page.rs new file mode 100644 index 0000000..17bf370 --- /dev/null +++ b/doradb-storage/src/buffer/page.rs @@ -0,0 +1,20 @@ +pub const PAGE_SIZE: usize = 64 * 1024; +pub type Page = [u8; PAGE_SIZE]; +pub type PageID = u64; +pub const INVALID_PAGE_ID: PageID = !0; +pub type LSN = u64; + +pub trait PageOps: Sized { + /// Initialize page and returns mutable reference of + /// in-memory representation. + fn init(page: &mut Page, height: usize) -> &mut Self; + + fn cast(page: &Page) -> &Self { + unsafe { &*(page as *const _ as *const Self) } + } + + /// convert page to Self. + fn cast_mut(page: &mut Page) -> &mut Self { + unsafe { &mut *(page as *mut _ as *mut Self) } + } +} \ No newline at end of file diff --git a/doradb-storage/src/buffer/ptr.rs b/doradb-storage/src/buffer/ptr.rs new file mode 100644 index 0000000..ea91b56 --- /dev/null +++ b/doradb-storage/src/buffer/ptr.rs @@ -0,0 +1,81 @@ +use std::marker::PhantomData; + +pub const COLD_BIT: u64 = 1u64 << 63; +pub const COLD_MASK: u64 = !COLD_BIT; +pub const COOL_BIT: u64 = 1u64 << 62; +pub const COOL_MASK: u64 = !COOL_BIT; +pub const HOT_MASK: u64 = !(3u64 << 62); + +#[derive(Clone)] +pub struct SwizPtr { + val: u64, + _marker: PhantomData<*mut BufferFrame>, +} + +impl SwizPtr { + /// Create a new swizzled pointer with given raw pointer. + #[inline] + pub fn new_bf(ptr: *mut BufferFrame) -> Self { + SwizPtr{val: ptr as u64, _marker: PhantomData} + } + + /// Returns the in-memory pointer. + #[inline] + pub fn as_bf(&self) -> *mut BufferFrame { + self.val as *mut BufferFrame + } + + /// Returns the in-memory pointer. + /// Usually used for data in cool stage. + #[inline] + pub fn as_bf_masked(&self) -> *mut BufferFrame { + (self.val & HOT_MASK) as *mut BufferFrame + } + + /// Returns the on-disk location identifier. + #[inline] + pub fn as_pid(&self) -> u64 { + self.val & COLD_MASK + } + + /// Returns whether the pointed data is hot(in memory). + #[inline] + pub fn is_hot(&self) -> bool { + self.val & (COLD_BIT | COOL_BIT) == 0 + } + + /// Returns whether the pointed data is cold(on disk). + #[inline] + pub fn is_cold(&self) -> bool { + self.val & COLD_BIT != 0 + } + + /// Returns whether the pointed data is cool. + #[inline] + pub fn is_cool(&self) -> bool { + self.val & COOL_BIT != 0 + } + + /// Returns the raw value. + #[inline] + pub fn raw(&self) -> u64 { + self.val + } + + /// mark the pointer as cold. + #[inline] + pub fn mark_as_cold(&mut self, pid: u64) { + self.val = pid | COLD_BIT; + } + + /// mark the pointer from cool to hot. + #[inline] + pub fn warm(&mut self) { + debug_assert!(self.is_cool()); + self.val = self.val & COOL_MASK; + } +} + +pub struct BufferFrame { + _marker: PhantomData<*mut T>, +} diff --git a/doradb-storage/src/error.rs b/doradb-storage/src/error.rs index 45fa036..3752b7d 100644 --- a/doradb-storage/src/error.rs +++ b/doradb-storage/src/error.rs @@ -26,6 +26,16 @@ pub enum Error { ValueCountMismatch, #[error("Invalid datatype")] InvalidDatatype, + // buffer pool errors + #[error("insufficient memory({0})")] + InsufficientMemory(usize), + #[error("insufficient buffer pool({0})")] + InsufficientBufferPool(u64), + #[error("page id out of bound({0})")] + PageIdOutOfBound(u64), + // latch errors + #[error("retry latch")] + RetryLatch, } impl From for Error { diff --git a/doradb-storage/src/index/block_index.rs b/doradb-storage/src/index/block_index.rs new file mode 100644 index 0000000..b71780b --- /dev/null +++ b/doradb-storage/src/index/block_index.rs @@ -0,0 +1,179 @@ +use crate::buffer::page::{LSN, Page, PageID, PageOps, PAGE_SIZE}; +use crate::buffer::{FixedBufferPool, PageGuard}; +use crate::error::{Result, Error}; +use crate::latch::LatchFallbackMode; +use std::sync::atomic::AtomicU64; +use std::mem; + +pub const BLOCK_SIZE: usize = 1272; +pub const NBR_BLOCKS_IN_LEAF: usize = 51; +pub const NBR_ENTRIES_IN_BRANCH: usize = 4093; +pub const NBR_PAGES_IN_ROW_BLOCK: usize = 78; +pub type RowID = u64; +pub type Block = [u8; BLOCK_SIZE]; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u64)] +pub enum BlockKind { + Row = 1, + Col = 2, +} + +/// The inner node of block index, including root. +#[repr(C)] +pub struct BlockBranch { + // node type + pub height: u64, + // log sequence number + pub lsn: LSN, + // start row id of this inner node + pub row_id: RowID, + // count of entry. + pub count: u64, + // padding to make the total bytes to be 64K. + padding: [u8; 16], + // entries + pub entries: [PageEntry; NBR_ENTRIES_IN_BRANCH], +} + +impl PageOps for BlockBranch { + #[inline] + fn init(page: &mut [u8; PAGE_SIZE], height: usize) -> &mut BlockBranch { + let branch = unsafe { &mut *(page as *mut _ as *mut BlockBranch) }; + branch.height = height as u64; + branch.lsn = 0; + branch.row_id = 0; + branch.count = 0; + branch + } +} + +/// The leaf node of block index. +#[repr(C)] +pub struct BlockLeaf { + // height of the node + pub height: u64, + // log sequence number + pub lsn: LSN, + // start row id of this leaf + pub row_id: RowID, + // count of entry. + pub count: u64, + // padding to make the total bytes to be 64K. + padding: [u8; 640], + // list of block header. + pub blocks: [Block; NBR_BLOCKS_IN_LEAF], +} + +impl PageOps for BlockLeaf { + #[inline] + fn init(page: &mut Page, height: usize) -> &mut Self { + let leaf = unsafe { &mut *(page as *mut _ as *mut BlockLeaf) }; + leaf.height = height as u64; + leaf.lsn = 0; + leaf.row_id = 0; + leaf.count = 0; + leaf + } +} + +impl BlockLeaf { + #[inline] + pub fn is_full(&self) -> bool { + self.count as usize == NBR_BLOCKS_IN_LEAF + } +} + +pub struct PageEntry { + pub row_id: RowID, + pub page_id: PageID, +} + +pub trait BlockOps: Sized { + #[inline] + fn cast(block: &Block) -> &Self { + unsafe { &*(block as *const _ as *const Self) } + } + + #[inline] + fn cast_mut(block: &mut Block) -> &mut Self { + unsafe { &mut *(block as *mut _ as *mut Self) } + } +} + +#[repr(C)] +pub struct RowBlock { + pub kind: BlockKind, + pub count: u32, + pub start_row_id: RowID, + pub end_row_id: RowID, + pub entries: [PageEntry; NBR_PAGES_IN_ROW_BLOCK], +} + +impl RowBlock { + #[inline] + pub fn is_full(&self) -> bool { + self.count as usize == NBR_PAGES_IN_ROW_BLOCK + } +} + +impl BlockOps for RowBlock {} + +#[repr(C)] +pub struct ColBlock { + pub kind: BlockKind, + padding1: [u8; 4], + pub row_id: RowID, + pub count: u64, + pub entries: [ColSegmentMeta; 16], + // maybe include some column statistics. + padding2: [u8; 992], +} + +impl BlockOps for ColBlock {} + +#[repr(C)] +pub struct ColSegmentMeta { + pub row_id: RowID, + pub count: u64, +} + +pub struct BlockIndex<'a> { + buf_pool: &'a FixedBufferPool, + root: PageID, + max_row_id: AtomicU64, // maximum row id, exclusive. +} + +impl<'a> BlockIndex<'a> { + #[inline] + pub fn new(buf_pool: &'a FixedBufferPool) -> Result { + let root_page = buf_pool.allocate_page()?; + let page_id = root_page.page_id(); + let mut g = root_page.exclusive()?; + let page = g.page_mut(); + let _ = BlockLeaf::init(page, 0); + Ok(BlockIndex{ + buf_pool, + root: page_id, + max_row_id: AtomicU64::new(0), + }) + } + + #[inline] + pub fn insert_row_page(&self, count: u64) -> Result { + todo!() + } + + #[inline] + pub fn find_row_id(&self, row_id: RowID) -> Result { + let mut g = self.buf_pool.get_page(self.root, LatchFallbackMode::Spin)?; + todo!() + } + + +} + +pub enum RowLocation { + ColSegment(u64, u64), + RowPage(PageID), +} \ No newline at end of file diff --git a/doradb-storage/src/index/mod.rs b/doradb-storage/src/index/mod.rs new file mode 100644 index 0000000..a253818 --- /dev/null +++ b/doradb-storage/src/index/mod.rs @@ -0,0 +1 @@ +pub mod block_index; diff --git a/doradb-storage/src/latch.rs b/doradb-storage/src/latch.rs new file mode 100644 index 0000000..0732ebc --- /dev/null +++ b/doradb-storage/src/latch.rs @@ -0,0 +1,278 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use parking_lot::RawRwLock; +use parking_lot::lock_api::RawRwLock as RawRwLockApi; +use crate::error::{Result, Error}; + +pub const LATCH_EXCLUSIVE_BIT: u64 = 1; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LatchFallbackMode { + Shared, + Exclusive, + Spin, + Jump, // retry is a special mode that will directly return and caller need to retry. +} + +/// A HybridLatch combines optimisitic lock(version validation) and +/// pessimistic lock(tranditional mutex) to support high-performance +/// on current operations. +/// +/// It has three lock modes. +/// +/// 1. optimisitic. Optimistic mode does not block read or write. +/// but once the inner data is read, version must be validated +/// to ensure no writer updated it. +/// +/// 2. shared. Same as read lock, it can exist with +/// multiple reader but mutually exclusive with writer. +/// +/// 3. exclusive. Same as write lock. Once the writer acquired the lock, +/// it first increment version and before unlocking, it also +/// increment version. +/// +#[repr(C, align(64))] +pub struct HybridLatch { + version: AtomicU64, + lock: RawRwLock, +} + +impl HybridLatch { + #[inline] + pub const fn new() -> Self { + HybridLatch{version: AtomicU64::new(0), lock: RawRwLock::INIT} + } + + /// Returns current version with atomic load. + #[inline] + pub fn version_seqcst(&self) -> u64 { + self.version.load(Ordering::SeqCst) + } + + /// Returns whether the latch is already exclusive locked. + #[inline] + pub fn is_exclusive_latched(&self) -> bool { + let ver = self.version_seqcst(); + (ver & LATCH_EXCLUSIVE_BIT) == LATCH_EXCLUSIVE_BIT + } + + /// Returns whether the current version matches given one. + #[inline] + pub fn version_match(&self, version: u64) -> bool { + let ver = self.version_seqcst(); + ver == version + } + + #[inline] + pub fn optimistic_fallback(&self, mode: LatchFallbackMode) -> Result> { + match mode { + LatchFallbackMode::Spin => { + Ok(self.optimistic_spin()) + } + LatchFallbackMode::Shared => { + Ok(self.optimistic_or_shared()) + } + LatchFallbackMode::Exclusive => { + Ok(self.optimistic_or_exclusive()) + } + LatchFallbackMode::Jump => { + self.try_optimistic().ok_or(Error::RetryLatch) + } + } + } + + /// Returns an optimistic lock guard via spin wait + /// until exclusive lock is released. + #[inline] + pub fn optimistic_spin(&self) -> HybridGuard<'_> { + let mut ver: u64; + loop { + ver = self.version_seqcst(); + if (ver & LATCH_EXCLUSIVE_BIT) != LATCH_EXCLUSIVE_BIT { + break; + } + } + HybridGuard::new(self, GuardState::Optimistic, ver) + } + + /// Try to acquire an optimistic lock. + /// Fail if the lock is exclusive locked. + #[inline] + pub fn try_optimistic(&self) -> Option> { + let ver = self.version_seqcst(); + if (ver & LATCH_EXCLUSIVE_BIT) == LATCH_EXCLUSIVE_BIT { + None + } else { + Some(HybridGuard::new(self, GuardState::Optimistic, ver)) + } + } + + /// Get a read lock if lock is exclusive locked(blocking wait). + /// Otherwise get an optimistic lock. + #[inline] + pub fn optimistic_or_shared(&self) -> HybridGuard<'_> { + let ver = self.version_seqcst(); + if (ver & LATCH_EXCLUSIVE_BIT) == LATCH_EXCLUSIVE_BIT { + self.lock.lock_shared(); + let ver = self.version_seqcst(); + HybridGuard::new(self, GuardState::Shared, ver) + } else { + HybridGuard::new(self, GuardState::Optimistic, ver) + } + } + + /// Get a write lock if lock is exclusive locked(blocking wait). + /// Otherwise get an optimistic lock. + /// This use case is rare. + #[inline] + pub fn optimistic_or_exclusive(&self) -> HybridGuard<'_> { + let ver = self.version_seqcst(); + if (ver & LATCH_EXCLUSIVE_BIT) == LATCH_EXCLUSIVE_BIT { + self.lock.lock_exclusive(); + let ver = self.version_seqcst() + LATCH_EXCLUSIVE_BIT; + self.version.store(ver, Ordering::Release); + HybridGuard::new(self, GuardState::Exclusive, ver) + } else { + HybridGuard::new(self, GuardState::Optimistic, ver) + } + } + + /// Get a write lock. + #[inline] + pub fn exclusive(&self) -> HybridGuard<'_> { + self.lock.lock_exclusive(); // may block + let ver = self.version_seqcst() + LATCH_EXCLUSIVE_BIT; + self.version.store(ver, Ordering::Release); + HybridGuard::new(self, GuardState::Exclusive, ver) + } + + /// Get a shared lock. + #[inline] + pub fn shared(&self) -> HybridGuard<'_> { + self.lock.lock_shared(); // may block + let ver = self.version_seqcst(); + HybridGuard::new(self, GuardState::Shared, ver) + } + + /// Try to get a write lock. + #[inline] + pub fn try_exclusive(&self) -> Option> { + if self.lock.try_lock_exclusive() { + let ver = self.version_seqcst(); + return Some(HybridGuard::new(self, GuardState::Exclusive, ver)) + } + None + } + + /// Try to get a read lock. + #[inline] + pub fn try_shared(&self) -> Option> { + if self.lock.try_lock_shared() { + let ver = self.version_seqcst(); + return Some(HybridGuard::new(self, GuardState::Shared, ver)) + } + None + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum GuardState { + Optimistic, + Shared, + Exclusive, +} + +pub struct HybridGuard<'a> { + lock: &'a HybridLatch, + pub state: GuardState, + // initial version when guard is created. + pub version: u64, +} + +impl<'a> HybridGuard<'a> { + #[inline] + fn new(lock: &'a HybridLatch, state: GuardState, version: u64) -> Self { + HybridGuard{lock, state, version} + } + + /// Validate the optimistic lock is effective. + #[inline] + pub fn validate(&self) -> bool { + debug_assert!(self.state == GuardState::Optimistic); + self.lock.version_match(self.version) + } + + /// Convert lock mode to optimistic. + /// Then the guard can be saved and used in future. + #[inline] + pub fn keepalive(&mut self) { + match self.state { + GuardState::Exclusive => { + let ver = self.version + LATCH_EXCLUSIVE_BIT; + self.lock.version.store(ver, Ordering::Release); + unsafe { self.lock.lock.unlock_exclusive(); } + self.version = ver; + } + GuardState::Shared => { + unsafe { self.lock.lock.unlock_shared(); } + } + GuardState::Optimistic => (), + } + } + + /// Convert a guard to shared mode. + /// return false if fail.(exclusive to shared will fail) + #[inline] + pub fn shared(&mut self) -> bool { + match self.state { + GuardState::Optimistic => { + *self = self.lock.shared(); + true + } + GuardState::Shared => true, + GuardState::Exclusive => false, + } + } + + /// Convert a guard to exclusive mode. + /// return false if fail.(shared to exclusive will fail) + #[inline] + pub fn exclusive(&mut self) -> bool { + match self.state { + GuardState::Optimistic => { + *self = self.lock.exclusive(); + true + } + GuardState::Shared => false, + GuardState::Exclusive => true, + } + } +} + +impl<'a> Drop for HybridGuard<'a> { + #[inline] + fn drop(&mut self) { + match self.state { + GuardState::Exclusive => { + let ver = self.version + LATCH_EXCLUSIVE_BIT; + self.lock.version.store(ver, Ordering::Release); + unsafe { self.lock.lock.unlock_exclusive(); } + } + GuardState::Shared => { + unsafe { self.lock.lock.unlock_shared(); } + } + GuardState::Optimistic => (), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_hybrid_lock() { + let boxed = Box::new(HybridLatch::new()); + let fixed: &'static mut HybridLatch = Box::leak(boxed); + + } +} \ No newline at end of file diff --git a/doradb-storage/src/lib.rs b/doradb-storage/src/lib.rs index a523f17..eb67122 100644 --- a/doradb-storage/src/lib.rs +++ b/doradb-storage/src/lib.rs @@ -1,3 +1,6 @@ +pub mod latch; +pub mod buffer; +pub mod index; pub mod alloc; pub mod array; pub mod attr;