From 0c77a0ac7649bc6c9978e9a2f353bf3915d2879d Mon Sep 17 00:00:00 2001 From: jiangzhe Date: Sat, 14 Dec 2024 06:17:32 +0000 Subject: [PATCH] basic transaction management --- doradb-storage/Cargo.toml | 7 +- doradb-storage/src/buffer/frame.rs | 36 +- doradb-storage/src/buffer/guard.rs | 113 ++- doradb-storage/src/buffer/mod.rs | 135 ++-- doradb-storage/src/error.rs | 8 +- doradb-storage/src/index/block_index.rs | 150 ++-- doradb-storage/src/latch.rs | 26 +- doradb-storage/src/lib.rs | 2 + doradb-storage/src/row/layout.rs | 8 +- doradb-storage/src/row/mod.rs | 480 ++++++++---- doradb-storage/src/row/ops.rs | 112 ++- doradb-storage/src/row/value.rs | 451 ----------- doradb-storage/src/trx/mod.rs | 220 ++++++ doradb-storage/src/trx/redo.rs | 107 +++ doradb-storage/src/trx/sys.rs | 968 ++++++++++++++++++++++++ doradb-storage/src/trx/undo.rs | 43 ++ doradb-storage/src/value.rs | 824 ++++++++++++++++++++ 17 files changed, 2919 insertions(+), 771 deletions(-) delete mode 100644 doradb-storage/src/row/value.rs create mode 100644 doradb-storage/src/trx/mod.rs create mode 100644 doradb-storage/src/trx/redo.rs create mode 100644 doradb-storage/src/trx/sys.rs create mode 100644 doradb-storage/src/trx/undo.rs create mode 100644 doradb-storage/src/value.rs diff --git a/doradb-storage/Cargo.toml b/doradb-storage/Cargo.toml index d5c0cf8..820c3d6 100644 --- a/doradb-storage/Cargo.toml +++ b/doradb-storage/Cargo.toml @@ -15,8 +15,13 @@ smallvec = {version = "1.8", features = ["union"]} thiserror = "1.0" bitflags = "1.3" bytemuck = "1.7" -parking_lot = "0.12.3" +parking_lot = "0.12" libc = "0.2.164" +crossbeam-utils = "0.8" +serde = { version = "1.0.216", features = ["derive"] } +bincode = { version = "2.0.0-rc", features = ["serde"] } +flume = "0.11" [dev-dependencies] rand = "0.8" +bitcode = { version = "0.6.3", features = ["serde"] } diff --git a/doradb-storage/src/buffer/frame.rs b/doradb-storage/src/buffer/frame.rs index c5b6d65..fdc3588 100644 --- a/doradb-storage/src/buffer/frame.rs +++ b/doradb-storage/src/buffer/frame.rs @@ -1,10 +1,36 @@ -use std::cell::{Cell, UnsafeCell}; use crate::buffer::page::{Page, PageID}; +use crate::buffer::FixedBufferPool; use crate::latch::HybridLatch; +use crate::trx::undo::UndoMap; +const _: () = assert!( + { std::mem::size_of::() % 64 == 0 }, + "Size of BufferFrame must be multiply of 64" +); + +const _: () = assert!( + { std::mem::align_of::() % 64 == 0 }, + "Align of BufferFrame must be multiply of 64" +); + +#[repr(C)] pub struct BufferFrame { - pub page_id: Cell, + pub page_id: PageID, + pub next_free: PageID, + /// Undo Map is only maintained by RowPage. + /// Once a RowPage is eliminated, the UndoMap is retained by BufferPool + /// and when the page is reloaded, UndoMap is reattached to page. + pub undo_map: Option>, pub latch: HybridLatch, // lock proctects free list and page. - pub next_free: UnsafeCell, - pub page: UnsafeCell, -} \ No newline at end of file + pub page: Page, +} + +/// BufferFrameAware defines callbacks on lifecycle of buffer frame +/// for initialization and de-initialization. +pub trait BufferFrameAware { + /// This callback is called when a page is just loaded into BufferFrame. + fn init_bf(_pool: &FixedBufferPool, _bf: &mut BufferFrame) {} + + /// This callback is called when a page is cleaned and return to BufferPool. + fn deinit_bf(_pool: &FixedBufferPool, _bf: &mut BufferFrame) {} +} diff --git a/doradb-storage/src/buffer/guard.rs b/doradb-storage/src/buffer/guard.rs index 6c7e62c..c39ed2a 100644 --- a/doradb-storage/src/buffer/guard.rs +++ b/doradb-storage/src/buffer/guard.rs @@ -1,19 +1,24 @@ -use crate::latch::HybridGuard; -use crate::buffer::page::PageID; use crate::buffer::frame::BufferFrame; -use crate::error::{Result, Validation, Validation::{Valid, Invalid}}; +use crate::buffer::page::PageID; +use crate::error::{ + Result, Validation, + Validation::{Invalid, Valid}, +}; use crate::latch::GuardState; +use crate::latch::HybridGuard; +use std::cell::UnsafeCell; use std::marker::PhantomData; +use std::mem; pub struct PageGuard<'a, T> { - bf: &'a BufferFrame, + bf: &'a UnsafeCell, guard: HybridGuard<'a>, _marker: PhantomData<&'a T>, } impl<'a, T> PageGuard<'a, T> { #[inline] - pub(super) fn new(bf: &'a BufferFrame, guard: HybridGuard<'a>) -> Self { + pub(super) fn new(bf: &'a UnsafeCell, guard: HybridGuard<'a>) -> Self { Self { bf, guard, @@ -22,33 +27,78 @@ impl<'a, T> PageGuard<'a, T> { } #[inline] - pub fn page_id(&self) -> PageID { - self.bf.page_id.get() + pub unsafe fn page_id(&self) -> PageID { + (*self.bf.get()).page_id } #[inline] pub fn try_shared(mut self) -> Validation> { - self.guard.try_shared().map(|_| PageSharedGuard{ - bf: self.bf, + self.guard.try_shared().map(|_| PageSharedGuard { + bf: unsafe { &*self.bf.get() }, guard: self.guard, _marker: PhantomData, }) } + #[inline] + pub fn block_until_shared(self) -> PageSharedGuard<'a, T> { + match self.guard.state { + GuardState::Exclusive => PageSharedGuard { + bf: unsafe { &*self.bf.get() }, + guard: self.guard, + _marker: PhantomData, + }, + GuardState::Shared => { + unimplemented!("lock downgrade from exclusive to shared is not supported") + } + GuardState::Optimistic => { + let guard = self.guard.block_until_shared(); + PageSharedGuard { + bf: unsafe { &*self.bf.get() }, + guard, + _marker: PhantomData, + } + } + } + } + #[inline] pub fn try_exclusive(mut self) -> Validation> { self.guard.try_exclusive().map(|_| PageExclusiveGuard { - bf: self.bf, + bf: unsafe { &mut *self.bf.get() }, guard: self.guard, _marker: PhantomData, }) } + #[inline] + pub fn block_until_exclusive(self) -> PageExclusiveGuard<'a, T> { + match self.guard.state { + GuardState::Exclusive => PageExclusiveGuard { + bf: unsafe { &mut *self.bf.get() }, + guard: self.guard, + _marker: PhantomData, + }, + GuardState::Shared => { + unimplemented!("lock upgradate from shared to exclusive is not supported") + } + GuardState::Optimistic => { + let guard = self.guard.block_until_exclusive(); + PageExclusiveGuard { + bf: unsafe { &mut *self.bf.get() }, + guard, + _marker: PhantomData, + } + } + } + } + /// Returns page with optimistic read. /// All values must be validated before use. #[inline] - pub fn page_unchecked(&self) -> &T { - unsafe { &*(self.bf.page.get() as *const _ as *const T) } + pub unsafe fn page_unchecked(&self) -> &T { + let bf = self.bf.get(); + mem::transmute(&(*bf).page) } /// Validates version not change. @@ -105,20 +155,27 @@ impl<'a, T> PageSharedGuard<'a, T> { self.guard.downgrade(); } + /// Returns the buffer frame current page associated. + #[inline] + pub fn bf(&self) -> &BufferFrame { + self.bf + } + + /// Returns current page id. #[inline] pub fn page_id(&self) -> PageID { - self.bf.page_id.get() + self.bf.page_id } /// Returns shared page. #[inline] pub fn page(&self) -> &T { - unsafe { &*(self.bf.page.get() as *const _ as *const T) } + unsafe { mem::transmute(&self.bf.page) } } } pub struct PageExclusiveGuard<'a, T> { - bf: &'a BufferFrame, + bf: &'a mut BufferFrame, guard: HybridGuard<'a>, _marker: PhantomData<&'a mut T>, } @@ -131,25 +188,39 @@ impl<'a, T> PageExclusiveGuard<'a, T> { self.guard.downgrade(); } + /// Returns current page id. #[inline] pub fn page_id(&self) -> PageID { - self.bf.page_id.get() + self.bf.page_id } + /// Returns current page. #[inline] pub fn page(&self) -> &T { - unsafe { &*(self.bf.page.get() as *const T) } + unsafe { mem::transmute(&self.bf.page) } } + /// Returns mutable page. #[inline] pub fn page_mut(&mut self) -> &mut T { - unsafe { &mut *(self.bf.page.get() as *mut T) } + unsafe { mem::transmute(&mut self.bf.page) } } + /// Returns current buffer frame. + #[inline] + pub fn bf(&self) -> &BufferFrame { + &self.bf + } + + /// Returns mutable buffer frame. + #[inline] + pub fn bf_mut(&mut self) -> &mut BufferFrame { + &mut self.bf + } + + /// Set next free page. #[inline] pub fn set_next_free(&mut self, next_free: PageID) { - unsafe { - *self.bf.next_free.get() = next_free; - } + self.bf.next_free = next_free; } } diff --git a/doradb-storage/src/buffer/mod.rs b/doradb-storage/src/buffer/mod.rs index ceca06a..0061d7f 100644 --- a/doradb-storage/src/buffer/mod.rs +++ b/doradb-storage/src/buffer/mod.rs @@ -1,17 +1,19 @@ -pub mod page; -pub mod ptr; pub mod frame; pub mod guard; +pub mod page; +pub mod ptr; +use crate::buffer::frame::{BufferFrame, BufferFrameAware}; +use crate::buffer::guard::{PageExclusiveGuard, PageGuard}; +use crate::buffer::page::{PageID, INVALID_PAGE_ID}; +use crate::error::{Error, Result, Validation, Validation::Valid}; +use crate::latch::LatchFallbackMode; use libc::{ c_void, madvise, mmap, munmap, MADV_DONTFORK, MADV_HUGEPAGE, MAP_ANONYMOUS, MAP_FAILED, MAP_PRIVATE, PROT_READ, PROT_WRITE, }; -use crate::buffer::frame::BufferFrame; -use crate::buffer::page::{PageID, INVALID_PAGE_ID}; -use crate::buffer::guard::{PageGuard, PageExclusiveGuard}; -use crate::latch::LatchFallbackMode; -use crate::error::{Result, Error, Validation, Validation::{Valid, Invalid}}; +use parking_lot::Mutex; +use std::cell::UnsafeCell; use std::mem; use std::sync::atomic::{AtomicU64, Ordering}; @@ -21,7 +23,7 @@ pub struct FixedBufferPool { bfs: *mut BufferFrame, size: usize, allocated: AtomicU64, - free_list: AtomicU64, + free_list: Mutex, } impl FixedBufferPool { @@ -50,7 +52,7 @@ impl FixedBufferPool { bfs, size, allocated: AtomicU64::new(0), - free_list: AtomicU64::new(INVALID_PAGE_ID), + free_list: Mutex::new(INVALID_PAGE_ID), }) } @@ -73,44 +75,47 @@ impl FixedBufferPool { drop(Box::from_raw(this as *const Self as *mut Self)); } + #[inline] + fn try_get_page_from_free_list( + &self, + ) -> Option> { + unsafe { + let bf = { + let mut page_id = self.free_list.lock(); + if *page_id == INVALID_PAGE_ID { + return None; + } + let bf = self.get_bf(*page_id); + *page_id = (*bf.get()).next_free; + bf + }; + (*bf.get()).next_free = INVALID_PAGE_ID; + T::init_bf(self, &mut (*bf.get())); + let g = init_bf_exclusive_guard(bf); + Some(g) + } + } + // allocate a new page with exclusive lock. #[inline] - pub fn allocate_page(&self) -> Result> { + pub fn allocate_page(&self) -> Result> { // try get from free list. - loop { - let page_id = self.free_list.load(Ordering::Acquire); - if page_id == INVALID_PAGE_ID { - break; - } - let bf = unsafe { &mut *self.bfs.offset(page_id as isize) }; - let new_free = unsafe { *bf.next_free.get() }; - if self - .free_list - .compare_exchange(page_id, new_free, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - *bf.next_free.get_mut() = INVALID_PAGE_ID; - let g = bf.latch.exclusive(); - let g = PageGuard::new(bf, g) - .try_exclusive() - .expect("free page owns exclusive lock"); - return Ok(g); - } + if let Some(g) = self.try_get_page_from_free_list() { + return Ok(g); } - // try get from page pool. let page_id = self.allocated.fetch_add(1, Ordering::AcqRel); if page_id as usize >= self.size { return Err(Error::InsufficientBufferPool(page_id)); } - let bf = unsafe { &mut *self.bfs.offset(page_id as isize) }; - bf.page_id.set(page_id); - *bf.next_free.get_mut() = INVALID_PAGE_ID; // only current thread hold the mutable ref. - let g = bf.latch.exclusive(); - let g = PageGuard::new(bf, g) - .try_exclusive() - .expect("new page owns exclusive lock"); - Ok(g) + unsafe { + let bf = self.get_bf(page_id); + (*bf.get()).page_id = page_id; + (*bf.get()).next_free = INVALID_PAGE_ID; + T::init_bf(self, &mut *bf.get()); + let g = init_bf_exclusive_guard(bf); + Ok(g) + } } /// Returns the page guard with given page id. @@ -128,29 +133,27 @@ impl FixedBufferPool { } #[inline] - fn get_page_internal( - &self, - page_id: PageID, - mode: LatchFallbackMode, - ) -> PageGuard<'_, T> { - let bf = unsafe { &*self.bfs.offset(page_id as usize as isize) }; - let g = bf.latch.optimistic_fallback(mode); - PageGuard::new(bf, g) + fn get_page_internal(&self, page_id: PageID, mode: LatchFallbackMode) -> PageGuard<'_, T> { + unsafe { + let bf = self.get_bf(page_id); + let g = (*bf.get()).latch.optimistic_fallback(mode); + PageGuard::new(bf, g) + } } #[inline] - pub fn deallocate_page(&self, mut g: PageExclusiveGuard<'_, T>) { - loop { - let page_id = self.free_list.load(Ordering::Acquire); - g.set_next_free(page_id); - if self - .free_list - .compare_exchange(page_id, g.page_id(), Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - return; - } - } + unsafe fn get_bf(&self, page_id: PageID) -> &UnsafeCell { + let bf_ptr = self.bfs.offset(page_id as isize); + &*(bf_ptr as *mut UnsafeCell) + } + + /// Deallocate page. + #[inline] + pub fn deallocate_page(&self, mut g: PageExclusiveGuard<'_, T>) { + T::deinit_bf(self, g.bf_mut()); + let mut page_id = self.free_list.lock(); + g.bf_mut().next_free = *page_id; + *page_id = g.page_id(); } /// Get child page by page id provided by parent page. @@ -184,6 +187,20 @@ impl Drop for FixedBufferPool { } } +unsafe impl Sync for FixedBufferPool {} + +#[inline] +fn init_bf_exclusive_guard( + bf: &UnsafeCell, +) -> PageExclusiveGuard<'_, T> { + unsafe { + let g = (*bf.get()).latch.exclusive(); + PageGuard::new(bf, g) + .try_exclusive() + .expect("free page owns exclusive lock") + } +} + #[cfg(test)] mod tests { use super::*; @@ -205,7 +222,7 @@ mod tests { } { let g: PageGuard<'_, BlockNode> = pool.get_page(0, LatchFallbackMode::Spin).unwrap(); - assert_eq!(g.page_id(), 0); + assert_eq!(unsafe { g.page_id() }, 0); } assert!(pool .get_page::(5, LatchFallbackMode::Spin) diff --git a/doradb-storage/src/error.rs b/doradb-storage/src/error.rs index 0ace98f..4e29979 100644 --- a/doradb-storage/src/error.rs +++ b/doradb-storage/src/error.rs @@ -98,7 +98,7 @@ impl Validation { #[inline] pub fn and_then(self, f: F) -> Validation where - F: FnOnce(T) -> Validation + F: FnOnce(T) -> Validation, { match self { Validation::Valid(v) => f(v), @@ -151,7 +151,7 @@ macro_rules! verify { Validation::Valid(v) => v, Validation::Invalid => return Validation::Invalid, } - } + }; } macro_rules! verify_continue { @@ -160,7 +160,7 @@ macro_rules! verify_continue { Validation::Invalid => continue, Validation::Valid(v) => v, } - } + }; } macro_rules! bypass_res { @@ -169,5 +169,5 @@ macro_rules! bypass_res { Ok(res) => res, Err(e) => return Validation::Valid(Err(e)), } - } + }; } diff --git a/doradb-storage/src/index/block_index.rs b/doradb-storage/src/index/block_index.rs index ee8f793..08b62aa 100644 --- a/doradb-storage/src/index/block_index.rs +++ b/doradb-storage/src/index/block_index.rs @@ -1,10 +1,14 @@ +use crate::buffer::frame::{BufferFrame, BufferFrameAware}; +use crate::buffer::guard::{PageExclusiveGuard, PageGuard, PageSharedGuard}; use crate::buffer::page::{PageID, LSN, PAGE_SIZE}; use crate::buffer::FixedBufferPool; -use crate::buffer::guard::{PageExclusiveGuard, PageGuard, PageSharedGuard}; -use crate::error::{Error, Result, Validation, Validation::{Valid, Invalid}}; +use crate::error::{ + Error, Result, Validation, + Validation::{Invalid, Valid}, +}; use crate::latch::LatchFallbackMode; -use crate::row::{RowID, RowPage}; use crate::row::layout::Layout; +use crate::row::{RowID, RowPage}; use parking_lot::Mutex; use std::mem; @@ -22,17 +26,20 @@ pub const BLOCK_BRANCH_ENTRY_START: usize = 48; // header 32 bytes, padding 640 bytes. pub const BLOCK_LEAF_ENTRY_START: usize = 672; -const _: () = assert!({ - mem::size_of::() == BLOCK_PAGE_SIZE -}, "Size of node of BlockIndex should equal to 64KB"); +const _: () = assert!( + { mem::size_of::() == BLOCK_PAGE_SIZE }, + "Size of node of BlockIndex should equal to 64KB" +); -const _: () = assert!({ - BLOCK_HEADER_SIZE + NBR_ENTRIES_IN_BRANCH * ENTRY_SIZE <= BLOCK_PAGE_SIZE -}, "Size of branch node of BlockIndex can be at most 64KB"); +const _: () = assert!( + { BLOCK_HEADER_SIZE + NBR_ENTRIES_IN_BRANCH * ENTRY_SIZE <= BLOCK_PAGE_SIZE }, + "Size of branch node of BlockIndex can be at most 64KB" +); -const _: () = assert!({ - BLOCK_HEADER_SIZE + NBR_BLOCKS_IN_LEAF * BLOCK_SIZE <= BLOCK_PAGE_SIZE -}, "Size of leaf node of BlockIndex can be at most 64KB"); +const _: () = assert!( + { BLOCK_HEADER_SIZE + NBR_BLOCKS_IN_LEAF * BLOCK_SIZE <= BLOCK_PAGE_SIZE }, + "Size of leaf node of BlockIndex can be at most 64KB" +); /// BlockKind can be Row or Col. /// Row Block contains 78 row page ids. @@ -106,9 +113,7 @@ impl BlockNode { #[inline] pub fn branch_entries_mut(&mut self) -> &mut [PageEntry] { debug_assert!(self.is_branch()); - unsafe { - std::slice::from_raw_parts_mut(self.data_ptr_mut(), self.header.count as usize) - } + unsafe { std::slice::from_raw_parts_mut(self.data_ptr_mut(), self.header.count as usize) } } /// Returns entry in branch node by given index. @@ -193,11 +198,14 @@ impl BlockNode { debug_assert!(!self.leaf_is_full()); let end_row_id = self.header.end_row_id; self.header.count += 1; - self.leaf_last_block_mut().init_row(end_row_id, count, page_id); + self.leaf_last_block_mut() + .init_row(end_row_id, count, page_id); self.header.end_row_id += count; } } +impl BufferFrameAware for BlockNode {} + #[repr(C)] #[derive(Clone)] pub struct BlockNodeHeader { @@ -388,15 +396,18 @@ impl<'a> BlockIndex<'a> { /// Get row page for insertion. /// Caller should cache insert page id to avoid invoking this method frequently. #[inline] - pub fn get_insert_page(&self, count: usize, cols: &[Layout]) -> Result> { + pub fn get_insert_page( + &self, + count: usize, + cols: &[Layout], + ) -> Result> { match self.get_insert_page_from_free_list() { Valid(Ok(free_page)) => return Ok(free_page), Valid(_) | Invalid => { // we just ignore the free list error and latch error, and continue to get new page. } } - let mut new_page: PageExclusiveGuard<'a, RowPage> = - self.buf_pool.allocate_page()?; + let mut new_page: PageExclusiveGuard<'a, RowPage> = self.buf_pool.allocate_page()?; let new_page_id = new_page.page_id(); loop { match self.insert_row_page(count as u64, new_page_id) { @@ -442,7 +453,7 @@ impl<'a> BlockIndex<'a> { let mut stack = vec![]; loop { let res = self.try_find_leaf_by_row_id(row_id, &mut stack); - let res =verify_continue!(res); + let res = verify_continue!(res); return res.map(|_| CursorShared { blk_idx: self, stack, @@ -451,7 +462,9 @@ impl<'a> BlockIndex<'a> { } #[inline] - fn get_insert_page_from_free_list(&self) -> Validation>> { + fn get_insert_page_from_free_list( + &self, + ) -> Validation>> { let page_id = { let mut g = self.insert_free_list.lock(); if g.is_empty() { @@ -460,7 +473,9 @@ impl<'a> BlockIndex<'a> { g.pop().unwrap() }; let page: PageGuard<'a, RowPage> = { - let res = self.buf_pool.get_page(page_id, LatchFallbackMode::Exclusive); + let res = self + .buf_pool + .get_page(page_id, LatchFallbackMode::Exclusive); bypass_res!(res) }; page.try_exclusive().map(Ok) @@ -490,14 +505,12 @@ impl<'a> BlockIndex<'a> { let max_row_id = r_row_id + count; // create left child and copy all contents to it. - let mut l_guard: PageExclusiveGuard<'_, BlockNode> = - self.buf_pool.allocate_page()?; + let mut l_guard: PageExclusiveGuard<'_, BlockNode> = self.buf_pool.allocate_page()?; let l_page_id = l_guard.page_id(); l_guard.page_mut().clone_from(p_guard.page()); // todo: LSN // create right child, add one row block with one page entry. - let mut r_guard: PageExclusiveGuard<'_, BlockNode> = - self.buf_pool.allocate_page()?; + let mut r_guard: PageExclusiveGuard<'_, BlockNode> = self.buf_pool.allocate_page()?; let r_page_id = r_guard.page_id(); { let r = r_guard.page_mut(); @@ -544,14 +557,18 @@ impl<'a> BlockIndex<'a> { loop { // try to lock parent. let g = stack.pop().unwrap(); - debug_assert!(g.page_unchecked().is_branch()); // if lock failed, just retry the whole process. p_guard = verify!(g.try_exclusive()); if !p_guard.page().branch_is_full() { break; } else if stack.is_empty() { // root is full, should split - return Valid(self.insert_row_page_split_root(p_guard, row_id, count, insert_page_id)); + return Valid(self.insert_row_page_split_root( + p_guard, + row_id, + count, + insert_page_id, + )); } } // create new leaf node with one insert page id @@ -580,7 +597,11 @@ impl<'a> BlockIndex<'a> { } #[inline] - fn insert_row_page(&self, count: u64, insert_page_id: PageID) -> Validation> { + fn insert_row_page( + &self, + count: u64, + insert_page_id: PageID, + ) -> Validation> { let mut stack = vec![]; let mut p_guard = { let res = verify!(self.find_right_most_leaf(&mut stack, LatchFallbackMode::Exclusive)); @@ -641,7 +662,7 @@ impl<'a> BlockIndex<'a> { bypass_res!(g) }; // optimistic mode, should always check version after use protected data. - let mut pu = p_guard.page_unchecked(); + let mut pu = unsafe { p_guard.page_unchecked() }; let height = pu.header.height; let mut level = 1; while !pu.is_leaf() { @@ -656,11 +677,13 @@ impl<'a> BlockIndex<'a> { let res = verify!(g); bypass_res!(res) } else { - let g = self.buf_pool.get_child_page(p_guard, page_id, LatchFallbackMode::Spin); + let g = self + .buf_pool + .get_child_page(p_guard, page_id, LatchFallbackMode::Spin); let res = verify!(g); bypass_res!(res) }; - pu = p_guard.page_unchecked(); + pu = unsafe { p_guard.page_unchecked() }; level += 1; } Valid(Ok(p_guard)) @@ -677,7 +700,7 @@ impl<'a> BlockIndex<'a> { bypass_res!(res) }; loop { - let pu = g.page_unchecked(); + let pu = unsafe { g.page_unchecked() }; if pu.is_leaf() { debug_assert!(g.is_optimistic()); let row_id = pu.header.start_row_id; @@ -721,7 +744,7 @@ impl<'a> BlockIndex<'a> { bypass_res!(res) }; loop { - let pu = g.page_unchecked(); + let pu = unsafe { g.page_unchecked() }; if pu.is_leaf() { // for leaf node, end_row_id is always correct, // so we can quickly determine if row id exists @@ -761,11 +784,11 @@ impl<'a> BlockIndex<'a> { return Valid(Ok(RowLocation::RowPage(entries[idx].page_id))); } // For branch node, end_row_id is not always correct. - // + // // With current page insert logic, at most time end_row_id // equals to its right-most child's start_row_id plus // row count of one row page. - // + // // All leaf nodes maintain correct row id range. // so if input row id exceeds end_row_id, we just redirect // it to right-most leaf. @@ -785,7 +808,9 @@ impl<'a> BlockIndex<'a> { }; verify!(g.validate()); g = { - let v = self.buf_pool.get_child_page(g, page_id, LatchFallbackMode::Spin); + let v = self + .buf_pool + .get_child_page(g, page_id, LatchFallbackMode::Spin); let res = verify!(v); bypass_res!(res) }; @@ -810,7 +835,8 @@ impl<'a> CursorShared<'a> { fn fill_stack_by_row_id_search(&mut self, row_id: RowID) -> Result<()> { loop { self.stack.clear(); - let v = self.blk_idx + let v = self + .blk_idx .try_find_leaf_by_row_id(row_id, &mut self.stack); let res = verify_continue!(v); match res { @@ -844,7 +870,7 @@ impl<'a> Iterator for CursorShared<'a> { } } NextKind::Branch(idx, row_id) => { - let pu = pos.g.page_unchecked(); + let pu = unsafe { pos.g.page_unchecked() }; // all entries in branch have been traversed. if idx == pu.header.count as usize { if pos.g.validate().is_valid() { @@ -879,12 +905,10 @@ impl<'a> Iterator for CursorShared<'a> { }); match g.try_shared() { Valid(g) => return Some(Ok(g)), - Invalid => { - match self.fill_stack_by_row_id_search(row_id) { - Ok(_) => continue 'LOOP, - Err(e) => return Some(Err(e)), - } - } + Invalid => match self.fill_stack_by_row_id_search(row_id) { + Ok(_) => continue 'LOOP, + Err(e) => return Some(Err(e)), + }, } } Valid(Err(e)) => { @@ -985,13 +1009,21 @@ mod tests { let g = res.unwrap(); let node = g.page(); assert!(node.is_leaf()); - let row_pages: usize = node.leaf_blocks().iter() - .map(|block| if block.is_row() { - block.row_page_entries().iter().count() - } else { 0usize }) + let row_pages: usize = node + .leaf_blocks() + .iter() + .map(|block| { + if block.is_row() { + block.row_page_entries().iter().count() + } else { + 0usize + } + }) .sum(); - println!("start_row_id={:?}, end_row_id={:?}, blocks={:?}, row_pages={:?}", - node.header.start_row_id, node.header.end_row_id, node.header.count, row_pages); + println!( + "start_row_id={:?}, end_row_id={:?}, blocks={:?}, row_pages={:?}", + node.header.start_row_id, node.header.end_row_id, node.header.count, row_pages + ); } let row_pages_per_leaf = NBR_BLOCKS_IN_LEAF * NBR_PAGES_IN_ROW_BLOCK; assert!(count == (row_pages + row_pages_per_leaf - 1) / row_pages_per_leaf); @@ -1013,11 +1045,19 @@ mod tests { let _ = blk_idx.get_insert_page(rows_per_page, &cols).unwrap(); } { - let res = blk_idx.buf_pool.get_page::(blk_idx.root, LatchFallbackMode::Spin).unwrap(); + let res = blk_idx + .buf_pool + .get_page::(blk_idx.root, LatchFallbackMode::Spin) + .unwrap(); let p = res.try_exclusive().unwrap(); let bn = p.page(); println!("root is leaf ? {:?}", bn.is_leaf()); - println!("root page_id={:?}, start_row_id={:?}, end_row_id={:?}", p.page_id(), bn.header.start_row_id, bn.header.end_row_id); + println!( + "root page_id={:?}, start_row_id={:?}, end_row_id={:?}", + p.page_id(), + bn.header.start_row_id, + bn.header.end_row_id + ); println!("root entries {:?}", bn.branch_entries()); } for i in 0..row_pages { @@ -1025,7 +1065,9 @@ mod tests { let res = blk_idx.find_row_id(row_id).unwrap(); match res { RowLocation::RowPage(page_id) => { - let g: PageGuard<'_, RowPage> = buf_pool.get_page(page_id, LatchFallbackMode::Shared).unwrap(); + let g: PageGuard<'_, RowPage> = buf_pool + .get_page(page_id, LatchFallbackMode::Shared) + .unwrap(); let g = g.try_shared().unwrap(); let p = g.page(); assert!(p.header.start_row_id as usize == i * rows_per_page); diff --git a/doradb-storage/src/latch.rs b/doradb-storage/src/latch.rs index f5c0929..3efe23b 100644 --- a/doradb-storage/src/latch.rs +++ b/doradb-storage/src/latch.rs @@ -1,4 +1,4 @@ -use crate::error::{Error, Result, Validation, Validation::Valid, Validation::Invalid}; +use crate::error::{Error, Result, Validation, Validation::Invalid, Validation::Valid}; use parking_lot::lock_api::RawRwLock as RawRwLockApi; use parking_lot::RawRwLock; use std::sync::atomic::{AtomicU64, Ordering}; @@ -184,7 +184,7 @@ pub enum GuardState { /// HybridGuard is the union of three kinds of locks. /// The common usage is to acquire optimistic lock first /// and then upgrade to shared lock or exclusive lock. -/// +/// /// An additional validation must be executed to for lock /// upgrade, because the protected object may be entirely /// rewritten to another object, e.g. frames in buffer pool @@ -257,11 +257,11 @@ impl<'a> HybridGuard<'a> { pub fn try_shared(&mut self) -> Validation<()> { match self.state { GuardState::Optimistic => { - // use try shared is ok. - // because only when there is a exclusive lock, this try will fail. + // use try shared is ok. + // because only when there is a exclusive lock, this try will fail. // and optimistic lock must be retried as vesion won't be matched. - // an additional validation is required, because other thread may - // gain the exclusive lock inbetween. + // an additional validation is required, because other thread may + // gain the exclusive lock inbetween. if let Some(g) = self.lock.try_shared() { if self.validate_shared_internal() { *self = g; @@ -283,6 +283,12 @@ impl<'a> HybridGuard<'a> { } } + #[inline] + pub fn block_until_shared(self) -> Self { + debug_assert!(self.state == GuardState::Optimistic); + self.lock.shared() + } + /// Convert a guard to exclusive mode. /// return false if fail.(shared to exclusive will fail) #[inline] @@ -292,7 +298,7 @@ impl<'a> HybridGuard<'a> { if let Some(g) = self.lock.try_exclusive() { if self.validate_exclusive_internal() { *self = g; - return Valid(()) + return Valid(()); } debug_assert!(self.version + LATCH_EXCLUSIVE_BIT != g.version); } @@ -306,6 +312,12 @@ impl<'a> HybridGuard<'a> { } } + #[inline] + pub fn block_until_exclusive(self) -> Self { + debug_assert!(self.state == GuardState::Optimistic); + self.lock.exclusive() + } + #[inline] pub fn optimistic_clone(&self) -> Result { if self.state == GuardState::Optimistic { diff --git a/doradb-storage/src/lib.rs b/doradb-storage/src/lib.rs index 05a3691..e25f7ec 100644 --- a/doradb-storage/src/lib.rs +++ b/doradb-storage/src/lib.rs @@ -5,3 +5,5 @@ pub mod error; pub mod index; pub mod latch; pub mod row; +pub mod trx; +pub mod value; diff --git a/doradb-storage/src/row/layout.rs b/doradb-storage/src/row/layout.rs index 3b0eeea..5ce6aba 100644 --- a/doradb-storage/src/row/layout.rs +++ b/doradb-storage/src/row/layout.rs @@ -1,5 +1,5 @@ -/// Layout defines the memory layout of columns -/// stored in row page. +/// Layout defines the memory layout of columns +/// stored in row page. #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum Layout { @@ -22,7 +22,7 @@ impl Layout { Layout::Byte16 => 16, // 2-byte len, 2-byte offset, 4-byte prefix // or inline version, 2-byte len, at most 6 inline bytes - Layout::VarByte => 8, + Layout::VarByte => 8, } } -} \ No newline at end of file +} diff --git a/doradb-storage/src/row/mod.rs b/doradb-storage/src/row/mod.rs index ad90512..1b1ae2b 100644 --- a/doradb-storage/src/row/mod.rs +++ b/doradb-storage/src/row/mod.rs @@ -1,27 +1,32 @@ -pub mod ops; pub mod layout; -pub mod value; +pub mod ops; +use crate::buffer::frame::{BufferFrame, BufferFrameAware}; use crate::buffer::page::PAGE_SIZE; -use crate::row::ops::{SelectResult, InsertRow, InsertResult, DeleteRow, DeleteResult, UpdateRow, UpdateCol, UpdateResult}; use crate::row::layout::Layout; -use crate::row::value::*; +use crate::row::ops::{ + DeleteResult, DeleteRow, InsertResult, InsertRow, SelectResult, UpdateCol, UpdateResult, + UpdateRow, UpdateWithUndoResult, +}; +use crate::value::*; +use std::fmt; use std::mem; use std::slice; pub type RowID = u64; pub const INVALID_ROW_ID: RowID = !0; -const _: () = assert!({ - std::mem::size_of::() % 8 == 0 -}, "RowPageHeader should have size align to 8 bytes"); +const _: () = assert!( + { std::mem::size_of::() % 8 == 0 }, + "RowPageHeader should have size align to 8 bytes" +); /// RowPage is the core data structure of row-store. /// It is designed to be fast in both TP and AP scenarios. /// It follows design of PAX format. -/// +/// /// Header: -/// +/// /// | field | length(B) | /// |-------------------------|-----------| /// | start_row_id | 8 | @@ -35,9 +40,9 @@ const _: () = assert!({ /// | fix_field_end | 2 | /// | var_field_offset | 2 | /// | padding | 6 | -/// +/// /// Data: -/// +/// /// | field | length(B) | /// |------------------|-----------------------------------------------| /// | del_bitset | (count + 63) / 64 * 8 | @@ -49,14 +54,14 @@ const _: () = assert!({ /// | c_n | same as above | /// | free_space | free space | /// | var_len_data | data of var-len column | -/// -#[derive(PartialEq, Eq)] +/// pub struct RowPage { pub header: RowPageHeader, pub data: [u8; PAGE_SIZE - mem::size_of::()], } impl RowPage { + /// Initialize row page. #[inline] pub fn init(&mut self, start_row_id: u64, max_row_count: usize, cols: &[Layout]) { debug_assert!(max_row_count <= 0xffff); @@ -66,17 +71,20 @@ impl RowPage { self.header.col_count = cols.len() as u16; // initialize offset fields. self.header.del_bitset_offset = 0; // always starts at data_ptr(). - self.header.null_bitset_list_offset = self.header.del_bitset_offset + del_bitset_len(max_row_count) as u16; - self.header.col_offset_list_offset = self.header.null_bitset_list_offset + null_bitset_list_len(max_row_count, cols.len()) as u16; - self.header.fix_field_offset = self.header.col_offset_list_offset + col_offset_list_len(cols.len()) as u16; + self.header.null_bitset_list_offset = + self.header.del_bitset_offset + del_bitset_len(max_row_count) as u16; + self.header.col_offset_list_offset = self.header.null_bitset_list_offset + + null_bitset_list_len(max_row_count, cols.len()) as u16; + self.header.fix_field_offset = + self.header.col_offset_list_offset + col_offset_list_len(cols.len()) as u16; self.init_col_offset_list_and_fix_field_end(cols, max_row_count as u16); self.header.var_field_offset = (PAGE_SIZE - mem::size_of::()) as u16; self.init_bitsets_and_row_ids(); debug_assert!({ (self.header.row_count..self.header.max_row_count).all(|i| { let row = self.row(i as usize); - !row.is_deleted() && row.is_invalid() - }) + row.is_deleted() + }) }); } @@ -97,27 +105,41 @@ impl RowPage { #[inline] fn init_bitsets_and_row_ids(&mut self) { unsafe { - // zero del_bitset and null_bitset_list - let count = (self.header.col_offset_list_offset - self.header.del_bitset_offset) as usize; - let ptr = self.data_ptr_mut().add(self.header.del_bitset_offset as usize); - std::ptr::write_bytes(ptr, 0, count); - - // initialize all RowIDs to INVALID_ROW_ID - let row_ids = self.vals_mut_unchecked::(0, self.header.max_row_count as usize); - for row_id in row_ids { - *row_id = INVALID_ROW_ID; + // initialize del_bitset to all ones. + { + let count = + (self.header.null_bitset_list_offset - self.header.del_bitset_offset) as usize; + let ptr = self + .data_ptr_mut() + .add(self.header.del_bitset_offset as usize); + std::ptr::write_bytes(ptr, 0xff, count); + } + // initialize null_bitset_list to all zeros. + { + let count = (self.header.col_offset_list_offset + - self.header.null_bitset_list_offset) as usize; + let ptr = self + .data_ptr_mut() + .add(self.header.null_bitset_list_offset as usize); + std::ptr::write_bytes(ptr, 0xff, count); } } } + /// Returns row id list in this page. #[inline] pub fn row_ids(&self) -> &[RowID] { self.vals::(0) } #[inline] - pub fn row_ids_mut(&mut self) -> &mut [RowID] { - self.vals_mut::(0) + pub fn row_by_id(&self, row_id: RowID) -> Option { + if row_id < self.header.start_row_id as RowID + || row_id >= self.header.start_row_id + self.header.row_count as u64 + { + return None; + } + Some(self.row((row_id - self.header.start_row_id) as usize)) } /// Returns free space of current page. @@ -129,7 +151,7 @@ impl RowPage { /// Insert a new row in page. #[inline] - pub fn insert(&mut self, insert: InsertRow) -> InsertResult { + pub fn insert(&mut self, insert: &InsertRow) -> InsertResult { // insert row does not include RowID, as RowID is auto-generated. debug_assert!(insert.0.len() + 1 == self.header.col_count as usize); if self.header.row_count == self.header.max_row_count { @@ -145,7 +167,7 @@ impl RowPage { Val::Byte2(v2) => new_row.add_val(*v2), Val::Byte4(v4) => new_row.add_val(*v4), Val::Byte8(v8) => new_row.add_val(*v8), - Val::VarByte(var) => new_row.add_var(var), + Val::VarByte(var) => new_row.add_var(var.as_bytes()), } } InsertResult::Ok(new_row.finish()) @@ -154,34 +176,52 @@ impl RowPage { /// delete row in page. /// This method will only mark the row as deleted. #[inline] - pub fn delete(&mut self, delete: DeleteRow) -> DeleteResult { + pub fn delete(&mut self, delete: &DeleteRow) -> DeleteResult { let row_id = delete.0; if row_id < self.header.start_row_id || row_id >= self.header.max_row_count as u64 { return DeleteResult::RowNotFound; } let row_idx = (row_id - self.header.start_row_id) as usize; - if self.row(row_idx).is_invalid() { - return DeleteResult::RowInvalid; - } if self.is_deleted(row_idx) { return DeleteResult::RowAlreadyDeleted; } - self.set_deleted(row_idx); + self.set_deleted(row_idx, true); DeleteResult::Ok } /// Update in-place in current page. #[inline] - pub fn update_in_place(&mut self, update: UpdateRow) -> UpdateResult { - if update.row_id < self.header.start_row_id || update.row_id >= self.header.max_row_count as u64 { + pub fn update(&mut self, update: &UpdateRow) -> UpdateResult { + // column indexes must be in range + debug_assert!( + { + update + .cols + .iter() + .all(|uc| uc.idx < self.header.col_count as usize) + }, + "update column indexes must be in range" + ); + // column indexes should be in order. + debug_assert!( + { + update.cols.is_empty() + || update + .cols + .iter() + .zip(update.cols.iter().skip(1)) + .all(|(l, r)| l.idx < r.idx) + }, + "update columns should be in order" + ); + if update.row_id < self.header.start_row_id + || update.row_id >= self.header.max_row_count as u64 + { return UpdateResult::RowNotFound; } let row_idx = (update.row_id - self.header.start_row_id) as usize; if self.row(row_idx).is_deleted() { - return UpdateResult::RowAlreadyDeleted; - } - if self.row(row_idx).is_invalid() { - return UpdateResult::RowInvalid; + return UpdateResult::RowDeleted; } if !self.free_space_enough_for_update(row_idx, &update.cols) { return UpdateResult::NoFreeSpace; @@ -194,6 +234,57 @@ impl RowPage { UpdateResult::Ok } + #[inline] + pub fn update_with_undo(&mut self, update: &UpdateRow) -> UpdateWithUndoResult { + // column indexes must be in range + debug_assert!( + { + update + .cols + .iter() + .all(|uc| uc.idx < self.header.col_count as usize) + }, + "update column indexes must be in range" + ); + // column indexes should be in order. + debug_assert!( + { + update.cols.is_empty() + || update + .cols + .iter() + .zip(update.cols.iter().skip(1)) + .all(|(l, r)| l.idx < r.idx) + }, + "update columns should be in order" + ); + if update.row_id < self.header.start_row_id + || update.row_id >= self.header.max_row_count as u64 + { + return UpdateWithUndoResult::RowNotFound; + } + let row_idx = (update.row_id - self.header.start_row_id) as usize; + if self.row(row_idx).is_deleted() { + return UpdateWithUndoResult::RowDeleted; + } + if !self.free_space_enough_for_update(row_idx, &update.cols) { + return UpdateWithUndoResult::NoFreeSpace; + } + let mut row = self.row_mut(row_idx); + // todo: identify difference and skip if the same. + let mut undo = vec![]; + for uc in &update.cols { + if let Some(old) = row.is_different(uc.idx, &uc.val) { + undo.push(UpdateCol { + idx: uc.idx, + val: Val::from(old), + }); + row.update_col(uc.idx, &uc.val); + } + } + UpdateWithUndoResult::Ok(undo) + } + /// Select single row by row id. #[inline] pub fn select(&self, row_id: RowID) -> SelectResult { @@ -202,46 +293,49 @@ impl RowPage { } let row_idx = (row_id - self.header.start_row_id) as usize; let row = self.row(row_idx); - if row.is_invalid() { - return SelectResult::RowInvalid; - } if row.is_deleted() { - return SelectResult::Deleted(row); + return SelectResult::RowDeleted(row); } SelectResult::Ok(row) } #[inline] - fn free_space_enough_for_insert(&self, insert: &[Val<'_>]) -> bool { - let var_len: usize = insert.iter().map(|v| match v { - Val::VarByte(var) => if var.len() > VAR_LEN_INLINE { - var.len() - } else { - 0 - } - _ => 0, - }).sum(); + fn free_space_enough_for_insert(&self, insert: &[Val]) -> bool { + let var_len: usize = insert + .iter() + .map(|v| match v { + Val::VarByte(var) => { + if var.len() > PAGE_VAR_LEN_INLINE { + var.len() + } else { + 0 + } + } + _ => 0, + }) + .sum(); var_len <= self.free_space() as usize } #[inline] - fn free_space_enough_for_update(&self, row_idx: usize, update: &[UpdateCol<'_>]) -> bool { + fn free_space_enough_for_update(&self, row_idx: usize, update: &[UpdateCol]) -> bool { let row = self.row(row_idx); - let var_len: usize = update.iter().map(|uc| { - match uc.val { + let var_len: usize = update + .iter() + .map(|uc| match &uc.val { Val::VarByte(var) => { let col = row.var(uc.idx); - let orig_var_len = VarByteVal::inpage_len(col); - let upd_var_len = VarByteVal::inpage_len(var); + let orig_var_len = PageVar::outline_len(col); + let upd_var_len = PageVar::outline_len(var.as_bytes()); if upd_var_len > orig_var_len { upd_var_len } else { 0 } } - _ => 0 - } - }).sum(); + _ => 0, + }) + .sum(); var_len <= self.free_space() as usize } @@ -251,7 +345,11 @@ impl RowPage { debug_assert!(self.header.row_count < self.header.max_row_count); let start_row_id = self.header.start_row_id; let row_idx = self.header.row_count as usize; - let mut row = NewRow{page: self, row_idx, col_idx: 0}; + let mut row = NewRow { + page: self, + row_idx, + col_idx: 0, + }; // always add RowID as first column row.add_val(start_row_id + row_idx as u64); row @@ -261,32 +359,34 @@ impl RowPage { #[inline] fn row(&self, row_idx: usize) -> Row { debug_assert!(row_idx < self.header.max_row_count as usize); - Row{page: self, row_idx} + Row { + page: self, + row_idx, + } } /// Returns mutable row by given index in page. #[inline] fn row_mut(&mut self, row_idx: usize) -> RowMut { debug_assert!(row_idx < self.header.row_count as usize); - RowMut{page: self, row_idx} + RowMut { + page: self, + row_idx, + } } /// Returns all values of given column. #[inline] fn vals(&self, col_idx: usize) -> &[V] { let len = self.header.row_count as usize; - unsafe { - self.vals_unchecked(col_idx, len) - } + unsafe { self.vals_unchecked(col_idx, len) } } /// Returns all mutable values of given column. #[inline] fn vals_mut(&mut self, col_idx: usize) -> &mut [V] { let len = self.header.row_count as usize; - unsafe { - self.vals_mut_unchecked(col_idx, len) - } + unsafe { self.vals_mut_unchecked(col_idx, len) } } #[inline] @@ -322,10 +422,10 @@ impl RowPage { } #[inline] - unsafe fn var_unchecked(&self, row_idx: usize, col_idx: usize) -> &VarByteVal { + unsafe fn var_unchecked(&self, row_idx: usize, col_idx: usize) -> &PageVar { let offset = self.col_offset(col_idx) as usize; let ptr = self.data_ptr().add(offset); - let data: *const VarByteVal = mem::transmute(ptr); + let data: *const PageVar = mem::transmute(ptr); &*data.add(row_idx) } @@ -336,10 +436,10 @@ impl RowPage { } #[inline] - unsafe fn var_mut_unchecked(&mut self, row_idx: usize, col_idx: usize) -> &mut VarByteVal { + unsafe fn var_mut_unchecked(&mut self, row_idx: usize, col_idx: usize) -> &mut PageVar { let offset = self.col_offset(col_idx) as usize; let ptr = self.data_ptr().add(offset); - let data: *mut VarByteVal = mem::transmute(ptr); + let data: *mut PageVar = mem::transmute(ptr); &mut *data.add(row_idx) } @@ -357,7 +457,10 @@ impl RowPage { fn del_bitset(&self) -> &[u8] { let len = del_bitset_len(self.header.max_row_count as usize); unsafe { - slice::from_raw_parts(self.data_ptr().add(self.header.del_bitset_offset as usize), len) + slice::from_raw_parts( + self.data_ptr().add(self.header.del_bitset_offset as usize), + len, + ) } } @@ -373,15 +476,25 @@ impl RowPage { fn del_bitset_mut(&mut self) -> &mut [u8] { let len = del_bitset_len(self.header.max_row_count as usize); unsafe { - slice::from_raw_parts_mut(self.data_ptr_mut().add(self.header.del_bitset_offset as usize), len) + slice::from_raw_parts_mut( + self.data_ptr_mut() + .add(self.header.del_bitset_offset as usize), + len, + ) } } #[inline] - fn set_deleted(&mut self, row_idx: usize) { + fn set_deleted(&mut self, row_idx: usize, deleted: bool) { unsafe { - let ptr = self.data_ptr_mut().add(self.header.del_bitset_offset as usize); - *ptr.add(row_idx / 8) |= 1 << (row_idx % 8); + let ptr = self + .data_ptr_mut() + .add(self.header.del_bitset_offset as usize); + if deleted { + *ptr.add(row_idx / 8) |= 1 << (row_idx % 8); + } else { + *ptr.add(row_idx / 8) &= !(1 << (row_idx % 8)); + } } } @@ -390,9 +503,7 @@ impl RowPage { fn null_bitset(&self, col_idx: usize) -> &[u8] { let len = align8(self.header.max_row_count as usize) / 8; let offset = self.header.null_bitset_list_offset as usize; - unsafe { - slice::from_raw_parts(self.data_ptr().add(offset + len * col_idx), len) - } + unsafe { slice::from_raw_parts(self.data_ptr().add(offset + len * col_idx), len) } } #[inline] @@ -405,9 +516,7 @@ impl RowPage { fn null_bitset_mut(&mut self, col_idx: usize) -> &mut [u8] { let len = align8(self.header.max_row_count as usize) / 8; let offset = self.header.null_bitset_list_offset as usize; - unsafe { - slice::from_raw_parts_mut(self.data_ptr_mut().add(offset + len * col_idx), len) - } + unsafe { slice::from_raw_parts_mut(self.data_ptr_mut().add(offset + len * col_idx), len) } } #[inline] @@ -435,22 +544,39 @@ impl RowPage { } #[inline] - fn add_var(&mut self, input: &[u8]) -> VarByteVal { + fn add_var(&mut self, input: &[u8]) -> PageVar { let len = input.len(); - if len <= VAR_LEN_INLINE { - return VarByteVal::inline(input); + if len <= PAGE_VAR_LEN_INLINE { + return PageVar::inline(input); } self.header.var_field_offset -= len as u16; unsafe { - let ptr = self.data_ptr_mut().add(self.header.var_field_offset as usize); + let ptr = self + .data_ptr_mut() + .add(self.header.var_field_offset as usize); let target = slice::from_raw_parts_mut(ptr, len); target.copy_from_slice(input); } - VarByteVal::inpage(len as u16, self.header.var_field_offset, &input[..VAR_LEN_PREFIX]) + PageVar::outline( + len as u16, + self.header.var_field_offset, + &input[..PAGE_VAR_LEN_PREFIX], + ) + } +} + +impl BufferFrameAware for RowPage { + #[inline] + fn init_bf(pool: &crate::buffer::FixedBufferPool, bf: &mut BufferFrame) { + // todo: associate UndoMap + } + + #[inline] + fn deinit_bf(_pool: &crate::buffer::FixedBufferPool, _bf: &mut BufferFrame) { + // todo: de-associate UndoMap } } -#[derive(Debug, Clone, PartialEq, Eq)] #[repr(C)] pub struct RowPageHeader { pub start_row_id: u64, @@ -466,6 +592,24 @@ pub struct RowPageHeader { padding: [u8; 6], } +impl fmt::Debug for RowPageHeader { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RowPageHeader") + .field("start_row_id", &self.start_row_id) + .field("max_row_count", &self.max_row_count) + .field("row_count", &self.row_count) + .field("col_count", &self.col_count) + .field("del_bitset_offset", &self.del_bitset_offset) + .field("null_bitset_list_offset", &self.null_bitset_list_offset) + .field("col_offset_list_offset", &self.col_offset_list_offset) + .field("fix_field_offset", &self.fix_field_offset) + .field("fix_field_end", &self.fix_field_end) + .field("var_field_offset", &self.var_field_offset) + .finish() + } +} + /// NewRow wraps the page to provide convenient method /// to add values to new row. pub struct NewRow<'a> { @@ -481,7 +625,9 @@ impl<'a> NewRow<'a> { debug_assert!(self.col_idx < self.page.header.col_count as usize); let val = input.to_val(); unsafe { - let target = self.page.val_mut_unchecked::(self.row_idx, self.col_idx); + let target = self + .page + .val_mut_unchecked::(self.row_idx, self.col_idx); *target = val; } self.col_idx += 1; @@ -518,13 +664,14 @@ impl<'a> NewRow<'a> { #[inline] pub fn finish(self) -> RowID { debug_assert!(self.col_idx == self.page.header.col_count as usize); + self.page.set_deleted(self.row_idx, false); self.page.header.row_count += 1; self.page.header.start_row_id + self.row_idx as u64 } } /// Row abstract a logical row in the page. -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone)] pub struct Row<'a> { page: &'a RowPage, row_idx: usize, @@ -538,31 +685,17 @@ impl<'a> Row<'a> { } /// Returns whether current row is deleted. + /// The page is initialized as all rows are deleted. + /// And insert should set the delete flag to false. #[inline] pub fn is_deleted(&self) -> bool { self.page.is_deleted(self.row_idx) } - /// Returns whether current row is invalid. - /// The may happen in recovery if a transaction - /// insert a row and then delete it. - /// The row id is used but redo log does not - /// record it, as we use value logging, only latest - /// version is kept. - /// Therefore, some rows may be untouched during - /// recovery and lead to "holes" in the page. - /// The table scan should discard such rows. - #[inline] - pub fn is_invalid(&self) -> bool { - self.row_id() == INVALID_ROW_ID - } - /// Returns value by given column index. #[inline] pub fn val(&self, col_idx: usize) -> &T { - unsafe { - self.page.val_unchecked::(self.row_idx, col_idx) - } + unsafe { self.page.val_unchecked::(self.row_idx, col_idx) } } /// Returns variable-length value by given column index. @@ -600,12 +733,49 @@ impl<'a> RowMut<'a> { self.page.is_deleted(self.row_idx) } - /// Returns whether current row is invalid. + /// Returns the old value if different from given index and new value. #[inline] - pub fn is_invalid(&self) -> bool { - self.row_id() == INVALID_ROW_ID + pub fn is_different(&self, col_idx: usize, value: &Val) -> Option { + match value { + Val::Byte1(new) => { + let old = self.val::(col_idx); + if old == new { + return None; + } + Some(Val::Byte1(*old)) + } + Val::Byte2(new) => { + let old = self.val::(col_idx); + if old == new { + return None; + } + Some(Val::Byte2(*old)) + } + Val::Byte4(new) => { + let old = self.val::(col_idx); + if old == new { + return None; + } + Some(Val::Byte4(*old)) + } + Val::Byte8(new) => { + let old = self.val::(col_idx); + if old == new { + return None; + } + Some(Val::Byte8(*old)) + } + Val::VarByte(new) => { + let old = self.var(col_idx); + if old == new.as_bytes() { + return None; + } + Some(Val::VarByte(MemVar::new(old))) + } + } } + /// Update column by given index and value. #[inline] pub fn update_col(&mut self, col_idx: usize, value: &Val) { match value { @@ -622,7 +792,7 @@ impl<'a> RowMut<'a> { self.update_val(col_idx, v8); } Val::VarByte(var) => { - self.update_var(col_idx, var); + self.update_var(col_idx, var.as_bytes()); } } } @@ -630,17 +800,13 @@ impl<'a> RowMut<'a> { /// Returns value by given column index. #[inline] pub fn val(&self, col_idx: usize) -> &T { - unsafe { - self.page.val_unchecked::(self.row_idx, col_idx) - } + unsafe { self.page.val_unchecked::(self.row_idx, col_idx) } } /// Returns mutable value by given column index. #[inline] pub fn val_mut(&mut self, col_idx: usize) -> &mut T { - unsafe { - self.page.val_mut_unchecked(self.row_idx, col_idx) - } + unsafe { self.page.val_mut_unchecked(self.row_idx, col_idx) } } /// Update fix-length value by givne column index. @@ -661,16 +827,16 @@ impl<'a> RowMut<'a> { /// Update variable-length value. #[inline] pub fn update_var(&mut self, col_idx: usize, input: &[u8]) { - // todo: reuse released space by update. + // todo: reuse released space by update. // if update value is longer than original value, // the original space is wasted. // there can be optimization that additionally record // the head free offset of released var-len space at the page header. - // and any released space is at lest 7 bytes(larger than VAR_LEN_INLINE) + // and any released space is at lest 7 bytes(larger than VAR_LEN_INLINE) // long and is enough to connect the free list. unsafe { let origin_len = self.page.var_len_unchecked(self.row_idx, col_idx); - if input.len() <= VAR_LEN_INLINE || input.len() <= origin_len { + if input.len() <= PAGE_VAR_LEN_INLINE || input.len() <= origin_len { let ptr = self.page.data_ptr_mut(); let target = self.page.var_mut_unchecked(self.row_idx, col_idx); target.update_in_place(ptr, input); @@ -776,7 +942,7 @@ mod tests { let cols = vec![Layout::Byte8, Layout::Byte4, Layout::VarByte]; let mut page = create_row_page(); page.init(100, 200, &cols); - + let mut new_row = page.new_row(); new_row.add_val(1_000_000i32); new_row.add_str("hello"); // inline string @@ -814,38 +980,64 @@ mod tests { #[test] fn test_row_page_crud() { - let cols = vec![Layout::Byte8, Layout::Byte1, Layout::Byte2, Layout::Byte4, Layout::Byte8, Layout::VarByte]; + let cols = vec![ + Layout::Byte8, + Layout::Byte1, + Layout::Byte2, + Layout::Byte4, + Layout::Byte8, + Layout::VarByte, + ]; let mut page = create_row_page(); page.init(100, 200, &cols); let short = b"short"; let long = b"very loooooooooooooooooong"; - let insert: InsertRow<'static> = InsertRow(vec![ - Val::Byte1(1), Val::Byte2(1000), Val::Byte4(1_000_000), - Val::Byte8(1 << 35), Val::VarByte(short)]); - let res = page.insert(insert); - assert!(res == InsertResult::Ok(100)); + let insert: InsertRow = InsertRow(vec![ + Val::Byte1(1), + Val::Byte2(1000), + Val::Byte4(1_000_000), + Val::Byte8(1 << 35), + Val::from(&short[..]), + ]); + let res = page.insert(&insert); + assert!(matches!(res, InsertResult::Ok(100))); assert!(!page.row(0).is_deleted()); - let update: UpdateRow<'static> = UpdateRow{ + let update: UpdateRow = UpdateRow { row_id: 100, cols: vec![ - UpdateCol{idx: 1, val: Val::Byte1(2)}, - UpdateCol{idx: 2, val: Val::Byte2(2000)}, - UpdateCol{idx: 3, val: Val::Byte4(2_000_000)}, - UpdateCol{idx: 4, val: Val::Byte8(2 << 35)}, - UpdateCol{idx: 5, val: Val::VarByte(long)}, - ], + UpdateCol { + idx: 1, + val: Val::Byte1(2), + }, + UpdateCol { + idx: 2, + val: Val::Byte2(2000), + }, + UpdateCol { + idx: 3, + val: Val::Byte4(2_000_000), + }, + UpdateCol { + idx: 4, + val: Val::Byte8(2 << 35), + }, + UpdateCol { + idx: 5, + val: Val::VarByte(MemVar::new(long)), + }, + ], }; - let res = page.update_in_place(update); - assert!(res == UpdateResult::Ok); + let res = page.update(&update); + assert!(matches!(res, UpdateResult::Ok)); let delete = DeleteRow(100); - let res = page.delete(delete); - assert!(res == DeleteResult::Ok); + let res = page.delete(&delete); + assert!(res.is_ok()); let select = page.select(100); - assert!(matches!(select, SelectResult::Deleted(_))); + assert!(matches!(select, SelectResult::RowDeleted(_))); } fn create_row_page() -> RowPage { diff --git a/doradb-storage/src/row/ops.rs b/doradb-storage/src/row/ops.rs index f3cc14b..adbf086 100644 --- a/doradb-storage/src/row/ops.rs +++ b/doradb-storage/src/row/ops.rs @@ -1,52 +1,122 @@ -use crate::row::value::Val; -use crate::row::{RowID, Row}; +use crate::row::{Row, RowID}; +use crate::trx::redo::RedoKind; +use crate::value::Val; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; -#[derive(PartialEq, Eq)] pub enum SelectResult<'a> { Ok(Row<'a>), - Deleted(Row<'a>), + RowDeleted(Row<'a>), RowNotFound, - RowInvalid, } -#[derive(Debug, Clone)] -pub struct InsertRow<'a>(pub Vec>); +impl SelectResult<'_> { + /// Returns if select succeeds. + #[inline] + pub fn is_ok(&self) -> bool { + matches!(self, SelectResult::Ok(_)) + } +} + +#[derive(Debug)] +pub struct InsertRow(pub Vec); + +impl InsertRow { + /// Create redo log + #[inline] + pub fn create_redo(&self) -> RedoKind { + RedoKind::Insert(self.0.clone()) + } +} -#[derive(Debug, Clone, PartialEq, Eq)] pub enum InsertResult { Ok(RowID), RowIDExhausted, NoFreeSpace, } -#[derive(Debug, Clone)] +impl InsertResult { + /// Returns if insert succeeds. + #[inline] + pub fn is_ok(&self) -> bool { + matches!(self, InsertResult::Ok(_)) + } +} + +#[derive(Debug)] pub struct DeleteRow(pub RowID); -#[derive(Debug, Clone, PartialEq, Eq)] pub enum DeleteResult { Ok, RowNotFound, RowAlreadyDeleted, - RowInvalid, } -#[derive(Debug, Clone)] -pub struct UpdateRow<'a> { +impl DeleteResult { + /// Returns if delete succeeds. + #[inline] + pub fn is_ok(&self) -> bool { + matches!(self, DeleteResult::Ok) + } +} + +#[derive(PartialEq, Eq)] +pub struct UpdateRow { pub row_id: RowID, - pub cols: Vec>, + pub cols: Vec, +} + +impl UpdateRow { + /// Create redo log based on changed values. + #[inline] + pub fn create_redo(&self, undo: &[UpdateCol]) -> Option { + let hashset: HashSet<_> = undo.iter().map(|uc| uc.idx).collect(); + let vals: Vec = self + .cols + .iter() + .filter(|uc| hashset.contains(&uc.idx)) + .cloned() + .collect(); + if vals.is_empty() { + return None; + } + Some(RedoKind::Update(vals)) + } } -#[derive(Debug, Clone, PartialEq, Eq)] pub enum UpdateResult { Ok, RowNotFound, - RowAlreadyDeleted, - RowInvalid, + RowDeleted, NoFreeSpace, } -#[derive(Debug, Clone)] -pub struct UpdateCol<'a> { +impl UpdateResult { + /// Returns if update succeeds. + #[inline] + pub fn is_ok(&self) -> bool { + matches!(self, UpdateResult::Ok) + } +} + +#[derive(PartialEq, Eq)] +pub enum UpdateWithUndoResult { + Ok(Vec), + RowNotFound, + RowDeleted, + NoFreeSpace, +} + +impl UpdateWithUndoResult { + /// Returns if update with undo succeeds. + #[inline] + pub fn is_ok(&self) -> bool { + matches!(self, UpdateWithUndoResult::Ok(_)) + } +} + +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct UpdateCol { pub idx: usize, - pub val: Val<'a>, -} \ No newline at end of file + pub val: Val, +} diff --git a/doradb-storage/src/row/value.rs b/doradb-storage/src/row/value.rs deleted file mode 100644 index 5b74a59..0000000 --- a/doradb-storage/src/row/value.rs +++ /dev/null @@ -1,451 +0,0 @@ -use std::mem::{self, MaybeUninit}; - -pub const VAR_LEN_INLINE: usize = 6; -pub const VAR_LEN_PREFIX: usize = 4; -const _: () = assert!(mem::size_of::() == 8); - -#[derive(Debug, Clone, Copy)] -pub enum Val<'a> { - Byte1(Byte1Val), - Byte2(Byte2Val), - Byte4(Byte4Val), - Byte8(Byte8Val), - VarByte(&'a [u8]), -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: u8) -> Self { - Val::Byte1(value) - } -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: i8) -> Self { - Val::Byte1(value as u8) - } -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: u16) -> Self { - Val::Byte2(value) - } -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: i16) -> Self { - Val::Byte2(value as u16) - } -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: u32) -> Self { - Val::Byte4(value) - } -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: i32) -> Self { - Val::Byte4(value as u32) - } -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: f32) -> Self { - Val::Byte4(u32::from_ne_bytes(value.to_ne_bytes())) - } -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: u64) -> Self { - Val::Byte8(value) - } -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: i64) -> Self { - Val::Byte8(value as u64) - } -} - -impl<'a> From for Val<'a> { - #[inline] - fn from(value: f64) -> Self { - Val::Byte8(u64::from_ne_bytes(value.to_ne_bytes())) - } -} - -impl<'a> From<&'a [u8]> for Val<'a> { - #[inline] - fn from(value: &'a [u8]) -> Self { - Val::VarByte(value) - } -} - -impl<'a> From<&'a str> for Val<'a> { - #[inline] - fn from(value: &'a str) -> Self { - Val::VarByte(value.as_bytes()) - } -} - -/// Value is a marker trait to represent -/// fixed-length column value in row page. -pub trait Value {} - -pub trait ToValue { - type Target: Value; - - fn to_val(&self) -> Self::Target; -} - -pub type Byte1Val = u8; -pub trait Byte1ValSlice { - fn as_i8s(&self) -> &[i8]; - - fn as_i8s_mut(&mut self) -> &mut [i8]; -} - -impl Value for Byte1Val {} - -impl Byte1ValSlice for [Byte1Val] { - #[inline] - fn as_i8s(&self) -> &[i8] { - unsafe { mem::transmute(self) } - } - - #[inline] - fn as_i8s_mut(&mut self) -> &mut [i8] { - unsafe { mem::transmute(self) } - } -} - -impl ToValue for u8 { - type Target = Byte1Val; - #[inline] - fn to_val(&self) -> Self::Target { - *self - } -} - -impl ToValue for i8 { - type Target = Byte1Val; - #[inline] - fn to_val(&self) -> Self::Target { - *self as u8 - } -} - -pub type Byte2Val = u16; -pub trait Byte2ValSlice { - fn as_i16s(&self) -> &[i16]; - - fn as_i16s_mut(&mut self) -> &mut [i16]; -} -impl Value for Byte2Val {} - -impl ToValue for u16 { - type Target = Byte2Val; - #[inline] - fn to_val(&self) -> Self::Target { - *self - } -} - -impl ToValue for i16 { - type Target = Byte2Val; - #[inline] - fn to_val(&self) -> Self::Target { - *self as u16 - } -} - -impl Byte2ValSlice for [Byte2Val] { - #[inline] - fn as_i16s(&self) -> &[i16] { - unsafe { mem::transmute(self) } - } - - #[inline] - fn as_i16s_mut(&mut self) -> &mut [i16] { - unsafe { mem::transmute(self) } - } -} - -pub type Byte4Val = u32; -pub trait Byte4ValSlice { - fn as_i32s(&self) -> &[i32]; - - fn as_i32s_mut(&mut self) -> &mut [i32]; - - fn as_f32s(&self) -> &[f32]; - - fn as_f32s_mut(&mut self) -> &mut [f32]; -} - -impl Value for Byte4Val {} - -impl Byte4ValSlice for [Byte4Val] { - #[inline] - fn as_i32s(&self) -> &[i32] { - unsafe { mem::transmute(self) } - } - - #[inline] - fn as_i32s_mut(&mut self) -> &mut [i32] { - unsafe { mem::transmute(self) } - } - - #[inline] - fn as_f32s(&self) -> &[f32] { - unsafe { mem::transmute(self) } - } - - #[inline] - fn as_f32s_mut(&mut self) -> &mut [f32] { - unsafe { mem::transmute(self) } - } -} - -impl ToValue for u32 { - type Target = Byte4Val; - #[inline] - fn to_val(&self) -> Self::Target { - *self - } -} - -impl ToValue for i32 { - type Target = Byte4Val; - #[inline] - fn to_val(&self) -> Self::Target { - *self as u32 - } -} - -pub type Byte8Val = u64; -pub trait Byte8ValSlice { - fn as_i64s(&self) -> &[i64]; - - fn as_i64s_mut(&mut self) -> &mut [i64]; - - fn as_f64s(&self) -> &[f64]; - - fn as_f64s_mut(&mut self) -> &mut [f64]; -} - -impl Value for Byte8Val {} - -impl ToValue for u64 { - type Target = Byte8Val; - #[inline] - fn to_val(&self) -> Self::Target { - *self - } -} - -impl ToValue for i64 { - type Target = Byte8Val; - #[inline] - fn to_val(&self) -> Self::Target { - *self as u64 - } -} - -impl Byte8ValSlice for [Byte8Val] { - #[inline] - fn as_i64s(&self) -> &[i64] { - unsafe { mem::transmute(self) } - } - - #[inline] - fn as_i64s_mut(&mut self) -> &mut [i64] { - unsafe { mem::transmute(self) } - } - - #[inline] - fn as_f64s(&self) -> &[f64] { - unsafe { mem::transmute(self) } - } - - #[inline] - fn as_f64s_mut(&mut self) -> &mut [f64] { - unsafe { mem::transmute(self) } - } -} - -/// VarByteVal represents var-len value in page. -/// It has two kinds: inline and inpage. -/// Inline means the bytes are inlined in the fixed field. -/// Inpage means the fixed field only store length, -/// offset and prfix. Entire value is located at -/// tail of page. -#[derive(Clone, Copy)] -pub struct VarByteVal { - inner: VarByteInner, -} - -impl VarByteVal { - /// Create a new VarByteVal with inline data. - /// The data length must be no more than 14 bytes. - #[inline] - pub fn inline(val: &[u8]) -> Self { - debug_assert!(val.len() <= VAR_LEN_INLINE); - let mut inline = MaybeUninit::::uninit(); - unsafe { - let i = inline.assume_init_mut(); - i.len = val.len() as u16; - i.data[..val.len()].copy_from_slice(val); - VarByteVal{ - inner: VarByteInner{v: inline.assume_init()}, - } - } - } - - /// Create a new VarByteVal with pointer info. - /// The prefix length must be 12 bytes. - #[inline] - pub fn inpage(len: u16, offset: u16, prefix: &[u8]) -> Self { - debug_assert!(prefix.len() == VAR_LEN_PREFIX); - let mut inpage = MaybeUninit::::uninit(); - unsafe { - let p = inpage.assume_init_mut(); - p.len = len; - p.offset = offset; - p.prefix.copy_from_slice(prefix); - VarByteVal{ - inner: VarByteInner{p: inpage.assume_init() } - } - } - } - - /// Returns length of the value. - #[inline] - pub fn len(&self) -> usize { - unsafe { self.inner.v.len as usize } - } - - /// Returns whether the value is inlined. - #[inline] - pub fn is_inlined(&self) -> bool { - self.len() <= VAR_LEN_INLINE - } - - /// Returns inpage length of given value. - /// If the value can be inlined, returns 0. - #[inline] - pub fn inpage_len(data: &[u8]) -> usize { - if data.len() > VAR_LEN_INLINE { - data.len() - } else { - 0 - } - } - - /// Returns bytes. - #[inline] - pub fn as_bytes(&self, ptr: *const u8) -> &[u8] { - let len = self.len(); - if len <= 14 { - unsafe { &self.inner.v.data[..len] } - } else { - unsafe { - let data = ptr.add(self.inner.p.offset as usize); - std::slice::from_raw_parts(data, len) - } - } - } - - /// Returns mutable bytes. - #[inline] - pub fn as_bytes_mut(&mut self, ptr: *mut u8) -> &mut [u8] { - let len = self.len(); - if len <= 14 { - unsafe { &mut self.inner.v.data[..len] } - } else { - unsafe { - let data = ptr.add(self.inner.p.offset as usize); - std::slice::from_raw_parts_mut(data, len) - } - } - } - - /// Returns string. - #[inline] - pub fn as_str(&self, ptr: *const u8) -> &str { - let len = self.len(); - if len <= 14 { - unsafe { std::str::from_utf8_unchecked(&self.inner.v.data[..len]) } - } else { - unsafe { - let data = ptr.add(self.inner.p.offset as usize); - let bytes = std::slice::from_raw_parts(data, len); - std::str::from_utf8_unchecked(bytes) - } - } - } - - /// Returns mutable string. - #[inline] - pub fn as_str_mut(&mut self, ptr: *mut u8) -> &mut str { - let len = self.len(); - if len <= 14 { - unsafe { std::str::from_utf8_unchecked_mut(&mut self.inner.v.data[..len]) } - } else { - unsafe { - let data = ptr.add(self.inner.p.offset as usize); - let bytes = std::slice::from_raw_parts_mut(data, len); - std::str::from_utf8_unchecked_mut(bytes) - } - } - } - - /// In-place update with given value. - /// Caller must ensure no extra space is required. - #[inline] - pub fn update_in_place(&mut self, ptr: *mut u8, val: &[u8]) { - debug_assert!(val.len() <= VAR_LEN_INLINE || val.len() <= self.len()); - unsafe { - if val.len() > VAR_LEN_INLINE { - // all not inline, but original is longer or equal to input value. - debug_assert!(self.len() > VAR_LEN_INLINE); - self.inner.p.len = val.len() as u16; - let target = std::slice::from_raw_parts_mut(ptr.add(self.inner.p.offset as usize), val.len()); - target.copy_from_slice(val); - } else { // input is inlined. - // better to reuse release page data. - self.inner.v.len = val.len() as u16; - self.inner.v.data[..val.len()].copy_from_slice(val); - } - } - } -} - -#[derive(Clone, Copy)] -union VarByteInner { - v: Inline, - p: Inpage, -} - -#[derive(Debug, Clone, Copy)] -#[repr(C)] -struct Inline { - len: u16, - data: [u8; VAR_LEN_INLINE], -} - -#[derive(Debug, Clone, Copy)] -#[repr(C)] -struct Inpage { - len: u16, - offset: u16, - prefix: [u8; VAR_LEN_PREFIX], -} \ No newline at end of file diff --git a/doradb-storage/src/trx/mod.rs b/doradb-storage/src/trx/mod.rs new file mode 100644 index 0000000..26edc44 --- /dev/null +++ b/doradb-storage/src/trx/mod.rs @@ -0,0 +1,220 @@ +//! DoraDB's concurrency control protocol is an implmementation of MVCC + MV2PL(todo). +//! +//! The basic MVCC logic is described as below. +//! 1. When starting a transaction, a snapshot timestamp(STS) is generated, and transaction id +//! is also derived from STS by setting highest bit to 1. +//! 2. When the transaction do any insert, update or delete, an undo log is generated with +//! RowID and stored in a page-level transaction version map(TRX-MAP). The undo log records +//! current transaction id at head. +//! 3. When the transaction commits, a commit timestamp(CTS) is generated, and all undo logs of +//! this transaction will update CTS in its head. +//! 4. When a transaction query a row in one page, +//! a) it first look at page-level TRX-MAP, if the map is empty, then all data on the page +//! are latest. So directly read data and return. +//! b) otherwise, check if queried RowID exists in the map. if not, same as a). +//! c) If exists, check the timestamp in entry head. If it's larger than current STS, means +//! it's invisible, undo change and go to next version in the chain... +//! d) If less than current STS, return current version. +pub mod redo; +pub mod sys; +pub mod undo; + +use crate::buffer::guard::{PageExclusiveGuard, PageGuard}; +use crate::buffer::FixedBufferPool; +use crate::latch::LatchFallbackMode; +use crate::row::ops::{InsertResult, InsertRow}; +use crate::row::RowPage; +use crate::trx::redo::{RedoBin, RedoEntry, RedoLog}; +use crate::trx::undo::{SharedUndoEntry, UndoEntry, UndoKind}; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +pub type TrxID = u64; +pub const INVALID_TRX_ID: TrxID = !0; +pub const MIN_SNAPSHOT_TS: TrxID = 1; +pub const MAX_SNAPSHOT_TS: TrxID = 1 << 63; +pub const MAX_COMMIT_TS: TrxID = 1 << 63; +// As active transaction id is always greater than STS, that means +// visibility check can be simplified to "STS is larger". +pub const MIN_ACTIVE_TRX_ID: TrxID = (1 << 63) + 1; + +pub struct ActiveTrx { + trx_id: Arc, + pub sts: TrxID, + // transaction-level undo logs. + trx_undo: Vec, + // statement-level undo logs. + stmt_undo: Vec, + // transaction-level redo logs. + trx_redo: Vec, + // statement-level redo logs. + stmt_redo: Vec, +} + +impl ActiveTrx { + /// Create a new transaction. + #[inline] + pub fn new(trx_id: TrxID, sts: TrxID) -> Self { + ActiveTrx { + trx_id: Arc::new(AtomicU64::new(trx_id)), + sts, + trx_undo: vec![], + stmt_undo: vec![], + trx_redo: vec![], + stmt_redo: vec![], + } + } + + /// Returns transaction id of current transaction. + #[inline] + pub fn trx_id(&self) -> TrxID { + self.trx_id.load(Ordering::Acquire) + } + + /// Starts a statement. + #[inline] + pub fn start_stmt(&mut self) { + debug_assert!(self.stmt_undo.is_empty()); + debug_assert!(self.stmt_redo.is_empty()); + } + + /// Ends a statement. + #[inline] + pub fn end_stmt(&mut self) { + self.trx_undo.extend(self.stmt_undo.drain(..)); + self.trx_redo.extend(self.stmt_redo.drain(..)); + } + + /// Rollback a statement. + #[inline] + pub fn rollback_stmt(&mut self, buf_pool: &FixedBufferPool) { + while let Some(undo) = self.stmt_undo.pop() { + let page_guard: PageGuard<'_, RowPage> = buf_pool + .get_page(undo.page_id, LatchFallbackMode::Exclusive) + .expect("get page for undo should not fail"); + let page_guard = page_guard.block_until_exclusive(); + todo!() + } + } + + /// Prepare current transaction for committing. + #[inline] + pub fn prepare(self) -> PreparedTrx { + debug_assert!(self.stmt_undo.is_empty()); + debug_assert!(self.stmt_redo.is_empty()); + // use bincode to serialize redo log + let redo_bin = if self.trx_redo.is_empty() { + None + } else { + // todo: use customized serialization method, and keep CTS placeholder. + let redo_log = RedoLog { + cts: INVALID_TRX_ID, + data: self.trx_redo, + }; + let redo_bin = bincode::serde::encode_to_vec(&redo_log, bincode::config::standard()) + .expect("redo serialization should not fail"); + Some(redo_bin) + }; + PreparedTrx { + trx_id: self.trx_id, + sts: self.sts, + // cts, + redo_bin, + undo: self.trx_undo, + } + } + + /// Rollback current transaction. + #[inline] + pub fn rollback(self) { + todo!() + } + + #[inline] + fn insert_row_into_page( + &mut self, + page_guard: &mut PageExclusiveGuard<'_, RowPage>, + insert: &InsertRow, + ) -> InsertResult { + match page_guard.page_mut().insert(&insert) { + InsertResult::Ok(row_id) => { + let page_id = page_guard.page_id(); + // create undo log. + let undo_entry = Arc::new(UndoEntry { + ts: Arc::clone(&self.trx_id), + page_id, + row_id, + kind: UndoKind::Insert, + next: Mutex::new(None), + }); + // store undo log in undo map. + let undo_map = page_guard + .bf_mut() + .undo_map + .get_or_insert_with(|| Box::new(HashMap::new())) + .as_mut(); + let res = undo_map.insert(row_id, Arc::clone(&undo_entry)); + debug_assert!(res.is_none()); // insert must not have old version. + // store undo log into transaction undo buffer. + self.stmt_undo.push(undo_entry); + // create redo log. + let redo_entry = RedoEntry { + page_id, + row_id, + kind: insert.create_redo(), + }; + // store redo log into transaction redo buffer. + self.stmt_redo.push(redo_entry); + InsertResult::Ok(row_id) + } + err => err, + } + } +} + +/// PrecommitTrx has been assigned commit timestamp and already prepared redo log binary. +pub struct PreparedTrx { + pub trx_id: Arc, + pub sts: TrxID, + pub redo_bin: Option, + pub undo: Vec, +} + +impl PreparedTrx { + #[inline] + pub fn fill_cts(self, cts: TrxID) -> PrecommitTrx { + let mut redo_bin = self.redo_bin; + if let Some(redo_bin) = redo_bin.as_mut() { + backfill_cts(redo_bin, cts); + } + PrecommitTrx { + trx_id: self.trx_id, + sts: self.sts, + cts, + redo_bin, + undo: self.undo, + } + } +} + +#[inline] +fn backfill_cts(redo_bin: &mut [u8], cts: TrxID) { + // todo +} + +/// PrecommitTrx has been assigned commit timestamp and already prepared redo log binary. +pub struct PrecommitTrx { + pub trx_id: Arc, + pub sts: TrxID, + pub cts: TrxID, + pub redo_bin: Option, + pub undo: Vec, +} + +pub struct CommittedTrx { + pub sts: TrxID, + pub cts: TrxID, + pub undo: Vec, +} diff --git a/doradb-storage/src/trx/redo.rs b/doradb-storage/src/trx/redo.rs new file mode 100644 index 0000000..ad553d1 --- /dev/null +++ b/doradb-storage/src/trx/redo.rs @@ -0,0 +1,107 @@ +use crate::buffer::page::PageID; +use crate::row::ops::UpdateCol; +use crate::row::RowID; +use crate::trx::TrxID; +use crate::value::Val; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub enum RedoKind { + Insert(Vec), + Delete, + Update(Vec), +} + +#[derive(Serialize, Deserialize)] +pub struct RedoEntry { + pub page_id: PageID, + pub row_id: RowID, + pub kind: RedoKind, +} + +#[derive(Serialize, Deserialize)] +pub struct RedoLog { + pub cts: TrxID, + pub data: Vec, +} + +/// RedoBin is serialized redo log in binary format +pub type RedoBin = Vec; + +pub struct RedoLogger; + +impl RedoLogger { + #[inline] + pub fn log(&mut self) { + // todo + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Instant; + + #[test] + fn test_redo_log_serde_bincode() { + let mut entries = vec![]; + for _ in 0..102400 { + let entry = RedoEntry { + page_id: rand::random::() as u64, + row_id: rand::random::() as u64, + kind: RedoKind::Insert(vec![Val::Byte8(rand::random::() as u64)]), + }; + entries.push(entry); + } + const CONFIG: bincode::config::Configuration = bincode::config::standard(); + let start = Instant::now(); + let res = bincode::serde::encode_to_vec(&entries, CONFIG).unwrap(); + let dur = start.elapsed(); + println!( + "bincode.serialize res.len()={:?}, dur={:?} microseconds, avg {:?} GB/s", + res.len(), + dur.as_micros(), + res.len() as f64 / dur.as_nanos() as f64 + ); + let start = Instant::now(); + let entries: Vec = bincode::serde::decode_from_slice(&res, CONFIG).unwrap().0; + let dur = start.elapsed(); + println!( + "bincode.deserialize entries.len()={:?}, dur={:?} microseconds, avg {:?} op/s", + res.len(), + dur.as_micros(), + entries.len() as f64 * 1_000_000_000f64 / dur.as_nanos() as f64 + ); + } + + #[test] + fn test_redo_log_serde_bitcode() { + let mut entries = vec![]; + for _ in 0..102400 { + let entry = RedoEntry { + page_id: rand::random::() as u64, + row_id: rand::random::() as u64, + kind: RedoKind::Insert(vec![Val::Byte8(rand::random::() as u64)]), + }; + entries.push(entry); + } + let start = Instant::now(); + let res = bitcode::serialize(&entries).unwrap(); + let dur = start.elapsed(); + println!( + "bitcode.serialize res.len()={:?}, dur={:?} microseconds, avg {:?} GB/s", + res.len(), + dur.as_micros(), + res.len() as f64 / dur.as_nanos() as f64 + ); + let start = Instant::now(); + let entries: Vec = bitcode::deserialize(&res).unwrap(); + let dur = start.elapsed(); + println!( + "bitcode.deserialize entries.len()={:?}, dur={:?} microseconds, avg {:?} op/s", + res.len(), + dur.as_micros(), + entries.len() as f64 * 1_000_000_000f64 / dur.as_nanos() as f64 + ); + } +} diff --git a/doradb-storage/src/trx/sys.rs b/doradb-storage/src/trx/sys.rs new file mode 100644 index 0000000..1612a46 --- /dev/null +++ b/doradb-storage/src/trx/sys.rs @@ -0,0 +1,968 @@ +use crate::trx::redo::RedoLogger; +use crate::trx::{ + ActiveTrx, CommittedTrx, PrecommitTrx, TrxID, MAX_COMMIT_TS, MAX_SNAPSHOT_TS, + MIN_ACTIVE_TRX_ID, MIN_SNAPSHOT_TS, +}; +use crossbeam_utils::CachePadded; +use flume::{Receiver, Sender}; +use parking_lot::{Condvar, Mutex, MutexGuard}; +use std::collections::{BTreeSet, VecDeque}; +use std::mem; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; + +/// TransactionSystem controls lifecycle of all transactions. +/// +/// 1. Transaction begin: +/// a) Generate STS and TrxID. +/// b) Put it into active transaction list. +/// +/// 2. Transaction pre-commmit: +/// a) Generate CTS. +/// b) Put it into precommit transaction list. +/// +/// Note 1: Before pre-commit, the transaction should serialize its redo log to binary +/// because group commit is single-threaded and the serialization may require +/// much CPU and slow down the log writer. So each transaction +/// +/// Note 2: In this phase, transaction is still in active transaction list. +/// Only when redo log is persisted, we can move it from active list to committed list. +/// One optimization is Early Lock Release, which unlock all row-locks(backfill CTS to undo) +/// and move it to committed list. This can improve performance because it does not wait +/// log writer to fsync. But final-commit step must wait for additional transaction dependencies, +/// to ensure any previous dependent transaction's log are already persisted. +/// Currently, we do NOT apply this optimization. +/// +/// 3. Transaction group-commit: +/// +/// A single-threaded log writer is responsible for persisting redo logs. +/// It also notify all transactions in group commit to check if log has been persisted. +/// +/// 4. Transaction final-commit: +/// +/// TrxID in all undo log entries of current transaction should be updated to CTS after log +/// is persisted. +/// As undo logs are maintained purely in memory, we can use shared pointer with atomic variable +/// to perform very fast CTS backfill. +pub struct TransactionSystem { + /// A sequence to generate snapshot timestamp(abbr. sts) and commit timestamp(abbr. cts). + /// They share the same sequence and start from 1. + /// The two timestamps are used to identify which version of data a transaction should see. + /// Transaction id is derived from snapshot timestamp with highest bit set to 1. + /// + /// trx_id range: (1<<63)+1 to uint::MAX-1 + /// sts range: 1 to 1<<63 + /// cts range: 1 to 1<<63 + ts: CachePadded, + /// Minimum active snapshot timestamp. + /// It's updated by group committer thread, and is used by query/GC thread to clean + /// out-of-date version chains. + /// + /// Note: this field may not reflect the latest value, but is enough for GC purpose. + min_active_sts: CachePadded, + /// Group commit implementation. + group_commit: CachePadded>, + /// list of precommitted transactions. + /// Once user sends COMMIT command or statement is auto-committed. The transaction + /// will be assign CTS and put into this list, waiting for log writer thread to + /// persist its + // precommit_trx_list: CachePadded<(Mutex>, Condvar)>, + /// Rollbacked transaction snapshot timestamp list. + /// This list is used to calculate active sts list. + rollback_sts_list: CachePadded>>, + /// Persisted commit timestamp is the maximum commit timestamp of all persisted redo + /// logs. Precommit transactions are already notified by group committer. This global + /// atomic variable is used by query or GC thread to perform GC. + persisted_cts: CachePadded, + /// Committed transaction list. + /// When a transaction is committed, it will be put into this queue in sequence. + /// Head is always oldest and tail is newest. + gc_info: CachePadded>, + /// Log writer controls how to persist redo log buffer to disk. + redo_logger: Mutex>, + /// Flag to indicate whether redo log is enabled. + redo_log_enabled: AtomicBool, + /// Background GC thread identify which transactions can be garbage collected and + /// calculate watermark for other thread to cooperate. + gc_thread: Mutex>, +} + +impl TransactionSystem { + /// Create a static transaction system. + /// Which can be used in multi-threaded environment. + #[inline] + pub fn new_static() -> &'static Self { + let sys = Self::new(); + let boxed = Box::new(sys); + Box::leak(boxed) + } + + /// Drop static transaction system. + /// + /// # Safety + /// + /// Caller must ensure no further use on it. + pub unsafe fn drop_static(this: &'static Self) { + // notify and wait for group commit to quit. + // this.stop_group_committer_and_wait(); + drop(Box::from_raw(this as *const Self as *mut Self)); + } + + /// Create a new transaction. + #[inline] + pub fn new_trx(&self) -> ActiveTrx { + // active transaction list is calculated by group committer thread + // so here we just generate STS and TrxID. + let sts = self.ts.fetch_add(1, Ordering::SeqCst); + let trx_id = sts | (1 << 63); + debug_assert!(sts < MAX_SNAPSHOT_TS); + debug_assert!(trx_id >= MIN_ACTIVE_TRX_ID); + ActiveTrx::new(trx_id, sts) + } + + /// Commit an active transaction. + /// The commit process is implemented as group commit. + /// If multiple transactions are being committed at the same time, one of them + /// will become leader of the commit group. Others become followers waiting for + /// leader to persist log and backfill CTS. + /// This strategy can largely reduce logging IO, therefore improve throughput. + #[inline] + pub fn commit(&self, trx: ActiveTrx) { + // Prepare redo log first, this may take some time, + // so keep it out of lock scope, and only fill cts then. + let prepared_trx = trx.prepare(); + // start group commit + let mut group_commit_g = self.group_commit.lock(); + let cts = self.ts.fetch_add(1, Ordering::SeqCst); + debug_assert!(cts < MAX_COMMIT_TS); + let precommit_trx = prepared_trx.fill_cts(cts); + match group_commit_g.groups.len() { + 0 => self.act_as_single_trx_leader(precommit_trx, group_commit_g), + 1 => { + let curr_group = group_commit_g.groups.front_mut().unwrap(); + match &mut curr_group.kind { + // There is only one commit group and it's processing, so current thread + // becomes leader of next group, and waits for previous one to finish, + // and then start processing. + CommitGroupKind::Processing => { + self.act_as_next_group_leader(precommit_trx, group_commit_g) + } + kind => { + // this is the second transaction entering a non-started commit group, + // it should add itself to this group and setup notifier. + debug_assert!({ + (matches!(kind, CommitGroupKind::Single(_)) + && curr_group.notifier.is_none()) + || (matches!(kind, CommitGroupKind::Multi(_)) + && curr_group.notifier.is_some()) + }); + kind.add_trx(precommit_trx); + let notify = curr_group + .notifier + .get_or_insert_with(|| Arc::new(CommitGroupNotify::default())); + let notify = Arc::clone(notify); + drop(group_commit_g); // release lock to let other transactions enter or leader start. + + // wait for leader to finish group commit + loop { + let mut status_g = notify.status.lock(); + if status_g.finished { + // in current implementation, persisted cts must be greater than or equal to + // cts of any transaction in the same group. + debug_assert!(cts <= status_g.persisted_cts); + break; + } + notify.follower_cv.wait(&mut status_g); + } + } + } + } + 2 => { + // This is common scenario: previous group commit is running, and next group is already established. + // Just join it. + debug_assert!(matches!( + group_commit_g.groups.front().unwrap().kind, + CommitGroupKind::Processing + )); + debug_assert!(!matches!( + group_commit_g.groups.back().unwrap().kind, + CommitGroupKind::Processing + )); + let curr_group = group_commit_g.groups.back_mut().unwrap(); + debug_assert!({ + (matches!(&curr_group.kind, CommitGroupKind::Single(_)) + && curr_group.notifier.is_none()) + || (matches!(&curr_group.kind, CommitGroupKind::Multi(_)) + && curr_group.notifier.is_some()) + }); + curr_group.kind.add_trx(precommit_trx); + let notify = curr_group + .notifier + .get_or_insert_with(|| Arc::new(CommitGroupNotify::default())); + let notify = Arc::clone(notify); + drop(group_commit_g); // release lock to let other transactions enter or leader start. + + // wait for leader to finish group commit + loop { + let mut status_g = notify.status.lock(); + if status_g.finished { + // in current implementation, persisted cts must be greater than or equal to + // cts of any transaction in the same group. + debug_assert!(cts <= status_g.persisted_cts); + break; + } + notify.follower_cv.wait(&mut status_g); + } + } + _ => unreachable!("group commit can only have two groups at the same time"), + } + } + + /// Rollback active transaction. + #[inline] + pub fn rollback(&self, trx: ActiveTrx) { + let sts = trx.sts; + trx.rollback(); + let mut g = self.rollback_sts_list.lock(); + let rollback_inserted = g.insert(sts); + debug_assert!(rollback_inserted); + } + + /// Returns whether redo log is enabled. + #[inline] + pub fn redo_log_enabled(&self) -> bool { + self.redo_log_enabled.load(Ordering::Relaxed) + } + + /// Enable or disable redo log persistence. + /// It will not impact redo log generation even if it's disabled. + /// But the log persistence will be disabled. + #[inline] + pub fn set_redo_log_enabled(&self, redo_log_enabled: bool) { + self.redo_log_enabled + .store(redo_log_enabled, Ordering::SeqCst); + } + + /// Start background GC thread. + /// This method should be called once transaction system is initialized. + #[inline] + pub fn start_gc_thread(&'static self) { + let mut gc_thread_g = self.gc_thread.lock(); + if gc_thread_g.is_some() { + panic!("GC thread should be created only once"); + } + let (tx, rx) = flume::unbounded(); + let handle = thread::Builder::new() + .name("GC-Thread".to_string()) + .spawn(move || { + while let Ok(committed_trx_list) = rx.recv() { + self.gc(committed_trx_list); + } + }) + .unwrap(); + + *gc_thread_g = Some(GCThread(handle)); + drop(gc_thread_g); // release lock here + + // put sender into group commit, so group commit leader can send + // persisted transaction list to GC thread. + let mut group_commit_g = self.group_commit.lock(); + debug_assert!(group_commit_g.gc_chan.is_none()); + group_commit_g.gc_chan = Some(tx); + } + + /// Stop background GC thread. + /// The method should be called before shutdown transaction system if GC thread + /// is enabled. + #[inline] + pub fn stop_gc_thread(&'static self) { + let mut group_commit_g = self.group_commit.lock(); + if group_commit_g.gc_chan.is_none() { + panic!("GC thread should be stopped only once"); + } + group_commit_g.gc_chan = None; + drop(group_commit_g); + + let mut gc_thread_g = self.gc_thread.lock(); + if let Some(gc_thread) = gc_thread_g.take() { + gc_thread.0.join().unwrap(); + } else { + panic!("GC thread should be stopped only once"); + } + } + + #[inline] + fn commit_single_trx( + &self, + trx: PrecommitTrx, + gc_chan: Option>>, + ) -> TrxID { + let cts = trx.cts; + // todo: persist log + if self.redo_log_enabled() { + let mut g = self.redo_logger.lock(); + if let Some(logger) = g.as_mut() { + logger.log(); + } + } + // backfill cts + trx.trx_id.store(cts, Ordering::SeqCst); + + // update global persisted cts. + // this is safe because group commit allows only one commit group + // to execute, so the update values of persisted cts is always + // monotonously increasing. + self.persisted_cts.store(cts, Ordering::SeqCst); + + if let Some(gc_chan) = gc_chan { + let _ = gc_chan.send(vec![CommittedTrx { + sts: trx.sts, + cts: trx.cts, + undo: trx.undo, + }]); + } + cts + } + + #[inline] + fn commit_multi_trx( + &self, + trx_list: Vec, + gc_chan: Option>>, + ) -> TrxID { + debug_assert!(trx_list.len() > 1); + debug_assert!({ + trx_list + .iter() + .zip(trx_list.iter().skip(1)) + .all(|(l, r)| l.cts < r.cts) + }); + let max_cts = trx_list.last().unwrap().cts; + // todo: persist log + if self.redo_log_enabled() { + let mut g = self.redo_logger.lock(); + if let Some(logger) = g.as_mut() { + logger.log(); + } + } + // Instead of letting each thread backfill its CTS in undo logs, + // we delegate this action to group commit leader because it's + // a very cheap operation via Arc::store(). + for trx in &trx_list { + trx.trx_id.store(trx.cts, Ordering::SeqCst); + } + + // update global persisted cts. + // this is safe because group commit allows only one commit group + // to execute, so the update values of persisted cts is always + // monotonously increasing. + self.persisted_cts.store(max_cts, Ordering::SeqCst); + + if let Some(gc_chan) = gc_chan { + let _ = gc_chan.send( + trx_list + .into_iter() + .map(|trx| CommittedTrx { + sts: trx.sts, + cts: trx.cts, + undo: trx.undo, + }) + .collect(), + ); + } + max_cts + } + + #[inline] + fn act_as_single_trx_leader( + &self, + precommit_trx: PrecommitTrx, + mut group_commit_g: MutexGuard<'_, GroupCommit>, + ) { + // no group commit running, current thread is just leader and do single transaction commit. + let new_group = CommitGroup { + kind: CommitGroupKind::Processing, + notifier: None, + }; + group_commit_g.groups.push_back(new_group); + let gc_chan = group_commit_g.gc_chan.clone(); + drop(group_commit_g); + + let cts = self.commit_single_trx(precommit_trx, gc_chan); + + // Here we remove the first finished group and let other transactions to enter commit phase. + // new transaction may join the group which is not started, or form a new group and wait. + let curr_group = { + let mut group_commit_g = self.group_commit.lock(); + debug_assert!(group_commit_g.groups.len() >= 1); + debug_assert!(matches!( + group_commit_g.groups.front().unwrap().kind, + CommitGroupKind::Processing + )); + group_commit_g.groups.pop_front().unwrap() + }; + + // Now notify follower to make progress if any. + if let Some(notifier) = curr_group.notifier { + let mut g = notifier.status.lock(); + g.finished = true; + g.persisted_cts = cts; + notifier.follower_cv.notify_one(); + } + } + + #[inline] + fn act_as_next_group_leader( + &self, + precommit_trx: PrecommitTrx, + mut group_commit_g: MutexGuard<'_, GroupCommit>, + ) { + // First group is processing, so become leader and create a new group. + let new_group = CommitGroup { + kind: CommitGroupKind::Single(precommit_trx), + notifier: None, + }; + group_commit_g.groups.push_back(new_group); + + // Become follower of first group. If there is no notifier, add one. + let notify = group_commit_g + .groups + .front_mut() + .unwrap() + .notifier + .get_or_insert_with(|| Arc::new(CommitGroupNotify::default())); + let notify = Arc::clone(¬ify); + drop(group_commit_g); // release lock to let other transactions join the new group. + + // Wait until first group to finish. + loop { + let mut status_g = notify.status.lock(); + if status_g.finished { + // does not care about persisted cts + break; + } + notify.follower_cv.wait(&mut status_g); + } + + // now previous group finishes, start current group. + let (kind, gc_chan) = { + let mut group_commit_g = self.group_commit.lock(); + // previous leader removed its group before notifying next leader. + // and new transactions must join this group because it's not started. + // so group count must be 1. + debug_assert!(group_commit_g.groups.len() == 1); + let CommitGroup { + kind, + notifier: notify, + } = &mut group_commit_g.groups.front_mut().unwrap(); + let kind = mem::replace(kind, CommitGroupKind::Processing); + let gc_chan = group_commit_g.gc_chan.clone(); + (kind, gc_chan) + }; // Here release the lock so other transactions can form new group. + + // persist redo log, backfill cts for each transaction, and get back maximum persisted cts. + let persisted_cts = match kind { + CommitGroupKind::Single(trx) => self.commit_single_trx(trx, gc_chan), + CommitGroupKind::Multi(trx_list) => self.commit_multi_trx(trx_list, gc_chan), + _ => unreachable!("invalid group commit kind"), + }; + + let curr_group = { + let mut group_commit_g = self.group_commit.lock(); + debug_assert!(group_commit_g.groups.len() >= 1); + debug_assert!(matches!( + group_commit_g.groups.front().unwrap().kind, + CommitGroupKind::Processing + )); + group_commit_g.groups.pop_front().unwrap() + }; + + if let Some(notifier) = curr_group.notifier { + let mut g = notifier.status.lock(); + g.finished = true; + g.persisted_cts = persisted_cts; + notifier.follower_cv.notify_all(); + } + } + + #[inline] + fn gc(&self, trx_list: Vec) { + if trx_list.is_empty() { + return; + } + let persisted_cts = trx_list + .last() + .expect("committed transaction list is not empty") + .cts; + + // Re-calculate GC info, including active_sts_list, committed_trx_list, old_trx_list + // min_active_sts. + { + let mut gc_info_g = self.gc_info.lock(); + + let GCInfo { + committed_trx_list, + old_trx_list, + active_sts_list, + } = &mut *gc_info_g; + + // swap out active sts list for update. + let mut next_active_sts_list = mem::take(active_sts_list); + + // add all potential active sts + let next_min_active_sts = if let Some(max) = next_active_sts_list.last() { + *max + 1 + } else { + MIN_SNAPSHOT_TS + }; + let next_max_active_sts = self.ts.load(Ordering::Relaxed); + for ts in next_min_active_sts..next_max_active_sts { + let sts_inserted = next_active_sts_list.insert(ts); + debug_assert!(sts_inserted); + } + + // remove sts and cts of committed transactions + for trx in &trx_list { + let sts_removed = next_active_sts_list.remove(&trx.sts); + debug_assert!(sts_removed); + let cts_removed = next_active_sts_list.remove(&trx.cts); + debug_assert!(cts_removed); + } + + // remove rollback sts + { + let mut removed_rb_sts = vec![]; + let mut rb_g = self.rollback_sts_list.lock(); + for rb_sts in (&rb_g).iter() { + if next_active_sts_list.remove(rb_sts) { + removed_rb_sts.push(*rb_sts); + } // otherwise, rollback trx is added after latest sts is acquired, which should be very rare. + } + for rb_sts in removed_rb_sts { + let rb_sts_removed = rb_g.remove(&rb_sts); + debug_assert!(rb_sts_removed); + } + } + // calculate smallest active sts + // 1. if active_sts_list is empty, means there is no new transaction after group commit. + // so maximum persisted cts + 1 can be used. + // 2. otherwise, use minimum value in active_sts_list. + let min_active_sts = if let Some(min) = next_active_sts_list.first() { + *min + } else { + persisted_cts + 1 + }; + + // update smallest active sts + self.min_active_sts.store(min_active_sts, Ordering::SeqCst); + + // update active_sts_list + *active_sts_list = next_active_sts_list; + + // populate committed transaction list + committed_trx_list.extend(trx_list); + + // move committed transactions to old transaction list, waiting for GC. + while let Some(trx) = committed_trx_list.front() { + if trx.cts < min_active_sts { + old_trx_list.push(committed_trx_list.pop_front().unwrap()); + } else { + break; + } + } + + // todo: implement undo cleaner based on old transaction list. + // currently just ignore. + old_trx_list.clear(); + } + } + + #[inline] + fn new() -> Self { + TransactionSystem { + ts: CachePadded::new(AtomicU64::new(MIN_SNAPSHOT_TS)), + min_active_sts: CachePadded::new(AtomicU64::new(MIN_SNAPSHOT_TS)), + group_commit: CachePadded::new(Mutex::new(GroupCommit::new())), + rollback_sts_list: CachePadded::new(Mutex::new(BTreeSet::new())), + // initialize to MIN_SNAPSHOT_TS is fine, the actual value relies on recovery process. + persisted_cts: CachePadded::new(AtomicU64::new(MIN_SNAPSHOT_TS)), + gc_info: CachePadded::new(Mutex::new(GCInfo::new())), + redo_logger: Mutex::new(None), + redo_log_enabled: AtomicBool::new(false), + gc_thread: Mutex::new(None), + } + } +} + +unsafe impl Sync for TransactionSystem {} + +/// GCInfo is only used for GroupCommitter to store and analyze GC related information, +/// including committed transaction list, old transaction list, active snapshot timestamp +/// list, etc. +pub struct GCInfo { + /// Committed transaction list. + /// When a transaction is committed, it will be put into this queue in sequence. + /// Head is always oldest and tail is newest. + committed_trx_list: VecDeque, + /// Old transaction list. + /// If a transaction's committed timestamp is less than the smallest + /// snapshot timestamp of all active transactions, it means this transction's + /// data vesion is latest and all its undo log can be purged. + /// So we move such transactions from commited list to old list. + old_trx_list: Vec, + /// Active snapshot timestamp list. + /// The smallest value equals to min_active_sts. + active_sts_list: BTreeSet, +} + +impl GCInfo { + #[inline] + pub fn new() -> Self { + GCInfo { + committed_trx_list: VecDeque::new(), + old_trx_list: Vec::new(), + active_sts_list: BTreeSet::new(), + } + } +} + +struct GroupCommit { + /// Groups of committing transactions. + /// At most 2. + groups: VecDeque, + /// Channel to send old transaction list to GC thread. + /// If GC thread is not initialized, these out-of-date transactions are just + /// ignored. + gc_chan: Option>>, +} + +impl GroupCommit { + #[inline] + fn new() -> Self { + GroupCommit { + groups: VecDeque::new(), + gc_chan: None, + } + } +} + +/// GroupCommit is an abstraction to group multiple transactions +/// and write and sync logs in batch mode. +/// The first transaction thread arrived in the group becomes the +/// group leader, and take care of all transactions log persistence. +/// It's used to improve logging performance. +struct CommitGroup { + kind: CommitGroupKind, + notifier: Option>, +} + +enum CommitGroupKind { + Processing, + Single(PrecommitTrx), + Multi(Vec), +} + +impl CommitGroupKind { + #[inline] + fn add_trx(&mut self, trx: PrecommitTrx) { + match self { + CommitGroupKind::Processing => { + unreachable!("Transaction cannot be added to processing commit group") + } + CommitGroupKind::Multi(trx_list) => trx_list.push(trx), + _ => match mem::replace(self, CommitGroupKind::Processing) { + CommitGroupKind::Single(leader_trx) => { + let mut trx_list = vec![]; + trx_list.push(leader_trx); + trx_list.push(trx); + *self = CommitGroupKind::Multi(trx_list); + } + _ => unreachable!(), + }, + } + } +} + +#[derive(Default)] +struct CommitGroupNotify { + status: Mutex, + follower_cv: Condvar, +} + +struct CommitGroupStatus { + finished: bool, + persisted_cts: TrxID, +} + +impl Default for CommitGroupStatus { + #[inline] + fn default() -> Self { + CommitGroupStatus { + finished: false, + persisted_cts: MIN_SNAPSHOT_TS, + } + } +} + +/// GarbageCollector is a single thread to identify which transaction should +/// be GC. The real GC work can be done. +pub struct GCThread(JoinHandle<()>); + +#[cfg(test)] +mod tests { + use super::*; + use crossbeam_utils::CachePadded; + use parking_lot::Mutex; + use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; + use std::sync::Arc; + use std::time::Instant; + + #[test] + fn test_transaction_system() { + let trx_sys = TransactionSystem::new_static(); + { + let trx = trx_sys.new_trx(); + trx_sys.commit(trx); + } + std::thread::spawn(|| { + let trx = trx_sys.new_trx(); + trx_sys.commit(trx); + }) + .join() + .unwrap(); + unsafe { + TransactionSystem::drop_static(trx_sys); + } + } + + #[test] + fn test_single_thread_mutex_trx_id_generate() { + const COUNT: usize = 1000000; + let mu = Mutex::new(1u64); + let start = Instant::now(); + for _ in 0..COUNT { + let mut g = mu.lock(); + let _ = *g; + *g += 1; + } + let dur = start.elapsed(); + println!( + "{:?} transaction id generation cost {:?} microseconds, avg {:?} op/s", + COUNT, + dur.as_micros(), + COUNT as f64 * 1_000_000_000f64 / dur.as_nanos() as f64 + ); + } + + #[test] + fn test_multi_threads_mutex_trx_id_generate() { + const COUNT: usize = 1000000; + const THREADS: usize = 4; + let mu = Arc::new(Mutex::new(1u64)); + let stop = Arc::new(AtomicBool::new(false)); + let start = Instant::now(); + let mut handles = vec![]; + for _ in 1..THREADS { + let mu = Arc::clone(&mu); + let stop = Arc::clone(&stop); + let handle = + std::thread::spawn(move || worker_thread_mutex_trx_id_generate(&mu, &stop)); + handles.push(handle); + } + let mut count = 0usize; + for _ in 0..COUNT { + let mut g = mu.lock(); + let _ = *g; + *g += 1; + count += 1; + } + stop.store(true, Ordering::SeqCst); + for handle in handles { + count += handle.join().unwrap(); + } + let dur = start.elapsed(); + println!( + "{:?} threads generate {:?} transaction ids in {:?} microseconds, avg {:?} op/s", + THREADS, + count, + dur.as_micros(), + count as f64 * 1_000_000_000f64 / dur.as_nanos() as f64 + ); + } + + #[inline] + fn worker_thread_mutex_trx_id_generate(mu: &Mutex, stop: &AtomicBool) -> usize { + let mut count = 0usize; + while !stop.load(Ordering::Relaxed) { + let mut g = mu.lock(); + let _ = *g; + *g += 1; + count += 1; + } + count + } + + #[test] + fn test_single_thread_atomic_trx_id_generate() { + const COUNT: usize = 1000000; + let atom = AtomicU64::new(1u64); + let start = Instant::now(); + for _ in 0..COUNT { + let _ = atom.fetch_add(1, Ordering::SeqCst); + } + let dur = start.elapsed(); + println!( + "{:?} transaction id generation cost {:?} microseconds, avg {:?} op/s", + COUNT, + dur.as_micros(), + COUNT as f64 * 1_000_000_000f64 / dur.as_nanos() as f64 + ); + } + + #[test] + fn test_multi_threads_atomic_trx_id_generate() { + const COUNT: usize = 1000000; + const THREADS: usize = 4; + let atom = Arc::new(CachePadded::new(AtomicU64::new(1u64))); + let stop = Arc::new(AtomicBool::new(false)); + let start = Instant::now(); + let mut handles = vec![]; + for _ in 1..THREADS { + let atom = Arc::clone(&atom); + let stop = Arc::clone(&stop); + let handle = + std::thread::spawn(move || worker_thread_atomic_trx_id_generate(&atom, &stop)); + handles.push(handle); + } + let mut count = 0usize; + for _ in 0..COUNT { + let _ = atom.fetch_add(1, Ordering::SeqCst); + count += 1; + } + stop.store(true, Ordering::SeqCst); + for handle in handles { + count += handle.join().unwrap(); + } + let dur = start.elapsed(); + println!( + "{:?} threads generate {:?} transaction ids in {:?} microseconds, avg {:?} op/s", + THREADS, + count, + dur.as_micros(), + count as f64 * 1_000_000_000f64 / dur.as_nanos() as f64 + ); + } + + #[inline] + fn worker_thread_atomic_trx_id_generate(atom: &AtomicU64, stop: &AtomicBool) -> usize { + let mut count = 0usize; + while !stop.load(Ordering::Relaxed) { + let _ = atom.fetch_add(1, Ordering::SeqCst); + count += 1; + } + count + } + + #[test] + fn test_single_thread_trx_begin_and_commit() { + const COUNT: usize = 1000000; + let trx_sys = TransactionSystem::new_static(); + { + // hook persisted_ts to u64::MAX to allaw all transactions immediately finish. + trx_sys.persisted_cts.store(u64::MAX, Ordering::SeqCst); + } + + { + let start = Instant::now(); + for _ in 0..COUNT { + let trx = trx_sys.new_trx(); + trx_sys.commit(trx); + } + let dur = start.elapsed(); + println!( + "{:?} transaction begin and commit cost {:?} microseconds, avg {:?} trx/s", + COUNT, + dur.as_micros(), + COUNT as f64 * 1_000_000_000f64 / dur.as_nanos() as f64 + ); + } + unsafe { + TransactionSystem::drop_static(trx_sys); + } + } + + #[test] + fn test_multi_threads_trx_begin_and_commit() { + const COUNT: usize = 1000000; + const THREADS: usize = 4; + let stop = Arc::new(AtomicBool::new(false)); + let trx_sys = TransactionSystem::new_static(); + { + // hook persisted_ts to u64::MAX to allaw all transactions immediately finish. + trx_sys.persisted_cts.store(u64::MAX, Ordering::SeqCst); + } + { + let mut handles = vec![]; + for _ in 1..THREADS { + let stop = Arc::clone(&stop); + let handle = + std::thread::spawn(move || worker_thread_trx_begin_and_commit(trx_sys, &stop)); + handles.push(handle); + } + let mut count = 0; + let start = Instant::now(); + for _ in 0..COUNT { + let trx = trx_sys.new_trx(); + trx_sys.commit(trx); + count += 1; + } + stop.store(true, Ordering::SeqCst); + for handle in handles { + count += handle.join().unwrap(); + } + let dur = start.elapsed(); + println!( + "{:?} transaction begin and commit cost {:?} microseconds, avg {:?} trx/s", + count, + dur.as_micros(), + count as f64 * 1_000_000_000f64 / dur.as_nanos() as f64 + ); + } + unsafe { + TransactionSystem::drop_static(trx_sys); + } + } + + #[inline] + fn worker_thread_trx_begin_and_commit(trx_sys: &TransactionSystem, stop: &AtomicBool) -> usize { + let mut count = 0usize; + while !stop.load(Ordering::Relaxed) { + let trx = trx_sys.new_trx(); + trx_sys.commit(trx); + count += 1; + } + count + } + + #[test] + fn test_trx_sys_group_commit() { + let trx_sys = TransactionSystem::new_static(); + trx_sys.start_gc_thread(); + { + let new_trx = trx_sys.new_trx(); + trx_sys.commit(new_trx); + } + // sts=1, cts=2, next_ts=3 + assert!(trx_sys.ts.load(Ordering::Relaxed) == 3); + // sleep 100 millisecond should be enough for GC thread to + // analyze and update minimum active sts. + thread::sleep(std::time::Duration::from_millis(100)); + // all active transactions ended, so min_active_sts should equal to next_ts. + assert!(trx_sys.min_active_sts.load(Ordering::Relaxed) == 3); + trx_sys.stop_gc_thread(); + unsafe { + TransactionSystem::drop_static(trx_sys); + } + } +} diff --git a/doradb-storage/src/trx/undo.rs b/doradb-storage/src/trx/undo.rs new file mode 100644 index 0000000..fe57aef --- /dev/null +++ b/doradb-storage/src/trx/undo.rs @@ -0,0 +1,43 @@ +use crate::buffer::page::PageID; +use crate::row::ops::UpdateCol; +use crate::row::RowID; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +/// UndoMap is page level hash map to store undo chain of rows. +/// If UndoMap is empty, this page has all data visible to all transactions. +pub type UndoMap = HashMap; + +pub enum UndoKind { + /// Before-image is empty for insert, so we do not need to copy values. + Insert, + /// Delete must be the head of version chain and data is stored in row page. + /// So no need to copy values. + Delete, + /// Copy old versions of updated columns. + Update(Vec), + /// This is special case when in-place update fails. + /// The old row is marked as deleted, and new row is inserted into a new page. + /// Because we always update secondary index to point to new version, + /// there might be two index entries pointing to the same row id. + /// In such case, index key validation is required to choose correct version. + /// Meanwhile, table scan should take care of two versions existing in different pages. + /// In current design, the previous version of DeleteUpdate is discarded. + DeleteUpdate(Vec), +} + +/// SharedUndoEntry is a reference-counted pointer to UndoEntry. +/// The transaction is the primary owner of undo log. +/// and page-level transaction map can also own undo log +/// to track all visible versions of modified rows. +pub type SharedUndoEntry = Arc; + +pub struct UndoEntry { + pub ts: Arc, + pub page_id: PageID, + pub row_id: RowID, + pub kind: UndoKind, + pub next: Mutex>, +} diff --git a/doradb-storage/src/value.rs b/doradb-storage/src/value.rs new file mode 100644 index 0000000..37294d3 --- /dev/null +++ b/doradb-storage/src/value.rs @@ -0,0 +1,824 @@ +use serde::de::Visitor; +use serde::{Deserialize, Serialize}; +use std::alloc::{alloc, dealloc, Layout}; +use std::fmt; +use std::mem::{self, ManuallyDrop, MaybeUninit}; +// use bitcode::{Encode, Decode}; + +pub const PAGE_VAR_LEN_INLINE: usize = 6; +pub const PAGE_VAR_LEN_PREFIX: usize = 4; +const _: () = assert!(mem::size_of::() == 8); + +pub const MEM_VAR_LEN_INLINE: usize = 14; +pub const MEM_VAR_LEN_PREFIX: usize = 6; +const _: () = assert!(mem::size_of::() == 16); + +/// Val is value representation of row-store. +/// The variable-length data may require new allocation +/// because we cannot rely on page data. +#[derive(Clone, Serialize, Deserialize)] +pub enum Val { + Byte1(Byte1Val), + Byte2(Byte2Val), + Byte4(Byte4Val), + Byte8(Byte8Val), + VarByte(MemVar), +} + +unsafe impl Send for Val {} +unsafe impl Sync for Val {} + +impl PartialEq for Val { + #[inline] + fn eq(&self, rhs: &Self) -> bool { + match (self, rhs) { + (Val::Byte1(l), Val::Byte1(r)) => l == r, + (Val::Byte2(l), Val::Byte2(r)) => l == r, + (Val::Byte4(l), Val::Byte4(r)) => l == r, + (Val::Byte8(l), Val::Byte8(r)) => l == r, + (Val::VarByte(l), Val::VarByte(r)) => l.as_bytes() == r.as_bytes(), + _ => false, + } + } +} + +impl Eq for Val {} + +impl fmt::Debug for Val { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Val").finish() + } +} + +impl From for Val { + #[inline] + fn from(value: u8) -> Self { + Val::Byte1(value) + } +} + +impl From for Val { + #[inline] + fn from(value: i8) -> Self { + Val::Byte1(value as u8) + } +} + +impl From for Val { + #[inline] + fn from(value: u16) -> Self { + Val::Byte2(value) + } +} + +impl From for Val { + #[inline] + fn from(value: i16) -> Self { + Val::Byte2(value as u16) + } +} + +impl From for Val { + #[inline] + fn from(value: u32) -> Self { + Val::Byte4(value) + } +} + +impl From for Val { + #[inline] + fn from(value: i32) -> Self { + Val::Byte4(value as u32) + } +} + +impl From for Val { + #[inline] + fn from(value: u64) -> Self { + Val::Byte8(value) + } +} + +impl From for Val { + #[inline] + fn from(value: i64) -> Self { + Val::Byte8(value as u64) + } +} + +impl From<&[u8]> for Val { + #[inline] + fn from(value: &[u8]) -> Self { + Val::VarByte(MemVar::new(value)) + } +} + +#[derive(Debug, Clone, Copy)] +pub enum ValRef<'a> { + Byte1(Byte1Val), + Byte2(Byte2Val), + Byte4(Byte4Val), + Byte8(Byte8Val), + VarByte(&'a [u8]), +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: u8) -> Self { + ValRef::Byte1(value) + } +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: i8) -> Self { + ValRef::Byte1(value as u8) + } +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: u16) -> Self { + ValRef::Byte2(value) + } +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: i16) -> Self { + ValRef::Byte2(value as u16) + } +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: u32) -> Self { + ValRef::Byte4(value) + } +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: i32) -> Self { + ValRef::Byte4(value as u32) + } +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: f32) -> Self { + ValRef::Byte4(u32::from_ne_bytes(value.to_ne_bytes())) + } +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: u64) -> Self { + ValRef::Byte8(value) + } +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: i64) -> Self { + ValRef::Byte8(value as u64) + } +} + +impl<'a> From for ValRef<'a> { + #[inline] + fn from(value: f64) -> Self { + ValRef::Byte8(u64::from_ne_bytes(value.to_ne_bytes())) + } +} + +impl<'a> From<&'a [u8]> for ValRef<'a> { + #[inline] + fn from(value: &'a [u8]) -> Self { + ValRef::VarByte(value) + } +} + +impl<'a> From<&'a str> for ValRef<'a> { + #[inline] + fn from(value: &'a str) -> Self { + ValRef::VarByte(value.as_bytes()) + } +} + +/// Value is a marker trait to represent +/// fixed-length column value in row page. +pub trait Value {} + +pub trait ToValue { + type Target: Value; + + fn to_val(&self) -> Self::Target; +} + +pub type Byte1Val = u8; +pub trait Byte1ValSlice { + fn as_i8s(&self) -> &[i8]; + + fn as_i8s_mut(&mut self) -> &mut [i8]; +} + +impl Value for Byte1Val {} + +impl Byte1ValSlice for [Byte1Val] { + #[inline] + fn as_i8s(&self) -> &[i8] { + unsafe { mem::transmute(self) } + } + + #[inline] + fn as_i8s_mut(&mut self) -> &mut [i8] { + unsafe { mem::transmute(self) } + } +} + +impl ToValue for u8 { + type Target = Byte1Val; + #[inline] + fn to_val(&self) -> Self::Target { + *self + } +} + +impl ToValue for i8 { + type Target = Byte1Val; + #[inline] + fn to_val(&self) -> Self::Target { + *self as u8 + } +} + +pub type Byte2Val = u16; +pub trait Byte2ValSlice { + fn as_i16s(&self) -> &[i16]; + + fn as_i16s_mut(&mut self) -> &mut [i16]; +} +impl Value for Byte2Val {} + +impl ToValue for u16 { + type Target = Byte2Val; + #[inline] + fn to_val(&self) -> Self::Target { + *self + } +} + +impl ToValue for i16 { + type Target = Byte2Val; + #[inline] + fn to_val(&self) -> Self::Target { + *self as u16 + } +} + +impl Byte2ValSlice for [Byte2Val] { + #[inline] + fn as_i16s(&self) -> &[i16] { + unsafe { mem::transmute(self) } + } + + #[inline] + fn as_i16s_mut(&mut self) -> &mut [i16] { + unsafe { mem::transmute(self) } + } +} + +pub type Byte4Val = u32; +pub trait Byte4ValSlice { + fn as_i32s(&self) -> &[i32]; + + fn as_i32s_mut(&mut self) -> &mut [i32]; + + fn as_f32s(&self) -> &[f32]; + + fn as_f32s_mut(&mut self) -> &mut [f32]; +} + +impl Value for Byte4Val {} + +impl Byte4ValSlice for [Byte4Val] { + #[inline] + fn as_i32s(&self) -> &[i32] { + unsafe { mem::transmute(self) } + } + + #[inline] + fn as_i32s_mut(&mut self) -> &mut [i32] { + unsafe { mem::transmute(self) } + } + + #[inline] + fn as_f32s(&self) -> &[f32] { + unsafe { mem::transmute(self) } + } + + #[inline] + fn as_f32s_mut(&mut self) -> &mut [f32] { + unsafe { mem::transmute(self) } + } +} + +impl ToValue for u32 { + type Target = Byte4Val; + #[inline] + fn to_val(&self) -> Self::Target { + *self + } +} + +impl ToValue for i32 { + type Target = Byte4Val; + #[inline] + fn to_val(&self) -> Self::Target { + *self as u32 + } +} + +pub type Byte8Val = u64; +pub trait Byte8ValSlice { + fn as_i64s(&self) -> &[i64]; + + fn as_i64s_mut(&mut self) -> &mut [i64]; + + fn as_f64s(&self) -> &[f64]; + + fn as_f64s_mut(&mut self) -> &mut [f64]; +} + +impl Value for Byte8Val {} + +impl ToValue for u64 { + type Target = Byte8Val; + #[inline] + fn to_val(&self) -> Self::Target { + *self + } +} + +impl ToValue for i64 { + type Target = Byte8Val; + #[inline] + fn to_val(&self) -> Self::Target { + *self as u64 + } +} + +impl Byte8ValSlice for [Byte8Val] { + #[inline] + fn as_i64s(&self) -> &[i64] { + unsafe { mem::transmute(self) } + } + + #[inline] + fn as_i64s_mut(&mut self) -> &mut [i64] { + unsafe { mem::transmute(self) } + } + + #[inline] + fn as_f64s(&self) -> &[f64] { + unsafe { mem::transmute(self) } + } + + #[inline] + fn as_f64s_mut(&mut self) -> &mut [f64] { + unsafe { mem::transmute(self) } + } +} + +/// PageVar represents var-len value in page. +/// It has two kinds: inline and outline. +/// Inline means the bytes are inlined in the fixed field. +/// Outline means the fixed field only store length, +/// offset and prfix. Entire value is located at +/// tail of page. +#[derive(Clone, Copy)] +pub union PageVar { + i: PageVarInline, + o: PageVarOutline, +} + +impl PageVar { + /// Create a new PageVar with inline data. + /// The data length must be no more than 6 bytes. + #[inline] + pub fn inline(data: &[u8]) -> Self { + debug_assert!(data.len() <= PAGE_VAR_LEN_INLINE); + let mut inline = MaybeUninit::::uninit(); + unsafe { + let i = inline.assume_init_mut(); + i.len = data.len() as u16; + i.data[..data.len()].copy_from_slice(data); + PageVar { + i: inline.assume_init(), + } + } + } + + /// Create a new PageVar with pointer info. + /// The prefix length must be 4 bytes. + #[inline] + pub fn outline(len: u16, offset: u16, prefix: &[u8]) -> Self { + debug_assert!(prefix.len() == PAGE_VAR_LEN_PREFIX); + let mut outline = MaybeUninit::::uninit(); + unsafe { + let p = outline.assume_init_mut(); + p.len = len; + p.offset = offset; + p.prefix.copy_from_slice(prefix); + PageVar { + o: outline.assume_init(), + } + } + } + + /// Returns length of the value. + #[inline] + pub fn len(&self) -> usize { + unsafe { self.i.len as usize } + } + + /// Returns whether the value is inlined. + #[inline] + pub fn is_inlined(&self) -> bool { + self.len() <= PAGE_VAR_LEN_INLINE + } + + /// Returns inpage length of given value. + /// If the value can be inlined, returns 0. + #[inline] + pub fn outline_len(data: &[u8]) -> usize { + if data.len() > PAGE_VAR_LEN_INLINE { + data.len() + } else { + 0 + } + } + + /// Returns bytes. + #[inline] + pub fn as_bytes(&self, ptr: *const u8) -> &[u8] { + let len = self.len(); + if len <= PAGE_VAR_LEN_INLINE { + unsafe { &self.i.data[..len] } + } else { + unsafe { + let data = ptr.add(self.o.offset as usize); + std::slice::from_raw_parts(data, len) + } + } + } + + /// Returns mutable bytes. + #[inline] + pub fn as_bytes_mut(&mut self, ptr: *mut u8) -> &mut [u8] { + let len = self.len(); + if len <= PAGE_VAR_LEN_INLINE { + unsafe { &mut self.i.data[..len] } + } else { + unsafe { + let data = ptr.add(self.o.offset as usize); + std::slice::from_raw_parts_mut(data, len) + } + } + } + + /// Returns string. + #[inline] + pub fn as_str(&self, ptr: *const u8) -> &str { + let len = self.len(); + if len <= PAGE_VAR_LEN_INLINE { + unsafe { std::str::from_utf8_unchecked(&self.i.data[..len]) } + } else { + unsafe { + let data = ptr.add(self.o.offset as usize); + let bytes = std::slice::from_raw_parts(data, len); + std::str::from_utf8_unchecked(bytes) + } + } + } + + /// Returns mutable string. + #[inline] + pub fn as_str_mut(&mut self, ptr: *mut u8) -> &mut str { + let len = self.len(); + if len <= PAGE_VAR_LEN_INLINE { + unsafe { std::str::from_utf8_unchecked_mut(&mut self.i.data[..len]) } + } else { + unsafe { + let data = ptr.add(self.o.offset as usize); + let bytes = std::slice::from_raw_parts_mut(data, len); + std::str::from_utf8_unchecked_mut(bytes) + } + } + } + + /// In-place update with given value. + /// Caller must ensure no extra space is required. + #[inline] + pub fn update_in_place(&mut self, ptr: *mut u8, val: &[u8]) { + debug_assert!(val.len() <= PAGE_VAR_LEN_INLINE || val.len() <= self.len()); + unsafe { + if val.len() > PAGE_VAR_LEN_INLINE { + // all not inline, but original is longer or equal to input value. + debug_assert!(self.len() > PAGE_VAR_LEN_INLINE); + self.o.len = val.len() as u16; + let target = + std::slice::from_raw_parts_mut(ptr.add(self.o.offset as usize), val.len()); + target.copy_from_slice(val); + } else { + // input is inlined. + // better to reuse release page data. + self.i.len = val.len() as u16; + self.i.data[..val.len()].copy_from_slice(val); + } + } + } +} + +#[derive(Debug, Clone, Copy)] +#[repr(C)] +struct PageVarInline { + len: u16, + data: [u8; PAGE_VAR_LEN_INLINE], +} + +#[derive(Debug, Clone, Copy)] +#[repr(C)] +struct PageVarOutline { + len: u16, + offset: u16, + prefix: [u8; PAGE_VAR_LEN_PREFIX], +} + +/// VarBytes is similar to PageVar, but more general to use. +/// It does not depend on page data. +pub union MemVar { + i: MemVarInline, + o: ManuallyDrop, +} + +impl MemVar { + /// Create a new MemVar. + #[inline] + pub fn new(data: &[u8]) -> Self { + debug_assert!(data.len() <= 0xffff); + if data.len() <= MEM_VAR_LEN_INLINE { + Self::inline(data) + } else { + Self::outline(data) + } + } + + /// Create a new MemVar with inline data. + /// The data length must be no more than 14 bytes. + #[inline] + pub fn inline(data: &[u8]) -> Self { + debug_assert!(data.len() <= MEM_VAR_LEN_INLINE); + let mut inline = MaybeUninit::::uninit(); + unsafe { + let i = inline.assume_init_mut(); + i.len = data.len() as u16; + i.data[..data.len()].copy_from_slice(data); + MemVar { + i: inline.assume_init(), + } + } + } + + /// Create a new outlined PageVar. + #[inline] + pub fn outline(data: &[u8]) -> Self { + debug_assert!(data.len() <= 0xffff); // must be in range of u16 + let mut outline = MaybeUninit::::uninit(); + unsafe { + let o = outline.assume_init_mut(); + o.len = data.len() as u16; + o.prefix.copy_from_slice(&data[..MEM_VAR_LEN_PREFIX]); + let layout = Layout::from_size_align_unchecked(data.len(), 1); + o.ptr = alloc(layout); + let bs = std::slice::from_raw_parts_mut(o.ptr, data.len()); + bs.copy_from_slice(data); + MemVar { + o: ManuallyDrop::new(outline.assume_init()), + } + } + } + + /// Returns length of the value. + #[inline] + pub fn len(&self) -> usize { + unsafe { self.i.len as usize } + } + + /// Returns whether the value is inlined. + #[inline] + pub fn is_inlined(&self) -> bool { + self.len() <= MEM_VAR_LEN_INLINE + } + + /// Returns inpage length of given value. + /// If the value can be inlined, returns 0. + #[inline] + pub fn outline_len(data: &[u8]) -> usize { + if data.len() > MEM_VAR_LEN_INLINE { + data.len() + } else { + 0 + } + } + + /// Returns bytes. + #[inline] + pub fn as_bytes(&self) -> &[u8] { + let len = self.len(); + if len <= MEM_VAR_LEN_INLINE { + unsafe { &self.i.data[..len] } + } else { + unsafe { std::slice::from_raw_parts(self.o.ptr, len) } + } + } + + /// Returns string. + #[inline] + pub fn as_str(&self) -> &str { + let len = self.len(); + if len <= MEM_VAR_LEN_INLINE { + unsafe { std::str::from_utf8_unchecked(&self.i.data[..len]) } + } else { + unsafe { + let bytes = std::slice::from_raw_parts(self.o.ptr, len); + std::str::from_utf8_unchecked(bytes) + } + } + } +} + +impl Clone for MemVar { + #[inline] + fn clone(&self) -> Self { + unsafe { + if self.len() > MEM_VAR_LEN_INLINE { + MemVar { o: self.o.clone() } + } else { + MemVar { i: self.i } + } + } + } +} + +impl Drop for MemVar { + #[inline] + fn drop(&mut self) { + let len = self.len(); + if len > MEM_VAR_LEN_INLINE { + unsafe { + let layout = Layout::from_size_align_unchecked(len, 1); + dealloc(self.o.ptr, layout); + } + } + } +} + +impl Serialize for MemVar { + #[inline] + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_bytes(self.as_bytes()) + } +} + +impl<'de> Deserialize<'de> for MemVar { + #[inline] + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(MemVarVisitor) + } +} + +struct MemVarVisitor; + +impl<'de> Visitor<'de> for MemVarVisitor { + type Value = MemVar; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("byte array") + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + if v.len() >= 0xffff { + return fail_long_bytes(); + } + Ok(MemVar::new(v)) + } + + fn visit_byte_buf(self, v: Vec) -> Result + where + E: serde::de::Error, + { + if v.len() >= 0xffff { + return fail_long_bytes(); + } + Ok(MemVar::new(&v)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + if v.len() >= 0xffff { + return fail_long_bytes(); + } + Ok(MemVar::new(v.as_bytes())) + } + + fn visit_string(self, v: String) -> Result + where + E: serde::de::Error, + { + if v.len() >= 0xffff { + return fail_long_bytes(); + } + Ok(MemVar::new(v.as_bytes())) + } +} + +#[inline] +fn fail_long_bytes() -> Result { + Err(serde::de::Error::custom( + "MemVar does not support bytes longer than u16:MAX", + )) +} + +#[derive(Clone, Copy, PartialEq, Eq)] +#[repr(C)] +struct MemVarInline { + len: u16, + data: [u8; MEM_VAR_LEN_INLINE], +} + +#[derive(PartialEq, Eq)] +#[repr(C)] +struct MemVarOutline { + len: u16, + prefix: [u8; MEM_VAR_LEN_PREFIX], + ptr: *mut u8, +} + +impl Clone for MemVarOutline { + #[inline] + fn clone(&self) -> Self { + unsafe { + let layout = Layout::from_size_align_unchecked(self.len as usize, 1); + let ptr = alloc(layout); + MemVarOutline { + len: self.len, + prefix: self.prefix, + ptr, + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_page_var() { + let var1 = PageVar::inline(b"hello"); + assert!(var1.is_inlined()); + assert!(var1.len() == 5); + assert!(var1.as_bytes(std::ptr::null()) == b"hello"); + } + + #[test] + fn test_mem_var() { + let var1 = MemVar::new(b"hello"); + assert!(var1.is_inlined()); + assert!(var1.len() == 5); + assert!(var1.as_bytes() == b"hello"); + assert!(var1.as_str() == "hello"); + assert!(MemVar::outline_len(b"hello") == 0); + + let var2 = MemVar::new(b"a long value stored outline"); + assert!(!var2.is_inlined()); + assert!(var2.len() == 27); + assert!(var2.as_bytes() == b"a long value stored outline"); + assert!(var2.as_str() == "a long value stored outline"); + assert!(MemVar::outline_len(b"a long value stored outline") == 27); + } +}