Skip to content

Commit

Permalink
session and statement
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangzhe committed Jan 14, 2025
1 parent 845bf60 commit f8db485
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 155 deletions.
5 changes: 4 additions & 1 deletion doradb-storage/examples/multi_threaded_trx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ async fn worker(trx_sys: &TransactionSystem, stop: Arc<AtomicBool>, wg: WaitGrou
while !stop.load(Ordering::Relaxed) {
let mut trx = session.begin_trx(trx_sys);
trx.add_pseudo_redo_log_entry();
let _ = trx_sys.commit(trx).await;
match trx_sys.commit(trx).await {
Ok(s) => session = s,
Err(_) => return,
}
}
drop(wg);
}
Expand Down
1 change: 1 addition & 0 deletions doradb-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod index;
pub mod latch;
pub mod row;
pub mod session;
pub mod stmt;
pub mod table;
pub mod trx;
pub mod value;
Expand Down
38 changes: 31 additions & 7 deletions doradb-storage/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,51 @@ use std::collections::HashMap;
use std::sync::Arc;

pub struct Session {
inner: Option<Box<InternalSession>>,
}

impl Session {
#[inline]
pub fn new() -> Session {
Session {
inner: Some(Box::new(InternalSession::new())),
}
}

#[inline]
pub fn with_internal_session(inner: Box<InternalSession>) -> Self {
Session { inner: Some(inner) }
}

#[inline]
pub fn begin_trx(mut self, trx_sys: &TransactionSystem) -> ActiveTrx {
trx_sys.new_trx(self.inner.take())
}
}

pub trait IntoSession: Sized {
fn into_session(self) -> Session;

fn split_session(&mut self) -> Session;
}

pub struct InternalSession {
active_insert_pages: HashMap<TableID, (PageID, RowID)>,
abort_signal: Arc<Mutex<Option<Sender<()>>>>,
abort_notifier: Receiver<()>,
}

impl Session {
impl InternalSession {
#[inline]
pub fn new() -> Self {
let (tx, rx) = flume::unbounded();
Session {
InternalSession {
active_insert_pages: HashMap::new(),
abort_signal: Arc::new(Mutex::new(Some(tx))),
abort_notifier: rx,
}
}

#[inline]
pub fn begin_trx(&mut self, trx_sys: &TransactionSystem) -> ActiveTrx {
trx_sys.new_trx(self)
}

#[inline]
pub fn load_active_insert_page(&mut self, table_id: TableID) -> Option<(PageID, RowID)> {
self.active_insert_pages.remove(&table_id)
Expand Down
58 changes: 58 additions & 0 deletions doradb-storage/src/stmt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::buffer::page::PageID;
use crate::buffer::BufferPool;
use crate::row::RowID;
use crate::table::TableID;
use crate::trx::redo::RedoEntry;
use crate::trx::undo::SharedUndoEntry;
use crate::trx::ActiveTrx;

pub struct Statement {
pub trx: ActiveTrx,
// statement-level undo logs.
pub undo: Vec<SharedUndoEntry>,
// statement-level redo logs.
pub redo: Vec<RedoEntry>,
}

impl Statement {
#[inline]
pub fn new(trx: ActiveTrx) -> Self {
Statement {
trx,
undo: vec![],
redo: vec![],
}
}

#[inline]
pub fn commit(mut self) -> ActiveTrx {
self.trx.trx_undo.extend(self.undo.drain(..));
self.trx.trx_redo.extend(self.redo.drain(..));
self.trx
}

#[inline]
pub fn rollback<P: BufferPool>(self, buf_pool: &P) -> ActiveTrx {
todo!();
}

#[inline]
pub fn last_undo_entry(&self) -> Option<&SharedUndoEntry> {
self.undo.last()
}

#[inline]
pub fn load_active_insert_page(&mut self, table_id: TableID) -> Option<(PageID, RowID)> {
self.trx
.session
.as_mut()
.and_then(|session| session.load_active_insert_page(table_id))
}

#[inline]
pub fn save_active_insert_page(&mut self, table_id: TableID, page_id: PageID, row_id: RowID) {
if let Some(session) = self.trx.session.as_mut() {
session.save_active_insert_page(table_id, page_id, row_id);
}
}
}
Loading

0 comments on commit f8db485

Please sign in to comment.