From 1220556edd695a67011e39221f24e9ef2afb98bc Mon Sep 17 00:00:00 2001 From: jiangzhe Date: Tue, 21 Jan 2025 06:00:13 +0000 Subject: [PATCH] Add simple catalog and refine statment-level rollback --- doradb-storage/src/catalog.rs | 59 + doradb-storage/src/index/block_index.rs | 8 +- doradb-storage/src/index/secondary_index.rs | 22 +- doradb-storage/src/lib.rs | 1 + doradb-storage/src/row/mod.rs | 90 +- doradb-storage/src/row/ops.rs | 77 +- doradb-storage/src/stmt.rs | 51 +- doradb-storage/src/table/mod.rs | 1125 +++++++++++++++-- doradb-storage/src/table/mvcc.rs | 1011 --------------- doradb-storage/src/table/schema.rs | 102 ++ doradb-storage/src/table/tests.rs | 340 +++++ doradb-storage/src/trx/mod.rs | 66 +- doradb-storage/src/trx/row.rs | 177 ++- doradb-storage/src/trx/sys.rs | 2 +- doradb-storage/src/trx/undo/index.rs | 19 + doradb-storage/src/trx/undo/mod.rs | 5 + .../src/trx/{undo.rs => undo/row.rs} | 85 +- 17 files changed, 2012 insertions(+), 1228 deletions(-) create mode 100644 doradb-storage/src/catalog.rs delete mode 100644 doradb-storage/src/table/mvcc.rs create mode 100644 doradb-storage/src/table/schema.rs create mode 100644 doradb-storage/src/table/tests.rs create mode 100644 doradb-storage/src/trx/undo/index.rs create mode 100644 doradb-storage/src/trx/undo/mod.rs rename doradb-storage/src/trx/{undo.rs => undo/row.rs} (83%) diff --git a/doradb-storage/src/catalog.rs b/doradb-storage/src/catalog.rs new file mode 100644 index 0000000..2e9c4ed --- /dev/null +++ b/doradb-storage/src/catalog.rs @@ -0,0 +1,59 @@ +use crate::buffer::BufferPool; +use crate::index::{BlockIndex, PartitionIntIndex, SingleKeyIndex}; +use crate::table::{Schema, Table, TableID}; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +/// Catalog contains metadata of user tables. +/// Initial implementation would be a in-mem hash-table. +pub struct Catalog

{ + table_id: AtomicU64, + tables: Mutex>>, +} + +impl Catalog

