Skip to content

Commit

Permalink
basic transaction management
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangzhe committed Dec 14, 2024
1 parent 3d02dc4 commit 0c77a0a
Show file tree
Hide file tree
Showing 17 changed files with 2,919 additions and 771 deletions.
7 changes: 6 additions & 1 deletion doradb-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ smallvec = {version = "1.8", features = ["union"]}
thiserror = "1.0"
bitflags = "1.3"
bytemuck = "1.7"
parking_lot = "0.12.3"
parking_lot = "0.12"
libc = "0.2.164"
crossbeam-utils = "0.8"
serde = { version = "1.0.216", features = ["derive"] }
bincode = { version = "2.0.0-rc", features = ["serde"] }
flume = "0.11"

[dev-dependencies]
rand = "0.8"
bitcode = { version = "0.6.3", features = ["serde"] }
36 changes: 31 additions & 5 deletions doradb-storage/src/buffer/frame.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,36 @@
use std::cell::{Cell, UnsafeCell};
use crate::buffer::page::{Page, PageID};
use crate::buffer::FixedBufferPool;
use crate::latch::HybridLatch;
use crate::trx::undo::UndoMap;

const _: () = assert!(
{ std::mem::size_of::<BufferFrame>() % 64 == 0 },
"Size of BufferFrame must be multiply of 64"
);

const _: () = assert!(
{ std::mem::align_of::<BufferFrame>() % 64 == 0 },
"Align of BufferFrame must be multiply of 64"
);

#[repr(C)]
pub struct BufferFrame {
pub page_id: Cell<PageID>,
pub page_id: PageID,
pub next_free: PageID,
/// Undo Map is only maintained by RowPage.
/// Once a RowPage is eliminated, the UndoMap is retained by BufferPool
/// and when the page is reloaded, UndoMap is reattached to page.
pub undo_map: Option<Box<UndoMap>>,
pub latch: HybridLatch, // lock proctects free list and page.
pub next_free: UnsafeCell<PageID>,
pub page: UnsafeCell<Page>,
}
pub page: Page,
}

/// BufferFrameAware defines callbacks on lifecycle of buffer frame
/// for initialization and de-initialization.
pub trait BufferFrameAware {
/// This callback is called when a page is just loaded into BufferFrame.
fn init_bf(_pool: &FixedBufferPool, _bf: &mut BufferFrame) {}

/// This callback is called when a page is cleaned and return to BufferPool.
fn deinit_bf(_pool: &FixedBufferPool, _bf: &mut BufferFrame) {}
}
113 changes: 92 additions & 21 deletions doradb-storage/src/buffer/guard.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use crate::latch::HybridGuard;
use crate::buffer::page::PageID;
use crate::buffer::frame::BufferFrame;
use crate::error::{Result, Validation, Validation::{Valid, Invalid}};
use crate::buffer::page::PageID;
use crate::error::{
Result, Validation,
Validation::{Invalid, Valid},
};
use crate::latch::GuardState;
use crate::latch::HybridGuard;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::mem;

pub struct PageGuard<'a, T> {
bf: &'a BufferFrame,
bf: &'a UnsafeCell<BufferFrame>,
guard: HybridGuard<'a>,
_marker: PhantomData<&'a T>,
}

