Skip to content

Commit

Permalink
simplify undo log ownership and remove locks
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangzhe committed Jan 17, 2025
1 parent 130b408 commit e2a3135
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 360 deletions.
88 changes: 51 additions & 37 deletions doradb-storage/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod ops;
use crate::buffer::frame::{BufferFrameAware, FrameHeader};
use crate::buffer::page::PAGE_SIZE;
use crate::buffer::BufferPool;
use crate::row::ops::{DeleteResult, InsertResult, SelectResult, UpdateCol, UpdateResult};
use crate::row::ops::{Delete, InsertRow, Select, Update, UpdateCol};
use crate::table::Schema;
use crate::trx::undo::UndoMap;
use crate::value::*;
Expand Down Expand Up @@ -126,6 +126,27 @@ impl RowPage {
}
}

/// Returns index of the row within page.
#[inline]
pub fn row_idx(&self, row_id: RowID) -> usize {
debug_assert!(self.row_id_in_valid_range(row_id));
(row_id - self.header.start_row_id) as usize
}

/// Returns row id for given index.
#[inline]
pub fn row_id(&self, row_idx: usize) -> RowID {
debug_assert!(row_idx < self.header.row_count());
self.header.start_row_id + row_idx as u64
}

/// Returns whether row id is in valid range.
#[inline]
pub fn row_id_in_valid_range(&self, row_id: RowID) -> bool {
row_id >= self.header.start_row_id
&& row_id < self.header.start_row_id + self.header.row_count() as u64
}