{ + #[inline] + pub fn empty() -> Self { + Catalog { + table_id: AtomicU64::new(1), + tables: Mutex::new(HashMap::new()), + } + } + + #[inline] + pub fn create_table(&self, buf_pool: &P, schema: Schema) -> TableID { + let table_id = self.table_id.fetch_add(1, Ordering::SeqCst); + let blk_idx = BlockIndex::new(buf_pool).unwrap(); + let sec_idx = PartitionIntIndex::empty(); + let mut g = self.tables.lock(); + let res = g.insert( + table_id, + TableMeta { + schema: Arc::new(schema), + blk_idx: Arc::new(blk_idx), + sec_idx: Arc::new(sec_idx), + }, + ); + debug_assert!(res.is_none()); + table_id + } + + #[inline] + pub fn get_table(&self, table_id: TableID) -> Option> { + let g = self.tables.lock(); + g.get(&table_id).map(|meta| Table { + table_id, + schema: Arc::clone(&meta.schema), + blk_idx: Arc::clone(&meta.blk_idx), + sec_idx: Arc::clone(&meta.sec_idx), + }) + } +} + +pub struct TableMeta

{ + pub schema: Arc, + pub blk_idx: Arc>, + pub sec_idx: Arc, +} diff --git a/doradb-storage/src/index/block_index.rs b/doradb-storage/src/index/block_index.rs index 116cb68..0e4ffe1 100644 --- a/doradb-storage/src/index/block_index.rs +++ b/doradb-storage/src/index/block_index.rs @@ -912,7 +912,7 @@ mod tests { fn test_block_index_free_list() { let buf_pool = FixedBufferPool::with_capacity_static(64 * 1024 * 1024).unwrap(); { - let schema = Schema::new(vec![Layout::Byte8], 0); + let schema = Schema::new(vec![Layout::Byte4], 0); let blk_idx = BlockIndex::new(buf_pool).unwrap(); let p1 = blk_idx.get_insert_page(buf_pool, 100, &schema); let pid1 = p1.page_id(); @@ -932,7 +932,7 @@ mod tests { fn test_block_index_insert_row_page() { let buf_pool = FixedBufferPool::with_capacity_static(64 * 1024 * 1024).unwrap(); { - let schema = Schema::new(vec![Layout::Byte8], 0); + let schema = Schema::new(vec![Layout::Byte4], 0); let blk_idx = BlockIndex::new(buf_pool).unwrap(); let p1 = blk_idx.get_insert_page(buf_pool, 100, &schema); let pid1 = p1.page_id(); @@ -954,7 +954,7 @@ mod tests { // allocate 1GB buffer pool is enough: 10240 pages ~= 640MB let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024 * 1024).unwrap(); { - let schema = Schema::new(vec![Layout::Byte8], 0); + let schema = Schema::new(vec![Layout::Byte4], 0); let blk_idx = BlockIndex::new(buf_pool).unwrap(); for _ in 0..row_pages { let _ = blk_idx.get_insert_page(buf_pool, 100, &schema); @@ -999,7 +999,7 @@ mod tests { let rows_per_page = 100usize; let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024 * 1024).unwrap(); { - let schema = Schema::new(vec![Layout::Byte8], 0); + let schema = Schema::new(vec![Layout::Byte4], 0); let blk_idx = BlockIndex::new(buf_pool).unwrap(); for _ in 0..row_pages { let _ = blk_idx.get_insert_page(buf_pool, rows_per_page, &schema); diff --git a/doradb-storage/src/index/secondary_index.rs b/doradb-storage/src/index/secondary_index.rs index 74e6448..c541a10 100644 --- a/doradb-storage/src/index/secondary_index.rs +++ b/doradb-storage/src/index/secondary_index.rs @@ -20,6 +20,8 @@ pub trait SingleKeyIndex { fn delete(&self, key: &Val) -> Option; + fn compare_exchange(&self, key: &Val, old_row_id: RowID, new_row_id: RowID) -> bool; + // todo: scan } @@ -92,9 +94,27 @@ impl SingleKeyIndex for PartitionIntIndex { #[inline] fn delete(&self, key: &Val) -> Option { - let key = self.key_to_int(&key); + let key = self.key_to_int(key); let tree = self.select(key); let mut g = tree.write(); g.remove(&key) } + + #[inline] + fn compare_exchange(&self, key: &Val, old_row_id: RowID, new_row_id: RowID) -> bool { + let key = self.key_to_int(key); + let tree = self.select(key); + let mut g = tree.write(); + match g.get_mut(&key) { + Some(row_id) => { + if *row_id == old_row_id { + *row_id = new_row_id; + true + } else { + false + } + } + None => false, + } + } } diff --git a/doradb-storage/src/lib.rs b/doradb-storage/src/lib.rs index 4840f4a..fd823b9 100644 --- a/doradb-storage/src/lib.rs +++ b/doradb-storage/src/lib.rs @@ -3,6 +3,7 @@ pub mod col; pub mod io; #[macro_use] pub mod error; +pub mod catalog; pub mod index; pub mod latch; pub mod row; diff --git a/doradb-storage/src/row/mod.rs b/doradb-storage/src/row/mod.rs index dc0c03e..97c7240 100644 --- a/doradb-storage/src/row/mod.rs +++ b/doradb-storage/src/row/mod.rs @@ -239,11 +239,11 @@ impl RowPage { #[inline] pub fn delete(&self, row_id: RowID) -> Delete { if !self.row_id_in_valid_range(row_id) { - return Delete::RowNotFound; + return Delete::NotFound; } let row_idx = self.row_idx(row_id); if self.is_deleted(row_idx) { - return Delete::RowAlreadyDeleted; + return Delete::AlreadyDeleted; } self.set_deleted(row_idx, true); Delete::Ok @@ -273,11 +273,11 @@ impl RowPage { "update columns should be in order" ); if !self.row_id_in_valid_range(row_id) { - return Update::RowNotFound; + return Update::NotFound; } let row_idx = self.row_idx(row_id); if self.row(row_idx).is_deleted() { - return Update::RowDeleted; + return Update::Deleted; } let var_len = self.var_len_for_update(row_idx, user_cols); let var_offset = if let Some(var_offset) = self.request_free_space(var_len) { @@ -299,7 +299,7 @@ impl RowPage { #[inline] pub fn select(&self, row_id: RowID) -> Select { if !self.row_id_in_valid_range(row_id) { - return Select::RowNotFound; + return Select::NotFound; } let row_idx = self.row_idx(row_id); let row = self.row(row_idx); @@ -423,7 +423,7 @@ impl RowPage { } #[inline] - fn update_val(&self, row_idx: usize, col_idx: usize, val: &V) { + pub(crate) fn update_val(&self, row_idx: usize, col_idx: usize, val: &V) { unsafe { let val = val.to_val(); let offset = self.val_offset(row_idx, col_idx, mem::size_of::()); @@ -433,13 +433,13 @@ impl RowPage { } #[inline] - fn update_var(&self, row_idx: usize, col_idx: usize, var: PageVar) { + pub(crate) fn update_var(&self, row_idx: usize, col_idx: usize, var: PageVar) { debug_assert!(mem::size_of::() == mem::size_of::()); self.update_val::(row_idx, col_idx, unsafe { mem::transmute(&var) }); } #[inline] - fn add_var(&self, input: &[u8], var_offset: usize) -> (PageVar, usize) { + pub(crate) fn add_var(&self, input: &[u8], var_offset: usize) -> (PageVar, usize) { let len = input.len(); if len <= PAGE_VAR_LEN_INLINE { return (PageVar::inline(input), var_offset); @@ -498,7 +498,7 @@ impl RowPage { /// Mark given row as deleted. #[inline] - pub fn set_deleted(&self, row_idx: usize, deleted: bool) { + pub(crate) fn set_deleted(&self, row_idx: usize, deleted: bool) { unsafe { let offset = self.header.del_bit_offset(row_idx); let ptr = self.data_ptr().add(offset); @@ -544,7 +544,7 @@ impl RowPage { } #[inline] - fn set_null(&self, row_idx: usize, col_idx: usize, null: bool) { + pub(crate) fn set_null(&self, row_idx: usize, col_idx: usize, null: bool) { unsafe { let offset = self.header.null_bit_offset(row_idx, col_idx); let ptr = self.data_ptr().add(offset); @@ -828,6 +828,16 @@ pub trait RowRead { self.clone_val(schema, user_col_idx + 1) } + /// Clone single value and its var-len offset with given column index. + #[inline] + fn clone_user_val_with_var_offset( + &self, + schema: &Schema, + user_col_idx: usize, + ) -> (Val, Option) { + self.clone_val_with_var_offset(schema, user_col_idx + 1) + } + /// Clone single value with given column index. /// NOTE: input column index includes RowID. #[inline] @@ -859,6 +869,38 @@ pub trait RowRead { } } + #[inline] + fn clone_val_with_var_offset(&self, schema: &Schema, col_idx: usize) -> (Val, Option) { + if self.is_null(col_idx) { + return (Val::Null, None); + } + match schema.layout(col_idx) { + Layout::Byte1 => { + let v = self.val::(col_idx); + (Val::from(*v), None) + } + Layout::Byte2 => { + let v = self.val::(col_idx); + (Val::from(*v), None) + } + Layout::Byte4 => { + let v = self.val::(col_idx); + (Val::from(*v), None) + } + Layout::Byte8 => { + let v = self.val::(col_idx); + (Val::from(*v), None) + } + Layout::VarByte => { + // let v = self.var(col_idx); + let pv = unsafe { self.page().var_unchecked(self.row_idx(), col_idx) }; + let v = pv.as_bytes(self.page().data_ptr()); + let offset = pv.offset().map(|os| os as u16); + (Val::VarByte(MemVar::new(v)), offset) + } + } + } + /// Clone all values. #[inline] fn clone_vals(&self, schema: &Schema, include_row_id: bool) -> Vec { @@ -870,6 +912,21 @@ pub trait RowRead { vals } + /// Clone all values with var-len offset. + #[inline] + fn clone_vals_with_var_offsets( + &self, + schema: &Schema, + include_row_id: bool, + ) -> Vec<(Val, Option)> { + let skip = if include_row_id { 0 } else { 1 }; + let mut vals = Vec::with_capacity(schema.col_count() - skip); + for (col_idx, _) in schema.cols().iter().enumerate().skip(skip) { + vals.push(self.clone_val_with_var_offset(schema, col_idx)); + } + vals + } + /// Clone values for given read set. (row id is excluded) #[inline] fn clone_vals_for_read_set(&self, schema: &Schema, user_read_set: &[usize]) -> Vec { @@ -931,11 +988,16 @@ pub trait RowRead { /// Returns the old value if different from given index and new value. #[inline] - fn user_different(&self, schema: &Schema, user_col_idx: usize, value: &Val) -> Option { + fn user_different( + &self, + schema: &Schema, + user_col_idx: usize, + value: &Val, + ) -> Option<(Val, Option)> { if !self.is_user_different(schema, user_col_idx, value) { return None; } - Some(self.clone_user_val(schema, user_col_idx)) + Some(self.clone_user_val_with_var_offset(schema, user_col_idx)) } } @@ -1149,7 +1211,7 @@ mod tests { #[test] fn test_row_page_init() { - let schema = Schema::new(vec![Layout::Byte8], 0); + let schema = Schema::new(vec![Layout::Byte4], 0); let mut page = create_row_page(); page.init(100, 105, &schema); println!("page header={:?}", page.header); @@ -1166,7 +1228,7 @@ mod tests { #[test] fn test_row_page_new_row() { - let schema = Schema::new(vec![Layout::Byte8], 0); + let schema = Schema::new(vec![Layout::Byte4], 0); let mut page = create_row_page(); page.init(100, 200, &schema); assert!(page.header.row_count() == 0); diff --git a/doradb-storage/src/row/ops.rs b/doradb-storage/src/row/ops.rs index 806703f..bdaaf58 100644 --- a/doradb-storage/src/row/ops.rs +++ b/doradb-storage/src/row/ops.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; pub enum Select<'a> { Ok(Row<'a>), RowDeleted(Row<'a>), - RowNotFound, + NotFound, } impl Select<'_> { @@ -18,8 +18,7 @@ impl Select<'_> { pub enum SelectMvcc { Ok(Vec), - RowNotFound, - InvalidIndex, + NotFound, } impl SelectMvcc { @@ -27,6 +26,25 @@ impl SelectMvcc { pub fn is_ok(&self) -> bool { matches!(self, SelectMvcc::Ok(_)) } + + #[inline] + pub fn not_found(&self) -> bool { + matches!(self, SelectMvcc::NotFound) + } + + #[inline] + pub fn unwrap(self) -> Vec { + match self { + SelectMvcc::Ok(vals) => vals, + SelectMvcc::NotFound => panic!("empty select result"), + } + } +} + +pub enum ReadRow { + Ok(Vec), + NotFound, + InvalidIndex, } pub enum InsertRow { @@ -57,6 +75,14 @@ impl InsertMvcc { pub fn is_ok(&self) -> bool { matches!(self, InsertMvcc::Ok(_)) } + + #[inline] + pub fn unwrap(self) -> RowID { + match self { + InsertMvcc::Ok(row_id) => row_id, + _ => panic!("insert not ok"), + } + } } pub enum MoveInsert { @@ -69,8 +95,8 @@ pub enum MoveInsert { pub enum Update { // RowID may change if the update is out-of-place. Ok(RowID), - RowNotFound, - RowDeleted, + NotFound, + Deleted, // if space is not enough, we perform a logical deletion+insert to // achieve the update sematics. The returned values are user columns // of original row. @@ -87,11 +113,9 @@ impl Update { pub enum UpdateMvcc { Ok(RowID), - RowNotFound, - RowDeleted, - NoFreeSpace(Vec, Vec), // with user columns of original row returned for out-of-place update + NotFound, WriteConflict, - Retry(Vec), + DuplicateKey, } impl UpdateMvcc { @@ -102,27 +126,47 @@ impl UpdateMvcc { } } +pub enum UpdateIndex { + Ok, + WriteConflict, + DuplicateKey, +} + +pub enum InsertIndex { + Ok, + WriteConflict, + DuplicateKey, +} + #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct UpdateCol { pub idx: usize, pub val: Val, } +pub struct UndoCol { + pub idx: usize, + pub val: Val, + // If value is var-len field and not inlined, + // we need to record its original offset in page + // to support rollback without new allocation. + pub var_offset: Option, +} + pub enum UpdateRow<'a> { Ok(RowMut<'a>), - NoFreeSpace(Vec), + NoFreeSpace(Vec<(Val, Option)>), } pub enum Delete { Ok, - RowNotFound, - RowAlreadyDeleted, + NotFound, + AlreadyDeleted, } pub enum DeleteMvcc { Ok, - RowNotFound, - RowAlreadyDeleted, + NotFound, WriteConflict, } @@ -131,4 +175,9 @@ impl DeleteMvcc { pub fn is_ok(&self) -> bool { matches!(self, DeleteMvcc::Ok) } + + #[inline] + pub fn not_found(&self) -> bool { + matches!(self, DeleteMvcc::NotFound) + } } diff --git a/doradb-storage/src/stmt.rs b/doradb-storage/src/stmt.rs index 98852df..d0d70ff 100644 --- a/doradb-storage/src/stmt.rs +++ b/doradb-storage/src/stmt.rs @@ -1,15 +1,20 @@ +use crate::buffer::guard::PageGuard; use crate::buffer::page::PageID; use crate::buffer::BufferPool; -use crate::row::RowID; +use crate::catalog::Catalog; +use crate::latch::LatchFallbackMode; +use crate::row::{RowID, RowPage}; use crate::table::TableID; use crate::trx::redo::RedoEntry; -use crate::trx::undo::OwnedUndoEntry; +use crate::trx::undo::{IndexUndo, IndexUndoKind, OwnedRowUndo}; use crate::trx::ActiveTrx; pub struct Statement { pub trx: ActiveTrx, - // statement-level undo logs. - pub undo: Vec, + // statement-level undo logs of row data. + pub row_undo: Vec, + // statement-level index undo operations. + pub index_undo: Vec, // statement-level redo logs. pub redo: Vec, } @@ -19,21 +24,51 @@ impl Statement { pub fn new(trx: ActiveTrx) -> Self { Statement { trx, - undo: vec![], + row_undo: vec![], + index_undo: vec![], redo: vec![], } } +} +impl Statement { #[inline] pub fn commit(mut self) -> ActiveTrx { - self.trx.undo.extend(self.undo.drain(..)); + self.trx.row_undo.extend(self.row_undo.drain(..)); + self.trx.index_undo.extend(self.index_undo.drain(..)); self.trx.redo.extend(self.redo.drain(..)); self.trx } + /// Statement level rollback. #[inline] - pub fn rollback(self, buf_pool: &P) -> ActiveTrx { - todo!(); + pub fn rollback(mut self, buf_pool: &P, catalog: &Catalog

) -> ActiveTrx { + // rollback row data. + // todo: group by page level may be better. + while let Some(entry) = self.row_undo.pop() { + let page_guard: PageGuard<'_, RowPage> = + buf_pool.get_page(entry.page_id, LatchFallbackMode::Shared); + let page_guard = page_guard.block_until_shared(); + let row_idx = page_guard.page().row_idx(entry.row_id); + let mut access = page_guard.write_row(row_idx); + access.rollback_first_undo(entry); + } + // rollback index data. + while let Some(entry) = self.index_undo.pop() { + let table = catalog.get_table(entry.table_id).unwrap(); + match entry.kind { + IndexUndoKind::Insert(key, row_id) => { + let res = table.sec_idx.delete(&key); + assert!(res.unwrap() == row_id); + } + IndexUndoKind::Update(key, old_id, new_id) => { + assert!(table.sec_idx.compare_exchange(&key, new_id, old_id)); + } + } + } + // clear redo logs. + self.redo.clear(); + self.trx } #[inline] diff --git a/doradb-storage/src/table/mod.rs b/doradb-storage/src/table/mod.rs index 6957551..a93a9b0 100644 --- a/doradb-storage/src/table/mod.rs +++ b/doradb-storage/src/table/mod.rs @@ -1,126 +1,1103 @@ -pub mod mvcc; +pub mod schema; +#[cfg(test)] +mod tests; -use crate::buffer::FixedBufferPool; -use crate::index::{BlockIndex, SingleKeyIndex}; -use crate::table::mvcc::MvccTable; -use crate::value::{Layout, Val}; +use crate::buffer::guard::PageSharedGuard; +use crate::buffer::page::PageID; +use crate::buffer::BufferPool; +use crate::index::{BlockIndex, RowLocation, SingleKeyIndex}; +use crate::latch::LatchFallbackMode; +use crate::row::ops::{ + DeleteMvcc, InsertIndex, InsertMvcc, InsertRow, MoveInsert, ReadRow, SelectMvcc, UndoCol, + UpdateCol, UpdateIndex, UpdateMvcc, UpdateRow, +}; +use crate::row::{estimate_max_row_count, RowID, RowPage, RowRead}; +use crate::stmt::Statement; +use crate::trx::redo::{RedoEntry, RedoKind}; +use crate::trx::row::{RowLatestStatus, RowReadAccess, RowWriteAccess}; +use crate::trx::undo::{ + IndexUndo, IndexUndoKind, NextRowUndo, NextRowUndoStatus, NextTrxCTS, OwnedRowUndo, + RowUndoHead, RowUndoKind, RowUndoRef, +}; +use crate::trx::{trx_is_committed, ActiveTrx}; +use crate::value::{Val, PAGE_VAR_LEN_INLINE}; +use std::mem; use std::sync::Arc; +pub use schema::Schema; + // todo: integrate with doradb_catalog::TableID. pub type TableID = u64; -pub struct Table<'a> { +/// Table is a logical data set of rows. +/// It combines components such as row page, undo map, block index, secondary +/// index to provide full-featured CRUD and MVCC functionalities. +/// +/// The basic flow is: +/// +/// secondary index -> block index -> buffer pool -> row page -> undo map. +/// +/// 1. secondary index stores mapping from key to row id. +/// +/// 2. block index stores mapping from row id to page. +/// +/// 3. Buffer pool take care of creating and fetching pages. +/// +/// 4. Row page stores latest version fo row data. +/// +/// 5. Undo map stores old versions of row data. +/// +/// We have a separate undo array associated to each row in row page. +/// The undo head also acts as the (logical) row lock, so that transactions +/// can abort/wait if write conflict is found on acquire lock of undo head. +/// +/// Insert/update/delete operation will add one or more undo entry to the +/// chain linked to undo head. +/// +/// Select operation will traverse undo chain to find visible version. +/// +/// Additional key validation is performed if index lookup is used, because +/// index does not contain version information, and out-of-date index entry +/// should ignored if visible data version does not match index key. +pub struct Table

{ pub table_id: TableID, - pub buf_pool: &'a FixedBufferPool, - pub schema: Schema, - pub blk_idx: BlockIndex, + pub schema: Arc, + pub blk_idx: Arc>, // todo: secondary indexes. pub sec_idx: Arc, } -impl<'a> Table<'a> { +impl Table

{ + /// Select row with MVCC. #[inline] - pub fn mvcc(&'a self) -> MvccTable<'a> { - MvccTable(self) + pub async fn select_row( + &self, + buf_pool: &P, + stmt: &Statement, + key: Val, + user_read_set: &[usize], + ) -> SelectMvcc { + debug_assert!(self.schema.idx_type_match(&key)); + debug_assert!({ + !user_read_set.is_empty() + && user_read_set + .iter() + .zip(user_read_set.iter().skip(1)) + .all(|(l, r)| l < r) + }); + loop { + match self.sec_idx.lookup(&key) { + None => return SelectMvcc::NotFound, + Some(row_id) => match self.blk_idx.find_row_id(buf_pool, row_id) { + RowLocation::NotFound => return SelectMvcc::NotFound, + RowLocation::ColSegment(..) => todo!(), + RowLocation::RowPage(page_id) => { + let page = buf_pool.get_page(page_id, LatchFallbackMode::Shared); + let page_guard = page.block_until_shared(); + if !validate_page_id(&page_guard, page_id) { + continue; + } + return self + .select_row_in_page(stmt, page_guard, &key, row_id, user_read_set) + .await; + } + }, + } + } } -} -pub struct Schema { - cols: Vec, - // fix length is the total inline length of all columns. - pub fix_len: usize, - // index of var-length columns. - pub var_cols: Vec, - // index column id. - key_idx: usize, -} + #[inline] + async fn select_row_in_page( + &self, + stmt: &Statement, + page_guard: PageSharedGuard<'_, RowPage>, + key: &Val, + row_id: RowID, + user_read_set: &[usize], + ) -> SelectMvcc { + let page = page_guard.page(); + if !page.row_id_in_valid_range(row_id) { + return SelectMvcc::NotFound; + } + let row_idx = page.row_idx(row_id); + let access = self + .lock_row_for_read(&stmt.trx, &page_guard, row_idx) + .await; + match access.read_row_mvcc(&stmt.trx, &self.schema, user_read_set, &key) { + ReadRow::Ok(vals) => SelectMvcc::Ok(vals), + ReadRow::InvalidIndex | ReadRow::NotFound => SelectMvcc::NotFound, + } + } + + /// Insert row with MVCC. + /// This method will also take care of index update. + #[inline] + pub async fn insert_row( + &self, + buf_pool: &P, + stmt: &mut Statement, + cols: Vec, + ) -> InsertMvcc { + debug_assert!(cols.len() + 1 == self.schema.col_count()); + debug_assert!({ + cols.iter() + .enumerate() + .all(|(idx, val)| self.schema.user_col_type_match(idx, val)) + }); + let key = cols[self.schema.user_key_idx()].clone(); + // insert row into page with undo log linked. + let (row_id, page_guard) = + self.insert_row_internal(buf_pool, stmt, cols, RowUndoKind::Insert, None); + // insert index + match self + .insert_index(buf_pool, stmt, key, row_id, page_guard) + .await + { + InsertIndex::Ok => InsertMvcc::Ok(row_id), + InsertIndex::DuplicateKey => InsertMvcc::DuplicateKey, + InsertIndex::WriteConflict => InsertMvcc::WriteConflict, + } + } -impl Schema { - /// Create a new schema. - /// RowID is not included in input, but will be created - /// automatically. + /// Update row with MVCC. + /// This method is for update based on index lookup. + /// It also takes care of index update. #[inline] - pub fn new(user_cols: Vec, user_key_idx: usize) -> Self { - debug_assert!(!user_cols.is_empty()); - debug_assert!(user_key_idx < user_cols.len()); - let mut cols = Vec::with_capacity(user_cols.len() + 1); - cols.push(Layout::Byte8); - cols.extend(user_cols); - let mut fix_len = 0; - let mut var_cols = vec![]; - for (idx, layout) in cols.iter().enumerate() { - fix_len += layout.inline_len(); - if !layout.is_fixed() { - var_cols.push(idx); + pub async fn update_row( + &self, + buf_pool: &P, + stmt: &mut Statement, + key: Val, + update: Vec, + ) -> UpdateMvcc { + let key_change = self.key_change(&key, &update); + loop { + match self.sec_idx.lookup(&key) { + None => return UpdateMvcc::NotFound, + Some(row_id) => { + match self.blk_idx.find_row_id(buf_pool, row_id) { + RowLocation::NotFound => return UpdateMvcc::NotFound, + RowLocation::ColSegment(..) => todo!(), + RowLocation::RowPage(page_id) => { + let page = buf_pool.get_page(page_id, LatchFallbackMode::Shared); + let page_guard = page.block_until_shared(); + if !validate_page_id(&page_guard, page_id) { + continue; + } + let res = self + .update_row_inplace(stmt, page_guard, &key, row_id, update) + .await; + match res { + UpdateRowInplace::Ok(new_row_id) => { + debug_assert!(row_id == new_row_id); + return match self.update_index( + stmt, buf_pool, key, key_change, row_id, new_row_id, + ) { + UpdateIndex::Ok => UpdateMvcc::Ok(new_row_id), + UpdateIndex::DuplicateKey => UpdateMvcc::DuplicateKey, + UpdateIndex::WriteConflict => UpdateMvcc::WriteConflict, + }; + } + UpdateRowInplace::RowDeleted | UpdateRowInplace::RowNotFound => { + return UpdateMvcc::NotFound + } + UpdateRowInplace::WriteConflict => { + return UpdateMvcc::WriteConflict + } + UpdateRowInplace::NoFreeSpace( + old_row_id, + old_row, + update, + old_guard, + ) => { + // in-place update failed, we transfer update into + // move+update. + let new_row_id = self.move_update( + buf_pool, stmt, old_row, update, old_row_id, old_guard, + ); + return match self.update_index( + stmt, buf_pool, key, key_change, old_row_id, new_row_id, + ) { + UpdateIndex::Ok => UpdateMvcc::Ok(new_row_id), + UpdateIndex::DuplicateKey => UpdateMvcc::DuplicateKey, + UpdateIndex::WriteConflict => UpdateMvcc::WriteConflict, + }; + } + }; + } + } + } } } - Schema { - cols, - fix_len, - var_cols, - key_idx: user_key_idx + 1, + } + + /// Delete row with MVCC. + /// This method is for delete based on index lookup. + #[inline] + pub async fn delete_row(&self, buf_pool: &P, stmt: &mut Statement, key: Val) -> DeleteMvcc { + loop { + match self.sec_idx.lookup(&key) { + None => return DeleteMvcc::NotFound, + Some(row_id) => match self.blk_idx.find_row_id(buf_pool, row_id) { + RowLocation::NotFound => return DeleteMvcc::NotFound, + RowLocation::ColSegment(..) => todo!(), + RowLocation::RowPage(page_id) => { + let page = buf_pool.get_page(page_id, LatchFallbackMode::Shared); + let page_guard = page.block_until_shared(); + if !validate_page_id(&page_guard, page_id) { + continue; + } + return self + .delete_row_internal(stmt, page_guard, row_id, &key) + .await; + } + }, + } } } - /// Returns column count of this schema, including row id. + // Move update is similar to a delete+insert. #[inline] - pub fn col_count(&self) -> usize { - self.cols.len() + fn move_update<'a>( + &self, + buf_pool: &'a P, + stmt: &mut Statement, + old_row: Vec<(Val, Option)>, + update: Vec, + old_id: RowID, + old_guard: PageSharedGuard<'a, RowPage>, + ) -> RowID { + // calculate new row and undo entry. + let (new_row, undo_kind) = { + let mut row = Vec::with_capacity(old_row.len()); + let mut var_offsets = Vec::with_capacity(old_row.len()); + for (v, var_offset) in old_row { + row.push(v); + var_offsets.push(var_offset); + } + let mut undo_cols = vec![]; + for mut uc in update { + let old_val = &mut row[uc.idx]; + if old_val != &uc.val { + // swap old value and new value, then put into undo columns + mem::swap(&mut uc.val, old_val); + undo_cols.push(UndoCol { + idx: uc.idx, + val: uc.val, + var_offset: var_offsets[uc.idx], + }); + } + } + (row, RowUndoKind::Update(undo_cols)) + }; + let (row_id, page_guard) = self.insert_row_internal( + buf_pool, + stmt, + new_row, + undo_kind, + Some((old_id, old_guard)), + ); + drop(page_guard); // unlock the page + row_id } - /// Returns layouts of all columns, including row id. + /// Move insert is similar to a delete+insert. + /// But it triggered by duplicate key finding when updating index. + /// The insert is already done and we additionally add a move entry to the + /// already deleted version. #[inline] - pub fn cols(&self) -> &[Layout] { - &self.cols + async fn move_insert( + &self, + buf_pool: &P, + stmt: &mut Statement, + row_id: RowID, + key: &Val, + new_id: RowID, + new_guard: PageSharedGuard<'_, RowPage>, + ) -> MoveInsert { + loop { + match self.blk_idx.find_row_id(buf_pool, row_id) { + RowLocation::NotFound => return MoveInsert::None, + RowLocation::ColSegment(..) => todo!(), + RowLocation::RowPage(page_id) => { + let page_guard = buf_pool + .get_page(page_id, LatchFallbackMode::Shared) + .block_until_shared(); + if !validate_page_row_range(&page_guard, page_id, row_id) { + continue; + } + let row_idx = page_guard.page().row_idx(row_id); + let mut lock_row = self + .lock_row_for_write(&stmt.trx, &page_guard, row_idx, key) + .await; + match &mut lock_row { + LockRowForWrite::InvalidIndex => return MoveInsert::None, // key changed so we are fine. + LockRowForWrite::WriteConflict => return MoveInsert::WriteConflict, + LockRowForWrite::Ok(access, old_cts) => { + let mut access = access.take().unwrap(); + if !access.row().is_deleted() { + return MoveInsert::DuplicateKey; + } + let old_cts = mem::take(old_cts); + let mut move_entry = OwnedRowUndo::new( + self.table_id, + page_id, + row_id, + RowUndoKind::Move(true), + ); + access.build_undo_chain(&stmt.trx, &mut move_entry, old_cts); + drop(access); // unlock the row. + drop(lock_row); + drop(page_guard); // unlock the page. + + // Here we re-lock new row and link new entry to move entry. + // In this way, we can make sure no other thread can access new entry pointer + // so the update of next pointer is safe. + let new_idx = new_guard.page().row_idx(new_id); + let lock_new = self + .lock_row_for_write(&stmt.trx, &new_guard, new_idx, key) + .await; + let (new_access, _) = lock_new.ok().expect("lock new row for insert"); + debug_assert!(new_access.is_some()); + let mut new_entry = stmt.row_undo.pop().expect("new entry for insert"); + link_move_entry(&mut new_entry, move_entry.leak()); + + drop(new_access); // unlock new row + drop(new_guard); // unlock new page + + stmt.row_undo.push(move_entry); + stmt.row_undo.push(new_entry); + // no redo required, because no change on row data. + return MoveInsert::Ok; + } + } + } + } + } } - /// Returns whether the type is matched at given column index, row id is excluded. #[inline] - pub fn user_col_type_match(&self, user_col_idx: usize, val: &Val) -> bool { - self.col_type_match(user_col_idx + 1, val) + fn insert_row_internal<'a>( + &self, + buf_pool: &'a P, + stmt: &mut Statement, + mut insert: Vec, + mut undo_kind: RowUndoKind, + mut move_entry: Option<(RowID, PageSharedGuard<'a, RowPage>)>, + ) -> (RowID, PageSharedGuard<'a, RowPage>) { + let row_len = row_len(&self.schema, &insert); + let row_count = estimate_max_row_count(row_len, self.schema.col_count()); + loop { + let page_guard = self.get_insert_page(buf_pool, stmt, row_count); + let page_id = page_guard.page_id(); + match self.insert_row_to_page(stmt, page_guard, insert, undo_kind, move_entry) { + InsertRowIntoPage::Ok(row_id, page_guard) => { + stmt.save_active_insert_page(self.table_id, page_id, row_id); + return (row_id, page_guard); + } + // this page cannot be inserted any more, just leave it and retry another page. + InsertRowIntoPage::NoSpaceOrRowID(ins, uk, me) => { + insert = ins; + undo_kind = uk; + move_entry = me; + } + } + } } - /// Returns whether the type is matched at given column index. + /// Insert row into given page. + /// There might be move+update call this method, in such case, op_kind will be + /// set to UndoKind::Update. #[inline] - pub fn col_type_match(&self, col_idx: usize, val: &Val) -> bool { - match (val, self.layout(col_idx)) { - (Val::Null, _) => true, - (Val::Byte1(_), Layout::Byte1) - | (Val::Byte2(_), Layout::Byte2) - | (Val::Byte4(_), Layout::Byte4) - | (Val::Byte8(_), Layout::Byte8) - | (Val::VarByte(_), Layout::VarByte) => true, - _ => false, + fn insert_row_to_page<'a>( + &self, + stmt: &mut Statement, + page_guard: PageSharedGuard<'a, RowPage>, + insert: Vec, + undo_kind: RowUndoKind, + move_entry: Option<(RowID, PageSharedGuard<'a, RowPage>)>, + ) -> InsertRowIntoPage<'a> { + debug_assert!({ + (matches!(undo_kind, RowUndoKind::Insert) && move_entry.is_none()) + || (matches!(undo_kind, RowUndoKind::Update(_)) && move_entry.is_some()) + }); + + let page_id = page_guard.page_id(); + match page_guard.page().insert(&self.schema, &insert) { + InsertRow::Ok(row_id) => { + let row_idx = page_guard.page().row_idx(row_id); + let mut access = page_guard.write_row(row_idx); + // create undo log. + let mut new_entry = OwnedRowUndo::new(self.table_id, page_id, row_id, undo_kind); + // The MOVE undo entry is for MOVE+UPDATE. + // Once update in-place fails, we convert the update operation to insert. + // and link them together. + if let Some((old_id, old_guard)) = move_entry { + let old_row_idx = old_guard.page().row_idx(old_id); + // Here we actually lock both new row and old row, + // not very sure if this will cause dead-lock. + // + let access = old_guard.write_row(old_row_idx); + debug_assert!({ + access.undo_head().is_some() + && stmt + .trx + .is_same_trx(&access.undo_head().as_ref().unwrap().status) + }); + + // re-lock moved row and link new entry to it. + let move_entry = access.first_undo_entry().unwrap(); + link_move_entry(&mut new_entry, move_entry); + } + + debug_assert!(access.undo_head().is_none()); + access.build_undo_chain(&stmt.trx, &mut new_entry, NextTrxCTS::None); + drop(access); + // Here we do not unlock the page because we need to verify validity of unique index update + // according to this insert. + // There might be scenario that a deleted row shares the same key with this insert. + // Then we have to mark it as MOVE and point insert undo's next version to it. + // So hold the page guard in order to re-lock the insert undo fast. + stmt.row_undo.push(new_entry); + // create redo log. + // even if the operation is move+update, we still treat it as insert redo log. + // because redo is only useful when recovering and no version chain is required + // during recovery. + let redo_entry = RedoEntry { + page_id, + row_id, + kind: RedoKind::Insert(insert), + }; + // store redo log into transaction redo buffer. + stmt.redo.push(redo_entry); + InsertRowIntoPage::Ok(row_id, page_guard) + } + InsertRow::NoFreeSpaceOrRowID => { + InsertRowIntoPage::NoSpaceOrRowID(insert, undo_kind, move_entry) + } } } #[inline] - pub fn idx_type_match(&self, val: &Val) -> bool { - self.col_type_match(self.key_idx, val) + async fn update_row_inplace<'a>( + &self, + stmt: &mut Statement, + page_guard: PageSharedGuard<'a, RowPage>, + key: &Val, + row_id: RowID, + mut update: Vec, + ) -> UpdateRowInplace<'a> { + let page_id = page_guard.page_id(); + let page = page_guard.page(); + // column indexes must be in range + debug_assert!( + { + update + .iter() + .all(|uc| uc.idx < page_guard.page().header.col_count as usize) + }, + "update column indexes must be in range" + ); + // column indexes should be in order. + debug_assert!( + { + update.is_empty() + || update + .iter() + .zip(update.iter().skip(1)) + .all(|(l, r)| l.idx < r.idx) + }, + "update columns should be in order" + ); + if row_id < page.header.start_row_id + || row_id >= page.header.start_row_id + page.header.max_row_count as u64 + { + return UpdateRowInplace::RowNotFound; + } + let row_idx = (row_id - page.header.start_row_id) as usize; + let mut lock_row = self + .lock_row_for_write(&stmt.trx, &page_guard, row_idx, key) + .await; + match &mut lock_row { + LockRowForWrite::InvalidIndex => return UpdateRowInplace::RowNotFound, + LockRowForWrite::WriteConflict => return UpdateRowInplace::WriteConflict, + LockRowForWrite::Ok(access, old_cts) => { + let mut access = access.take().unwrap(); + if access.row().is_deleted() { + return UpdateRowInplace::RowDeleted; + } + let old_cts = mem::take(old_cts); + match access.update_row(&self.schema, &update) { + UpdateRow::NoFreeSpace(old_row) => { + // page does not have enough space for update, we need to switch + // to out-of-place update mode, which will add a MOVE undo entry + // to end original row and perform a INSERT into new page, and + // link the two versions. + let mut new_entry = OwnedRowUndo::new( + self.table_id, + page_id, + row_id, + RowUndoKind::Move(false), + ); + access.build_undo_chain(&stmt.trx, &mut new_entry, old_cts); + drop(access); // unlock row + drop(lock_row); + // Here we do not unlock page because we need to perform MOVE+UPDATE + // and link undo entries of two rows. + // The re-lock of current undo is required. + stmt.row_undo.push(new_entry); + let redo_entry = RedoEntry { + page_id, + row_id, + // use DELETE for redo is ok, no version chain should be maintained if recovering from redo. + kind: RedoKind::Delete, + }; + stmt.redo.push(redo_entry); + UpdateRowInplace::NoFreeSpace(row_id, old_row, update, page_guard) + } + UpdateRow::Ok(mut row) => { + // perform in-place update. + let (mut undo_cols, mut redo_cols) = (vec![], vec![]); + for uc in &mut update { + if let Some((old, var_offset)) = + row.user_different(&self.schema, uc.idx, &uc.val) + { + undo_cols.push(UndoCol { + idx: uc.idx, + val: Val::from(old), + var_offset, + }); + redo_cols.push(UpdateCol { + idx: uc.idx, + // new value no longer needed, so safe to take it here. + val: mem::take(&mut uc.val), + }); + row.update_user_col(uc.idx, &uc.val); + } + } + let mut new_entry = OwnedRowUndo::new( + self.table_id, + page_id, + row_id, + RowUndoKind::Update(undo_cols), + ); + access.build_undo_chain(&stmt.trx, &mut new_entry, old_cts); + drop(access); // unlock the row. + drop(lock_row); + drop(page_guard); // unlock the page, because we finish page update. + stmt.row_undo.push(new_entry); + if !redo_cols.is_empty() { + // there might be nothing to update, so we do not need to add redo log. + // but undo is required because we need to properly lock the row. + let redo_entry = RedoEntry { + page_id, + row_id, + kind: RedoKind::Update(redo_cols), + }; + stmt.redo.push(redo_entry); + } + UpdateRowInplace::Ok(row_id) + } + } + } + } } #[inline] - pub fn user_key_idx(&self) -> usize { - self.key_idx - 1 + async fn delete_row_internal( + &self, + stmt: &mut Statement, + page_guard: PageSharedGuard<'_, RowPage>, + row_id: RowID, + key: &Val, + ) -> DeleteMvcc { + let page_id = page_guard.page_id(); + let page = page_guard.page(); + if !page.row_id_in_valid_range(row_id) { + return DeleteMvcc::NotFound; + } + let row_idx = page.row_idx(row_id); + let mut lock_row = self + .lock_row_for_write(&stmt.trx, &page_guard, row_idx, key) + .await; + match &mut lock_row { + LockRowForWrite::InvalidIndex => return DeleteMvcc::NotFound, + LockRowForWrite::WriteConflict => return DeleteMvcc::WriteConflict, + LockRowForWrite::Ok(access, old_cts) => { + let mut access = access.take().unwrap(); + if access.row().is_deleted() { + return DeleteMvcc::NotFound; + } + access.delete_row(); + let mut new_entry = + OwnedRowUndo::new(self.table_id, page_id, row_id, RowUndoKind::Delete); + access.build_undo_chain(&stmt.trx, &mut new_entry, mem::take(old_cts)); + drop(access); // unlock row + drop(lock_row); + drop(page_guard); // unlock page + stmt.row_undo.push(new_entry); + // create redo log + let redo_entry = RedoEntry { + page_id, + row_id, + kind: RedoKind::Delete, + }; + stmt.redo.push(redo_entry); + DeleteMvcc::Ok + } + } } #[inline] - pub fn key_idx(&self) -> usize { - self.key_idx + fn get_insert_page<'a>( + &self, + buf_pool: &'a P, + stmt: &mut Statement, + row_count: usize, + ) -> PageSharedGuard<'a, RowPage> { + if let Some((page_id, row_id)) = stmt.load_active_insert_page(self.table_id) { + let g = buf_pool.get_page(page_id, LatchFallbackMode::Shared); + // because we save last insert page in session and meanwhile other thread may access this page + // and do some modification, even worse, buffer pool may evict it and reload other data into + // this page. so here, we do not require that no change should happen, but if something change, + // we validate that page id and row id range is still valid. + let g = g.block_until_shared(); + if validate_page_row_range(&g, page_id, row_id) { + return g; + } + } + self.blk_idx + .get_insert_page(buf_pool, row_count, &self.schema) } + // lock row will check write conflict on given row and lock it. #[inline] - pub fn user_layout(&self, user_col_idx: usize) -> Layout { - self.cols[user_col_idx + 1] + async fn lock_row_for_write<'a>( + &self, + trx: &ActiveTrx, + page_guard: &'a PageSharedGuard<'a, RowPage>, + row_idx: usize, + key: &Val, + ) -> LockRowForWrite<'a> { + loop { + let mut access = page_guard.write_row(row_idx); + let (row, undo_head) = access.row_and_undo_mut(); + match undo_head { + None => { + let head = RowUndoHead { + status: trx.status(), + entry: None, // currently we don't have undo entry to insert. + }; + *undo_head = Some(head); // lock the row. + return LockRowForWrite::Ok(Some(access), NextTrxCTS::None); + } + Some(head) => { + if trx.is_same_trx(head.status.as_ref()) { + // Locked by itself + return LockRowForWrite::Ok(Some(access), NextTrxCTS::Myself); + } + let ts = head.status.ts(); + if trx_is_committed(ts) { + // This row is committed, no lock conflict. + // Check whether the row is valid through index lookup. + // There might be case an out-of-date index entry pointing to the + // latest version of the row which has different key other than index. + // For example, assume: + // 1. one row with row_id=100, k=200 is inserted. + // Then index has entry k(200) -> row_id(100). + // + // 2. update row set k=300. + // If in-place update is available, we will reuse row_id=100, and + // just update its key to 300. + // So in index, we have two entries: k(200) -> row_id(100), + // k(300) -> row_id(100). + // The first entry is supposed to be linked to the old version, and + // second entry to new version. + // But in our design, both of them point to latest version and + // we need to traverse the version chain to find correct(visible) + // version. + // + // 3. insert one row with row_id=101, k=200. + // Now we need to identify k=200 is actually out-of-date index entry, + // and just skip it. + if row.is_key_different(&self.schema, key) { + return LockRowForWrite::InvalidIndex; + } + head.status = trx.status(); // lock the row. + return LockRowForWrite::Ok(Some(access), NextTrxCTS::Value(ts)); + } + if !head.status.preparing() { + // uncommitted, write-write conflict. + return LockRowForWrite::WriteConflict; + } + if let Some(commit_notifier) = head.status.prepare_notifier() { + // unlock row(but logical row lock is still held) + drop(access); + + // Here we do not unlock the page, because the preparation time of commit is supposed + // to be short. + // And as active transaction is using this page, we don't want page evictor swap it onto + // disk. + // Other transactions can still access this page and modify other rows. + + let _ = commit_notifier.recv_async().await; // wait for that transaction to be committed. + + // now we get back on current page. + // maybe another thread modify our row before the lock acquisition, + // so we need to recheck. + } // there might be progress on preparation, so recheck. + } + } + } } + // perform non-locking read on row. #[inline] - pub fn layout(&self, col_idx: usize) -> Layout { - self.cols[col_idx] + async fn lock_row_for_read<'a>( + &self, + trx: &ActiveTrx, + page_guard: &'a PageSharedGuard<'a, RowPage>, + row_idx: usize, + ) -> RowReadAccess<'a> { + loop { + let access = page_guard.read_row(row_idx); + match access.undo() { + None => return access, + Some(head) => { + if trx.is_same_trx(head.status.as_ref()) { + // Locked by itself + return access; + } + let ts = head.status.ts(); + if trx_is_committed(ts) { + // Because MVCC will backtrace to visible version, we do not need to check if index lookup is out-of-date here. + return access; + } + if !head.status.preparing() { + // uncommitted, write-write conflict. + return access; + } + if let Some(commit_notifier) = head.status.prepare_notifier() { + // unlock row + drop(access); + // Even if it's non-locking read, we still need to wait for the preparation to avoid partial read. + // For example: + // Suppose transaction T1 is committing with CTS 100, + // Transaction T2 starts with STS 101 and reads rows that are modified by T1. + // If we do not block on waiting for T1, we may read one row of old version, and another + // row with new version. This breaks ACID properties. + + let _ = commit_notifier.recv_async().await; // wait for that transaction to be committed. + + // now we get back on current page. + // maybe another thread modify our row before the lock acquisition, + // so we need to recheck. + } // there might be progress on preparation, so recheck. + } + } + } + } + + #[inline] + fn index_undo(&self, kind: IndexUndoKind) -> IndexUndo { + IndexUndo { + table_id: self.table_id, + kind, + } + } + + #[inline] + fn key_change(&self, key: &Val, update: &[UpdateCol]) -> bool { + let user_key_idx = self.schema.user_key_idx(); + if let Ok(pos) = update.binary_search_by_key(&user_key_idx, |uc| uc.idx) { + let new_key = &update[pos]; + return key != &new_key.val; + } + false + } + + #[inline] + fn row_latest_status(&self, buf_pool: &P, row_id: RowID) -> RowLatestStatus { + loop { + match self.blk_idx.find_row_id(buf_pool, row_id) { + RowLocation::NotFound => return RowLatestStatus::NotFound, + RowLocation::ColSegment(..) => todo!(), + RowLocation::RowPage(page_id) => { + let page_guard = buf_pool + .get_page(page_id, LatchFallbackMode::Shared) + .block_until_shared(); + if !validate_page_row_range(&page_guard, page_id, row_id) { + continue; + } + let row_idx = page_guard.page().row_idx(row_id); + let access = page_guard.read_row(row_idx); + return access.latest_status(); + } + } + } + } + + #[inline] + async fn insert_index<'a>( + &self, + buf_pool: &'a P, + stmt: &mut Statement, + key: Val, + row_id: RowID, + page_guard: PageSharedGuard<'a, RowPage>, + ) -> InsertIndex { + match self.sec_idx.insert_if_not_exists(key.clone(), row_id) { + None => { + let index_undo = self.index_undo(IndexUndoKind::Insert(key, row_id)); + stmt.index_undo.push(index_undo); + InsertIndex::Ok + } + Some(old_row_id) => { + // we found there is already one existing row with same key. + // so perform move+insert. + return match self + .move_insert(buf_pool, stmt, old_row_id, &key, row_id, page_guard) + .await + { + MoveInsert::DuplicateKey => InsertIndex::DuplicateKey, + MoveInsert::WriteConflict => InsertIndex::WriteConflict, + MoveInsert::None => { + // move+insert does not find old row. It's safe. + let index_undo = self.index_undo(IndexUndoKind::Insert(key, row_id)); + stmt.index_undo.push(index_undo); + InsertIndex::Ok + } + MoveInsert::Ok => { + // Once move+insert is done, + // we already locked both old and new row, and make undo chain linked. + // So any other transaction that want to modify the index with same key + // should fail because lock can not be acquired by them. + let res = self.sec_idx.compare_exchange(&key, old_row_id, row_id); + assert!(res); + let index_undo = + self.index_undo(IndexUndoKind::Update(key, old_row_id, row_id)); + stmt.index_undo.push(index_undo); + InsertIndex::Ok + } + }; + } + } + } + + #[inline] + fn update_index( + &self, + stmt: &mut Statement, + buf_pool: &P, + key: Val, + key_change: bool, + old_row_id: RowID, + new_row_id: RowID, + ) -> UpdateIndex { + let row_id_change = old_row_id != new_row_id; + match (key_change, row_id_change) { + (false, false) => UpdateIndex::Ok, // nothing changed + (true, false) => { + // Key changed, and row id not change. + // Then, we try to insert new key with row id into index. + match self.sec_idx.insert_if_not_exists(key.clone(), new_row_id) { + None => { + let index_undo = self.index_undo(IndexUndoKind::Insert(key, new_row_id)); + stmt.index_undo.push(index_undo); + UpdateIndex::Ok + } + Some(index_row_id) => { + // There is already a row with same new key. + // We have to check its status. + if index_row_id == new_row_id { + // This is possible. + // For example, transaction update row(RowID=100) key=1 to key=2. + // + // Then index has following entries: + // key=1 -> RowID=100 (old version) + // key=2 -> RowID=100 (latest version) + // + // Then we update key=2 to key=1 again. + // Now we should have: + // key=1 -> RowID=100 (latest version) + // key=2 -> RowID=100 (old version) + // + // nothing to do in this case. + return UpdateIndex::Ok; + } + let row_status = self.row_latest_status(buf_pool, index_row_id); + match row_status { + RowLatestStatus::Committed => UpdateIndex::DuplicateKey, + RowLatestStatus::Uncommitted => UpdateIndex::WriteConflict, + RowLatestStatus::Deleted => { + // todo: we need to link new row and old row, + // to make result of MVCC index lookup correct. + // + // Current design of undo chain does not support this scenario. + // We need to embed index information into undo chain to support + // one undo entry point to multiple old entries using + // different index key. + // + // there might also be optimization that we can identify + // the deleted row is globally visible, so we do not + // need to keep version chain. + todo!() + } + RowLatestStatus::NotFound => UpdateIndex::Ok, + } + } + } + } + (false, true) => { + // Key not changed, but row id changed. + let res = self.sec_idx.compare_exchange(&key, old_row_id, new_row_id); + assert!(res); + let index_undo = + self.index_undo(IndexUndoKind::Update(key, old_row_id, new_row_id)); + stmt.index_undo.push(index_undo); + UpdateIndex::Ok + } + (true, true) => { + // Key changed and row id changed. + match self.sec_idx.insert_if_not_exists(key.clone(), new_row_id) { + None => { + let index_undo = self.index_undo(IndexUndoKind::Insert(key, new_row_id)); + stmt.index_undo.push(index_undo); + UpdateIndex::Ok + } + Some(index_row_id) => { + // new row id is the insert id so index value must not be the same. + debug_assert!(index_row_id != new_row_id); + if index_row_id == old_row_id { + // This is possible. + // For example, transaction update row(RowID=100) key=1 to key=2. + // + // Then index has following entries: + // key=1 -> RowID=100 (old version) + // key=2 -> RowID=100 (latest version) + // + // Then we update key=2 to key=1 again. + // And page does not have enough space, so move+update with RowID=200. + // Now we should have: + // key=1 -> RowID=200 (latest version) + // key=2 -> RowID=100 (old version) + // + // In this case, we can just update index to point to new version. + let res = self.sec_idx.compare_exchange(&key, old_row_id, new_row_id); + assert!(res); + let index_undo = + self.index_undo(IndexUndoKind::Update(key, old_row_id, new_row_id)); + stmt.index_undo.push(index_undo); + return UpdateIndex::Ok; + } + let row_status = self.row_latest_status(buf_pool, index_row_id); + match row_status { + RowLatestStatus::Committed => UpdateIndex::DuplicateKey, + RowLatestStatus::Uncommitted => UpdateIndex::WriteConflict, + RowLatestStatus::Deleted => { + todo!() + } + RowLatestStatus::NotFound => UpdateIndex::Ok, + } + } + } + } + } + } +} + +#[inline] +fn validate_page_id(page_guard: &PageSharedGuard<'_, RowPage>, page_id: PageID) -> bool { + if page_guard.page_id() != page_id { + return false; + } + true +} + +#[inline] +fn validate_page_row_range( + page_guard: &PageSharedGuard<'_, RowPage>, + page_id: PageID, + row_id: RowID, +) -> bool { + if page_guard.page_id() != page_id { + return false; } + page_guard.page().row_id_in_valid_range(row_id) +} +#[inline] +fn row_len(schema: &Schema, user_cols: &[Val]) -> usize { + let var_len = schema + .var_cols + .iter() + .map(|idx| { + let val = &user_cols[*idx - 1]; + match val { + Val::Null => 0, + Val::VarByte(var) => { + if var.len() <= PAGE_VAR_LEN_INLINE { + 0 + } else { + var.len() + } + } + _ => unreachable!(), + } + }) + .sum::(); + schema.fix_len + var_len +} + +#[inline] +fn link_move_entry(new_entry: &mut OwnedRowUndo, move_entry: RowUndoRef) { + // ref-count this pointer. + debug_assert!(new_entry.next.is_none()); + new_entry.next = Some(NextRowUndo { + status: NextRowUndoStatus::SameAsPrev, + entry: move_entry, + }); +} + +enum LockRowForWrite<'a> { + // lock success, returns optional last commit timestamp. + Ok(Option>, NextTrxCTS), + // lock fail, there is another transaction modifying this row. + WriteConflict, + // row is invalid through index lookup. + // this can happen when index entry is not garbage collected, + // so some old key points to new version. + InvalidIndex, +} + +impl<'a> LockRowForWrite<'a> { #[inline] - pub fn key_layout(&self) -> Layout { - self.layout(self.key_idx) + pub fn ok(self) -> Option<(Option>, NextTrxCTS)> { + match self { + LockRowForWrite::Ok(access, next_cts) => Some((access, next_cts)), + _ => None, + } } } + +enum InsertRowIntoPage<'a> { + Ok(RowID, PageSharedGuard<'a, RowPage>), + NoSpaceOrRowID( + Vec, + RowUndoKind, + Option<(RowID, PageSharedGuard<'a, RowPage>)>, + ), +} + +enum UpdateRowInplace<'a> { + Ok(RowID), + RowNotFound, + RowDeleted, + WriteConflict, + NoFreeSpace( + RowID, + Vec<(Val, Option)>, + Vec, + PageSharedGuard<'a, RowPage>, + ), +} diff --git a/doradb-storage/src/table/mvcc.rs b/doradb-storage/src/table/mvcc.rs deleted file mode 100644 index 1fd6a7b..0000000 --- a/doradb-storage/src/table/mvcc.rs +++ /dev/null @@ -1,1011 +0,0 @@ -use crate::buffer::guard::PageSharedGuard; -use crate::buffer::page::PageID; -use crate::buffer::BufferPool; -use crate::index::RowLocation; -use crate::latch::LatchFallbackMode; -use crate::row::ops::{ - DeleteMvcc, InsertMvcc, InsertRow, MoveInsert, SelectMvcc, UpdateCol, UpdateMvcc, UpdateRow, -}; -use crate::row::{estimate_max_row_count, RowID, RowPage, RowRead}; -use crate::stmt::Statement; -use crate::table::{Schema, Table}; -use crate::trx::redo::{RedoEntry, RedoKind}; -use crate::trx::row::{RowReadAccess, RowWriteAccess}; -use crate::trx::undo::{ - NextTrxCTS, NextUndoEntry, NextUndoStatus, OwnedUndoEntry, UndoEntryPtr, UndoHead, UndoKind, -}; -use crate::trx::{trx_is_committed, ActiveTrx}; -use crate::value::{Val, PAGE_VAR_LEN_INLINE}; -use std::mem; -use std::ops::Deref; - -/// MvccTable wraps a common table to provide MVCC functionalities. -/// -/// The basic idea is to separate undo logs from data page. -/// So we have a separate undo array associated to each row in row page. -/// The undo head also acts as the (logical) row lock, so that transactions -/// can abort/wait if write conflict is found on acquire lock of undo head. -/// -/// Insert/update/delete operation will add one or more undo entry to the -/// chain linked to undo head. -/// -/// Select operation will traverse undo chain to find visible version. -/// -/// Additional key validation is performed if index lookup is used, because -/// index does not contain version information, and out-of-date index entry -/// should ignored if visible data version does not match index key. -pub struct MvccTable<'a>(pub(super) &'a Table<'a>); - -impl<'a> Deref for MvccTable<'a> { - type Target = Table<'a>; - #[inline] - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'a> MvccTable<'a> { - /// Select row with MVCC. - #[inline] - pub async fn select_row( - &self, - stmt: &mut Statement, - key: Val, - user_read_set: &[usize], - ) -> SelectMvcc { - debug_assert!(self.schema.idx_type_match(&key)); - debug_assert!({ - !user_read_set.is_empty() - && user_read_set - .iter() - .zip(user_read_set.iter().skip(1)) - .all(|(l, r)| l < r) - }); - loop { - match self.sec_idx.lookup(&key) { - None => return SelectMvcc::RowNotFound, - Some(row_id) => match self.blk_idx.find_row_id(self.buf_pool, row_id) { - RowLocation::NotFound => return SelectMvcc::RowNotFound, - RowLocation::ColSegment(..) => todo!(), - RowLocation::RowPage(page_id) => { - let page = self.buf_pool.get_page(page_id, LatchFallbackMode::Shared); - let page_guard = page.block_until_shared(); - if !validate_page_id(&page_guard, page_id) { - continue; - } - let key = key.clone(); - return self - .select_row_in_page(stmt, page_guard, key, row_id, user_read_set) - .await; - } - }, - } - } - } - - #[inline] - async fn select_row_in_page( - &self, - stmt: &mut Statement, - page_guard: PageSharedGuard<'_, RowPage>, - key: Val, - row_id: RowID, - user_read_set: &[usize], - ) -> SelectMvcc { - let page = page_guard.page(); - if !page.row_id_in_valid_range(row_id) { - return SelectMvcc::RowNotFound; - } - let row_idx = page.row_idx(row_id); - let access = self - .lock_row_for_read(&stmt.trx, &page_guard, row_idx) - .await; - access.read_row_mvcc(&stmt.trx, &self.schema, user_read_set, &key) - } - - /// Insert row with MVCC. - #[inline] - pub async fn insert_row(&self, stmt: &mut Statement, cols: Vec) -> InsertMvcc { - debug_assert!(cols.len() + 1 == self.schema.col_count()); - debug_assert!({ - cols.iter() - .enumerate() - .all(|(idx, val)| self.schema.user_col_type_match(idx, val)) - }); - let key = cols[self.schema.user_key_idx()].clone(); - // insert row into page with undo log linked. - let (row_id, page_guard) = self.insert_row_internal(stmt, cols, UndoKind::Insert, None); - // update index - if let Some(old_row_id) = self.sec_idx.insert_if_not_exists(key.clone(), row_id) { - // we found there is already one existing row with same key. - // so perform move+insert. - match self - .move_insert(stmt, old_row_id, key.clone(), row_id, page_guard) - .await - { - MoveInsert::DuplicateKey => InsertMvcc::DuplicateKey, - MoveInsert::WriteConflict => InsertMvcc::WriteConflict, - MoveInsert::Ok | MoveInsert::None => InsertMvcc::Ok(row_id), - } - } else { - InsertMvcc::Ok(row_id) - } - } - - /// Update row with MVCC. - /// This method is for update based on index lookup. - #[inline] - pub async fn update_row( - &self, - stmt: &mut Statement, - key: Val, - update: Vec, - ) -> UpdateMvcc { - loop { - match self.sec_idx.lookup(&key) { - None => return UpdateMvcc::RowNotFound, - Some(row_id) => { - match self.blk_idx.find_row_id(self.buf_pool, row_id) { - RowLocation::NotFound => return UpdateMvcc::RowNotFound, - RowLocation::ColSegment(..) => todo!(), - RowLocation::RowPage(page_id) => { - let page = self.buf_pool.get_page(page_id, LatchFallbackMode::Shared); - let page_guard = page.block_until_shared(); - if !validate_page_id(&page_guard, page_id) { - continue; - } - let key = key.clone(); - let res = self - .update_row_inplace(stmt, page_guard, key, row_id, update) - .await; - - return match res { - UpdateRowInplace::Ok(row_id) => UpdateMvcc::Ok(row_id), - UpdateRowInplace::RowDeleted => UpdateMvcc::RowDeleted, - UpdateRowInplace::RowNotFound => UpdateMvcc::RowNotFound, - UpdateRowInplace::WriteConflict => UpdateMvcc::WriteConflict, - UpdateRowInplace::NoFreeSpace( - old_row_id, - old_row, - update, - old_guard, - ) => { - // in-place update failed, we transfer update into - // move+update. - self.move_update(stmt, old_row, update, old_row_id, old_guard) - } - }; - } - } - } - } - } - } - - /// Delete row with MVCC. - /// This method is for delete based on index lookup. - #[inline] - pub async fn delete_row(&self, stmt: &mut Statement, key: Val) -> DeleteMvcc { - loop { - match self.sec_idx.lookup(&key) { - None => return DeleteMvcc::RowNotFound, - Some(row_id) => match self.blk_idx.find_row_id(self.buf_pool, row_id) { - RowLocation::NotFound => return DeleteMvcc::RowNotFound, - RowLocation::ColSegment(..) => todo!(), - RowLocation::RowPage(page_id) => { - let page = self.buf_pool.get_page(page_id, LatchFallbackMode::Shared); - let page_guard = page.block_until_shared(); - if !validate_page_id(&page_guard, page_id) { - continue; - } - return self - .delete_row_internal(stmt, page_guard, row_id, &key) - .await; - } - }, - } - } - } - - // Move update is similar to a delete+insert. - #[inline] - fn move_update( - &self, - stmt: &mut Statement, - mut old_row: Vec, - update: Vec, - old_id: RowID, - old_guard: PageSharedGuard<'a, RowPage>, - ) -> UpdateMvcc { - // calculate new row and undo entry. - let (new_row, undo_kind) = { - let mut undo_cols = vec![]; - for mut uc in update { - let old_val = &mut old_row[uc.idx]; - if old_val != &uc.val { - // swap old value and new value, then put into undo columns - mem::swap(&mut uc.val, old_val); - undo_cols.push(uc); - } - } - (old_row, UndoKind::Update(undo_cols)) - }; - let (row_id, page_guard) = - self.insert_row_internal(stmt, new_row, undo_kind, Some((old_id, old_guard))); - drop(page_guard); // unlock the page - UpdateMvcc::Ok(row_id) - } - - /// Move insert is similar to a delete+insert. - /// But it triggered by duplicate key finding when updating index. - /// The insert is already done and we additionally add a move entry to the - /// already deleted version. - #[inline] - async fn move_insert( - &self, - stmt: &mut Statement, - row_id: RowID, - key: Val, - new_id: RowID, - new_guard: PageSharedGuard<'_, RowPage>, - ) -> MoveInsert { - loop { - match self.blk_idx.find_row_id(self.buf_pool, row_id) { - RowLocation::NotFound => return MoveInsert::None, - RowLocation::ColSegment(..) => todo!(), - RowLocation::RowPage(page_id) => { - let page_guard = self - .buf_pool - .get_page(page_id, LatchFallbackMode::Shared) - .block_until_shared(); - if !validate_page_id(&page_guard, page_id) { - continue; - } - let page = page_guard.page(); - if !page.row_id_in_valid_range(row_id) { - // no old row found - return MoveInsert::None; - } - let row_idx = page.row_idx(row_id); - let mut lock_row = self - .lock_row_for_write(&stmt.trx, &page_guard, row_idx, &key) - .await; - match &mut lock_row { - LockRowForWrite::InvalidIndex => return MoveInsert::None, // key changed so we are fine. - LockRowForWrite::WriteConflict => return MoveInsert::WriteConflict, - LockRowForWrite::Ok(access, old_cts) => { - let mut access = access.take().unwrap(); - if !access.row().is_deleted() { - return MoveInsert::DuplicateKey; - } - let old_cts = mem::take(old_cts); - let mut move_entry = OwnedUndoEntry::new( - self.table_id, - page_id, - row_id, - UndoKind::Move(true), - ); - access.build_undo_chain(&stmt.trx, &mut move_entry, old_cts); - drop(access); // unlock the row. - drop(lock_row); - drop(page_guard); // unlock the page. - - // Here we re-lock new row and link new entry to move entry. - // In this way, we can make sure no other thread can access new entry pointer - // so the update of next pointer is safe. - let new_idx = new_guard.page().row_idx(new_id); - let lock_new = self - .lock_row_for_write(&stmt.trx, &new_guard, new_idx, &key) - .await; - let (new_access, _) = lock_new.ok().expect("lock new row for insert"); - debug_assert!(new_access.is_some()); - let mut new_entry = stmt.trx.undo.pop().expect("new entry for insert"); - link_move_entry(&mut new_entry, move_entry.leak()); - - drop(new_access); // unlock new row - drop(new_guard); // unlock new page - - stmt.undo.push(move_entry); - stmt.undo.push(new_entry); - // no redo required, because no change on row data. - return MoveInsert::Ok; - } - } - } - } - } - } - - #[inline] - fn insert_row_internal( - &self, - stmt: &mut Statement, - mut insert: Vec, - mut undo_kind: UndoKind, - mut move_entry: Option<(RowID, PageSharedGuard<'a, RowPage>)>, - ) -> (RowID, PageSharedGuard) { - let row_len = row_len(&self.schema, &insert); - let row_count = estimate_max_row_count(row_len, self.schema.col_count()); - loop { - let page_guard = self.get_insert_page(stmt, row_count); - let page_id = page_guard.page_id(); - match self.insert_row_to_page(stmt, page_guard, insert, undo_kind, move_entry) { - InsertRowIntoPage::Ok(row_id, page_guard) => { - stmt.save_active_insert_page(self.table_id, page_id, row_id); - return (row_id, page_guard); - } - // this page cannot be inserted any more, just leave it and retry another page. - InsertRowIntoPage::NoSpaceOrRowID(ins, uk, me) => { - insert = ins; - undo_kind = uk; - move_entry = me; - } - } - } - } - - /// Insert row into given page. - /// There might be move+update call this method, in such case, op_kind will be - /// set to UndoKind::Update. - #[inline] - fn insert_row_to_page( - &self, - stmt: &mut Statement, - page_guard: PageSharedGuard<'a, RowPage>, - insert: Vec, - undo_kind: UndoKind, - move_entry: Option<(RowID, PageSharedGuard<'a, RowPage>)>, - ) -> InsertRowIntoPage<'a> { - debug_assert!({ - (matches!(undo_kind, UndoKind::Insert) && move_entry.is_none()) - || (matches!(undo_kind, UndoKind::Update(_)) && move_entry.is_some()) - }); - - let page_id = page_guard.page_id(); - match page_guard.page().insert(&self.schema, &insert) { - InsertRow::Ok(row_id) => { - let row_idx = (row_id - page_guard.page().header.start_row_id) as usize; - page_guard.page().row_idx(row_id); - let mut access = page_guard.write_row(row_idx); - // create undo log. - let mut new_entry = OwnedUndoEntry::new(self.table_id, page_id, row_id, undo_kind); - // The MOVE undo entry is for MOVE+UPDATE. - // Once update in-place fails, we convert the update operation to insert. - // and link them together. - if let Some((old_id, old_guard)) = move_entry { - let old_row_idx = old_guard.page().row_idx(old_id); - // Here we actually lock both new row and old row, - // not very sure if this will cause dead-lock. - // - let access = old_guard.write_row(old_row_idx); - debug_assert!({ - access.undo_head().is_some() - && stmt - .trx - .is_same_trx(&access.undo_head().as_ref().unwrap().status) - }); - - // re-lock moved row and link new entry to it. - let move_entry = access.first_undo_entry().unwrap(); - link_move_entry(&mut new_entry, move_entry); - } - - debug_assert!(access.undo_head().is_none()); - access.build_undo_chain(&stmt.trx, &mut new_entry, NextTrxCTS::None); - drop(access); - // Here we do not unlock the page because we need to verify validity of unique index update - // according to this insert. - // There might be scenario that a deleted row shares the same key with this insert. - // Then we have to mark it as MOVE and point insert undo's next version to it. - // So hold the page guard in order to re-lock the insert undo fast. - stmt.undo.push(new_entry); - // create redo log. - // even if the operation is move+update, we still treat it as insert redo log. - // because redo is only useful when recovering and no version chain is required - // during recovery. - let redo_entry = RedoEntry { - page_id, - row_id, - kind: RedoKind::Insert(insert), - }; - // store redo log into transaction redo buffer. - stmt.redo.push(redo_entry); - InsertRowIntoPage::Ok(row_id, page_guard) - } - InsertRow::NoFreeSpaceOrRowID => { - InsertRowIntoPage::NoSpaceOrRowID(insert, undo_kind, move_entry) - } - } - } - - #[inline] - async fn update_row_inplace( - &self, - stmt: &mut Statement, - page_guard: PageSharedGuard<'a, RowPage>, - key: Val, - row_id: RowID, - mut update: Vec, - ) -> UpdateRowInplace { - let page_id = page_guard.page_id(); - let page = page_guard.page(); - // column indexes must be in range - debug_assert!( - { - update - .iter() - .all(|uc| uc.idx < page_guard.page().header.col_count as usize) - }, - "update column indexes must be in range" - ); - // column indexes should be in order. - debug_assert!( - { - update.is_empty() - || update - .iter() - .zip(update.iter().skip(1)) - .all(|(l, r)| l.idx < r.idx) - }, - "update columns should be in order" - ); - if row_id < page.header.start_row_id - || row_id >= page.header.start_row_id + page.header.max_row_count as u64 - { - return UpdateRowInplace::RowNotFound; - } - let row_idx = (row_id - page.header.start_row_id) as usize; - let mut lock_row = self - .lock_row_for_write(&stmt.trx, &page_guard, row_idx, &key) - .await; - match &mut lock_row { - LockRowForWrite::InvalidIndex => return UpdateRowInplace::RowNotFound, - LockRowForWrite::WriteConflict => return UpdateRowInplace::WriteConflict, - LockRowForWrite::Ok(access, old_cts) => { - let mut access = access.take().unwrap(); - if access.row().is_deleted() { - return UpdateRowInplace::RowDeleted; - } - let old_cts = mem::take(old_cts); - match access.update_row(&self.schema, &update) { - UpdateRow::NoFreeSpace(old_row) => { - // page does not have enough space for update, we need to switch - // to out-of-place update mode, which will add a MOVE undo entry - // to end original row and perform a INSERT into new page, and - // link the two versions. - let mut new_entry = OwnedUndoEntry::new( - self.table_id, - page_id, - row_id, - UndoKind::Move(false), - ); - access.build_undo_chain(&stmt.trx, &mut new_entry, old_cts); - drop(access); // unlock row - drop(lock_row); - // Here we do not unlock page because we need to perform MOVE+UPDATE - // and link undo entries of two rows. - // The re-lock of current undo is required. - stmt.undo.push(new_entry); - let redo_entry = RedoEntry { - page_id, - row_id, - // use DELETE for redo is ok, no version chain should be maintained if recovering from redo. - kind: RedoKind::Delete, - }; - stmt.redo.push(redo_entry); - UpdateRowInplace::NoFreeSpace(row_id, old_row, update, page_guard) - } - UpdateRow::Ok(mut row) => { - // perform in-place update. - let (mut undo_cols, mut redo_cols) = (vec![], vec![]); - for uc in &mut update { - if let Some(old) = row.user_different(&self.schema, uc.idx, &uc.val) { - undo_cols.push(UpdateCol { - idx: uc.idx, - val: Val::from(old), - }); - redo_cols.push(UpdateCol { - idx: uc.idx, - // new value no longer needed, so safe to take it here. - val: mem::take(&mut uc.val), - }); - row.update_user_col(uc.idx, &uc.val); - } - } - let mut new_entry = OwnedUndoEntry::new( - self.table_id, - page_id, - row_id, - UndoKind::Update(undo_cols), - ); - access.build_undo_chain(&stmt.trx, &mut new_entry, old_cts); - drop(access); // unlock the row. - drop(lock_row); - drop(page_guard); // unlock the page, because we finish page update. - stmt.undo.push(new_entry); - if !redo_cols.is_empty() { - // there might be nothing to update, so we do not need to add redo log. - // but undo is required because we need to properly lock the row. - let redo_entry = RedoEntry { - page_id, - row_id, - kind: RedoKind::Update(redo_cols), - }; - stmt.redo.push(redo_entry); - } - UpdateRowInplace::Ok(row_id) - } - } - } - } - } - - #[inline] - async fn delete_row_internal( - &self, - stmt: &mut Statement, - page_guard: PageSharedGuard<'_, RowPage>, - row_id: RowID, - key: &Val, - ) -> DeleteMvcc { - let page_id = page_guard.page_id(); - let page = page_guard.page(); - if row_id < page.header.start_row_id - || row_id >= page.header.start_row_id + page.header.max_row_count as u64 - { - return DeleteMvcc::RowNotFound; - } - let row_idx = (row_id - page.header.start_row_id) as usize; - let mut lock_row = self - .lock_row_for_write(&stmt.trx, &page_guard, row_idx, key) - .await; - match &mut lock_row { - LockRowForWrite::InvalidIndex => return DeleteMvcc::RowNotFound, - LockRowForWrite::WriteConflict => return DeleteMvcc::WriteConflict, - LockRowForWrite::Ok(access, old_cts) => { - let mut access = access.take().unwrap(); - if access.row().is_deleted() { - return DeleteMvcc::RowAlreadyDeleted; - } - access.delete_row(); - let mut new_entry = - OwnedUndoEntry::new(self.table_id, page_id, row_id, UndoKind::Delete); - access.build_undo_chain(&stmt.trx, &mut new_entry, mem::take(old_cts)); - drop(access); // unlock row - drop(lock_row); - drop(page_guard); // unlock page - stmt.undo.push(new_entry); - // create redo log - let redo_entry = RedoEntry { - page_id, - row_id, - kind: RedoKind::Delete, - }; - stmt.redo.push(redo_entry); - DeleteMvcc::Ok - } - } - } - - #[inline] - fn get_insert_page( - &self, - stmt: &mut Statement, - row_count: usize, - ) -> PageSharedGuard<'a, RowPage> { - if let Some((page_id, row_id)) = stmt.load_active_insert_page(self.table_id) { - let g = self.buf_pool.get_page(page_id, LatchFallbackMode::Shared); - // because we save last insert page in session and meanwhile other thread may access this page - // and do some modification, even worse, buffer pool may evict it and reload other data into - // this page. so here, we do not require that no change should happen, but if something change, - // we validate that page id and row id range is still valid. - let g = g.block_until_shared(); - if validate_page_row_range(&g, page_id, row_id) { - return g; - } - } - self.blk_idx - .get_insert_page(self.buf_pool, row_count, &self.schema) - } - - // lock row will check write conflict on given row and lock it. - #[inline] - async fn lock_row_for_write( - &self, - trx: &ActiveTrx, - page_guard: &'a PageSharedGuard<'a, RowPage>, - row_idx: usize, - key: &Val, - ) -> LockRowForWrite<'a> { - loop { - let mut access = page_guard.write_row(row_idx); - let (row, undo_head) = access.row_and_undo_mut(); - match undo_head { - None => { - let head = UndoHead { - status: trx.status(), - entry: None, // currently we don't have undo entry to insert. - }; - *undo_head = Some(head); // lock the row. - return LockRowForWrite::Ok(Some(access), NextTrxCTS::None); - } - Some(head) => { - if trx.is_same_trx(head.status.as_ref()) { - // Locked by itself - return LockRowForWrite::Ok(Some(access), NextTrxCTS::Myself); - } - let ts = head.status.ts(); - if trx_is_committed(ts) { - // This row is committed, no lock conflict. - // Check whether the row is valid through index lookup. - // There might be case an out-of-date index entry pointing to the - // latest version of the row which has different key other than index. - // For example, assume: - // 1. one row with row_id=100, k=200 is inserted. - // Then index has entry k(200) -> row_id(100). - // - // 2. update row set k=300. - // If in-place update is available, we will reuse row_id=100, and - // just update its key to 300. - // So in index, we have two entries: k(200) -> row_id(100), - // k(300) -> row_id(100). - // The first entry is supposed to be linked to the old version, and - // second entry to new version. - // But in our design, both of them point to latest version and - // we need to traverse the version chain to find correct(visible) - // version. - // - // 3. insert one row with row_id=101, k=200. - // Now we need to identify k=200 is actually out-of-date index entry, - // and just skip it. - if row.is_key_different(&self.schema, key) { - return LockRowForWrite::InvalidIndex; - } - head.status = trx.status(); // lock the row. - return LockRowForWrite::Ok(Some(access), NextTrxCTS::Value(ts)); - } - if !head.status.preparing() { - // uncommitted, write-write conflict. - return LockRowForWrite::WriteConflict; - } - if let Some(commit_notifier) = head.status.prepare_notifier() { - // unlock row(but logical row lock is still held) - drop(access); - - // Here we do not unlock the page, because the preparation time of commit is supposed - // to be short. - // And as active transaction is using this page, we don't want page evictor swap it onto - // disk. - // Other transactions can still access this page and modify other rows. - - let _ = commit_notifier.recv_async().await; // wait for that transaction to be committed. - - // now we get back on current page. - // maybe another thread modify our row before the lock acquisition, - // so we need to recheck. - } // there might be progress on preparation, so recheck. - } - } - } - } - - // perform non-locking read on row. - #[inline] - async fn lock_row_for_read( - &self, - trx: &ActiveTrx, - page_guard: &'a PageSharedGuard<'a, RowPage>, - row_idx: usize, - ) -> RowReadAccess<'a> { - loop { - let access = page_guard.read_row(row_idx); - match access.undo() { - None => return access, - Some(head) => { - if trx.is_same_trx(head.status.as_ref()) { - // Locked by itself - return access; - } - let ts = head.status.ts(); - if trx_is_committed(ts) { - // Because MVCC will backtrace to visible version, we do not need to check if index lookup is out-of-date here. - return access; - } - if !head.status.preparing() { - // uncommitted, write-write conflict. - return access; - } - if let Some(commit_notifier) = head.status.prepare_notifier() { - // unlock row - drop(access); - // Even if it's non-locking read, we still need to wait for the preparation to avoid partial read. - // For example: - // Suppose transaction T1 is committing with CTS 100, - // Transaction T2 starts with STS 101 and reads rows that are modified by T1. - // If we do not block on waiting for T1, we may read one row of old version, and another - // row with new version. This breaks ACID properties. - - let _ = commit_notifier.recv_async().await; // wait for that transaction to be committed. - - // now we get back on current page. - // maybe another thread modify our row before the lock acquisition, - // so we need to recheck. - } // there might be progress on preparation, so recheck. - } - } - } - } -} - -#[inline] -fn validate_page_id(page_guard: &PageSharedGuard<'_, RowPage>, page_id: PageID) -> bool { - if page_guard.page_id() != page_id { - return false; - } - true -} - -#[inline] -fn validate_page_row_range( - page_guard: &PageSharedGuard<'_, RowPage>, - page_id: PageID, - row_id: RowID, -) -> bool { - if page_guard.page_id() != page_id { - return false; - } - let header = &page_guard.page().header; - row_id >= header.start_row_id && row_id < header.start_row_id + header.max_row_count as u64 -} - -#[inline] -fn row_len(schema: &Schema, user_cols: &[Val]) -> usize { - let var_len = schema - .var_cols - .iter() - .map(|idx| { - let val = &user_cols[*idx - 1]; - match val { - Val::Null => 0, - Val::VarByte(var) => { - if var.len() <= PAGE_VAR_LEN_INLINE { - 0 - } else { - var.len() - } - } - _ => unreachable!(), - } - }) - .sum::(); - schema.fix_len + var_len -} - -#[inline] -fn link_move_entry(new_entry: &mut OwnedUndoEntry, move_entry: UndoEntryPtr) { - // ref-count this pointer. - debug_assert!(new_entry.next.is_none()); - new_entry.next = Some(NextUndoEntry { - status: NextUndoStatus::SameAsPrev, - entry: move_entry, - }); -} - -enum LockRowForWrite<'a> { - // lock success, returns optional last commit timestamp. - Ok(Option>, NextTrxCTS), - // lock fail, there is another transaction modifying this row. - WriteConflict, - // row is invalid through index lookup. - // this can happen when index entry is not garbage collected, - // so some old key points to new version. - InvalidIndex, -} - -impl<'a> LockRowForWrite<'a> { - #[inline] - pub fn ok(self) -> Option<(Option>, NextTrxCTS)> { - match self { - LockRowForWrite::Ok(access, next_cts) => Some((access, next_cts)), - _ => None, - } - } -} - -enum InsertRowIntoPage<'a> { - Ok(RowID, PageSharedGuard<'a, RowPage>), - NoSpaceOrRowID( - Vec, - UndoKind, - Option<(RowID, PageSharedGuard<'a, RowPage>)>, - ), -} - -enum UpdateRowInplace<'a> { - Ok(RowID), - RowNotFound, - RowDeleted, - WriteConflict, - NoFreeSpace( - RowID, - Vec, - Vec, - PageSharedGuard<'a, RowPage>, - ), -} - -#[cfg(test)] -mod tests { - use crate::buffer::FixedBufferPool; - use crate::index::BlockIndex; - use crate::index::PartitionIntIndex; - use crate::row::ops::{SelectMvcc, UpdateCol}; - use crate::session::Session; - use crate::table::{Schema, Table}; - use crate::trx::sys::{TransactionSystem, TrxSysConfig}; - use crate::value::Layout; - use crate::value::Val; - use std::sync::Arc; - - #[test] - fn test_mvcc_insert_normal() { - smol::block_on(async { - const SIZE: i32 = 10000; - - let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); - let trx_sys = TrxSysConfig::default().build_static(); - - let table = create_table(buf_pool); - let table = table.mvcc(); - let mut session = Session::new(); - { - let mut trx = session.begin_trx(trx_sys); - for i in 0..SIZE { - let s = format!("{}", i); - let mut stmt = trx.start_stmt(); - let res = table - .insert_row(&mut stmt, vec![Val::from(i), Val::from(&s[..])]) - .await; - trx = stmt.commit(); - assert!(res.is_ok()); - } - session = trx_sys.commit(trx).await.unwrap(); - } - { - let mut trx = session.begin_trx(trx_sys); - for i in 16..SIZE { - let mut stmt = trx.start_stmt(); - let key = Val::from(i); - let res = table.select_row(&mut stmt, key, &[0, 1]).await; - match res { - SelectMvcc::Ok(vals) => { - assert!(vals.len() == 2); - assert!(&vals[0] == &Val::from(i)); - let s = format!("{}", i); - assert!(&vals[1] == &Val::from(&s[..])); - } - _ => panic!("select fail"), - } - trx = stmt.commit(); - } - let _ = trx_sys.commit(trx).await.unwrap(); - } - - unsafe { - TransactionSystem::drop_static(trx_sys); - FixedBufferPool::drop_static(buf_pool); - } - }); - } - - #[test] - fn test_mvcc_update() { - smol::block_on(async { - const SIZE: i32 = 1000; - - let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); - let trx_sys = TrxSysConfig::default().build_static(); - { - let table = create_table(buf_pool); - let table = table.mvcc(); - - let mut session = Session::new(); - // insert 1000 rows - let mut trx = session.begin_trx(trx_sys); - for i in 0..SIZE { - let s = format!("{}", i); - let mut stmt = trx.start_stmt(); - let res = table - .insert_row(&mut stmt, vec![Val::from(i), Val::from(&s[..])]) - .await; - trx = stmt.commit(); - assert!(res.is_ok()); - } - session = trx_sys.commit(trx).await.unwrap(); - - // update 1 row with short value - let mut trx = session.begin_trx(trx_sys); - let k1 = Val::from(1i32); - let s1 = "hello"; - let update1 = vec![UpdateCol { - idx: 1, - val: Val::from(s1), - }]; - let mut stmt = trx.start_stmt(); - let res = table.update_row(&mut stmt, k1, update1).await; - assert!(res.is_ok()); - trx = stmt.commit(); - session = trx_sys.commit(trx).await.unwrap(); - - // update 1 row with long value - let mut trx = session.begin_trx(trx_sys); - let k2 = Val::from(100i32); - let s2: String = (0..50_000).map(|_| '1').collect(); - let update2 = vec![UpdateCol { - idx: 1, - val: Val::from(&s2[..]), - }]; - let mut stmt = trx.start_stmt(); - let res = table.update_row(&mut stmt, k2, update2).await; - assert!(res.is_ok()); - trx = stmt.commit(); - let _ = trx_sys.commit(trx).await.unwrap(); - } - unsafe { - TransactionSystem::drop_static(trx_sys); - FixedBufferPool::drop_static(buf_pool); - } - }); - } - - #[test] - fn test_mvcc_delete() { - smol::block_on(async { - const SIZE: i32 = 1000; - - let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); - let trx_sys = TrxSysConfig::default().build_static(); - { - let table = create_table(buf_pool); - let table = table.mvcc(); - - let mut session = Session::new(); - // insert 1000 rows - let mut trx = session.begin_trx(trx_sys); - for i in 0..SIZE { - let s = format!("{}", i); - let mut stmt = trx.start_stmt(); - let res = table - .insert_row(&mut stmt, vec![Val::from(i), Val::from(&s[..])]) - .await; - trx = stmt.commit(); - assert!(res.is_ok()); - } - session = trx_sys.commit(trx).await.unwrap(); - - // delete 1 row - let mut trx = session.begin_trx(trx_sys); - let k1 = Val::from(1i32); - let mut stmt = trx.start_stmt(); - let res = table.delete_row(&mut stmt, k1).await; - assert!(res.is_ok()); - trx = stmt.commit(); - let _ = trx_sys.commit(trx).await.unwrap(); - } - unsafe { - TransactionSystem::drop_static(trx_sys); - FixedBufferPool::drop_static(buf_pool); - } - }); - } - - fn create_table(buf_pool: &'static FixedBufferPool) -> Table<'static> { - Table { - table_id: 1, - buf_pool, - schema: Schema::new(vec![Layout::Byte4, Layout::VarByte], 0), - blk_idx: BlockIndex::new(buf_pool).unwrap(), - sec_idx: Arc::new(PartitionIntIndex::empty()), - } - } -} diff --git a/doradb-storage/src/table/schema.rs b/doradb-storage/src/table/schema.rs new file mode 100644 index 0000000..bb067f2 --- /dev/null +++ b/doradb-storage/src/table/schema.rs @@ -0,0 +1,102 @@ +use crate::value::{Layout, Val}; + +pub struct Schema { + cols: Vec, + // fix length is the total inline length of all columns. + pub fix_len: usize, + // index of var-length columns. + pub var_cols: Vec, + // index column id. + key_idx: usize, +} + +impl Schema { + /// Create a new schema. + /// RowID is not included in input, but will be created + /// automatically. + #[inline] + pub fn new(user_cols: Vec, user_key_idx: usize) -> Self { + debug_assert!(!user_cols.is_empty()); + debug_assert!(user_key_idx < user_cols.len()); + debug_assert!(user_cols[user_key_idx] == Layout::Byte4); + let mut cols = Vec::with_capacity(user_cols.len() + 1); + cols.push(Layout::Byte8); + cols.extend(user_cols); + let mut fix_len = 0; + let mut var_cols = vec![]; + for (idx, layout) in cols.iter().enumerate() { + fix_len += layout.inline_len(); + if !layout.is_fixed() { + var_cols.push(idx); + } + } + Schema { + cols, + fix_len, + var_cols, + key_idx: user_key_idx + 1, + } + } + + /// Returns column count of this schema, including row id. + #[inline] + pub fn col_count(&self) -> usize { + self.cols.len() + } + + /// Returns layouts of all columns, including row id. + #[inline] + pub fn cols(&self) -> &[Layout] { + &self.cols + } + + /// Returns whether the type is matched at given column index, row id is excluded. + #[inline] + pub fn user_col_type_match(&self, user_col_idx: usize, val: &Val) -> bool { + self.col_type_match(user_col_idx + 1, val) + } + + /// Returns whether the type is matched at given column index. + #[inline] + pub fn col_type_match(&self, col_idx: usize, val: &Val) -> bool { + match (val, self.layout(col_idx)) { + (Val::Null, _) => true, + (Val::Byte1(_), Layout::Byte1) + | (Val::Byte2(_), Layout::Byte2) + | (Val::Byte4(_), Layout::Byte4) + | (Val::Byte8(_), Layout::Byte8) + | (Val::VarByte(_), Layout::VarByte) => true, + _ => false, + } + } + + #[inline] + pub fn idx_type_match(&self, val: &Val) -> bool { + self.col_type_match(self.key_idx, val) + } + + #[inline] + pub fn user_key_idx(&self) -> usize { + self.key_idx - 1 + } + + #[inline] + pub fn key_idx(&self) -> usize { + self.key_idx + } + + #[inline] + pub fn user_layout(&self, user_col_idx: usize) -> Layout { + self.cols[user_col_idx + 1] + } + + #[inline] + pub fn layout(&self, col_idx: usize) -> Layout { + self.cols[col_idx] + } + + #[inline] + pub fn key_layout(&self) -> Layout { + self.layout(self.key_idx) + } +} diff --git a/doradb-storage/src/table/tests.rs b/doradb-storage/src/table/tests.rs new file mode 100644 index 0000000..14723e7 --- /dev/null +++ b/doradb-storage/src/table/tests.rs @@ -0,0 +1,340 @@ +use crate::buffer::FixedBufferPool; +use crate::catalog::Catalog; +use crate::row::ops::{SelectMvcc, UpdateCol}; +use crate::session::Session; +use crate::table::Schema; +use crate::table::TableID; +use crate::trx::sys::{TransactionSystem, TrxSysConfig}; +use crate::value::Layout; +use crate::value::Val; + +#[test] +fn test_mvcc_insert_normal() { + smol::block_on(async { + const SIZE: i32 = 10000; + + let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); + let trx_sys = TrxSysConfig::default().build_static(); + + let (catalog, table_id) = create_table(buf_pool); + let table = catalog.get_table(table_id).unwrap(); + let mut session = Session::new(); + { + let mut trx = session.begin_trx(trx_sys); + for i in 0..SIZE { + let s = format!("{}", i); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row(buf_pool, &mut stmt, vec![Val::from(i), Val::from(&s[..])]) + .await; + trx = stmt.commit(); + assert!(res.is_ok()); + } + session = trx_sys.commit(trx).await.unwrap(); + } + { + let mut trx = session.begin_trx(trx_sys); + for i in 16..SIZE { + let mut stmt = trx.start_stmt(); + let key = Val::from(i); + let res = table.select_row(buf_pool, &mut stmt, key, &[0, 1]).await; + match res { + SelectMvcc::Ok(vals) => { + assert!(vals.len() == 2); + assert!(&vals[0] == &Val::from(i)); + let s = format!("{}", i); + assert!(&vals[1] == &Val::from(&s[..])); + } + _ => panic!("select fail"), + } + trx = stmt.commit(); + } + let _ = trx_sys.commit(trx).await.unwrap(); + } + + unsafe { + TransactionSystem::drop_static(trx_sys); + FixedBufferPool::drop_static(buf_pool); + } + }); +} + +#[test] +fn test_mvcc_update() { + smol::block_on(async { + const SIZE: i32 = 1000; + + let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); + let trx_sys = TrxSysConfig::default().build_static(); + { + let (catalog, table_id) = create_table(buf_pool); + let table = catalog.get_table(table_id).unwrap(); + + let mut session = Session::new(); + // insert 1000 rows + let mut trx = session.begin_trx(trx_sys); + for i in 0..SIZE { + let s = format!("{}", i); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row(buf_pool, &mut stmt, vec![Val::from(i), Val::from(&s[..])]) + .await; + trx = stmt.commit(); + assert!(res.is_ok()); + } + session = trx_sys.commit(trx).await.unwrap(); + + // update 1 row with short value + let mut trx = session.begin_trx(trx_sys); + let k1 = Val::from(1i32); + let s1 = "hello"; + let update1 = vec![UpdateCol { + idx: 1, + val: Val::from(s1), + }]; + let mut stmt = trx.start_stmt(); + let res = table.update_row(buf_pool, &mut stmt, k1, update1).await; + assert!(res.is_ok()); + trx = stmt.commit(); + session = trx_sys.commit(trx).await.unwrap(); + + // update 1 row with long value + let mut trx = session.begin_trx(trx_sys); + let k2 = Val::from(100i32); + let s2: String = (0..50_000).map(|_| '1').collect(); + let update2 = vec![UpdateCol { + idx: 1, + val: Val::from(&s2[..]), + }]; + let mut stmt = trx.start_stmt(); + let res = table.update_row(buf_pool, &mut stmt, k2, update2).await; + assert!(res.is_ok()); + trx = stmt.commit(); + let _ = trx_sys.commit(trx).await.unwrap(); + } + unsafe { + TransactionSystem::drop_static(trx_sys); + FixedBufferPool::drop_static(buf_pool); + } + }); +} + +#[test] +fn test_mvcc_delete_normal() { + smol::block_on(async { + const SIZE: i32 = 1000; + + let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); + let trx_sys = TrxSysConfig::default().build_static(); + { + let (catalog, table_id) = create_table(buf_pool); + let table = catalog.get_table(table_id).unwrap(); + + let mut session = Session::new(); + // insert 1000 rows + let mut trx = session.begin_trx(trx_sys); + for i in 0..SIZE { + let s = format!("{}", i); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row(buf_pool, &mut stmt, vec![Val::from(i), Val::from(&s[..])]) + .await; + trx = stmt.commit(); + assert!(res.is_ok()); + } + session = trx_sys.commit(trx).await.unwrap(); + + // delete 1 row + let mut trx = session.begin_trx(trx_sys); + let k1 = Val::from(1i32); + let mut stmt = trx.start_stmt(); + let res = table.delete_row(buf_pool, &mut stmt, k1).await; + assert!(res.is_ok()); + trx = stmt.commit(); + let _ = trx_sys.commit(trx).await.unwrap(); + } + unsafe { + TransactionSystem::drop_static(trx_sys); + FixedBufferPool::drop_static(buf_pool); + } + }); +} + +#[test] +fn test_mvcc_rollback_insert_normal() { + smol::block_on(async { + let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); + let trx_sys = TrxSysConfig::default().build_static(); + { + let (catalog, table_id) = create_table(buf_pool); + let table = catalog.get_table(table_id).unwrap(); + + let mut session = Session::new(); + // insert 1 row + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row( + buf_pool, + &mut stmt, + vec![Val::from(1i32), Val::from("hello")], + ) + .await; + assert!(res.is_ok()); + trx = stmt.rollback(buf_pool, &catalog); + session = trx_sys.commit(trx).await.unwrap(); + + // select 1 row + let mut trx = session.begin_trx(trx_sys); + let stmt = trx.start_stmt(); + let key = Val::from(1i32); + let res = table.select_row(buf_pool, &stmt, key, &[0, 1]).await; + assert!(res.not_found()); + trx = stmt.commit(); + _ = trx_sys.commit(trx).await.unwrap(); + } + unsafe { + TransactionSystem::drop_static(trx_sys); + FixedBufferPool::drop_static(buf_pool); + } + }); +} + +#[test] +fn test_mvcc_move_insert() { + smol::block_on(async { + let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); + let trx_sys = TrxSysConfig::default().build_static(); + { + let (catalog, table_id) = create_table(buf_pool); + let table = catalog.get_table(table_id).unwrap(); + + let mut session = Session::new(); + // insert 1 row + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row( + buf_pool, + &mut stmt, + vec![Val::from(1i32), Val::from("hello")], + ) + .await; + assert!(res.is_ok()); + trx = stmt.commit(); + session = trx_sys.commit(trx).await.unwrap(); + + // delete it + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let key = Val::from(1i32); + let res = table.delete_row(buf_pool, &mut stmt, key).await; + assert!(res.is_ok()); + trx = stmt.commit(); + session = trx_sys.commit(trx).await.unwrap(); + + // insert again, trigger move+insert + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row( + buf_pool, + &mut stmt, + vec![Val::from(1i32), Val::from("world")], + ) + .await; + assert!(res.is_ok()); + trx = stmt.commit(); + session = trx_sys.commit(trx).await.unwrap(); + + // select 1 row + let mut trx = session.begin_trx(trx_sys); + let stmt = trx.start_stmt(); + let key = Val::from(1i32); + let res = table.select_row(buf_pool, &stmt, key, &[0, 1]).await; + assert!(res.is_ok()); + let vals = res.unwrap(); + assert!(vals[1] == Val::from("world")); + trx = stmt.commit(); + _ = trx_sys.commit(trx).await.unwrap(); + } + unsafe { + TransactionSystem::drop_static(trx_sys); + FixedBufferPool::drop_static(buf_pool); + } + }); +} + +#[test] +fn test_mvcc_rollback_move_insert() { + smol::block_on(async { + let buf_pool = FixedBufferPool::with_capacity_static(1024 * 1024).unwrap(); + let trx_sys = TrxSysConfig::default().build_static(); + { + let (catalog, table_id) = create_table(buf_pool); + let table = catalog.get_table(table_id).unwrap(); + + let mut session = Session::new(); + // insert 1 row + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row( + buf_pool, + &mut stmt, + vec![Val::from(1i32), Val::from("hello")], + ) + .await; + assert!(res.is_ok()); + println!("row_id={}", res.unwrap()); + trx = stmt.commit(); + session = trx_sys.commit(trx).await.unwrap(); + + // delete it + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let key = Val::from(1i32); + let res = table.delete_row(buf_pool, &mut stmt, key).await; + assert!(res.is_ok()); + trx = stmt.commit(); + session = trx_sys.commit(trx).await.unwrap(); + + // insert again, trigger move+insert + let mut trx = session.begin_trx(trx_sys); + let mut stmt = trx.start_stmt(); + let res = table + .insert_row( + buf_pool, + &mut stmt, + vec![Val::from(1i32), Val::from("world")], + ) + .await; + assert!(res.is_ok()); + println!("row_id={}", res.unwrap()); + trx = stmt.rollback(buf_pool, &catalog); + session = trx_sys.commit(trx).await.unwrap(); + + // select 1 row + let mut trx = session.begin_trx(trx_sys); + let stmt = trx.start_stmt(); + let key = Val::from(1i32); + let res = table.select_row(buf_pool, &stmt, key, &[0, 1]).await; + assert!(res.not_found()); + trx = stmt.commit(); + _ = trx_sys.commit(trx).await.unwrap(); + } + unsafe { + TransactionSystem::drop_static(trx_sys); + FixedBufferPool::drop_static(buf_pool); + } + }); +} + +fn create_table(buf_pool: &'static FixedBufferPool) -> (Catalog, TableID) { + let catalog = Catalog::empty(); + let table_id = catalog.create_table( + buf_pool, + Schema::new(vec![Layout::Byte4, Layout::VarByte], 0), + ); + (catalog, table_id) +} diff --git a/doradb-storage/src/trx/mod.rs b/doradb-storage/src/trx/mod.rs index 6546147..221b76a 100644 --- a/doradb-storage/src/trx/mod.rs +++ b/doradb-storage/src/trx/mod.rs @@ -23,7 +23,7 @@ pub mod undo; use crate::session::{InternalSession, IntoSession, Session}; use crate::stmt::Statement; use crate::trx::redo::{RedoBin, RedoEntry, RedoKind, RedoLog}; -use crate::trx::undo::OwnedUndoEntry; +use crate::trx::undo::{IndexUndo, OwnedRowUndo}; use crate::value::Val; use flume::{Receiver, Sender}; use parking_lot::Mutex; @@ -38,7 +38,7 @@ pub const MAX_SNAPSHOT_TS: TrxID = 1 << 63; pub const SNAPSHOT_TS_MASK: TrxID = MAX_SNAPSHOT_TS - 1; pub const MAX_COMMIT_TS: TrxID = 1 << 63; // data without version chain will treated as its commit timestamp equals to 0 -pub const GLOBAL_VISIBLE_COMMIT_TS: TrxID = 1; +pub const GLOBAL_VISIBLE_COMMIT_TS: TrxID = 0; // As active transaction id is always greater than STS, that means // visibility check can be simplified to "STS is larger". pub const MIN_ACTIVE_TRX_ID: TrxID = (1 << 63) + 1; @@ -57,7 +57,7 @@ pub struct SharedTrxStatus { } impl SharedTrxStatus { - /// Create a new shared transaction status for uncommitted transaction. + /// Create a new shared transaction status for given transaction id. #[inline] pub fn new(trx_id: TrxID) -> Self { SharedTrxStatus { @@ -67,6 +67,17 @@ impl SharedTrxStatus { } } + /// Create a new transaction status that is globally visible for all + /// transactions. This is used for transaction rollback. + #[inline] + pub fn global_visible() -> Self { + SharedTrxStatus { + ts: AtomicU64::new(GLOBAL_VISIBLE_COMMIT_TS), + preparing: AtomicBool::new(false), + prepare_notify: Mutex::new(None), + } + } + /// Returns the timestamp of current transaction. #[inline] pub fn ts(&self) -> TrxID { @@ -117,8 +128,10 @@ pub struct ActiveTrx { pub sts: TrxID, // which log partition it belongs to. pub log_partition_idx: usize, - // transaction-level undo logs. - pub(crate) undo: Vec, + // transaction-level undo logs of row data. + pub(crate) row_undo: Vec, + // transaction-level index undo operations. + pub(crate) index_undo: Vec, // transaction-level redo logs. pub(crate) redo: Vec, // session of current transaction. @@ -133,7 +146,8 @@ impl ActiveTrx { status: Arc::new(SharedTrxStatus::new(trx_id)), sts, log_partition_idx: 0, - undo: vec![], + row_undo: vec![], + index_undo: vec![], redo: vec![], session, } @@ -168,7 +182,7 @@ impl ActiveTrx { /// Returns whether the transaction is readonly. #[inline] pub fn readonly(&self) -> bool { - self.redo.is_empty() && self.undo.is_empty() + self.redo.is_empty() && self.row_undo.is_empty() } /// Prepare current transaction for committing. @@ -182,7 +196,7 @@ impl ActiveTrx { status: Arc::clone(&self.status), sts: self.sts, redo_bin: None, - undo: vec![], + row_undo: vec![], session: self.session.take(), }; } @@ -206,12 +220,15 @@ impl ActiveTrx { .expect("redo serialization should not fail"); Some(redo_bin) }; - let undo = mem::take(&mut self.undo); + let row_undo = mem::take(&mut self.row_undo); + // Because we do not have rollback logic when transaction enters commit phase, + // so remove index undo here is safe. + self.index_undo.clear(); PreparedTrx { status: self.status.clone(), sts: self.sts, redo_bin, - undo, + row_undo, session: self.session.take(), } } @@ -246,7 +263,14 @@ impl ActiveTrx { impl Drop for ActiveTrx { #[inline] fn drop(&mut self) { - assert!(self.undo.is_empty(), "trx undo should be cleared"); + assert!( + self.row_undo.is_empty(), + "trx row undo logs should be cleared" + ); + assert!( + self.index_undo.is_empty(), + "trx index undo logs should be cleared" + ); assert!(self.redo.is_empty(), "trx redo should be cleared"); } } @@ -259,7 +283,7 @@ pub struct PreparedTrx { status: Arc, sts: TrxID, redo_bin: Option, - undo: Vec, + row_undo: Vec, session: Option>, } @@ -272,13 +296,13 @@ impl PreparedTrx { } else { None }; - let undo = mem::take(&mut self.undo); + let row_undo = mem::take(&mut self.row_undo); PrecommitTrx { status: Arc::clone(&self.status), sts: self.sts, cts, redo_bin, - undo, + row_undo, session: self.session.take(), } } @@ -286,7 +310,7 @@ impl PreparedTrx { /// Returns whether the prepared transaction is readonly. #[inline] pub fn readonly(&self) -> bool { - self.redo_bin.is_none() && self.undo.is_empty() + self.redo_bin.is_none() && self.row_undo.is_empty() } } @@ -306,7 +330,7 @@ impl Drop for PreparedTrx { #[inline] fn drop(&mut self) { assert!(self.redo_bin.is_none(), "redo should be cleared"); - assert!(self.undo.is_empty(), "undo should be cleared"); + assert!(self.row_undo.is_empty(), "undo should be cleared"); } } @@ -316,7 +340,7 @@ pub struct PrecommitTrx { pub sts: TrxID, pub cts: TrxID, pub redo_bin: Option, - pub undo: Vec, + pub row_undo: Vec, session: Option>, } @@ -338,11 +362,11 @@ impl PrecommitTrx { let mut g = self.status.prepare_notify.lock(); drop(g.take()); } - let undo = mem::take(&mut self.undo); + let row_undo = mem::take(&mut self.row_undo); CommittedTrx { sts: self.sts, cts: self.cts, - undo, + row_undo, session: self.session.take(), } } @@ -352,7 +376,7 @@ impl Drop for PrecommitTrx { #[inline] fn drop(&mut self) { assert!(self.redo_bin.is_none(), "redo should be cleared"); - assert!(self.undo.is_empty(), "undo should be cleared"); + assert!(self.row_undo.is_empty(), "undo should be cleared"); } } @@ -371,7 +395,7 @@ impl IntoSession for PrecommitTrx { pub struct CommittedTrx { pub sts: TrxID, pub cts: TrxID, - pub undo: Vec, + pub row_undo: Vec, session: Option>, } diff --git a/doradb-storage/src/trx/row.rs b/doradb-storage/src/trx/row.rs index 9cdd959..c298fa8 100644 --- a/doradb-storage/src/trx/row.rs +++ b/doradb-storage/src/trx/row.rs @@ -1,20 +1,20 @@ use crate::buffer::guard::PageSharedGuard; -use crate::row::ops::{SelectMvcc, UpdateCol, UpdateRow}; +use crate::row::ops::{ReadRow, UndoCol, UpdateCol, UpdateRow}; use crate::row::{Row, RowMut, RowPage, RowRead}; use crate::table::Schema; -use crate::trx::undo::UndoKind; use crate::trx::undo::{ - NextTrxCTS, NextUndoEntry, NextUndoStatus, OwnedUndoEntry, UndoEntryPtr, UndoHead, + NextRowUndo, NextRowUndoStatus, NextTrxCTS, OwnedRowUndo, RowUndoHead, RowUndoKind, RowUndoRef, }; -use crate::trx::{trx_is_committed, ActiveTrx}; +use crate::trx::{trx_is_committed, ActiveTrx, SharedTrxStatus}; use crate::value::Val; use parking_lot::{RwLockReadGuard, RwLockWriteGuard}; use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Arc; pub struct RowReadAccess<'a> { page: &'a RowPage, row_idx: usize, - undo: RwLockReadGuard<'a, Option>, + undo: RwLockReadGuard<'a, Option>, } impl RowReadAccess<'_> { @@ -24,10 +24,25 @@ impl RowReadAccess<'_> { } #[inline] - pub fn undo(&self) -> &Option { + pub fn undo(&self) -> &Option { &self.undo } + #[inline] + pub fn latest_status(&self) -> RowLatestStatus { + if let Some(head) = &*self.undo { + let ts = head.status.ts(); + if !trx_is_committed(ts) { + return RowLatestStatus::Uncommitted; + } + } + // the row is committed, check if it's deleted. + if self.row().is_deleted() { + return RowLatestStatus::Deleted; + } + RowLatestStatus::Committed + } + #[inline] pub fn read_row_mvcc( &self, @@ -35,20 +50,20 @@ impl RowReadAccess<'_> { schema: &Schema, user_read_set: &[usize], key: &Val, - ) -> SelectMvcc { + ) -> ReadRow { // let mut vals = BTreeMap::new(); match &*self.undo { None => { let row = self.row(); // latest version in row page. if row.is_deleted() { - return SelectMvcc::RowNotFound; + return ReadRow::NotFound; } if row.is_key_different(schema, key) { - return SelectMvcc::InvalidIndex; + return ReadRow::InvalidIndex; } let vals = row.clone_vals_for_read_set(schema, user_read_set); - SelectMvcc::Ok(vals) + ReadRow::Ok(vals) } Some(undo_head) => { // At this point, we already wait for preparation of commit is done. @@ -59,13 +74,13 @@ impl RowReadAccess<'_> { let row = self.row(); // we can see this version if row.is_deleted() { - return SelectMvcc::RowNotFound; + return ReadRow::NotFound; } if row.is_key_different(schema, key) { - return SelectMvcc::InvalidIndex; + return ReadRow::InvalidIndex; } let vals = row.clone_vals_for_read_set(schema, user_read_set); - return SelectMvcc::Ok(vals); + return ReadRow::Ok(vals); } // otherwise, go to next version } else { let trx_id = trx.trx_id(); @@ -73,20 +88,20 @@ impl RowReadAccess<'_> { let row = self.row(); // self update, see the latest version if row.is_deleted() { - return SelectMvcc::RowNotFound; + return ReadRow::NotFound; } if row.is_key_different(schema, key) { - return SelectMvcc::InvalidIndex; + return ReadRow::InvalidIndex; } let vals = row.clone_vals_for_read_set(schema, user_read_set); - return SelectMvcc::Ok(vals); + return ReadRow::Ok(vals); } // otherwise, go to next version } // page data is invisible, we have to backtrace version chain match undo_head.entry.as_ref() { None => { // no next version, so nothing to be return. - return SelectMvcc::RowNotFound; + return ReadRow::NotFound; } Some(entry) => { let mut entry = entry.clone(); @@ -103,19 +118,19 @@ impl RowReadAccess<'_> { }; loop { match &entry.as_ref().kind { - UndoKind::Insert => { + RowUndoKind::Insert => { debug_assert!(!ver.deleted); ver.deleted = true; // insert is not seen, mark as deleted } - UndoKind::Update(upd_cols) => { + RowUndoKind::Update(undo_cols) => { debug_assert!(!ver.deleted); - ver.undo_update(upd_cols); + ver.undo_update(undo_cols); } - UndoKind::Delete => { + RowUndoKind::Delete => { debug_assert!(ver.deleted); ver.deleted = true; // delete is not seen, mark as not deleted. } - UndoKind::Move(del) => { + RowUndoKind::Move(del) => { // we cannot determine the delete flag here, // because if move+insert, flag is true. // if move+update, flag is false. @@ -132,22 +147,22 @@ impl RowReadAccess<'_> { // If undo kind is INSERT, and next version does not exist. // That means we should return no row. if ver.deleted { - return SelectMvcc::RowNotFound; + return ReadRow::NotFound; } // check if key match return ver.get_visible_vals(schema, self.row()); } Some(next) => { match next.status { - NextUndoStatus::SameAsPrev => { + NextRowUndoStatus::SameAsPrev => { let next_entry = next.entry.clone(); entry = next_entry; // still invisible. } - NextUndoStatus::CTS(cts) => { + NextRowUndoStatus::CTS(cts) => { if trx.sts > cts { // current version is visible if ver.deleted { - return SelectMvcc::RowNotFound; + return ReadRow::NotFound; } return ver.get_visible_vals(schema, self.row()); } @@ -176,7 +191,7 @@ struct RowVersion { impl RowVersion { #[inline] - fn undo_update(&mut self, upd_cols: &[UpdateCol]) { + fn undo_update(&mut self, upd_cols: &[UndoCol]) { // undo update for uc in upd_cols { if self.read_set.contains(&uc.idx) { @@ -192,7 +207,7 @@ impl RowVersion { } #[inline] - fn get_visible_vals(mut self, schema: &Schema, row: Row<'_>) -> SelectMvcc { + fn get_visible_vals(mut self, schema: &Schema, row: Row<'_>) -> ReadRow { if self.read_set_contains_key { let key_different = self .undo_vals @@ -200,11 +215,11 @@ impl RowVersion { .map(|v| v == &self.undo_key) .unwrap_or_else(|| row.is_key_different(schema, &self.undo_key)); if key_different { - return SelectMvcc::InvalidIndex; + return ReadRow::InvalidIndex; } } else { if row.is_key_different(schema, &self.undo_key) { - return SelectMvcc::InvalidIndex; + return ReadRow::InvalidIndex; } } let mut vals = Vec::with_capacity(self.read_set.len()); @@ -215,14 +230,14 @@ impl RowVersion { vals.push(row.clone_user_val(schema, *user_col_idx)) } } - SelectMvcc::Ok(vals) + ReadRow::Ok(vals) } } pub struct RowWriteAccess<'a> { page: &'a RowPage, row_idx: usize, - undo: RwLockWriteGuard<'a, Option>, + undo: RwLockWriteGuard<'a, Option>, } impl<'a> RowWriteAccess<'a> { @@ -242,7 +257,7 @@ impl<'a> RowWriteAccess<'a> { } #[inline] - pub fn row_and_undo_mut(&mut self) -> (Row, &mut Option) { + pub fn row_and_undo_mut(&mut self) -> (Row, &mut Option) { let row = self.page.row(self.row_idx); (row, &mut *self.undo) } @@ -252,7 +267,7 @@ impl<'a> RowWriteAccess<'a> { let var_len = self.row().var_len_for_update(user_cols); match self.page.request_free_space(var_len) { None => { - let old_row = self.row().clone_vals(schema, false); + let old_row = self.row().clone_vals_with_var_offsets(schema, false); UpdateRow::NoFreeSpace(old_row) } Some(offset) => { @@ -263,12 +278,12 @@ impl<'a> RowWriteAccess<'a> { } #[inline] - pub fn undo_head(&self) -> &Option { + pub fn undo_head(&self) -> &Option { &*self.undo } #[inline] - pub fn first_undo_entry(&self) -> Option { + pub fn first_undo_entry(&self) -> Option { self.undo.as_ref().and_then(|head| head.entry.clone()) } @@ -278,10 +293,10 @@ impl<'a> RowWriteAccess<'a> { pub fn build_undo_chain( &mut self, trx: &ActiveTrx, - new_entry: &mut OwnedUndoEntry, + new_entry: &mut OwnedRowUndo, old_cts: NextTrxCTS, ) { - let head = self.undo.get_or_insert_with(|| UndoHead { + let head = self.undo.get_or_insert_with(|| RowUndoHead { status: trx.status(), entry: None, }); @@ -295,12 +310,89 @@ impl<'a> RowWriteAccess<'a> { // 3. Link new and old. if let Some(entry) = old_entry { debug_assert!(new_entry.next.is_none()); - new_entry.next = Some(NextUndoEntry { + new_entry.next = Some(NextRowUndo { status: old_cts.undo_status(), entry, }); } } + + #[inline] + pub fn rollback_first_undo(&mut self, mut owned_entry: OwnedRowUndo) { + let head = self.undo.as_mut().expect("undo head"); + match head.entry.take() { + None => unreachable!(), + Some(entry) => { + debug_assert!(std::ptr::addr_eq(entry.as_ref(), &*owned_entry)); + // rollback row data + match &owned_entry.kind { + RowUndoKind::Insert => { + self.page.set_deleted(self.row_idx, true); + } + RowUndoKind::Delete => { + self.page.set_deleted(self.row_idx, false); + } + RowUndoKind::Update(undo_cols) => { + for uc in undo_cols { + match &uc.val { + Val::Null => { + self.page.set_null(self.row_idx, uc.idx, true); + } + Val::VarByte(var) => { + let (pv, _) = self.page.add_var( + var.as_bytes(), + uc.var_offset.unwrap_or(0) as usize, + ); + self.page.update_var(self.row_idx, uc.idx, pv); + } + Val::Byte1(v) => { + self.page.update_val(self.row_idx, uc.idx, v); + } + Val::Byte2(v) => { + self.page.update_val(self.row_idx, uc.idx, v); + } + Val::Byte4(v) => { + self.page.update_val(self.row_idx, uc.idx, v); + } + Val::Byte8(v) => { + self.page.update_val(self.row_idx, uc.idx, v); + } + } + } + } + RowUndoKind::Move(deleted) => { + self.page.set_deleted(self.row_idx, *deleted); + } + } + // rollback undo + match owned_entry.next.take() { + None => { + // The entry to rollback is the only undo entry of this row. + // So data in row page is globally visible now, we can + // update undo status to CTS=GLOBAL_VISIBLE_CTS. + head.status = Arc::new(SharedTrxStatus::global_visible()); + } + Some(next) => { + if let RowUndoKind::Move(_) = &next.entry.as_ref().kind { + // MOVE is undo entry of another row, so treat it as None. + head.status = Arc::new(SharedTrxStatus::global_visible()); + } else { + match next.status { + NextRowUndoStatus::SameAsPrev => { + // This entry belongs to same transaction, + // So keep transaction status as it is. + } + NextRowUndoStatus::CTS(cts) => { + head.status = Arc::new(SharedTrxStatus::new(cts)); + } + } + head.entry = Some(next.entry); + } + } + } + } + } + } } impl<'a> PageSharedGuard<'a, RowPage> { @@ -328,3 +420,10 @@ impl<'a> PageSharedGuard<'a, RowPage> { } } } + +pub enum RowLatestStatus { + NotFound, + Deleted, + Committed, + Uncommitted, +} diff --git a/doradb-storage/src/trx/sys.rs b/doradb-storage/src/trx/sys.rs index 8483926..7e4525b 100644 --- a/doradb-storage/src/trx/sys.rs +++ b/doradb-storage/src/trx/sys.rs @@ -130,7 +130,7 @@ impl TransactionSystem { let partition = &*self.log_partitions[trx.log_partition_idx]; let prepared_trx = trx.prepare(); if prepared_trx.redo_bin.is_none() { - if prepared_trx.undo.is_empty() { + if prepared_trx.row_undo.is_empty() { // This is a read-only transaction, drop it is safe. debug_assert!(prepared_trx.readonly()); return Ok(prepared_trx.into_session()); diff --git a/doradb-storage/src/trx/undo/index.rs b/doradb-storage/src/trx/undo/index.rs new file mode 100644 index 0000000..7007ed8 --- /dev/null +++ b/doradb-storage/src/trx/undo/index.rs @@ -0,0 +1,19 @@ +use crate::row::RowID; +use crate::table::TableID; +use crate::value::Val; + +/// IndexUndo represent the undo operation of a index. +pub struct IndexUndo { + pub table_id: TableID, + pub kind: IndexUndoKind, +} + +pub enum IndexUndoKind { + // Insert key -> row_id into index. + Insert(Val, RowID), + // Update key -> old_row_id to key -> new_row_id. + Update(Val, RowID, RowID), + // Delete is not included in index undo. + // Because the deletion of index will only be executed by GC thread, + // not transaction thread. So DELETE undo is unneccessary. +} diff --git a/doradb-storage/src/trx/undo/mod.rs b/doradb-storage/src/trx/undo/mod.rs new file mode 100644 index 0000000..026d621 --- /dev/null +++ b/doradb-storage/src/trx/undo/mod.rs @@ -0,0 +1,5 @@ +mod index; +mod row; + +pub use index::*; +pub use row::*; diff --git a/doradb-storage/src/trx/undo.rs b/doradb-storage/src/trx/undo/row.rs similarity index 83% rename from doradb-storage/src/trx/undo.rs rename to doradb-storage/src/trx/undo/row.rs index caccaef..ca3b85a 100644 --- a/doradb-storage/src/trx/undo.rs +++ b/doradb-storage/src/trx/undo/row.rs @@ -1,5 +1,5 @@ use crate::buffer::page::PageID; -use crate::row::ops::UpdateCol; +use crate::row::ops::UndoCol; use crate::row::RowID; use crate::table::TableID; use crate::trx::{SharedTrxStatus, TrxID, GLOBAL_VISIBLE_COMMIT_TS}; @@ -9,7 +9,7 @@ use std::ptr::NonNull; use std::sync::Arc; pub struct UndoMap { - entries: Box<[RwLock>]>, + entries: Box<[RwLock>]>, // occupied: usize, } @@ -32,20 +32,20 @@ impl UndoMap { } #[inline] - pub fn read(&self, row_idx: usize) -> RwLockReadGuard<'_, Option> { + pub fn read(&self, row_idx: usize) -> RwLockReadGuard<'_, Option> { self.entries[row_idx].read() } #[inline] - pub fn write(&self, row_idx: usize) -> RwLockWriteGuard<'_, Option> { + pub fn write(&self, row_idx: usize) -> RwLockWriteGuard<'_, Option> { self.entries[row_idx].write() } } -/// UndoKind represents the kind of original operation. +/// RowUndoKind represents the kind of original operation. /// So the actual undo action should be opposite of the kind. /// There is one special UndoKind *Move*, due to the design of DoraDB. -pub enum UndoKind { +pub enum RowUndoKind { /// Insert a new row. /// Before-image is empty for insert, so we do not need to copy values. /// @@ -161,75 +161,78 @@ pub enum UndoKind { /// Note: Update -> Delete is impossible. Even if we re-insert /// a deleted row, we will first *move* the deleted row to /// other place and then perform update. - Update(Vec), + Update(Vec), } -/// Owned undo entry is stored in transaction undo buffer. +/// OwnedRowUndo is the old version of a row. +/// It is stored in transaction undo buffer. /// Page level undo map will also hold pointers to the entries. /// We do not share ownership between them. /// Instead, we require the undo buffer owns all entries. /// Garbage collector will make sure the deletion of entries is /// safe, because no transaction will access entries that is /// supposed to be deleted. -pub struct OwnedUndoEntry(Box); +pub struct OwnedRowUndo(Box); -impl Deref for OwnedUndoEntry { - type Target = UndoEntry; +impl Deref for OwnedRowUndo { + type Target = RowUndo; #[inline] fn deref(&self) -> &Self::Target { &*self.0 } } -impl DerefMut for OwnedUndoEntry { +impl DerefMut for OwnedRowUndo { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { &mut *self.0 } } -impl OwnedUndoEntry { +impl OwnedRowUndo { #[inline] - pub fn new(table_id: TableID, page_id: PageID, row_id: RowID, kind: UndoKind) -> Self { - let entry = UndoEntry { + pub fn new(table_id: TableID, page_id: PageID, row_id: RowID, kind: RowUndoKind) -> Self { + let entry = RowUndo { table_id, page_id, row_id, kind, next: None, }; - OwnedUndoEntry(Box::new(entry)) + OwnedRowUndo(Box::new(entry)) } #[inline] - pub fn leak(&self) -> UndoEntryPtr { + pub fn leak(&self) -> RowUndoRef { unsafe { - UndoEntryPtr(NonNull::new_unchecked( - self.0.as_ref() as *const _ as *mut UndoEntry + RowUndoRef(NonNull::new_unchecked( + self.0.as_ref() as *const _ as *mut RowUndo )) } } } -/// UndoEntryPtr is an atomic pointer to UndoEntry. -#[repr(transparent)] -#[derive(Clone)] -pub struct UndoEntryPtr(NonNull); - +/// RowUndoRef is a reference to RowUndoEntry. +/// It does not share ownership with RowUndoEntry. +/// /// The safety is guaranteed by MVCC design and GC logic. /// The modification of undo log is always guarded by row lock. /// And the non-locking consistent read will not access /// log entries that are deleted(GCed). -unsafe impl Send for UndoEntryPtr {} +#[repr(transparent)] +#[derive(Clone)] +pub struct RowUndoRef(NonNull); + +unsafe impl Send for RowUndoRef {} -impl UndoEntryPtr { +impl RowUndoRef { #[inline] - pub(crate) fn as_ref(&self) -> &UndoEntry { + pub(crate) fn as_ref(&self) -> &RowUndo { unsafe { self.0.as_ref() } } } -pub struct UndoEntry { +pub struct RowUndo { /// This field stores uncommitted TrxID, committed timestamp. /// Or preparing status, which may block read. /// It uses shared pointer and atomic variable to support @@ -238,16 +241,16 @@ pub struct UndoEntry { pub table_id: TableID, pub page_id: PageID, pub row_id: RowID, - pub kind: UndoKind, - pub next: Option, + pub kind: RowUndoKind, + pub next: Option, } -pub struct NextUndoEntry { - pub status: NextUndoStatus, - pub entry: UndoEntryPtr, +pub struct NextRowUndo { + pub status: NextRowUndoStatus, + pub entry: RowUndoRef, } -pub enum NextUndoStatus { +pub enum NextRowUndoStatus { // If transaction modify a row multiple times. // It will link multiple undo entries with the // same timestamp. @@ -259,9 +262,9 @@ pub enum NextUndoStatus { CTS(TrxID), } -pub struct UndoHead { +pub struct RowUndoHead { pub status: Arc, - pub entry: Option, + pub entry: Option, } #[derive(Default, Clone, Copy)] @@ -274,11 +277,11 @@ pub enum NextTrxCTS { impl NextTrxCTS { #[inline] - pub fn undo_status(self) -> NextUndoStatus { + pub fn undo_status(self) -> NextRowUndoStatus { match self { - NextTrxCTS::None => NextUndoStatus::CTS(GLOBAL_VISIBLE_COMMIT_TS), - NextTrxCTS::Value(cts) => NextUndoStatus::CTS(cts), - NextTrxCTS::Myself => NextUndoStatus::SameAsPrev, + NextTrxCTS::None => NextRowUndoStatus::CTS(GLOBAL_VISIBLE_COMMIT_TS), + NextTrxCTS::Value(cts) => NextRowUndoStatus::CTS(cts), + NextTrxCTS::Myself => NextRowUndoStatus::SameAsPrev, } } } @@ -291,7 +294,7 @@ mod tests { fn test_undo_head_size() { println!( "size of RwLock> is {}", - std::mem::size_of::>>() + std::mem::size_of::>>() ); } }