diff --git a/doradb-storage/src/row/mod.rs b/doradb-storage/src/row/mod.rs index 52a1c0d..dc0c03e 100644 --- a/doradb-storage/src/row/mod.rs +++ b/doradb-storage/src/row/mod.rs @@ -3,7 +3,7 @@ pub mod ops; use crate::buffer::frame::{BufferFrameAware, FrameHeader}; use crate::buffer::page::PAGE_SIZE; use crate::buffer::BufferPool; -use crate::row::ops::{DeleteResult, InsertResult, SelectResult, UpdateCol, UpdateResult}; +use crate::row::ops::{Delete, InsertRow, Select, Update, UpdateCol}; use crate::table::Schema; use crate::trx::undo::UndoMap; use crate::value::*; @@ -126,6 +126,27 @@ impl RowPage { } } + /// Returns index of the row within page. + #[inline] + pub fn row_idx(&self, row_id: RowID) -> usize { + debug_assert!(self.row_id_in_valid_range(row_id)); + (row_id - self.header.start_row_id) as usize + } + + /// Returns row id for given index. + #[inline] + pub fn row_id(&self, row_idx: usize) -> RowID { + debug_assert!(row_idx < self.header.row_count()); + self.header.start_row_id + row_idx as u64 + } + + /// Returns whether row id is in valid range. + #[inline] + pub fn row_id_in_valid_range(&self, row_id: RowID) -> bool { + row_id >= self.header.start_row_id + && row_id < self.header.start_row_id + self.header.row_count() as u64 + } + /// Returns row id list in this page. #[inline] pub fn row_ids(&self) -> &[RowID] { @@ -134,12 +155,10 @@ impl RowPage { #[inline] pub fn row_by_id(&self, row_id: RowID) -> Option { - if row_id < self.header.start_row_id as RowID - || row_id >= self.header.start_row_id + self.header.row_count() as u64 - { + if !self.row_id_in_valid_range(row_id) { return None; } - Some(self.row((row_id - self.header.start_row_id) as usize)) + Some(self.row(self.row_idx(row_id))) } /// Returns free space of current page. @@ -189,7 +208,7 @@ impl RowPage { /// Insert a new row in page. #[inline] - pub fn insert(&self, schema: &Schema, user_cols: &[Val]) -> InsertResult { + pub fn insert(&self, schema: &Schema, user_cols: &[Val]) -> InsertRow { debug_assert!(schema.col_count() == self.header.col_count as usize); // insert row does not include RowID, as RowID is auto-generated. debug_assert!(user_cols.len() + 1 == self.header.col_count as usize); @@ -199,7 +218,7 @@ impl RowPage { if let Some((row_idx, var_offset)) = self.request_row_idx_and_free_space(var_len) { (row_idx, var_offset) } else { - return InsertResult::NoFreeSpaceOrRowID; + return InsertRow::NoFreeSpaceOrRowID; }; let mut new_row = self.new_row(row_idx as usize, var_offset); for v in user_cols { @@ -212,32 +231,27 @@ impl RowPage { Val::VarByte(var) => new_row.add_var(var.as_bytes()), } } - InsertResult::Ok(new_row.finish()) + InsertRow::Ok(new_row.finish()) } /// delete row in page. /// This method will only mark the row as deleted. #[inline] - pub fn delete(&self, row_id: RowID) -> DeleteResult { - if row_id < self.header.start_row_id || row_id >= self.header.max_row_count as u64 { - return DeleteResult::RowNotFound; + pub fn delete(&self, row_id: RowID) -> Delete { + if !self.row_id_in_valid_range(row_id) { + return Delete::RowNotFound; } - let row_idx = (row_id - self.header.start_row_id) as usize; + let row_idx = self.row_idx(row_id); if self.is_deleted(row_idx) { - return DeleteResult::RowAlreadyDeleted; + return Delete::RowAlreadyDeleted; } self.set_deleted(row_idx, true); - DeleteResult::Ok + Delete::Ok } /// Update in-place in current page. #[inline] - pub fn update( - &mut self, - schema: &Schema, - row_id: RowID, - user_cols: &[UpdateCol], - ) -> UpdateResult { + pub fn update(&mut self, schema: &Schema, row_id: RowID, user_cols: &[UpdateCol]) -> Update { // column indexes must be in range debug_assert!( { @@ -258,12 +272,12 @@ impl RowPage { }, "update columns should be in order" ); - if row_id < self.header.start_row_id || row_id >= self.header.max_row_count as u64 { - return UpdateResult::RowNotFound; + if !self.row_id_in_valid_range(row_id) { + return Update::RowNotFound; } - let row_idx = (row_id - self.header.start_row_id) as usize; + let row_idx = self.row_idx(row_id); if self.row(row_idx).is_deleted() { - return UpdateResult::RowDeleted; + return Update::RowDeleted; } let var_len = self.var_len_for_update(row_idx, user_cols); let var_offset = if let Some(var_offset) = self.request_free_space(var_len) { @@ -271,28 +285,28 @@ impl RowPage { } else { let row = self.row(row_idx); let vals = row.clone_vals(schema, false); - return UpdateResult::NoFreeSpace(vals); + return Update::NoFreeSpace(vals); }; let mut row = self.row_mut(row_idx, var_offset, var_offset + var_len); for uc in user_cols { row.update_user_col(uc.idx, &uc.val); } row.finish(); - UpdateResult::Ok(row_id) + Update::Ok(row_id) } /// Select single row by row id. #[inline] - pub fn select(&self, row_id: RowID) -> SelectResult { - if row_id < self.header.start_row_id || row_id >= self.header.max_row_count as u64 { - return SelectResult::RowNotFound; + pub fn select(&self, row_id: RowID) -> Select { + if !self.row_id_in_valid_range(row_id) { + return Select::RowNotFound; } - let row_idx = (row_id - self.header.start_row_id) as usize; + let row_idx = self.row_idx(row_id); let row = self.row(row_idx); if row.is_deleted() { - return SelectResult::RowDeleted(row); + return Select::RowDeleted(row); } - SelectResult::Ok(row) + Select::Ok(row) } #[inline] @@ -326,7 +340,7 @@ impl RowPage { var_offset, }; // always add RowID as first column - row.add_val(self.header.start_row_id + row_idx as u64); + row.add_val(self.row_id(row_idx)); row } @@ -738,7 +752,7 @@ impl<'a> NewRow<'a> { pub fn finish(self) -> RowID { debug_assert!(self.col_idx == self.page.header.col_count as usize); self.page.set_deleted(self.row_idx, false); - self.page.header.start_row_id + self.row_idx as u64 + self.page.row_id(self.row_idx) } } @@ -1231,7 +1245,7 @@ mod tests { Val::from(&short[..]), ]; let res = page.insert(&schema, &insert); - assert!(matches!(res, InsertResult::Ok(100))); + assert!(matches!(res, InsertRow::Ok(100))); assert!(!page.row(0).is_deleted()); let row_id = 100; @@ -1261,10 +1275,10 @@ mod tests { assert!(res.is_ok()); let res = page.delete(row_id); - assert!(matches!(res, DeleteResult::Ok)); + assert!(matches!(res, Delete::Ok)); let select = page.select(row_id); - assert!(matches!(select, SelectResult::RowDeleted(_))); + assert!(matches!(select, Select::RowDeleted(_))); } fn create_row_page() -> RowPage { diff --git a/doradb-storage/src/row/ops.rs b/doradb-storage/src/row/ops.rs index 09ad8db..806703f 100644 --- a/doradb-storage/src/row/ops.rs +++ b/doradb-storage/src/row/ops.rs @@ -2,68 +2,71 @@ use crate::row::{Row, RowID, RowMut}; use crate::value::Val; use serde::{Deserialize, Serialize}; -pub enum SelectResult<'a> { +pub enum Select<'a> { Ok(Row<'a>), RowDeleted(Row<'a>), RowNotFound, } -impl SelectResult<'_> { +impl Select<'_> { /// Returns if select succeeds. #[inline] pub fn is_ok(&self) -> bool { - matches!(self, SelectResult::Ok(_)) + matches!(self, Select::Ok(_)) } } -pub enum SelectMvccResult { +pub enum SelectMvcc { Ok(Vec), RowNotFound, InvalidIndex, } -impl SelectMvccResult { +impl SelectMvcc { #[inline] pub fn is_ok(&self) -> bool { - matches!(self, SelectMvccResult::Ok(_)) + matches!(self, SelectMvcc::Ok(_)) } } -pub enum InsertResult { +pub enum InsertRow { Ok(RowID), NoFreeSpaceOrRowID, } -impl InsertResult { +impl InsertRow { /// Returns if insert succeeds. #[inline] pub fn is_ok(&self) -> bool { - matches!(self, InsertResult::Ok(_)) + matches!(self, InsertRow::Ok(_)) } } -pub enum InsertMvccResult { +pub enum InsertMvcc { + // PageGuard is required if table has unique index and + // we may need to linke a deleted version to the new version. + // In such scenario, we should keep the page for shared mode + // and acquire row lock when we do the linking. Ok(RowID), WriteConflict, DuplicateKey, } -impl InsertMvccResult { +impl InsertMvcc { #[inline] pub fn is_ok(&self) -> bool { - matches!(self, InsertMvccResult::Ok(_)) + matches!(self, InsertMvcc::Ok(_)) } } -pub enum MoveInsertResult { +pub enum MoveInsert { Ok, None, WriteConflict, DuplicateKey, - Retry, } -pub enum UpdateResult { +pub enum Update { // RowID may change if the update is out-of-place. Ok(RowID), RowNotFound, @@ -74,15 +77,15 @@ pub enum UpdateResult { NoFreeSpace(Vec), } -impl UpdateResult { +impl Update { /// Returns if update succeeds. #[inline] pub fn is_ok(&self) -> bool { - matches!(self, UpdateResult::Ok(..)) + matches!(self, Update::Ok(..)) } } -pub enum UpdateMvccResult { +pub enum UpdateMvcc { Ok(RowID), RowNotFound, RowDeleted, @@ -91,11 +94,11 @@ pub enum UpdateMvccResult { Retry(Vec), } -impl UpdateMvccResult { +impl UpdateMvcc { /// Returns if update with undo succeeds. #[inline] pub fn is_ok(&self) -> bool { - matches!(self, UpdateMvccResult::Ok(_)) + matches!(self, UpdateMvcc::Ok(_)) } } @@ -110,22 +113,22 @@ pub enum UpdateRow<'a> { NoFreeSpace(Vec), } -pub enum DeleteResult { +pub enum Delete { Ok, RowNotFound, RowAlreadyDeleted, } -pub enum DeleteMvccResult { +pub enum DeleteMvcc { Ok, RowNotFound, RowAlreadyDeleted, WriteConflict, } -impl DeleteMvccResult { +impl DeleteMvcc { #[inline] pub fn is_ok(&self) -> bool { - matches!(self, DeleteMvccResult::Ok) + matches!(self, DeleteMvcc::Ok) } } diff --git a/doradb-storage/src/stmt.rs b/doradb-storage/src/stmt.rs index d74f497..98852df 100644 --- a/doradb-storage/src/stmt.rs +++ b/doradb-storage/src/stmt.rs @@ -3,13 +3,13 @@ use crate::buffer::BufferPool; use crate::row::RowID; use crate::table::TableID; use crate::trx::redo::RedoEntry; -use crate::trx::undo::SharedUndoEntry; +use crate::trx::undo::OwnedUndoEntry; use crate::trx::ActiveTrx; pub struct Statement { pub trx: ActiveTrx, // statement-level undo logs. - pub undo: Vec, + pub undo: Vec, // statement-level redo logs. pub redo: Vec, } @@ -36,11 +36,6 @@ impl Statement { todo!(); } - #[inline] - pub fn last_undo_entry(&self) -> Option<&SharedUndoEntry> { - self.undo.last() - } - #[inline] pub fn load_active_insert_page(&mut self, table_id: TableID) -> Option<(PageID, RowID)> { self.trx diff --git a/doradb-storage/src/table/mod.rs b/doradb-storage/src/table/mod.rs index e5d5d64..6957551 100644 --- a/doradb-storage/src/table/mod.rs +++ b/doradb-storage/src/table/mod.rs @@ -4,7 +4,6 @@ use crate::buffer::FixedBufferPool; use crate::index::{BlockIndex, SingleKeyIndex}; use crate::table::mvcc::MvccTable; use crate::value::{Layout, Val}; -use std::ops::Deref; use std::sync::Arc; // todo: integrate with doradb_catalog::TableID. diff --git a/doradb-storage/src/table/mvcc.rs b/doradb-storage/src/table/mvcc.rs index ea64733..1fd6a7b 100644 --- a/doradb-storage/src/table/mvcc.rs +++ b/doradb-storage/src/table/mvcc.rs @@ -4,8 +4,7 @@ use crate::buffer::BufferPool; use crate::index::RowLocation; use crate::latch::LatchFallbackMode; use crate::row::ops::{ - DeleteMvccResult, InsertMvccResult, InsertResult, MoveInsertResult, SelectMvccResult, - UpdateCol, UpdateMvccResult, UpdateRow, + DeleteMvcc, InsertMvcc, InsertRow, MoveInsert, SelectMvcc, UpdateCol, UpdateMvcc, UpdateRow, }; use crate::row::{estimate_max_row_count, RowID, RowPage, RowRead}; use crate::stmt::Statement; @@ -13,8 +12,7 @@ use crate::table::{Schema, Table}; use crate::trx::redo::{RedoEntry, RedoKind}; use crate::trx::row::{RowReadAccess, RowWriteAccess}; use crate::trx::undo::{ - NextTrxCTS, NextUndoEntry, NextUndoStatus, PrevUndoEntry, SharedUndoEntry, UndoEntryPtr, - UndoHead, UndoKind, + NextTrxCTS, NextUndoEntry, NextUndoStatus, OwnedUndoEntry, UndoEntryPtr, UndoHead, UndoKind, }; use crate::trx::{trx_is_committed, ActiveTrx}; use crate::value::{Val, PAGE_VAR_LEN_INLINE}; @@ -54,7 +52,7 @@ impl<'a> MvccTable<'a> { stmt: &mut Statement, key: Val, user_read_set: &[usize], - ) -> SelectMvccResult { + ) -> SelectMvcc { debug_assert!(self.schema.idx_type_match(&key)); debug_assert!({ !user_read_set.is_empty() @@ -65,9 +63,9 @@ impl<'a> MvccTable<'a> { }); loop { match self.sec_idx.lookup(&key) { - None => return SelectMvccResult::RowNotFound, + None => return SelectMvcc::RowNotFound, Some(row_id) => match self.blk_idx.find_row_id(self.buf_pool, row_id) { - RowLocation::NotFound => return SelectMvccResult::RowNotFound, + RowLocation::NotFound => return SelectMvcc::RowNotFound, RowLocation::ColSegment(..) => todo!(), RowLocation::RowPage(page_id) => { let page = self.buf_pool.get_page(page_id, LatchFallbackMode::Shared); @@ -93,14 +91,12 @@ impl<'a> MvccTable<'a> { key: Val, row_id: RowID, user_read_set: &[usize], - ) -> SelectMvccResult { + ) -> SelectMvcc { let page = page_guard.page(); - if row_id < page.header.start_row_id - || row_id >= page.header.start_row_id + page.header.max_row_count as u64 - { - return SelectMvccResult::RowNotFound; + if !page.row_id_in_valid_range(row_id) { + return SelectMvcc::RowNotFound; } - let row_idx = (row_id - page.header.start_row_id) as usize; + let row_idx = page.row_idx(row_id); let access = self .lock_row_for_read(&stmt.trx, &page_guard, row_idx) .await; @@ -109,7 +105,7 @@ impl<'a> MvccTable<'a> { /// Insert row with MVCC. #[inline] - pub async fn insert_row(&self, stmt: &mut Statement, cols: Vec) -> InsertMvccResult { + pub async fn insert_row(&self, stmt: &mut Statement, cols: Vec) -> InsertMvcc { debug_assert!(cols.len() + 1 == self.schema.col_count()); debug_assert!({ cols.iter() @@ -117,33 +113,22 @@ impl<'a> MvccTable<'a> { .all(|(idx, val)| self.schema.user_col_type_match(idx, val)) }); let key = cols[self.schema.user_key_idx()].clone(); - match self.insert_row_internal(stmt, cols, UndoKind::Insert, None) { - InsertMvccResult::Ok(row_id) => loop { - if let Some(old_row_id) = self.sec_idx.insert_if_not_exists(key.clone(), row_id) { - // we found there is already one existing row with same key. - // ref-count the INSERT undo entry because we treat it as newer version. - // and store in the MOVE entry's prev pointer. - let new_entry = stmt - .last_undo_entry() - .map(SharedUndoEntry::clone) - .expect("undo entry of insert statement"); - match self - .move_insert(stmt, old_row_id, key.clone(), new_entry) - .await - { - MoveInsertResult::DuplicateKey => return InsertMvccResult::DuplicateKey, - MoveInsertResult::WriteConflict => return InsertMvccResult::WriteConflict, - MoveInsertResult::Ok | MoveInsertResult::None => { - return InsertMvccResult::Ok(row_id) - } - MoveInsertResult::Retry => continue, - } - } else { - return InsertMvccResult::Ok(row_id); - } - }, - // can only happen in code branch of move_insert() - InsertMvccResult::WriteConflict | InsertMvccResult::DuplicateKey => unreachable!(), + // insert row into page with undo log linked. + let (row_id, page_guard) = self.insert_row_internal(stmt, cols, UndoKind::Insert, None); + // update index + if let Some(old_row_id) = self.sec_idx.insert_if_not_exists(key.clone(), row_id) { + // we found there is already one existing row with same key. + // so perform move+insert. + match self + .move_insert(stmt, old_row_id, key.clone(), row_id, page_guard) + .await + { + MoveInsert::DuplicateKey => InsertMvcc::DuplicateKey, + MoveInsert::WriteConflict => InsertMvcc::WriteConflict, + MoveInsert::Ok | MoveInsert::None => InsertMvcc::Ok(row_id), + } + } else { + InsertMvcc::Ok(row_id) } } @@ -154,14 +139,14 @@ impl<'a> MvccTable<'a> { &self, stmt: &mut Statement, key: Val, - mut update: Vec, - ) -> UpdateMvccResult { + update: Vec, + ) -> UpdateMvcc { loop { match self.sec_idx.lookup(&key) { - None => return UpdateMvccResult::RowNotFound, + None => return UpdateMvcc::RowNotFound, Some(row_id) => { match self.blk_idx.find_row_id(self.buf_pool, row_id) { - RowLocation::NotFound => return UpdateMvccResult::RowNotFound, + RowLocation::NotFound => return UpdateMvcc::RowNotFound, RowLocation::ColSegment(..) => todo!(), RowLocation::RowPage(page_id) => { let page = self.buf_pool.get_page(page_id, LatchFallbackMode::Shared); @@ -175,22 +160,19 @@ impl<'a> MvccTable<'a> { .await; return match res { - UpdateMvccResult::Ok(row_id) => UpdateMvccResult::Ok(row_id), - UpdateMvccResult::RowDeleted => UpdateMvccResult::RowDeleted, - UpdateMvccResult::RowNotFound => UpdateMvccResult::RowNotFound, - UpdateMvccResult::WriteConflict => UpdateMvccResult::WriteConflict, - UpdateMvccResult::NoFreeSpace(old_row, update) => { + UpdateRowInplace::Ok(row_id) => UpdateMvcc::Ok(row_id), + UpdateRowInplace::RowDeleted => UpdateMvcc::RowDeleted, + UpdateRowInplace::RowNotFound => UpdateMvcc::RowNotFound, + UpdateRowInplace::WriteConflict => UpdateMvcc::WriteConflict, + UpdateRowInplace::NoFreeSpace( + old_row_id, + old_row, + update, + old_guard, + ) => { // in-place update failed, we transfer update into // move+update. - let move_entry = stmt - .last_undo_entry() - .map(|entry| entry.leak()) - .expect("move entry"); - self.move_update(stmt, old_row, update, move_entry) - } - UpdateMvccResult::Retry(upd) => { - update = upd; - continue; + self.move_update(stmt, old_row, update, old_row_id, old_guard) } }; } @@ -203,12 +185,12 @@ impl<'a> MvccTable<'a> { /// Delete row with MVCC. /// This method is for delete based on index lookup. #[inline] - pub async fn delete_row(&self, stmt: &mut Statement, key: Val) -> DeleteMvccResult { + pub async fn delete_row(&self, stmt: &mut Statement, key: Val) -> DeleteMvcc { loop { match self.sec_idx.lookup(&key) { - None => return DeleteMvccResult::RowNotFound, + None => return DeleteMvcc::RowNotFound, Some(row_id) => match self.blk_idx.find_row_id(self.buf_pool, row_id) { - RowLocation::NotFound => return DeleteMvccResult::RowNotFound, + RowLocation::NotFound => return DeleteMvcc::RowNotFound, RowLocation::ColSegment(..) => todo!(), RowLocation::RowPage(page_id) => { let page = self.buf_pool.get_page(page_id, LatchFallbackMode::Shared); @@ -232,8 +214,9 @@ impl<'a> MvccTable<'a> { stmt: &mut Statement, mut old_row: Vec, update: Vec, - move_entry: UndoEntryPtr, - ) -> UpdateMvccResult { + old_id: RowID, + old_guard: PageSharedGuard<'a, RowPage>, + ) -> UpdateMvcc { // calculate new row and undo entry. let (new_row, undo_kind) = { let mut undo_cols = vec![]; @@ -247,11 +230,10 @@ impl<'a> MvccTable<'a> { } (old_row, UndoKind::Update(undo_cols)) }; - - match self.insert_row_internal(stmt, new_row, undo_kind, Some(move_entry)) { - InsertMvccResult::Ok(row_id) => UpdateMvccResult::Ok(row_id), - InsertMvccResult::WriteConflict | InsertMvccResult::DuplicateKey => unreachable!(), - } + let (row_id, page_guard) = + self.insert_row_internal(stmt, new_row, undo_kind, Some((old_id, old_guard))); + drop(page_guard); // unlock the page + UpdateMvcc::Ok(row_id) } /// Move insert is similar to a delete+insert. @@ -264,11 +246,12 @@ impl<'a> MvccTable<'a> { stmt: &mut Statement, row_id: RowID, key: Val, - new_entry: SharedUndoEntry, - ) -> MoveInsertResult { + new_id: RowID, + new_guard: PageSharedGuard<'_, RowPage>, + ) -> MoveInsert { loop { match self.blk_idx.find_row_id(self.buf_pool, row_id) { - RowLocation::NotFound => return MoveInsertResult::None, + RowLocation::NotFound => return MoveInsert::None, RowLocation::ColSegment(..) => todo!(), RowLocation::RowPage(page_id) => { let page_guard = self @@ -279,39 +262,53 @@ impl<'a> MvccTable<'a> { continue; } let page = page_guard.page(); - if row_id < page.header.start_row_id - || row_id >= page.header.start_row_id + page.header.max_row_count as u64 - { + if !page.row_id_in_valid_range(row_id) { // no old row found - return MoveInsertResult::None; + return MoveInsert::None; } - let row_idx = (row_id - page.header.start_row_id) as usize; + let row_idx = page.row_idx(row_id); let mut lock_row = self .lock_row_for_write(&stmt.trx, &page_guard, row_idx, &key) .await; match &mut lock_row { - LockRowForWrite::InvalidIndex => return MoveInsertResult::None, // key changed so we are fine. - LockRowForWrite::WriteConflict => return MoveInsertResult::WriteConflict, + LockRowForWrite::InvalidIndex => return MoveInsert::None, // key changed so we are fine. + LockRowForWrite::WriteConflict => return MoveInsert::WriteConflict, LockRowForWrite::Ok(access, old_cts) => { let mut access = access.take().unwrap(); if !access.row().is_deleted() { - return MoveInsertResult::DuplicateKey; + return MoveInsert::DuplicateKey; } let old_cts = mem::take(old_cts); - let move_entry = SharedUndoEntry::new( + let mut move_entry = OwnedUndoEntry::new( self.table_id, page_id, row_id, UndoKind::Move(true), ); - access.build_undo_chain(&stmt.trx, &move_entry, old_cts); + access.build_undo_chain(&stmt.trx, &mut move_entry, old_cts); drop(access); // unlock the row. drop(lock_row); drop(page_guard); // unlock the page. - link_move_entry(new_entry, move_entry.leak()); + + // Here we re-lock new row and link new entry to move entry. + // In this way, we can make sure no other thread can access new entry pointer + // so the update of next pointer is safe. + let new_idx = new_guard.page().row_idx(new_id); + let lock_new = self + .lock_row_for_write(&stmt.trx, &new_guard, new_idx, &key) + .await; + let (new_access, _) = lock_new.ok().expect("lock new row for insert"); + debug_assert!(new_access.is_some()); + let mut new_entry = stmt.trx.undo.pop().expect("new entry for insert"); + link_move_entry(&mut new_entry, move_entry.leak()); + + drop(new_access); // unlock new row + drop(new_guard); // unlock new page + stmt.undo.push(move_entry); + stmt.undo.push(new_entry); // no redo required, because no change on row data. - return MoveInsertResult::Ok; + return MoveInsert::Ok; } } } @@ -325,20 +322,20 @@ impl<'a> MvccTable<'a> { stmt: &mut Statement, mut insert: Vec, mut undo_kind: UndoKind, - mut move_entry: Option, - ) -> InsertMvccResult { + mut move_entry: Option<(RowID, PageSharedGuard<'a, RowPage>)>, + ) -> (RowID, PageSharedGuard) { let row_len = row_len(&self.schema, &insert); let row_count = estimate_max_row_count(row_len, self.schema.col_count()); loop { let page_guard = self.get_insert_page(stmt, row_count); let page_id = page_guard.page_id(); match self.insert_row_to_page(stmt, page_guard, insert, undo_kind, move_entry) { - InsertInternalResult::Ok(row_id) => { + InsertRowIntoPage::Ok(row_id, page_guard) => { stmt.save_active_insert_page(self.table_id, page_id, row_id); - return InsertMvccResult::Ok(row_id); + return (row_id, page_guard); } // this page cannot be inserted any more, just leave it and retry another page. - InsertInternalResult::NoSpaceOrRowID(ins, uk, me) => { + InsertRowIntoPage::NoSpaceOrRowID(ins, uk, me) => { insert = ins; undo_kind = uk; move_entry = me; @@ -354,32 +351,53 @@ impl<'a> MvccTable<'a> { fn insert_row_to_page( &self, stmt: &mut Statement, - page_guard: PageSharedGuard<'_, RowPage>, + page_guard: PageSharedGuard<'a, RowPage>, insert: Vec, undo_kind: UndoKind, - move_entry: Option, - ) -> InsertInternalResult { + move_entry: Option<(RowID, PageSharedGuard<'a, RowPage>)>, + ) -> InsertRowIntoPage<'a> { + debug_assert!({ + (matches!(undo_kind, UndoKind::Insert) && move_entry.is_none()) + || (matches!(undo_kind, UndoKind::Update(_)) && move_entry.is_some()) + }); + let page_id = page_guard.page_id(); match page_guard.page().insert(&self.schema, &insert) { - InsertResult::Ok(row_id) => { + InsertRow::Ok(row_id) => { let row_idx = (row_id - page_guard.page().header.start_row_id) as usize; + page_guard.page().row_idx(row_id); let mut access = page_guard.write_row(row_idx); // create undo log. - let new_entry = SharedUndoEntry::new(self.table_id, page_id, row_id, undo_kind); - debug_assert!(access.undo_mut().is_none()); - *access.undo_mut() = Some(UndoHead { - status: stmt.trx.status(), - entry: None, - }); - access.build_undo_chain(&stmt.trx, &new_entry, NextTrxCTS::None); - drop(access); - drop(page_guard); - // in case of move+insert and move+update, we need to link current undo entry to MOVE entry - // in another page. - if let Some(move_entry) = move_entry { - // ref-count this pointer. - link_move_entry(SharedUndoEntry::clone(&new_entry), move_entry); + let mut new_entry = OwnedUndoEntry::new(self.table_id, page_id, row_id, undo_kind); + // The MOVE undo entry is for MOVE+UPDATE. + // Once update in-place fails, we convert the update operation to insert. + // and link them together. + if let Some((old_id, old_guard)) = move_entry { + let old_row_idx = old_guard.page().row_idx(old_id); + // Here we actually lock both new row and old row, + // not very sure if this will cause dead-lock. + // + let access = old_guard.write_row(old_row_idx); + debug_assert!({ + access.undo_head().is_some() + && stmt + .trx + .is_same_trx(&access.undo_head().as_ref().unwrap().status) + }); + + // re-lock moved row and link new entry to it. + let move_entry = access.first_undo_entry().unwrap(); + link_move_entry(&mut new_entry, move_entry); } + + debug_assert!(access.undo_head().is_none()); + access.build_undo_chain(&stmt.trx, &mut new_entry, NextTrxCTS::None); + drop(access); + // Here we do not unlock the page because we need to verify validity of unique index update + // according to this insert. + // There might be scenario that a deleted row shares the same key with this insert. + // Then we have to mark it as MOVE and point insert undo's next version to it. + // So hold the page guard in order to re-lock the insert undo fast. stmt.undo.push(new_entry); // create redo log. // even if the operation is move+update, we still treat it as insert redo log. @@ -392,10 +410,10 @@ impl<'a> MvccTable<'a> { }; // store redo log into transaction redo buffer. stmt.redo.push(redo_entry); - InsertInternalResult::Ok(row_id) + InsertRowIntoPage::Ok(row_id, page_guard) } - InsertResult::NoFreeSpaceOrRowID => { - InsertInternalResult::NoSpaceOrRowID(insert, undo_kind, move_entry) + InsertRow::NoFreeSpaceOrRowID => { + InsertRowIntoPage::NoSpaceOrRowID(insert, undo_kind, move_entry) } } } @@ -404,11 +422,11 @@ impl<'a> MvccTable<'a> { async fn update_row_inplace( &self, stmt: &mut Statement, - page_guard: PageSharedGuard<'_, RowPage>, + page_guard: PageSharedGuard<'a, RowPage>, key: Val, row_id: RowID, mut update: Vec, - ) -> UpdateMvccResult { + ) -> UpdateRowInplace { let page_id = page_guard.page_id(); let page = page_guard.page(); // column indexes must be in range @@ -434,19 +452,19 @@ impl<'a> MvccTable<'a> { if row_id < page.header.start_row_id || row_id >= page.header.start_row_id + page.header.max_row_count as u64 { - return UpdateMvccResult::RowNotFound; + return UpdateRowInplace::RowNotFound; } let row_idx = (row_id - page.header.start_row_id) as usize; let mut lock_row = self .lock_row_for_write(&stmt.trx, &page_guard, row_idx, &key) .await; match &mut lock_row { - LockRowForWrite::InvalidIndex => return UpdateMvccResult::RowNotFound, - LockRowForWrite::WriteConflict => return UpdateMvccResult::WriteConflict, + LockRowForWrite::InvalidIndex => return UpdateRowInplace::RowNotFound, + LockRowForWrite::WriteConflict => return UpdateRowInplace::WriteConflict, LockRowForWrite::Ok(access, old_cts) => { let mut access = access.take().unwrap(); if access.row().is_deleted() { - return UpdateMvccResult::RowDeleted; + return UpdateRowInplace::RowDeleted; } let old_cts = mem::take(old_cts); match access.update_row(&self.schema, &update) { @@ -455,16 +473,18 @@ impl<'a> MvccTable<'a> { // to out-of-place update mode, which will add a MOVE undo entry // to end original row and perform a INSERT into new page, and // link the two versions. - let new_entry = SharedUndoEntry::new( + let mut new_entry = OwnedUndoEntry::new( self.table_id, page_id, row_id, UndoKind::Move(false), ); - access.build_undo_chain(&stmt.trx, &new_entry, old_cts); + access.build_undo_chain(&stmt.trx, &mut new_entry, old_cts); drop(access); // unlock row drop(lock_row); - drop(page_guard); // unlock page + // Here we do not unlock page because we need to perform MOVE+UPDATE + // and link undo entries of two rows. + // The re-lock of current undo is required. stmt.undo.push(new_entry); let redo_entry = RedoEntry { page_id, @@ -473,7 +493,7 @@ impl<'a> MvccTable<'a> { kind: RedoKind::Delete, }; stmt.redo.push(redo_entry); - UpdateMvccResult::NoFreeSpace(old_row, update) + UpdateRowInplace::NoFreeSpace(row_id, old_row, update, page_guard) } UpdateRow::Ok(mut row) => { // perform in-place update. @@ -492,13 +512,13 @@ impl<'a> MvccTable<'a> { row.update_user_col(uc.idx, &uc.val); } } - let new_entry = SharedUndoEntry::new( + let mut new_entry = OwnedUndoEntry::new( self.table_id, page_id, row_id, UndoKind::Update(undo_cols), ); - access.build_undo_chain(&stmt.trx, &new_entry, old_cts); + access.build_undo_chain(&stmt.trx, &mut new_entry, old_cts); drop(access); // unlock the row. drop(lock_row); drop(page_guard); // unlock the page, because we finish page update. @@ -513,7 +533,7 @@ impl<'a> MvccTable<'a> { }; stmt.redo.push(redo_entry); } - UpdateMvccResult::Ok(row_id) + UpdateRowInplace::Ok(row_id) } } } @@ -527,30 +547,30 @@ impl<'a> MvccTable<'a> { page_guard: PageSharedGuard<'_, RowPage>, row_id: RowID, key: &Val, - ) -> DeleteMvccResult { + ) -> DeleteMvcc { let page_id = page_guard.page_id(); let page = page_guard.page(); if row_id < page.header.start_row_id || row_id >= page.header.start_row_id + page.header.max_row_count as u64 { - return DeleteMvccResult::RowNotFound; + return DeleteMvcc::RowNotFound; } let row_idx = (row_id - page.header.start_row_id) as usize; let mut lock_row = self .lock_row_for_write(&stmt.trx, &page_guard, row_idx, key) .await; match &mut lock_row { - LockRowForWrite::InvalidIndex => return DeleteMvccResult::RowNotFound, - LockRowForWrite::WriteConflict => return DeleteMvccResult::WriteConflict, + LockRowForWrite::InvalidIndex => return DeleteMvcc::RowNotFound, + LockRowForWrite::WriteConflict => return DeleteMvcc::WriteConflict, LockRowForWrite::Ok(access, old_cts) => { let mut access = access.take().unwrap(); if access.row().is_deleted() { - return DeleteMvccResult::RowAlreadyDeleted; + return DeleteMvcc::RowAlreadyDeleted; } access.delete_row(); - let new_entry = - SharedUndoEntry::new(self.table_id, page_id, row_id, UndoKind::Delete); - access.build_undo_chain(&stmt.trx, &new_entry, mem::take(old_cts)); + let mut new_entry = + OwnedUndoEntry::new(self.table_id, page_id, row_id, UndoKind::Delete); + access.build_undo_chain(&stmt.trx, &mut new_entry, mem::take(old_cts)); drop(access); // unlock row drop(lock_row); drop(page_guard); // unlock page @@ -562,7 +582,7 @@ impl<'a> MvccTable<'a> { kind: RedoKind::Delete, }; stmt.redo.push(redo_entry); - DeleteMvccResult::Ok + DeleteMvcc::Ok } } } @@ -762,17 +782,13 @@ fn row_len(schema: &Schema, user_cols: &[Val]) -> usize { } #[inline] -fn link_move_entry(new_entry: SharedUndoEntry, move_entry: UndoEntryPtr) { +fn link_move_entry(new_entry: &mut OwnedUndoEntry, move_entry: UndoEntryPtr) { // ref-count this pointer. - let mut new_chain_g = new_entry.as_ref().chain.write(); - debug_assert!(new_chain_g.next.is_none()); - new_chain_g.next = Some(NextUndoEntry { + debug_assert!(new_entry.next.is_none()); + new_entry.next = Some(NextUndoEntry { status: NextUndoStatus::SameAsPrev, - entry: move_entry.clone(), + entry: move_entry, }); - let mut old_chain_g = move_entry.as_ref().chain.write(); - drop(new_chain_g); - old_chain_g.prev = Some(PrevUndoEntry::Entry(new_entry)); } enum LockRowForWrite<'a> { @@ -786,9 +802,36 @@ enum LockRowForWrite<'a> { InvalidIndex, } -enum InsertInternalResult { +impl<'a> LockRowForWrite<'a> { + #[inline] + pub fn ok(self) -> Option<(Option>, NextTrxCTS)> { + match self { + LockRowForWrite::Ok(access, next_cts) => Some((access, next_cts)), + _ => None, + } + } +} + +enum InsertRowIntoPage<'a> { + Ok(RowID, PageSharedGuard<'a, RowPage>), + NoSpaceOrRowID( + Vec, + UndoKind, + Option<(RowID, PageSharedGuard<'a, RowPage>)>, + ), +} + +enum UpdateRowInplace<'a> { Ok(RowID), - NoSpaceOrRowID(Vec, UndoKind, Option), + RowNotFound, + RowDeleted, + WriteConflict, + NoFreeSpace( + RowID, + Vec, + Vec, + PageSharedGuard<'a, RowPage>, + ), } #[cfg(test)] @@ -796,7 +839,7 @@ mod tests { use crate::buffer::FixedBufferPool; use crate::index::BlockIndex; use crate::index::PartitionIntIndex; - use crate::row::ops::{SelectMvccResult, UpdateCol}; + use crate::row::ops::{SelectMvcc, UpdateCol}; use crate::session::Session; use crate::table::{Schema, Table}; use crate::trx::sys::{TransactionSystem, TrxSysConfig}; @@ -835,7 +878,7 @@ mod tests { let key = Val::from(i); let res = table.select_row(&mut stmt, key, &[0, 1]).await; match res { - SelectMvccResult::Ok(vals) => { + SelectMvcc::Ok(vals) => { assert!(vals.len() == 2); assert!(&vals[0] == &Val::from(i)); let s = format!("{}", i); diff --git a/doradb-storage/src/trx/mod.rs b/doradb-storage/src/trx/mod.rs index f27964a..6546147 100644 --- a/doradb-storage/src/trx/mod.rs +++ b/doradb-storage/src/trx/mod.rs @@ -23,7 +23,7 @@ pub mod undo; use crate::session::{InternalSession, IntoSession, Session}; use crate::stmt::Statement; use crate::trx::redo::{RedoBin, RedoEntry, RedoKind, RedoLog}; -use crate::trx::undo::SharedUndoEntry; +use crate::trx::undo::OwnedUndoEntry; use crate::value::Val; use flume::{Receiver, Sender}; use parking_lot::Mutex; @@ -118,7 +118,7 @@ pub struct ActiveTrx { // which log partition it belongs to. pub log_partition_idx: usize, // transaction-level undo logs. - pub(crate) undo: Vec, + pub(crate) undo: Vec, // transaction-level redo logs. pub(crate) redo: Vec, // session of current transaction. @@ -259,7 +259,7 @@ pub struct PreparedTrx { status: Arc, sts: TrxID, redo_bin: Option, - undo: Vec, + undo: Vec, session: Option>, } @@ -316,7 +316,7 @@ pub struct PrecommitTrx { pub sts: TrxID, pub cts: TrxID, pub redo_bin: Option, - pub undo: Vec, + pub undo: Vec, session: Option>, } @@ -371,7 +371,7 @@ impl IntoSession for PrecommitTrx { pub struct CommittedTrx { pub sts: TrxID, pub cts: TrxID, - pub undo: Vec, + pub undo: Vec, session: Option>, } diff --git a/doradb-storage/src/trx/row.rs b/doradb-storage/src/trx/row.rs index bc4e127..9cdd959 100644 --- a/doradb-storage/src/trx/row.rs +++ b/doradb-storage/src/trx/row.rs @@ -1,11 +1,10 @@ use crate::buffer::guard::PageSharedGuard; -use crate::row::ops::{SelectMvccResult, UpdateCol, UpdateRow}; +use crate::row::ops::{SelectMvcc, UpdateCol, UpdateRow}; use crate::row::{Row, RowMut, RowPage, RowRead}; use crate::table::Schema; use crate::trx::undo::UndoKind; use crate::trx::undo::{ - NextTrxCTS, NextUndoEntry, NextUndoStatus, PrevUndoEntry, SharedUndoEntry, UndoHead, - UndoHeadPtr, + NextTrxCTS, NextUndoEntry, NextUndoStatus, OwnedUndoEntry, UndoEntryPtr, UndoHead, }; use crate::trx::{trx_is_committed, ActiveTrx}; use crate::value::Val; @@ -36,20 +35,20 @@ impl RowReadAccess<'_> { schema: &Schema, user_read_set: &[usize], key: &Val, - ) -> SelectMvccResult { + ) -> SelectMvcc { // let mut vals = BTreeMap::new(); match &*self.undo { None => { let row = self.row(); // latest version in row page. if row.is_deleted() { - return SelectMvccResult::RowNotFound; + return SelectMvcc::RowNotFound; } if row.is_key_different(schema, key) { - return SelectMvccResult::InvalidIndex; + return SelectMvcc::InvalidIndex; } let vals = row.clone_vals_for_read_set(schema, user_read_set); - SelectMvccResult::Ok(vals) + SelectMvcc::Ok(vals) } Some(undo_head) => { // At this point, we already wait for preparation of commit is done. @@ -60,13 +59,13 @@ impl RowReadAccess<'_> { let row = self.row(); // we can see this version if row.is_deleted() { - return SelectMvccResult::RowNotFound; + return SelectMvcc::RowNotFound; } if row.is_key_different(schema, key) { - return SelectMvccResult::InvalidIndex; + return SelectMvcc::InvalidIndex; } let vals = row.clone_vals_for_read_set(schema, user_read_set); - return SelectMvccResult::Ok(vals); + return SelectMvcc::Ok(vals); } // otherwise, go to next version } else { let trx_id = trx.trx_id(); @@ -74,20 +73,20 @@ impl RowReadAccess<'_> { let row = self.row(); // self update, see the latest version if row.is_deleted() { - return SelectMvccResult::RowNotFound; + return SelectMvcc::RowNotFound; } if row.is_key_different(schema, key) { - return SelectMvccResult::InvalidIndex; + return SelectMvcc::InvalidIndex; } let vals = row.clone_vals_for_read_set(schema, user_read_set); - return SelectMvccResult::Ok(vals); + return SelectMvcc::Ok(vals); } // otherwise, go to next version } // page data is invisible, we have to backtrace version chain match undo_head.entry.as_ref() { None => { // no next version, so nothing to be return. - return SelectMvccResult::RowNotFound; + return SelectMvcc::RowNotFound; } Some(entry) => { let mut entry = entry.clone(); @@ -123,8 +122,7 @@ impl RowReadAccess<'_> { ver.deleted = *del; // recover moved status } } - let chain_g = entry.as_ref().chain.read(); - match chain_g.next.as_ref() { + match entry.as_ref().next.as_ref() { None => { // No next version, we need to determine whether we should return row // by checking deleted flag. @@ -133,9 +131,8 @@ impl RowReadAccess<'_> { // That means we should should return the row before deletion. // If undo kind is INSERT, and next version does not exist. // That means we should return no row. - drop(chain_g); if ver.deleted { - return SelectMvccResult::RowNotFound; + return SelectMvcc::RowNotFound; } // check if key match return ver.get_visible_vals(schema, self.row()); @@ -144,17 +141,17 @@ impl RowReadAccess<'_> { match next.status { NextUndoStatus::SameAsPrev => { let next_entry = next.entry.clone(); - drop(chain_g); entry = next_entry; // still invisible. } NextUndoStatus::CTS(cts) => { if trx.sts > cts { // current version is visible if ver.deleted { - return SelectMvccResult::RowNotFound; + return SelectMvcc::RowNotFound; } return ver.get_visible_vals(schema, self.row()); } + entry = next.entry.clone(); // still invisible } } } @@ -195,7 +192,7 @@ impl RowVersion { } #[inline] - fn get_visible_vals(mut self, schema: &Schema, row: Row<'_>) -> SelectMvccResult { + fn get_visible_vals(mut self, schema: &Schema, row: Row<'_>) -> SelectMvcc { if self.read_set_contains_key { let key_different = self .undo_vals @@ -203,11 +200,11 @@ impl RowVersion { .map(|v| v == &self.undo_key) .unwrap_or_else(|| row.is_key_different(schema, &self.undo_key)); if key_different { - return SelectMvccResult::InvalidIndex; + return SelectMvcc::InvalidIndex; } } else { if row.is_key_different(schema, &self.undo_key) { - return SelectMvccResult::InvalidIndex; + return SelectMvcc::InvalidIndex; } } let mut vals = Vec::with_capacity(self.read_set.len()); @@ -218,7 +215,7 @@ impl RowVersion { vals.push(row.clone_user_val(schema, *user_col_idx)) } } - SelectMvccResult::Ok(vals) + SelectMvcc::Ok(vals) } } @@ -226,7 +223,6 @@ pub struct RowWriteAccess<'a> { page: &'a RowPage, row_idx: usize, undo: RwLockWriteGuard<'a, Option>, - head_ptr: UndoHeadPtr, } impl<'a> RowWriteAccess<'a> { @@ -267,45 +263,42 @@ impl<'a> RowWriteAccess<'a> { } #[inline] - pub fn undo_mut(&mut self) -> &mut Option { - &mut *self.undo + pub fn undo_head(&self) -> &Option { + &*self.undo + } + + #[inline] + pub fn first_undo_entry(&self) -> Option { + self.undo.as_ref().and_then(|head| head.entry.clone()) } /// Build undo chain. - /// This method locks undo head and new entry + /// This method locks undo head and add new entry #[inline] pub fn build_undo_chain( &mut self, trx: &ActiveTrx, - new_entry: &SharedUndoEntry, + new_entry: &mut OwnedUndoEntry, old_cts: NextTrxCTS, ) { - let head = self.undo.as_mut().expect("lock in undo head"); + let head = self.undo.get_or_insert_with(|| UndoHead { + status: trx.status(), + entry: None, + }); debug_assert!(head.status.ts() == trx.trx_id()); // 1. Update head trx id. head.status = trx.status(); - // 2. Link head and new entry bidirectionally. - let mut new_chain_g = new_entry.as_ref().chain.write(); - debug_assert!(new_chain_g.prev.is_none()); - new_chain_g.prev = Some(PrevUndoEntry::Head(self.head_ptr.clone())); - let new_entry_ptr = new_entry.leak(); - let old_entry_ptr = head.entry.replace(new_entry_ptr); - // 3. Link new and old bidirectionally. - // To do so, we need to lock both new and old. - // The GC thread has to traverse from old to new, and lock both - // to cleanup. - // To avoid dead-lock, we let GC thread follow the lock order, - // first acquire new then old. - if let Some(old_entry_ptr) = old_entry_ptr { - debug_assert!(new_chain_g.next.is_none()); - // link new to old. - new_chain_g.next = Some(NextUndoEntry { + // 2. Link new entry and head, or there might be case that + // new entry has non-null next pointer, and head must not have + // non-null next pointer. + let old_entry = head.entry.replace(new_entry.leak()); + // 3. Link new and old. + if let Some(entry) = old_entry { + debug_assert!(new_entry.next.is_none()); + new_entry.next = Some(NextUndoEntry { status: old_cts.undo_status(), - entry: old_entry_ptr.clone(), + entry, }); - // link old to new. - let mut old_chain_g = old_entry_ptr.as_ref().chain.write(); - old_chain_g.prev = Some(PrevUndoEntry::Entry(SharedUndoEntry::clone(new_entry))); } } } @@ -327,13 +320,11 @@ impl<'a> PageSharedGuard<'a, RowPage> { pub fn write_row(&self, row_idx: usize) -> RowWriteAccess<'_> { let (fh, page) = self.header_and_page(); let undo_map = fh.undo_map.as_ref().unwrap(); - let head_ptr = undo_map.ptr(row_idx); let undo = undo_map.write(row_idx); RowWriteAccess { page, row_idx, undo, - head_ptr, } } } diff --git a/doradb-storage/src/trx/undo.rs b/doradb-storage/src/trx/undo.rs index 145c6e3..caccaef 100644 --- a/doradb-storage/src/trx/undo.rs +++ b/doradb-storage/src/trx/undo.rs @@ -6,7 +6,6 @@ use crate::trx::{SharedTrxStatus, TrxID, GLOBAL_VISIBLE_COMMIT_TS}; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::ops::{Deref, DerefMut}; use std::ptr::NonNull; -use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; pub struct UndoMap { @@ -41,14 +40,6 @@ impl UndoMap { pub fn write(&self, row_idx: usize) -> RwLockWriteGuard<'_, Option> { self.entries[row_idx].write() } - - #[inline] - pub fn ptr(&self, row_idx: usize) -> UndoHeadPtr { - unsafe { - let ptr = &self.entries[row_idx] as *const _ as *mut _; - UndoHeadPtr(NonNull::new_unchecked(ptr)) - } - } } /// UndoKind represents the kind of original operation. @@ -173,29 +164,31 @@ pub enum UndoKind { Update(Vec), } -/// The transaction generates undo log, and page-level undo -/// map will also hold its shared copy to track all visible -/// versions of modified rows. -pub struct SharedUndoEntry(UndoEntryPtr); - -unsafe impl Send for SharedUndoEntry {} +/// Owned undo entry is stored in transaction undo buffer. +/// Page level undo map will also hold pointers to the entries. +/// We do not share ownership between them. +/// Instead, we require the undo buffer owns all entries. +/// Garbage collector will make sure the deletion of entries is +/// safe, because no transaction will access entries that is +/// supposed to be deleted. +pub struct OwnedUndoEntry(Box); -impl Deref for SharedUndoEntry { - type Target = UndoEntryPtr; +impl Deref for OwnedUndoEntry { + type Target = UndoEntry; #[inline] fn deref(&self) -> &Self::Target { - &self.0 + &*self.0 } } -impl DerefMut for SharedUndoEntry { +impl DerefMut for OwnedUndoEntry { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + &mut *self.0 } } -impl SharedUndoEntry { +impl OwnedUndoEntry { #[inline] pub fn new(table_id: TableID, page_id: PageID, row_id: RowID, kind: UndoKind) -> Self { let entry = UndoEntry { @@ -203,32 +196,17 @@ impl SharedUndoEntry { page_id, row_id, kind, - chain: RwLock::new(UndoChain { - prev: None, - next: None, - }), - ref_count: AtomicU32::new(1), + next: None, }; - let ptr = Box::leak(Box::new(entry)); - SharedUndoEntry(UndoEntryPtr(unsafe { NonNull::new_unchecked(ptr) })) + OwnedUndoEntry(Box::new(entry)) } #[inline] - pub fn clone(this: &SharedUndoEntry) -> SharedUndoEntry { - this.as_ref().ref_count.fetch_add(1, Ordering::Relaxed); - SharedUndoEntry(this.leak()) - } -} - -impl Drop for SharedUndoEntry { - #[inline] - fn drop(&mut self) { + pub fn leak(&self) -> UndoEntryPtr { unsafe { - let ref_count = self.as_ref().ref_count.fetch_sub(1, Ordering::Relaxed); - if ref_count == 1 { - // last referenece, call destructor now - drop(Box::from_raw(self.as_mut())); - } + UndoEntryPtr(NonNull::new_unchecked( + self.0.as_ref() as *const _ as *mut UndoEntry + )) } } } @@ -238,28 +216,17 @@ impl Drop for SharedUndoEntry { #[derive(Clone)] pub struct UndoEntryPtr(NonNull); +/// The safety is guaranteed by MVCC design and GC logic. +/// The modification of undo log is always guarded by row lock. +/// And the non-locking consistent read will not access +/// log entries that are deleted(GCed). +unsafe impl Send for UndoEntryPtr {} + impl UndoEntryPtr { #[inline] pub(crate) fn as_ref(&self) -> &UndoEntry { unsafe { self.0.as_ref() } } - - #[inline] - pub(crate) unsafe fn as_mut(&mut self) -> &mut UndoEntry { - self.0.as_mut() - } - - /// Leak a not reference counted pointer of the undo entry. - /// Caller must make sure it will not outlive the entry. - /// - /// This is used in MVCC forward processing. - /// We only count the pointers from old version to new version - /// to enable fast GC processing. - /// The validity of pointer lifetime is guaranteed by GC logic. - #[inline] - pub fn leak(&self) -> Self { - UndoEntryPtr(self.0) - } } pub struct UndoEntry { @@ -272,26 +239,9 @@ pub struct UndoEntry { pub page_id: PageID, pub row_id: RowID, pub kind: UndoKind, - pub chain: RwLock, - ref_count: AtomicU32, -} - -/// UndoChain stores prev pointer, next pointer, next status, and current ref count. -/// Prev chain is used for garbage collection. -/// Next chain is used for visibility check. -pub struct UndoChain { - /// Pointer to the newer version. - /// This pointer is reference counted. - pub prev: Option, - /// Status of older version and pointer to the older version. pub next: Option, } -pub enum PrevUndoEntry { - Head(UndoHeadPtr), - Entry(SharedUndoEntry), -} - pub struct NextUndoEntry { pub status: NextUndoStatus, pub entry: UndoEntryPtr, @@ -314,20 +264,6 @@ pub struct UndoHead { pub entry: Option, } -/// A not reference counted pointer to undo head. -/// Because all undo heads are stored in continuous memory -/// area allocated in heap and no re-allocation is allowed. -/// So we can make sure pointer is always valid. -#[derive(Clone)] -pub struct UndoHeadPtr(NonNull>>); - -impl UndoHeadPtr { - #[inline] - pub fn as_ref(&self) -> &RwLock> { - unsafe { self.0.as_ref() } - } -} - #[derive(Default, Clone, Copy)] pub enum NextTrxCTS { #[default]