From f8db485a651863e892f73fb3f65356aa07edb474 Mon Sep 17 00:00:00 2001 From: jiangzhe Date: Tue, 14 Jan 2025 06:33:43 +0000 Subject: [PATCH] session and statement --- doradb-storage/examples/multi_threaded_trx.rs | 5 +- doradb-storage/src/lib.rs | 1 + doradb-storage/src/session.rs | 38 ++++- doradb-storage/src/stmt.rs | 58 +++++++ doradb-storage/src/table/mvcc.rs | 157 +++++++++--------- doradb-storage/src/trx/mod.rs | 101 ++++++----- doradb-storage/src/trx/row.rs | 2 +- doradb-storage/src/trx/sys.rs | 51 +++--- 8 files changed, 258 insertions(+), 155 deletions(-) create mode 100644 doradb-storage/src/stmt.rs diff --git a/doradb-storage/examples/multi_threaded_trx.rs b/doradb-storage/examples/multi_threaded_trx.rs index 3b2a7d4..5c97a31 100644 --- a/doradb-storage/examples/multi_threaded_trx.rs +++ b/doradb-storage/examples/multi_threaded_trx.rs @@ -91,7 +91,10 @@ async fn worker(trx_sys: &TransactionSystem, stop: Arc, wg: WaitGrou while !stop.load(Ordering::Relaxed) { let mut trx = session.begin_trx(trx_sys); trx.add_pseudo_redo_log_entry(); - let _ = trx_sys.commit(trx).await; + match trx_sys.commit(trx).await { + Ok(s) => session = s, + Err(_) => return, + } } drop(wg); } diff --git a/doradb-storage/src/lib.rs b/doradb-storage/src/lib.rs index 8114ba0..4840f4a 100644 --- a/doradb-storage/src/lib.rs +++ b/doradb-storage/src/lib.rs @@ -7,6 +7,7 @@ pub mod index; pub mod latch; pub mod row; pub mod session; +pub mod stmt; pub mod table; pub mod trx; pub mod value; diff --git a/doradb-storage/src/session.rs b/doradb-storage/src/session.rs index 2132da7..de63ce4 100644 --- a/doradb-storage/src/session.rs +++ b/doradb-storage/src/session.rs @@ -9,27 +9,51 @@ use std::collections::HashMap; use std::sync::Arc; pub struct Session { + inner: Option>, +} + +impl Session { + #[inline] + pub fn new() -> Session { + Session { + inner: Some(Box::new(InternalSession::new())), + } + } + + #[inline] + pub fn with_internal_session(inner: Box) -> Self { + Session { inner: Some(inner) } + } + + #[inline] + pub fn begin_trx(mut self, trx_sys: &TransactionSystem) -> ActiveTrx { + trx_sys.new_trx(self.inner.take()) + } +} + +pub trait IntoSession: Sized { + fn into_session(self) -> Session; + + fn split_session(&mut self) -> Session; +} + +pub struct InternalSession { active_insert_pages: HashMap, abort_signal: Arc>>>, abort_notifier: Receiver<()>, } -impl Session { +impl InternalSession { #[inline] pub fn new() -> Self { let (tx, rx) = flume::unbounded(); - Session { + InternalSession { active_insert_pages: HashMap::new(), abort_signal: Arc::new(Mutex::new(Some(tx))), abort_notifier: rx, } } - #[inline] - pub fn begin_trx(&mut self, trx_sys: &TransactionSystem) -> ActiveTrx { - trx_sys.new_trx(self) - } - #[inline] pub fn load_active_insert_page(&mut self, table_id: TableID) -> Option<(PageID, RowID)> { self.active_insert_pages.remove(&table_id) diff --git a/doradb-storage/src/stmt.rs b/doradb-storage/src/stmt.rs new file mode 100644 index 0000000..a4badf4 --- /dev/null +++ b/doradb-storage/src/stmt.rs @@ -0,0 +1,58 @@ +use crate::buffer::page::PageID; +use crate::buffer::BufferPool; +use crate::row::RowID; +use crate::table::TableID; +use crate::trx::redo::RedoEntry; +use crate::trx::undo::SharedUndoEntry; +use crate::trx::ActiveTrx; + +pub struct Statement { + pub trx: ActiveTrx, + // statement-level undo logs. + pub undo: Vec, + // statement-level redo logs. + pub redo: Vec, +} + +impl Statement { + #[inline] + pub fn new(trx: ActiveTrx) -> Self { + Statement { + trx, + undo: vec![], + redo: vec![], + } + } + + #[inline] + pub fn commit(mut self) -> ActiveTrx { + self.trx.trx_undo.extend(self.undo.drain(..)); + self.trx.trx_redo.extend(self.redo.drain(..)); + self.trx + } + + #[inline] + pub fn rollback(self, buf_pool: &P) -> ActiveTrx { + todo!(); + } + + #[inline] + pub fn last_undo_entry(&self) -> Option<&SharedUndoEntry> { + self.undo.last() + } + + #[inline] + pub fn load_active_insert_page(&mut self, table_id: TableID) -> Option<(PageID, RowID)> { + self.trx + .session + .as_mut() + .and_then(|session| session.load_active_insert_page(table_id)) + } + + #[inline] + pub fn save_active_insert_page(&mut self, table_id: TableID, page_id: PageID, row_id: RowID) { + if let Some(session) = self.trx.session.as_mut() { + session.save_active_insert_page(table_id, page_id, row_id); + } + } +} diff --git a/doradb-storage/src/table/mvcc.rs b/doradb-storage/src/table/mvcc.rs index bacc739..ea64733 100644 --- a/doradb-storage/src/table/mvcc.rs +++ b/doradb-storage/src/table/mvcc.rs @@ -8,6 +8,7 @@ use crate::row::ops::{ UpdateCol, UpdateMvccResult, UpdateRow, }; use crate::row::{estimate_max_row_count, RowID, RowPage, RowRead}; +use crate::stmt::Statement; use crate::table::{Schema, Table}; use crate::trx::redo::{RedoEntry, RedoKind}; use crate::trx::row::{RowReadAccess, RowWriteAccess}; @@ -50,7 +51,7 @@ impl<'a> MvccTable<'a> { #[inline] pub async fn select_row( &self, - trx: &mut ActiveTrx<'_>, + stmt: &mut Statement, key: Val, user_read_set: &[usize], ) -> SelectMvccResult { @@ -76,7 +77,7 @@ impl<'a> MvccTable<'a> { } let key = key.clone(); return self - .select_row_in_page(trx, page_guard, key, row_id, user_read_set) + .select_row_in_page(stmt, page_guard, key, row_id, user_read_set) .await; } }, @@ -87,7 +88,7 @@ impl<'a> MvccTable<'a> { #[inline] async fn select_row_in_page( &self, - trx: &mut ActiveTrx<'_>, + stmt: &mut Statement, page_guard: PageSharedGuard<'_, RowPage>, key: Val, row_id: RowID, @@ -100,13 +101,15 @@ impl<'a> MvccTable<'a> { return SelectMvccResult::RowNotFound; } let row_idx = (row_id - page.header.start_row_id) as usize; - let access = self.lock_row_for_read(trx, &page_guard, row_idx).await; - access.read_row_mvcc(trx, &self.schema, user_read_set, &key) + let access = self + .lock_row_for_read(&stmt.trx, &page_guard, row_idx) + .await; + access.read_row_mvcc(&stmt.trx, &self.schema, user_read_set, &key) } /// Insert row with MVCC. #[inline] - pub async fn insert_row(&self, trx: &mut ActiveTrx<'_>, cols: Vec) -> InsertMvccResult { + pub async fn insert_row(&self, stmt: &mut Statement, cols: Vec) -> InsertMvccResult { debug_assert!(cols.len() + 1 == self.schema.col_count()); debug_assert!({ cols.iter() @@ -114,18 +117,18 @@ impl<'a> MvccTable<'a> { .all(|(idx, val)| self.schema.user_col_type_match(idx, val)) }); let key = cols[self.schema.user_key_idx()].clone(); - match self.insert_row_internal(trx, cols, UndoKind::Insert, None) { + match self.insert_row_internal(stmt, cols, UndoKind::Insert, None) { InsertMvccResult::Ok(row_id) => loop { if let Some(old_row_id) = self.sec_idx.insert_if_not_exists(key.clone(), row_id) { // we found there is already one existing row with same key. // ref-count the INSERT undo entry because we treat it as newer version. // and store in the MOVE entry's prev pointer. - let new_entry = trx - .last_undo_entry_of_curr_stmt() + let new_entry = stmt + .last_undo_entry() .map(SharedUndoEntry::clone) .expect("undo entry of insert statement"); match self - .move_insert(trx, old_row_id, key.clone(), new_entry) + .move_insert(stmt, old_row_id, key.clone(), new_entry) .await { MoveInsertResult::DuplicateKey => return InsertMvccResult::DuplicateKey, @@ -149,7 +152,7 @@ impl<'a> MvccTable<'a> { #[inline] pub async fn update_row( &self, - trx: &mut ActiveTrx<'_>, + stmt: &mut Statement, key: Val, mut update: Vec, ) -> UpdateMvccResult { @@ -168,7 +171,7 @@ impl<'a> MvccTable<'a> { } let key = key.clone(); let res = self - .update_row_inplace(trx, page_guard, key, row_id, update) + .update_row_inplace(stmt, page_guard, key, row_id, update) .await; return match res { @@ -179,11 +182,11 @@ impl<'a> MvccTable<'a> { UpdateMvccResult::NoFreeSpace(old_row, update) => { // in-place update failed, we transfer update into // move+update. - let move_entry = trx - .last_undo_entry_of_curr_stmt() + let move_entry = stmt + .last_undo_entry() .map(|entry| entry.leak()) .expect("move entry"); - self.move_update(trx, old_row, update, move_entry) + self.move_update(stmt, old_row, update, move_entry) } UpdateMvccResult::Retry(upd) => { update = upd; @@ -200,7 +203,7 @@ impl<'a> MvccTable<'a> { /// Delete row with MVCC. /// This method is for delete based on index lookup. #[inline] - pub async fn delete_row(&self, trx: &mut ActiveTrx<'_>, key: Val) -> DeleteMvccResult { + pub async fn delete_row(&self, stmt: &mut Statement, key: Val) -> DeleteMvccResult { loop { match self.sec_idx.lookup(&key) { None => return DeleteMvccResult::RowNotFound, @@ -214,7 +217,7 @@ impl<'a> MvccTable<'a> { continue; } return self - .delete_row_internal(trx, page_guard, row_id, &key) + .delete_row_internal(stmt, page_guard, row_id, &key) .await; } }, @@ -226,7 +229,7 @@ impl<'a> MvccTable<'a> { #[inline] fn move_update( &self, - trx: &mut ActiveTrx, + stmt: &mut Statement, mut old_row: Vec, update: Vec, move_entry: UndoEntryPtr, @@ -245,7 +248,7 @@ impl<'a> MvccTable<'a> { (old_row, UndoKind::Update(undo_cols)) }; - match self.insert_row_internal(trx, new_row, undo_kind, Some(move_entry)) { + match self.insert_row_internal(stmt, new_row, undo_kind, Some(move_entry)) { InsertMvccResult::Ok(row_id) => UpdateMvccResult::Ok(row_id), InsertMvccResult::WriteConflict | InsertMvccResult::DuplicateKey => unreachable!(), } @@ -258,7 +261,7 @@ impl<'a> MvccTable<'a> { #[inline] async fn move_insert( &self, - trx: &mut ActiveTrx<'_>, + stmt: &mut Statement, row_id: RowID, key: Val, new_entry: SharedUndoEntry, @@ -284,7 +287,7 @@ impl<'a> MvccTable<'a> { } let row_idx = (row_id - page.header.start_row_id) as usize; let mut lock_row = self - .lock_row_for_write(trx, &page_guard, row_idx, &key) + .lock_row_for_write(&stmt.trx, &page_guard, row_idx, &key) .await; match &mut lock_row { LockRowForWrite::InvalidIndex => return MoveInsertResult::None, // key changed so we are fine. @@ -301,12 +304,12 @@ impl<'a> MvccTable<'a> { row_id, UndoKind::Move(true), ); - access.build_undo_chain(trx, &move_entry, old_cts); + access.build_undo_chain(&stmt.trx, &move_entry, old_cts); drop(access); // unlock the row. drop(lock_row); drop(page_guard); // unlock the page. link_move_entry(new_entry, move_entry.leak()); - trx.stmt_undo.push(move_entry); + stmt.undo.push(move_entry); // no redo required, because no change on row data. return MoveInsertResult::Ok; } @@ -319,7 +322,7 @@ impl<'a> MvccTable<'a> { #[inline] fn insert_row_internal( &self, - trx: &mut ActiveTrx, + stmt: &mut Statement, mut insert: Vec, mut undo_kind: UndoKind, mut move_entry: Option, @@ -327,12 +330,11 @@ impl<'a> MvccTable<'a> { let row_len = row_len(&self.schema, &insert); let row_count = estimate_max_row_count(row_len, self.schema.col_count()); loop { - let page_guard = self.get_insert_page(trx, row_count); + let page_guard = self.get_insert_page(stmt, row_count); let page_id = page_guard.page_id(); - match self.insert_row_to_page(trx, page_guard, insert, undo_kind, move_entry) { + match self.insert_row_to_page(stmt, page_guard, insert, undo_kind, move_entry) { InsertInternalResult::Ok(row_id) => { - trx.session - .save_active_insert_page(self.table_id, page_id, row_id); + stmt.save_active_insert_page(self.table_id, page_id, row_id); return InsertMvccResult::Ok(row_id); } // this page cannot be inserted any more, just leave it and retry another page. @@ -351,7 +353,7 @@ impl<'a> MvccTable<'a> { #[inline] fn insert_row_to_page( &self, - trx: &mut ActiveTrx, + stmt: &mut Statement, page_guard: PageSharedGuard<'_, RowPage>, insert: Vec, undo_kind: UndoKind, @@ -366,10 +368,10 @@ impl<'a> MvccTable<'a> { let new_entry = SharedUndoEntry::new(self.table_id, page_id, row_id, undo_kind); debug_assert!(access.undo_mut().is_none()); *access.undo_mut() = Some(UndoHead { - status: trx.status(), + status: stmt.trx.status(), entry: None, }); - access.build_undo_chain(trx, &new_entry, NextTrxCTS::None); + access.build_undo_chain(&stmt.trx, &new_entry, NextTrxCTS::None); drop(access); drop(page_guard); // in case of move+insert and move+update, we need to link current undo entry to MOVE entry @@ -378,7 +380,7 @@ impl<'a> MvccTable<'a> { // ref-count this pointer. link_move_entry(SharedUndoEntry::clone(&new_entry), move_entry); } - trx.stmt_undo.push(new_entry); + stmt.undo.push(new_entry); // create redo log. // even if the operation is move+update, we still treat it as insert redo log. // because redo is only useful when recovering and no version chain is required @@ -389,7 +391,7 @@ impl<'a> MvccTable<'a> { kind: RedoKind::Insert(insert), }; // store redo log into transaction redo buffer. - trx.stmt_redo.push(redo_entry); + stmt.redo.push(redo_entry); InsertInternalResult::Ok(row_id) } InsertResult::NoFreeSpaceOrRowID => { @@ -401,7 +403,7 @@ impl<'a> MvccTable<'a> { #[inline] async fn update_row_inplace( &self, - trx: &mut ActiveTrx<'_>, + stmt: &mut Statement, page_guard: PageSharedGuard<'_, RowPage>, key: Val, row_id: RowID, @@ -436,7 +438,7 @@ impl<'a> MvccTable<'a> { } let row_idx = (row_id - page.header.start_row_id) as usize; let mut lock_row = self - .lock_row_for_write(trx, &page_guard, row_idx, &key) + .lock_row_for_write(&stmt.trx, &page_guard, row_idx, &key) .await; match &mut lock_row { LockRowForWrite::InvalidIndex => return UpdateMvccResult::RowNotFound, @@ -459,18 +461,18 @@ impl<'a> MvccTable<'a> { row_id, UndoKind::Move(false), ); - access.build_undo_chain(trx, &new_entry, old_cts); + access.build_undo_chain(&stmt.trx, &new_entry, old_cts); drop(access); // unlock row drop(lock_row); drop(page_guard); // unlock page - trx.stmt_undo.push(new_entry); + stmt.undo.push(new_entry); let redo_entry = RedoEntry { page_id, row_id, // use DELETE for redo is ok, no version chain should be maintained if recovering from redo. kind: RedoKind::Delete, }; - trx.stmt_redo.push(redo_entry); + stmt.redo.push(redo_entry); UpdateMvccResult::NoFreeSpace(old_row, update) } UpdateRow::Ok(mut row) => { @@ -496,11 +498,11 @@ impl<'a> MvccTable<'a> { row_id, UndoKind::Update(undo_cols), ); - access.build_undo_chain(trx, &new_entry, old_cts); + access.build_undo_chain(&stmt.trx, &new_entry, old_cts); drop(access); // unlock the row. drop(lock_row); drop(page_guard); // unlock the page, because we finish page update. - trx.stmt_undo.push(new_entry); + stmt.undo.push(new_entry); if !redo_cols.is_empty() { // there might be nothing to update, so we do not need to add redo log. // but undo is required because we need to properly lock the row. @@ -509,7 +511,7 @@ impl<'a> MvccTable<'a> { row_id, kind: RedoKind::Update(redo_cols), }; - trx.stmt_redo.push(redo_entry); + stmt.redo.push(redo_entry); } UpdateMvccResult::Ok(row_id) } @@ -521,7 +523,7 @@ impl<'a> MvccTable<'a> { #[inline] async fn delete_row_internal( &self, - trx: &mut ActiveTrx<'_>, + stmt: &mut Statement, page_guard: PageSharedGuard<'_, RowPage>, row_id: RowID, key: &Val, @@ -535,7 +537,7 @@ impl<'a> MvccTable<'a> { } let row_idx = (row_id - page.header.start_row_id) as usize; let mut lock_row = self - .lock_row_for_write(trx, &page_guard, row_idx, key) + .lock_row_for_write(&stmt.trx, &page_guard, row_idx, key) .await; match &mut lock_row { LockRowForWrite::InvalidIndex => return DeleteMvccResult::RowNotFound, @@ -548,18 +550,18 @@ impl<'a> MvccTable<'a> { access.delete_row(); let new_entry = SharedUndoEntry::new(self.table_id, page_id, row_id, UndoKind::Delete); - access.build_undo_chain(trx, &new_entry, mem::take(old_cts)); + access.build_undo_chain(&stmt.trx, &new_entry, mem::take(old_cts)); drop(access); // unlock row drop(lock_row); drop(page_guard); // unlock page - trx.stmt_undo.push(new_entry); + stmt.undo.push(new_entry); // create redo log let redo_entry = RedoEntry { page_id, row_id, kind: RedoKind::Delete, }; - trx.stmt_redo.push(redo_entry); + stmt.redo.push(redo_entry); DeleteMvccResult::Ok } } @@ -568,10 +570,10 @@ impl<'a> MvccTable<'a> { #[inline] fn get_insert_page( &self, - trx: &mut ActiveTrx, + stmt: &mut Statement, row_count: usize, ) -> PageSharedGuard<'a, RowPage> { - if let Some((page_id, row_id)) = trx.session.load_active_insert_page(self.table_id) { + if let Some((page_id, row_id)) = stmt.load_active_insert_page(self.table_id) { let g = self.buf_pool.get_page(page_id, LatchFallbackMode::Shared); // because we save last insert page in session and meanwhile other thread may access this page // and do some modification, even worse, buffer pool may evict it and reload other data into @@ -590,7 +592,7 @@ impl<'a> MvccTable<'a> { #[inline] async fn lock_row_for_write( &self, - trx: &mut ActiveTrx<'_>, + trx: &ActiveTrx, page_guard: &'a PageSharedGuard<'a, RowPage>, row_idx: usize, key: &Val, @@ -671,7 +673,7 @@ impl<'a> MvccTable<'a> { #[inline] async fn lock_row_for_read( &self, - trx: &ActiveTrx<'_>, + trx: &ActiveTrx, page_guard: &'a PageSharedGuard<'a, RowPage>, row_idx: usize, ) -> RowReadAccess<'a> { @@ -817,21 +819,21 @@ mod tests { let mut trx = session.begin_trx(trx_sys); for i in 0..SIZE { let s = format!("{}", i); - trx.start_stmt(); + let mut stmt = trx.start_stmt(); let res = table - .insert_row(&mut trx, vec![Val::from(i), Val::from(&s[..])]) + .insert_row(&mut stmt, vec![Val::from(i), Val::from(&s[..])]) .await; - trx.end_stmt(); + trx = stmt.commit(); assert!(res.is_ok()); } - trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx).await.unwrap(); } { let mut trx = session.begin_trx(trx_sys); for i in 16..SIZE { - trx.start_stmt(); + let mut stmt = trx.start_stmt(); let key = Val::from(i); - let res = table.select_row(&mut trx, key, &[0, 1]).await; + let res = table.select_row(&mut stmt, key, &[0, 1]).await; match res { SelectMvccResult::Ok(vals) => { assert!(vals.len() == 2); @@ -841,8 +843,9 @@ mod tests { } _ => panic!("select fail"), } + trx = stmt.commit(); } - trx_sys.commit(trx).await.unwrap(); + let _ = trx_sys.commit(trx).await.unwrap(); } unsafe { @@ -868,14 +871,14 @@ mod tests { let mut trx = session.begin_trx(trx_sys); for i in 0..SIZE { let s = format!("{}", i); - trx.start_stmt(); + let mut stmt = trx.start_stmt(); let res = table - .insert_row(&mut trx, vec![Val::from(i), Val::from(&s[..])]) + .insert_row(&mut stmt, vec![Val::from(i), Val::from(&s[..])]) .await; - trx.end_stmt(); + trx = stmt.commit(); assert!(res.is_ok()); } - trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx).await.unwrap(); // update 1 row with short value let mut trx = session.begin_trx(trx_sys); @@ -885,11 +888,11 @@ mod tests { idx: 1, val: Val::from(s1), }]; - trx.start_stmt(); - let res = table.update_row(&mut trx, k1, update1).await; + let mut stmt = trx.start_stmt(); + let res = table.update_row(&mut stmt, k1, update1).await; assert!(res.is_ok()); - trx.end_stmt(); - trx_sys.commit(trx).await.unwrap(); + trx = stmt.commit(); + session = trx_sys.commit(trx).await.unwrap(); // update 1 row with long value let mut trx = session.begin_trx(trx_sys); @@ -899,11 +902,11 @@ mod tests { idx: 1, val: Val::from(&s2[..]), }]; - trx.start_stmt(); - let res = table.update_row(&mut trx, k2, update2).await; + let mut stmt = trx.start_stmt(); + let res = table.update_row(&mut stmt, k2, update2).await; assert!(res.is_ok()); - trx.end_stmt(); - trx_sys.commit(trx).await.unwrap(); + trx = stmt.commit(); + let _ = trx_sys.commit(trx).await.unwrap(); } unsafe { TransactionSystem::drop_static(trx_sys); @@ -928,23 +931,23 @@ mod tests { let mut trx = session.begin_trx(trx_sys); for i in 0..SIZE { let s = format!("{}", i); - trx.start_stmt(); + let mut stmt = trx.start_stmt(); let res = table - .insert_row(&mut trx, vec![Val::from(i), Val::from(&s[..])]) + .insert_row(&mut stmt, vec![Val::from(i), Val::from(&s[..])]) .await; - trx.end_stmt(); + trx = stmt.commit(); assert!(res.is_ok()); } - trx_sys.commit(trx).await.unwrap(); + session = trx_sys.commit(trx).await.unwrap(); // delete 1 row let mut trx = session.begin_trx(trx_sys); let k1 = Val::from(1i32); - trx.start_stmt(); - let res = table.delete_row(&mut trx, k1).await; + let mut stmt = trx.start_stmt(); + let res = table.delete_row(&mut stmt, k1).await; assert!(res.is_ok()); - trx.end_stmt(); - trx_sys.commit(trx).await.unwrap(); + trx = stmt.commit(); + let _ = trx_sys.commit(trx).await.unwrap(); } unsafe { TransactionSystem::drop_static(trx_sys); diff --git a/doradb-storage/src/trx/mod.rs b/doradb-storage/src/trx/mod.rs index 9f77c51..53517bb 100644 --- a/doradb-storage/src/trx/mod.rs +++ b/doradb-storage/src/trx/mod.rs @@ -20,8 +20,8 @@ pub mod row; pub mod sys; pub mod undo; -use crate::buffer::BufferPool; -use crate::session::Session; +use crate::session::{InternalSession, IntoSession, Session}; +use crate::stmt::Statement; use crate::trx::redo::{RedoBin, RedoEntry, RedoKind, RedoLog}; use crate::trx::undo::SharedUndoEntry; use crate::value::Val; @@ -107,8 +107,7 @@ pub fn trx_must_not_see_even_if_prepare(sts: TrxID, ts: TrxID) -> bool { sts < (ts & SNAPSHOT_TS_MASK) } -pub struct ActiveTrx<'a> { - pub session: &'a mut Session, +pub struct ActiveTrx { // Status of the transaction. // Every undo log will refer to this object on heap. // There are nested pointers to allow atomic update on status when @@ -119,28 +118,24 @@ pub struct ActiveTrx<'a> { // which log partition it belongs to. pub log_partition_idx: usize, // transaction-level undo logs. - trx_undo: Vec, - // statement-level undo logs. - pub stmt_undo: Vec, + pub(crate) trx_undo: Vec, // transaction-level redo logs. - trx_redo: Vec, - // statement-level redo logs. - pub stmt_redo: Vec, + pub(crate) trx_redo: Vec, + + pub session: Option>, } -impl<'a> ActiveTrx<'a> { +impl ActiveTrx { /// Create a new transaction. #[inline] - pub fn new(session: &'a mut Session, trx_id: TrxID, sts: TrxID) -> Self { + pub fn new(session: Option>, trx_id: TrxID, sts: TrxID) -> Self { ActiveTrx { - session, status: Arc::new(SharedTrxStatus::new(trx_id)), sts, log_partition_idx: 0, trx_undo: vec![], - stmt_undo: vec![], trx_redo: vec![], - stmt_redo: vec![], + session, } } @@ -166,16 +161,8 @@ impl<'a> ActiveTrx<'a> { /// Starts a statement. #[inline] - pub fn start_stmt(&mut self) { - debug_assert!(self.stmt_undo.is_empty()); - debug_assert!(self.stmt_redo.is_empty()); - } - - /// Ends a statement. - #[inline] - pub fn end_stmt(&mut self) { - self.trx_undo.extend(self.stmt_undo.drain(..)); - self.trx_redo.extend(self.stmt_redo.drain(..)); + pub fn start_stmt(self) -> Statement { + Statement::new(self) } /// Returns whether the transaction is readonly. @@ -184,19 +171,9 @@ impl<'a> ActiveTrx<'a> { self.trx_redo.is_empty() && self.trx_undo.is_empty() } - /// Rollback a statement. - #[inline] - pub fn rollback_stmt(&mut self, buf_pool: &P) { - while let Some(undo) = self.stmt_undo.pop() { - todo!() - } - } - /// Prepare current transaction for committing. #[inline] pub fn prepare(mut self) -> PreparedTrx { - debug_assert!(self.stmt_undo.is_empty()); - debug_assert!(self.stmt_redo.is_empty()); // fast path for readonly transactions if self.readonly() { // there should be no ref count of transaction status. @@ -206,6 +183,7 @@ impl<'a> ActiveTrx<'a> { sts: self.sts, redo_bin: None, undo: vec![], + session: self.session.take(), }; } @@ -232,9 +210,9 @@ impl<'a> ActiveTrx<'a> { PreparedTrx { status: self.status.clone(), sts: self.sts, - // cts, redo_bin, undo, + session: self.session.take(), } } @@ -263,21 +241,13 @@ impl<'a> ActiveTrx<'a> { ]), }) } - - /// Returns last undo entry of current statement. - #[inline] - pub fn last_undo_entry_of_curr_stmt(&self) -> Option<&SharedUndoEntry> { - self.stmt_undo.last() - } } -impl Drop for ActiveTrx<'_> { +impl Drop for ActiveTrx { #[inline] fn drop(&mut self) { assert!(self.trx_undo.is_empty(), "trx undo should be cleared"); - assert!(self.stmt_undo.is_empty(), "stmt undo should be cleared"); assert!(self.trx_redo.is_empty(), "trx redo should be cleared"); - assert!(self.stmt_redo.is_empty(), "stmt redo should be cleared"); } } @@ -290,6 +260,7 @@ pub struct PreparedTrx { sts: TrxID, redo_bin: Option, undo: Vec, + session: Option>, } impl PreparedTrx { @@ -308,6 +279,7 @@ impl PreparedTrx { cts, redo_bin, undo, + session: self.session.take(), } } @@ -318,6 +290,18 @@ impl PreparedTrx { } } +impl IntoSession for PreparedTrx { + #[inline] + fn into_session(mut self) -> Session { + Session::with_internal_session(self.session.take().unwrap()) + } + + #[inline] + fn split_session(&mut self) -> Session { + Session::with_internal_session(self.session.take().unwrap()) + } +} + impl Drop for PreparedTrx { #[inline] fn drop(&mut self) { @@ -333,6 +317,7 @@ pub struct PrecommitTrx { pub cts: TrxID, pub redo_bin: Option, pub undo: Vec, + session: Option>, } impl PrecommitTrx { @@ -359,6 +344,7 @@ impl PrecommitTrx { sts: self.sts, cts: self.cts, undo, + session: self.session.take(), } } } @@ -371,9 +357,34 @@ impl Drop for PrecommitTrx { } } +impl IntoSession for PrecommitTrx { + #[inline] + fn into_session(mut self) -> Session { + Session::with_internal_session(self.session.take().unwrap()) + } + + #[inline] + fn split_session(&mut self) -> Session { + Session::with_internal_session(self.session.take().unwrap()) + } +} + pub struct CommittedTrx { status: Arc, pub sts: TrxID, pub cts: TrxID, pub undo: Vec, + session: Option>, +} + +impl IntoSession for CommittedTrx { + #[inline] + fn into_session(mut self) -> Session { + Session::with_internal_session(self.session.take().unwrap()) + } + + #[inline] + fn split_session(&mut self) -> Session { + Session::with_internal_session(self.session.take().unwrap()) + } } diff --git a/doradb-storage/src/trx/row.rs b/doradb-storage/src/trx/row.rs index 65d184b..bc4e127 100644 --- a/doradb-storage/src/trx/row.rs +++ b/doradb-storage/src/trx/row.rs @@ -276,7 +276,7 @@ impl<'a> RowWriteAccess<'a> { #[inline] pub fn build_undo_chain( &mut self, - trx: &ActiveTrx<'_>, + trx: &ActiveTrx, new_entry: &SharedUndoEntry, old_cts: NextTrxCTS, ) { diff --git a/doradb-storage/src/trx/sys.rs b/doradb-storage/src/trx/sys.rs index 9fe2301..ee4ebd8 100644 --- a/doradb-storage/src/trx/sys.rs +++ b/doradb-storage/src/trx/sys.rs @@ -3,7 +3,7 @@ use crate::io::{ align_to_sector_size, pwrite, AIOError, AIOManager, AIOManagerConfig, Buf, DirectBuf, FreeListWithFactory, IocbRawPtr, PageBuf, SparseFile, AIO, }; -use crate::session::Session; +use crate::session::{InternalSession, IntoSession, Session}; use crate::trx::{ ActiveTrx, CommittedTrx, PrecommitTrx, PreparedTrx, TrxID, MAX_COMMIT_TS, MAX_SNAPSHOT_TS, MIN_ACTIVE_TRX_ID, MIN_SNAPSHOT_TS, @@ -98,7 +98,7 @@ impl TransactionSystem { /// Create a new transaction. #[inline] - pub fn new_trx<'a>(&self, session: &'a mut Session) -> ActiveTrx<'a> { + pub fn new_trx<'a>(&self, session: Option>) -> ActiveTrx { // active transaction list is calculated by group committer thread // so here we just generate STS and TrxID. let sts = self.ts.fetch_add(1, Ordering::SeqCst); @@ -124,7 +124,7 @@ impl TransactionSystem { /// leader to persist log and backfill CTS. /// This strategy can largely reduce logging IO, therefore improve throughput. #[inline] - pub async fn commit(&self, trx: ActiveTrx<'_>) -> Result<()> { + pub async fn commit(&self, trx: ActiveTrx) -> Result { // Prepare redo log first, this may take some time, // so keep it out of lock scope, and we can fill cts after the lock is held. let partition = &*self.log_partitions[trx.log_partition_idx]; @@ -133,8 +133,7 @@ impl TransactionSystem { if prepared_trx.undo.is_empty() { // This is a read-only transaction, drop it is safe. debug_assert!(prepared_trx.readonly()); - drop(prepared_trx); - return Ok(()); + return Ok(prepared_trx.into_session()); } // There might be scenario that the transaction does not change anything // logically, but have undo logs. @@ -469,7 +468,7 @@ impl LogPartition { &self, mut trx: PrecommitTrx, mut group_commit_g: MutexGuard<'_, GroupCommit>, - ) -> Receiver<()> { + ) -> (Session, Receiver<()>) { let cts = trx.cts; let redo_bin = trx.redo_bin.take().unwrap(); debug_assert!(!redo_bin.is_empty()); @@ -487,6 +486,7 @@ impl LogPartition { let fd = log_file.as_raw_fd(); let log_buf = self.buf(&redo_bin); let (sync_signal, sync_notifier) = flume::unbounded(); + let session = trx.split_session(); let new_group = CommitGroup { trx_list: vec![trx], max_cts: cts, @@ -501,55 +501,54 @@ impl LogPartition { .push_back(CommitMessage::Group(new_group)); drop(group_commit_g); - sync_notifier + (session, sync_notifier) } /// Transaction has no redo log, so we can just acquire CTS and finish it immediately. #[inline] - fn commit_without_redo(&self, trx: PreparedTrx, ts: &AtomicU64) { - let sts = trx.sts; + fn commit_without_redo(&self, trx: PreparedTrx, ts: &AtomicU64) -> Session { let cts = ts.fetch_add(1, Ordering::SeqCst); let committed_trx = trx.fill_cts(cts).commit(); // todo: GC - // self.end_active_sts(sts); + committed_trx.into_session() } #[inline] - async fn commit(&self, trx: PreparedTrx, ts: &AtomicU64) -> Result<()> { + async fn commit(&self, trx: PreparedTrx, ts: &AtomicU64) -> Result { if trx.redo_bin.is_none() { - self.commit_without_redo(trx, ts); - return Ok(()); + let session = self.commit_without_redo(trx, ts); + return Ok(session); } let mut group_commit_g = self.group_commit.0.lock(); let cts = ts.fetch_add(1, Ordering::SeqCst); debug_assert!(cts < MAX_COMMIT_TS); let precommit_trx = trx.fill_cts(cts); if group_commit_g.queue.is_empty() { - let sync_notifier = self.create_new_group(precommit_trx, group_commit_g); + let (session, sync_notifier) = self.create_new_group(precommit_trx, group_commit_g); self.group_commit.1.notify_one(); // notify sync thread to work. let _ = sync_notifier.recv_async().await; // wait for fsync assert!(self.persisted_cts.load(Ordering::Relaxed) >= cts); - return Ok(()); + return Ok(session); } let last_group = match group_commit_g.queue.back_mut().unwrap() { CommitMessage::Shutdown => return Err(Error::TransactionSystemShutdown), CommitMessage::Group(group) => group, }; if last_group.can_join(&precommit_trx) { - let sync_notifier = last_group.join(precommit_trx); + let (session, sync_notifier) = last_group.join(precommit_trx); drop(group_commit_g); // unlock to let other transactions to enter commit phase. let _ = sync_notifier.recv_async().await; // wait for fsync assert!(self.persisted_cts.load(Ordering::Relaxed) >= cts); - return Ok(()); + return Ok(session); } - let sync_notifier = self.create_new_group(precommit_trx, group_commit_g); + let (session, sync_notifier) = self.create_new_group(precommit_trx, group_commit_g); let _ = sync_notifier.recv_async().await; // wait for fsync assert!(self.persisted_cts.load(Ordering::Relaxed) >= cts); - Ok(()) + Ok(session) } #[inline] @@ -879,14 +878,15 @@ impl CommitGroup { } #[inline] - fn join(&mut self, mut trx: PrecommitTrx) -> Receiver<()> { + fn join(&mut self, mut trx: PrecommitTrx) -> (Session, Receiver<()>) { debug_assert!(self.max_cts < trx.cts); if let Some(redo_bin) = trx.redo_bin.take() { self.log_buf.clone_from_slice(&redo_bin); } self.max_cts = trx.cts; + let session = trx.split_session(); self.trx_list.push(trx); - self.sync_notifier.clone() + (session, self.sync_notifier.clone()) } #[inline] @@ -979,6 +979,7 @@ fn shrink_inflight( #[cfg(test)] mod tests { use super::*; + use crate::session::Session; use crossbeam_utils::CachePadded; use parking_lot::Mutex; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; @@ -988,13 +989,13 @@ mod tests { #[test] fn test_transaction_system() { let trx_sys = TrxSysConfig::default().build_static(); - let mut session = Session::new(); + let session = Session::new(); { let trx = session.begin_trx(trx_sys); let _ = smol::block_on(trx_sys.commit(trx)); } std::thread::spawn(|| { - let mut session = Session::new(); + let session = Session::new(); let trx = session.begin_trx(trx_sys); let _ = smol::block_on(trx_sys.commit(trx)); }) @@ -1147,7 +1148,9 @@ mod tests { let start = Instant::now(); for _ in 0..COUNT { let trx = session.begin_trx(trx_sys); - assert!(smol::block_on(trx_sys.commit(trx)).is_ok()); + let res = smol::block_on(trx_sys.commit(trx)); + assert!(res.is_ok()); + session = res.unwrap(); } let dur = start.elapsed(); println!(