/// Returns row id list in this page.
#[inline]
pub fn row_ids(&self) -> &[RowID] {
Expand All @@ -134,12 +155,10 @@ impl RowPage {

#[inline]
pub fn row_by_id(&self, row_id: RowID) -> Option<Row> {
if row_id < self.header.start_row_id as RowID
|| row_id >= self.header.start_row_id + self.header.row_count() as u64
{
if !self.row_id_in_valid_range(row_id) {
return None;
}
Some(self.row((row_id - self.header.start_row_id) as usize))
Some(self.row(self.row_idx(row_id)))
}

/// Returns free space of current page.
Expand Down Expand Up @@ -189,7 +208,7 @@ impl RowPage {

/// Insert a new row in page.
#[inline]
pub fn insert(&self, schema: &Schema, user_cols: &[Val]) -> InsertResult {
pub fn insert(&self, schema: &Schema, user_cols: &[Val]) -> InsertRow {
debug_assert!(schema.col_count() == self.header.col_count as usize);
// insert row does not include RowID, as RowID is auto-generated.
debug_assert!(user_cols.len() + 1 == self.header.col_count as usize);
Expand All @@ -199,7 +218,7 @@ impl RowPage {
if let Some((row_idx, var_offset)) = self.request_row_idx_and_free_space(var_len) {
(row_idx, var_offset)
} else {
return InsertResult::NoFreeSpaceOrRowID;
return InsertRow::NoFreeSpaceOrRowID;
};
let mut new_row = self.new_row(row_idx as usize, var_offset);
for v in user_cols {
Expand All @@ -212,32 +231,27 @@ impl RowPage {
Val::VarByte(var) => new_row.add_var(var.as_bytes()),
}
}
InsertResult::Ok(new_row.finish())
InsertRow::Ok(new_row.finish())
}

/// delete row in page.
/// This method will only mark the row as deleted.
#[inline]
pub fn delete(&self, row_id: RowID) -> DeleteResult {
if row_id < self.header.start_row_id || row_id >= self.header.max_row_count as u64 {
return DeleteResult::RowNotFound;
pub fn delete(&self, row_id: RowID) -> Delete {
if !self.row_id_in_valid_range(row_id) {
return Delete::RowNotFound;
}
let row_idx = (row_id - self.header.start_row_id) as usize;
let row_idx = self.row_idx(row_id);
if self.is_deleted(row_idx) {
return DeleteResult::RowAlreadyDeleted;
return Delete::RowAlreadyDeleted;
}
self.set_deleted(row_idx, true);
DeleteResult::Ok
Delete::Ok
}

/// Update in-place in current page.
#[inline]
pub fn update(
&mut self,
schema: &Schema,
row_id: RowID,
user_cols: &[UpdateCol],
) -> UpdateResult {
pub fn update(&mut self, schema: &Schema, row_id: RowID, user_cols: &[UpdateCol]) -> Update {
// column indexes must be in range
debug_assert!(
{
Expand All @@ -258,41 +272,41 @@ impl RowPage {
},
"update columns should be in order"
);
if row_id < self.header.start_row_id || row_id >= self.header.max_row_count as u64 {
return UpdateResult::RowNotFound;
if !self.row_id_in_valid_range(row_id) {
return Update::RowNotFound;
}
let row_idx = (row_id - self.header.start_row_id) as usize;
let row_idx = self.row_idx(row_id);
if self.row(row_idx).is_deleted() {
return UpdateResult::RowDeleted;
return Update::RowDeleted;
}
let var_len = self.var_len_for_update(row_idx, user_cols);
let var_offset = if let Some(var_offset) = self.request_free_space(var_len) {
var_offset
} else {
let row = self.row(row_idx);
let vals = row.clone_vals(schema, false);
return UpdateResult::NoFreeSpace(vals);
return Update::NoFreeSpace(vals);
};
let mut row = self.row_mut(row_idx, var_offset, var_offset + var_len);
for uc in user_cols {
row.update_user_col(uc.idx, &uc.val);
}
row.finish();
UpdateResult::Ok(row_id)
Update::Ok(row_id)
}

/// Select single row by row id.
#[inline]
pub fn select(&self, row_id: RowID) -> SelectResult {
if row_id < self.header.start_row_id || row_id >= self.header.max_row_count as u64 {
return SelectResult::RowNotFound;
pub fn select(&self, row_id: RowID) -> Select {
if !self.row_id_in_valid_range(row_id) {
return Select::RowNotFound;
}
let row_idx = (row_id - self.header.start_row_id) as usize;
let row_idx = self.row_idx(row_id);
let row = self.row(row_idx);
if row.is_deleted() {
return SelectResult::RowDeleted(row);
return Select::RowDeleted(row);
}
SelectResult::Ok(row)
Select::Ok(row)
}

#[inline]
Expand Down Expand Up @@ -326,7 +340,7 @@ impl RowPage {
var_offset,
};
// always add RowID as first column
row.add_val(self.header.start_row_id + row_idx as u64);
row.add_val(self.row_id(row_idx));
row
}

Expand Down Expand Up @@ -738,7 +752,7 @@ impl<'a> NewRow<'a> {
pub fn finish(self) -> RowID {
debug_assert!(self.col_idx == self.page.header.col_count as usize);
self.page.set_deleted(self.row_idx, false);
self.page.header.start_row_id + self.row_idx as u64
self.page.row_id(self.row_idx)
}
}

Expand Down Expand Up @@ -1231,7 +1245,7 @@ mod tests {
Val::from(&short[..]),
];
let res = page.insert(&schema, &insert);
assert!(matches!(res, InsertResult::Ok(100)));
assert!(matches!(res, InsertRow::Ok(100)));
assert!(!page.row(0).is_deleted());

let row_id = 100;
Expand Down Expand Up @@ -1261,10 +1275,10 @@ mod tests {
assert!(res.is_ok());

let res = page.delete(row_id);
assert!(matches!(res, DeleteResult::Ok));
assert!(matches!(res, Delete::Ok));

let select = page.select(row_id);
assert!(matches!(select, SelectResult::RowDeleted(_)));
assert!(matches!(select, Select::RowDeleted(_)));
}

fn create_row_page() -> RowPage {
Expand Down
51 changes: 27 additions & 24 deletions doradb-storage/src/row/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,71 @@ use crate::row::{Row, RowID, RowMut};
use crate::value::Val;
use serde::{Deserialize, Serialize};

pub enum SelectResult<'a> {
pub enum Select<'a> {
Ok(Row<'a>),
RowDeleted(Row<'a>),
RowNotFound,
}

impl SelectResult<'_> {
impl Select<'_> {
/// Returns if select succeeds.
#[inline]
pub fn is_ok(&self) -> bool {
matches!(self, SelectResult::Ok(_))
matches!(self, Select::Ok(_))
}
}

pub enum SelectMvccResult {
pub enum SelectMvcc {
Ok(Vec<Val>),
RowNotFound,
InvalidIndex,
}

impl SelectMvccResult {
impl SelectMvcc {
#[inline]
pub fn is_ok(&self) -> bool {
matches!(self, SelectMvccResult::Ok(_))
matches!(self, SelectMvcc::Ok(_))
}
}

pub enum InsertResult {
pub enum InsertRow {
Ok(RowID),
NoFreeSpaceOrRowID,
}

impl InsertResult {
impl InsertRow {
/// Returns if insert succeeds.
#[inline]
pub fn is_ok(&self) -> bool {
matches!(self, InsertResult::Ok(_))
matches!(self, InsertRow::Ok(_))
}
}

pub enum InsertMvccResult {
pub enum InsertMvcc {
// PageGuard is required if table has unique index and
// we may need to linke a deleted version to the new version.
// In such scenario, we should keep the page for shared mode
// and acquire row lock when we do the linking.
Ok(RowID),
WriteConflict,
DuplicateKey,
}

impl InsertMvccResult {
impl InsertMvcc {
#[inline]
pub fn is_ok(&self) -> bool {
matches!(self, InsertMvccResult::Ok(_))
matches!(self, InsertMvcc::Ok(_))
}
}

pub enum MoveInsertResult {
pub enum MoveInsert {
Ok,
None,
WriteConflict,
DuplicateKey,
Retry,
}

pub enum UpdateResult {
pub enum Update {
// RowID may change if the update is out-of-place.
Ok(RowID),
RowNotFound,
Expand All @@ -74,15 +77,15 @@ pub enum UpdateResult {
NoFreeSpace(Vec<Val>),
}

impl UpdateResult {
impl Update {
/// Returns if update succeeds.
#[inline]
pub fn is_ok(&self) -> bool {
matches!(self, UpdateResult::Ok(..))
matches!(self, Update::Ok(..))
}
}

pub enum UpdateMvccResult {
pub enum UpdateMvcc {
Ok(RowID),
RowNotFound,
RowDeleted,
Expand All @@ -91,11 +94,11 @@ pub enum UpdateMvccResult {
Retry(Vec<UpdateCol>),
}

impl UpdateMvccResult {
impl UpdateMvcc {
/// Returns if update with undo succeeds.
#[inline]
pub fn is_ok(&self) -> bool {
matches!(self, UpdateMvccResult::Ok(_))
matches!(self, UpdateMvcc::Ok(_))
}
}

Expand All @@ -110,22 +113,22 @@ pub enum UpdateRow<'a> {
NoFreeSpace(Vec<Val>),
}

pub enum DeleteResult {
pub enum Delete {
Ok,
RowNotFound,
RowAlreadyDeleted,
}

pub enum DeleteMvccResult {
pub enum DeleteMvcc {
Ok,
RowNotFound,
RowAlreadyDeleted,
WriteConflict,
}

impl DeleteMvccResult {
impl DeleteMvcc {
#[inline]
pub fn is_ok(&self) -> bool {
matches!(self, DeleteMvccResult::Ok)
matches!(self, DeleteMvcc::Ok)
}
}
9 changes: 2 additions & 7 deletions doradb-storage/src/stmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use crate::buffer::BufferPool;
use crate::row::RowID;
use crate::table::TableID;
use crate::trx::redo::RedoEntry;
use crate::trx::undo::SharedUndoEntry;
use crate::trx::undo::OwnedUndoEntry;
use crate::trx::ActiveTrx;

pub struct Statement {
pub trx: ActiveTrx,
// statement-level undo logs.
pub undo: Vec<SharedUndoEntry>,
pub undo: Vec<OwnedUndoEntry>,
// statement-level redo logs.
pub redo: Vec<RedoEntry>,
}
Expand All @@ -36,11 +36,6 @@ impl Statement {
todo!();
}

#[inline]
pub fn last_undo_entry(&self) -> Option<&SharedUndoEntry> {
self.undo.last()
}

#[inline]
pub fn load_active_insert_page(&mut self, table_id: TableID) -> Option<(PageID, RowID)> {
self.trx
Expand Down
1 change: 0 additions & 1 deletion doradb-storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::buffer::FixedBufferPool;
use crate::index::{BlockIndex, SingleKeyIndex};
use crate::table::mvcc::MvccTable;
use crate::value::{Layout, Val};
use std::ops::Deref;
use std::sync::Arc;

// todo: integrate with doradb_catalog::TableID.
Expand Down
Loading

0 comments on commit e2a3135

Please sign in to comment.