impl<'a, T> PageGuard<'a, T> {
#[inline]
pub(super) fn new(bf: &'a BufferFrame, guard: HybridGuard<'a>) -> Self {
pub(super) fn new(bf: &'a UnsafeCell<BufferFrame>, guard: HybridGuard<'a>) -> Self {
Self {
bf,
guard,
Expand All @@ -22,33 +27,78 @@ impl<'a, T> PageGuard<'a, T> {
}

#[inline]
pub fn page_id(&self) -> PageID {
self.bf.page_id.get()
pub unsafe fn page_id(&self) -> PageID {
(*self.bf.get()).page_id
}

#[inline]
pub fn try_shared(mut self) -> Validation<PageSharedGuard<'a, T>> {
self.guard.try_shared().map(|_| PageSharedGuard{
bf: self.bf,
self.guard.try_shared().map(|_| PageSharedGuard {
bf: unsafe { &*self.bf.get() },
guard: self.guard,
_marker: PhantomData,
})
}

#[inline]
pub fn block_until_shared(self) -> PageSharedGuard<'a, T> {
match self.guard.state {
GuardState::Exclusive => PageSharedGuard {
bf: unsafe { &*self.bf.get() },
guard: self.guard,
_marker: PhantomData,
},
GuardState::Shared => {
unimplemented!("lock downgrade from exclusive to shared is not supported")
}
GuardState::Optimistic => {
let guard = self.guard.block_until_shared();
PageSharedGuard {
bf: unsafe { &*self.bf.get() },
guard,
_marker: PhantomData,
}
}
}
}

#[inline]
pub fn try_exclusive(mut self) -> Validation<PageExclusiveGuard<'a, T>> {
self.guard.try_exclusive().map(|_| PageExclusiveGuard {
bf: self.bf,
bf: unsafe { &mut *self.bf.get() },
guard: self.guard,
_marker: PhantomData,
})
}

#[inline]
pub fn block_until_exclusive(self) -> PageExclusiveGuard<'a, T> {
match self.guard.state {
GuardState::Exclusive => PageExclusiveGuard {
bf: unsafe { &mut *self.bf.get() },
guard: self.guard,
_marker: PhantomData,
},
GuardState::Shared => {
unimplemented!("lock upgradate from shared to exclusive is not supported")
}
GuardState::Optimistic => {
let guard = self.guard.block_until_exclusive();
PageExclusiveGuard {
bf: unsafe { &mut *self.bf.get() },
guard,
_marker: PhantomData,
}
}
}
}

/// Returns page with optimistic read.
/// All values must be validated before use.
#[inline]
pub fn page_unchecked(&self) -> &T {
unsafe { &*(self.bf.page.get() as *const _ as *const T) }
pub unsafe fn page_unchecked(&self) -> &T {
let bf = self.bf.get();
mem::transmute(&(*bf).page)
}

/// Validates version not change.
Expand Down Expand Up @@ -105,20 +155,27 @@ impl<'a, T> PageSharedGuard<'a, T> {
self.guard.downgrade();
}

/// Returns the buffer frame current page associated.
#[inline]
pub fn bf(&self) -> &BufferFrame {
self.bf
}

/// Returns current page id.
#[inline]
pub fn page_id(&self) -> PageID {
self.bf.page_id.get()
self.bf.page_id
}

/// Returns shared page.
#[inline]
pub fn page(&self) -> &T {
unsafe { &*(self.bf.page.get() as *const _ as *const T) }
unsafe { mem::transmute(&self.bf.page) }
}
}

pub struct PageExclusiveGuard<'a, T> {
bf: &'a BufferFrame,
bf: &'a mut BufferFrame,
guard: HybridGuard<'a>,
_marker: PhantomData<&'a mut T>,
}
Expand All @@ -131,25 +188,39 @@ impl<'a, T> PageExclusiveGuard<'a, T> {
self.guard.downgrade();
}

/// Returns current page id.
#[inline]
pub fn page_id(&self) -> PageID {
self.bf.page_id.get()
self.bf.page_id
}

/// Returns current page.
#[inline]
pub fn page(&self) -> &T {
unsafe { &*(self.bf.page.get() as *const T) }
unsafe { mem::transmute(&self.bf.page) }
}

/// Returns mutable page.
#[inline]
pub fn page_mut(&mut self) -> &mut T {
unsafe { &mut *(self.bf.page.get() as *mut T) }
unsafe { mem::transmute(&mut self.bf.page) }
}

/// Returns current buffer frame.
#[inline]
pub fn bf(&self) -> &BufferFrame {
&self.bf
}

/// Returns mutable buffer frame.
#[inline]
pub fn bf_mut(&mut self) -> &mut BufferFrame {
&mut self.bf
}

/// Set next free page.
#[inline]
pub fn set_next_free(&mut self, next_free: PageID) {
unsafe {
*self.bf.next_free.get() = next_free;
}
self.bf.next_free = next_free;
}
}
Loading

0 comments on commit 0c77a0a

Please sign in to comment.