diff --git a/Cargo.toml b/Cargo.toml index 1b745586..d0b16308 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ members = [ "pallas-crypto", "pallas-configs", "pallas-primitives", - "pallas-rolldb", "pallas-traverse", "pallas-txbuilder", "pallas-utxorpc", diff --git a/pallas-rolldb/Cargo.toml b/pallas-rolldb/Cargo.toml deleted file mode 100644 index bae1eb64..00000000 --- a/pallas-rolldb/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "pallas-rolldb" -description = "An opinionated Cardano storage engine built on top of RocksDB" -version = "0.30.2" -edition = "2021" -repository = "https://github.com/txpipe/pallas" -homepage = "https://github.com/txpipe/pallas" -documentation = "https://docs.rs/pallas-rolldb" -license = "Apache-2.0" -readme = "README.md" -authors = ["Santiago Carmuega "] - -[dependencies] -rocksdb = { version = "0.22.0", default-features = false, features = ["multi-threaded-cf"] } -bincode = "1.3.3" -serde = "1.0.188" -thiserror = "1.0.49" -pallas-crypto = { version = "=0.30.2", path = "../pallas-crypto" } -tracing = "0.1.37" -tokio = { version = "1.32.0", features = ["sync", "rt", "time", "macros"] } -async-stream = "0.3.5" -futures-core = "0.3.28" -futures-util = "0.3.28" - -[dev-dependencies] -tempfile = "3.3.0" diff --git a/pallas-rolldb/README.md b/pallas-rolldb/README.md deleted file mode 100644 index d79c5b2f..00000000 --- a/pallas-rolldb/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# Pallas RollDB - -An opinionated Cardano storage engine built on top of RocksDB. - diff --git a/pallas-rolldb/src/chain/mod.rs b/pallas-rolldb/src/chain/mod.rs deleted file mode 100644 index e9972030..00000000 --- a/pallas-rolldb/src/chain/mod.rs +++ /dev/null @@ -1,12 +0,0 @@ -use pallas_crypto::hash::Hash; - -mod store; - -#[cfg(test)] -mod tests; - -pub type BlockSlot = u64; -pub type BlockHash = Hash<32>; -pub type BlockBody = Vec; - -pub use store::*; diff --git a/pallas-rolldb/src/chain/store.rs b/pallas-rolldb/src/chain/store.rs deleted file mode 100644 index f8098507..00000000 --- a/pallas-rolldb/src/chain/store.rs +++ /dev/null @@ -1,275 +0,0 @@ -use pallas_crypto::hash::Hash; -use std::{path::Path, sync::Arc}; -use tracing::warn; - -use rocksdb::{Options, WriteBatch, DB}; - -use super::{BlockBody, BlockHash, BlockSlot}; - -use crate::kvtable::*; - -#[derive(Clone)] -pub struct Store { - db: Arc, - pub tip_change: Arc, -} - -pub struct BlockByHashKV; - -// hash -> block cbor -impl KVTable for BlockByHashKV { - const CF_NAME: &'static str = "BlockByHashKV"; -} - -// slot => block hash -pub struct HashBySlotKV; - -impl KVTable for HashBySlotKV { - const CF_NAME: &'static str = "HashBySlotKV"; -} - -pub struct ChainIterator<'a>(pub EntryIterator<'a, DBInt, DBHash>); - -impl Iterator for ChainIterator<'_> { - type Item = Result<(u64, Hash<32>), Error>; - - fn next(&mut self) -> Option { - self.0.next().map(|v| v.map(|(seq, val)| (seq.0, val.0))) - } -} - -impl Store { - pub fn open(path: impl AsRef) -> Result { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - - let db = DB::open_cf(&opts, path, [BlockByHashKV::CF_NAME, HashBySlotKV::CF_NAME]) - .map_err(|_| Error::IO)?; - - let out = Self { - db: Arc::new(db), - tip_change: Arc::new(tokio::sync::Notify::new()), - }; - - Ok(out) - } - - pub fn get_block(&self, hash: Hash<32>) -> Result, Error> { - let dbval = BlockByHashKV::get_by_key(&self.db, DBHash(hash))?; - Ok(dbval.map(|x| x.0)) - } - - pub fn roll_forward( - &mut self, - slot: BlockSlot, - hash: BlockHash, - body: BlockBody, - ) -> Result<(), Error> { - let mut batch = WriteBatch::default(); - - // keep track of the new block body - BlockByHashKV::stage_upsert(&self.db, DBHash(hash), DBBytes(body), &mut batch); - - // add new block to HashBySlotKV - HashBySlotKV::stage_upsert(&self.db, DBInt(slot), DBHash(hash), &mut batch); - - self.db.write(batch).map_err(|_| Error::IO)?; - self.tip_change.notify_waiters(); - - Ok(()) - } - - pub fn roll_back(&mut self, until: BlockSlot) -> Result<(), Error> { - let mut batch = WriteBatch::default(); - - // remove rollback-ed blocks from HashBySlotKV - let to_remove = HashBySlotKV::iter_keys_from(&self.db, DBInt(until)).skip(1); - - for key in to_remove { - HashBySlotKV::stage_delete(&self.db, key?, &mut batch); - } - - self.db.write(batch).map_err(|_| Error::IO)?; - self.tip_change.notify_waiters(); - - Ok(()) - } - - pub fn roll_back_origin(&mut self) -> Result<(), Error> { - HashBySlotKV::reset(&self.db)?; - BlockByHashKV::reset(&self.db)?; - - self.tip_change.notify_waiters(); - - Ok(()) - } - - pub fn find_tip(&self) -> Result, Error> { - let mut iter = HashBySlotKV::iter_entries(&self.db, rocksdb::IteratorMode::End); - - if let Some(last) = iter.next() { - let (slot, hash) = last?; - Ok(Some((slot.0, hash.0))) - } else { - Ok(None) - } - } - - pub fn intersect_options( - &self, - max_items: usize, - ) -> Result, Error> { - let mut iter = HashBySlotKV::iter_entries(&self.db, rocksdb::IteratorMode::End) - .filter_map(|res| res.ok()) - .map(|(k, v)| (k.0, v.0)); - - let mut out = Vec::with_capacity(max_items); - - while let Some((slot, hash)) = iter.next() { - out.push((slot, hash)); - - if out.len() >= max_items { - break; - } - - // skip exponentially - let skip = 2usize.pow(out.len() as u32) - 1; - for _ in 0..skip { - iter.next(); - } - } - - Ok(out) - } - - pub fn crawl_after(&self, slot: Option) -> ChainIterator { - if let Some(slot) = slot { - let slot = Box::<[u8]>::from(DBInt(slot)); - let from = rocksdb::IteratorMode::From(&slot, rocksdb::Direction::Forward); - let mut iter = HashBySlotKV::iter_entries(&self.db, from); - - // skip current - iter.next(); - - ChainIterator(iter) - } else { - let from = rocksdb::IteratorMode::Start; - let iter = HashBySlotKV::iter_entries(&self.db, from); - ChainIterator(iter) - } - } - - pub fn crawl(&self) -> ChainIterator { - self.crawl_after(None) - } - - pub fn read_chain_page( - &self, - from: BlockSlot, - len: usize, - ) -> impl Iterator> + '_ { - HashBySlotKV::iter_entries_from(&self.db, DBInt(from)) - .map(|res| res.map(|(x, y)| (x.0, y.0))) - .take(len) - } - - /// Iterator over chain between two points (inclusive) - /// - /// To use Origin as start point set `from` to None. - /// - /// Returns None if either point in range don't exist or `to` point is - /// earlier in chain than `from`. - pub fn read_chain_range( - &self, - from: Option<(BlockSlot, BlockHash)>, - to: (BlockSlot, BlockHash), - ) -> Result> + '_>, Error> - { - // TODO: We want to use a snapshot here to avoid race condition where - // point is checked to be in the HashBySlotKV but it is rolled-back before we - // create the iterator. Problem is `HashBySlotKV` etc must take `DB`, not - // `Snapshot`, so maybe we need a new way of creating something like - // a "KVTableSnapshot" in addition to the current "KVTable" type, which - // has methods on snapshots, but here I was having issues as there is - // no `cf` method on Snapshot but it is used is KVTable. - - // let snapshot = self.db.snapshot(); - - // check p2 not before p1 - let p1_slot = if let Some((slot, _)) = from { - if to.0 < slot { - warn!("chain range end slot before start slot"); - return Ok(None); - } else { - slot - } - } else { - 0 // Use 0 as slot for Origin - }; - - // check p1 exists in HashBySlotKV if provided - if let Some((slot, hash)) = from { - match HashBySlotKV::get_by_key(&self.db, DBInt(slot))? { - Some(DBHash(found_hash)) => { - if hash != found_hash { - warn!("chain range start hash mismatch"); - return Ok(None); - } - } - None => { - warn!("chain range start slot not found"); - return Ok(None); - } - } - } - - // check p2 exists in HashBySlotKV - match HashBySlotKV::get_by_key(&self.db, DBInt(to.0))? { - Some(DBHash(found_hash)) => { - if to.1 != found_hash { - warn!("chain range end hash mismatch"); - return Ok(None); - } - } - None => { - warn!("chain range end slot not found"); - return Ok(None); - } - }; - - // return iterator between p1 and p2 inclusive - Ok(Some( - HashBySlotKV::iter_entries_from(&self.db, DBInt(p1_slot)) - .map(|res| res.map(|(x, y)| (x.0, y.0))) - .take_while(move |x| { - if let Ok((slot, _)) = x { - // iter returns None once point is after `to` slot - *slot <= to.0 - } else { - false - } - }), - )) - } - - /// Check if a point (pair of slot and block hash) exists in the - /// HashBySlotKV - pub fn chain_contains(&self, slot: BlockSlot, hash: &BlockHash) -> Result { - if let Some(DBHash(found)) = HashBySlotKV::get_by_key(&self.db, DBInt(slot))? { - if found == *hash { - return Ok(true); - } - } - - Ok(false) - } - - pub fn is_empty(&self) -> bool { - HashBySlotKV::is_empty(&self.db) && BlockByHashKV::is_empty(&self.db) - } - - pub fn destroy(path: impl AsRef) -> Result<(), Error> { - DB::destroy(&Options::default(), path).map_err(|_| Error::IO) - } -} diff --git a/pallas-rolldb/src/chain/tests.rs b/pallas-rolldb/src/chain/tests.rs deleted file mode 100644 index 28d4cb8e..00000000 --- a/pallas-rolldb/src/chain/tests.rs +++ /dev/null @@ -1,104 +0,0 @@ -use super::{BlockBody, BlockHash, BlockSlot, Store}; - -fn with_tmp_db(op: fn(db: Store) -> T) { - let path = tempfile::tempdir().unwrap().into_path(); - let db = Store::open(path.clone()).unwrap(); - - op(db); - - Store::destroy(path).unwrap(); -} - -fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) { - let hash = pallas_crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice()); - (slot, hash, slot.to_be_bytes().to_vec()) -} - -#[test] -fn test_roll_forward_blackbox() { - with_tmp_db(|mut db| { - let (slot, hash, body) = dummy_block(11); - db.roll_forward(slot, hash, body.clone()).unwrap(); - - // ensure block body is persisted - let persisted = db.get_block(hash).unwrap().unwrap(); - assert_eq!(persisted, body); - - // ensure tip matches - let (tip_slot, tip_hash) = db.find_tip().unwrap().unwrap(); - assert_eq!(tip_slot, slot); - assert_eq!(tip_hash, hash); - - // ensure chain has item - let (chain_slot, chain_hash) = db.crawl().next().unwrap().unwrap(); - assert_eq!(chain_slot, slot); - assert_eq!(chain_hash, hash); - }); -} - -#[test] -fn test_roll_back_blackbox() { - with_tmp_db(|mut db| { - for i in 0..=5 { - let (slot, hash, body) = dummy_block(i * 10); - db.roll_forward(slot, hash, body).unwrap(); - } - - db.roll_back(20).unwrap(); - - // ensure tip show rollback point - let (tip_slot, _) = db.find_tip().unwrap().unwrap(); - assert_eq!(tip_slot, 20); - - // ensure chain has items not rolled back - let mut chain = db.crawl(); - - for i in 0..=2 { - let (slot, _) = chain.next().unwrap().unwrap(); - assert_eq!(slot, i * 10); - } - - // ensure chain stops here - assert!(chain.next().is_none()); - }); -} - -//TODO: test rollback beyond K -//TODO: test rollback with unknown slot - -#[test] -fn test_chain_page() { - with_tmp_db(|mut db| { - for i in 0..100 { - let (slot, hash, body) = dummy_block(i * 10); - db.roll_forward(slot, hash, body).unwrap(); - } - - let mut chain = db.read_chain_page(200, 15); - - for i in 0..15 { - let (slot, _) = chain.next().unwrap().unwrap(); - assert_eq!(200 + (i * 10), slot) - } - - assert!(chain.next().is_none()); - }); -} - -#[test] -fn test_intersect_options() { - with_tmp_db(|mut db| { - for i in 0..200 { - let (slot, hash, body) = dummy_block(i * 10); - db.roll_forward(slot, hash, body).unwrap(); - } - - let intersect = db.intersect_options(10).unwrap(); - - let expected = vec![1990, 1970, 1930, 1850, 1690, 1370, 730]; - - for (out, exp) in intersect.iter().zip(expected) { - assert_eq!(out.0, exp); - } - }); -} diff --git a/pallas-rolldb/src/kvtable.rs b/pallas-rolldb/src/kvtable.rs deleted file mode 100644 index 608418bb..00000000 --- a/pallas-rolldb/src/kvtable.rs +++ /dev/null @@ -1,422 +0,0 @@ -use pallas_crypto::hash::Hash; -use serde::{de::DeserializeOwned, Serialize}; -use std::marker::PhantomData; -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum Error { - #[error("IO error")] - IO, - - #[error("serde error")] - Serde, - - #[error("not found")] - NotFound, -} - -pub struct DBHash(pub Hash<32>); - -impl From> for DBHash { - fn from(value: Box<[u8]>) -> Self { - let inner: [u8; 32] = value[0..32].try_into().unwrap(); - let inner = Hash::<32>::from(inner); - Self(inner) - } -} - -impl From for Box<[u8]> { - fn from(value: DBHash) -> Self { - let b = value.0.to_vec(); - b.into() - } -} - -impl From> for DBHash { - fn from(value: Hash<32>) -> Self { - DBHash(value) - } -} - -impl From for Hash<32> { - fn from(value: DBHash) -> Self { - value.0 - } -} - -pub struct DBInt(pub u64); - -impl From for Box<[u8]> { - fn from(value: DBInt) -> Self { - let b = value.0.to_be_bytes(); - Box::new(b) - } -} - -impl From> for DBInt { - fn from(value: Box<[u8]>) -> Self { - let inner: [u8; 8] = value[0..8].try_into().unwrap(); - let inner = u64::from_be_bytes(inner); - Self(inner) - } -} - -impl From for DBInt { - fn from(value: u64) -> Self { - DBInt(value) - } -} - -impl From for u64 { - fn from(value: DBInt) -> Self { - value.0 - } -} - -pub struct DBBytes(pub Vec); - -impl From for Box<[u8]> { - fn from(value: DBBytes) -> Self { - value.0.into() - } -} - -impl From> for DBBytes { - fn from(value: Box<[u8]>) -> Self { - Self(value.into()) - } -} - -impl From> for DBBytes -where - V: Serialize, -{ - fn from(value: DBSerde) -> Self { - let inner = bincode::serialize(&value.0).unwrap(); - DBBytes(inner) - } -} - -#[derive(Debug)] -pub struct DBSerde(pub V); - -impl std::ops::Deref for DBSerde { - type Target = V; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl From> for Box<[u8]> -where - V: Serialize, -{ - fn from(v: DBSerde) -> Self { - bincode::serialize(&v.0) - .map(|x| x.into_boxed_slice()) - .unwrap() - } -} - -impl From> for DBSerde -where - V: DeserializeOwned, -{ - fn from(value: Box<[u8]>) -> Self { - let inner = bincode::deserialize(&value).unwrap(); - DBSerde(inner) - } -} - -impl From for DBSerde -where - V: DeserializeOwned, -{ - fn from(value: DBBytes) -> Self { - let inner = bincode::deserialize(&value.0).unwrap(); - DBSerde(inner) - } -} - -impl Clone for DBSerde -where - V: Clone, -{ - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -pub struct WithDBIntPrefix(pub u64, pub T); - -impl From> for Box<[u8]> -where - Box<[u8]>: From, -{ - fn from(value: WithDBIntPrefix) -> Self { - let prefix: Box<[u8]> = DBInt(value.0).into(); - let after: Box<[u8]> = value.1.into(); - - [prefix, after].concat().into() - } -} - -impl From> for WithDBIntPrefix { - fn from(_value: Box<[u8]>) -> Self { - todo!() - } -} - -type RocksIterator<'a> = rocksdb::DBIteratorWithThreadMode<'a, rocksdb::DB>; - -pub struct ValueIterator<'a, V>(RocksIterator<'a>, PhantomData); - -impl<'a, V> ValueIterator<'a, V> { - pub fn new(inner: RocksIterator<'a>) -> Self { - Self(inner, Default::default()) - } -} - -impl<'a, V> Iterator for ValueIterator<'a, V> -where - V: From>, -{ - type Item = Result; - - fn next(&mut self) -> Option> { - match self.0.next() { - Some(Ok((_, value))) => Some(Ok(V::from(value))), - Some(Err(err)) => { - tracing::error!(?err); - Some(Err(Error::IO)) - } - None => None, - } - } -} - -pub struct KeyIterator<'a, K>(RocksIterator<'a>, PhantomData); - -impl<'a, K> KeyIterator<'a, K> { - pub fn new(inner: RocksIterator<'a>) -> Self { - Self(inner, Default::default()) - } -} - -impl<'a, K> Iterator for KeyIterator<'a, K> -where - K: From>, -{ - type Item = Result; - - fn next(&mut self) -> Option> { - match self.0.next() { - Some(Ok((key, _))) => Some(Ok(K::from(key))), - Some(Err(err)) => { - tracing::error!(?err); - Some(Err(Error::IO)) - } - None => None, - } - } -} - -pub struct EntryIterator<'a, K, V>(RocksIterator<'a>, PhantomData<(K, V)>); - -impl<'a, K, V> EntryIterator<'a, K, V> { - pub fn new(inner: RocksIterator<'a>) -> Self { - Self(inner, Default::default()) - } -} - -impl<'a, K, V> Iterator for EntryIterator<'a, K, V> -where - K: From>, - V: From>, -{ - type Item = Result<(K, V), Error>; - - fn next(&mut self) -> Option> { - match self.0.next() { - Some(Ok((key, value))) => { - let key_out = K::from(key); - let value_out = V::from(value); - - Some(Ok((key_out, value_out))) - } - Some(Err(err)) => { - tracing::error!(?err); - Some(Err(Error::IO)) - } - None => None, - } - } -} - -pub trait KVTable -where - Box<[u8]>: From, - Box<[u8]>: From, - K: From>, - V: From>, -{ - const CF_NAME: &'static str; - - fn cf(db: &rocksdb::DB) -> rocksdb::ColumnFamilyRef { - db.cf_handle(Self::CF_NAME).unwrap() - } - - fn reset(db: &rocksdb::DB) -> Result<(), Error> { - db.drop_cf(Self::CF_NAME).map_err(|_| Error::IO)?; - - db.create_cf(Self::CF_NAME, &rocksdb::Options::default()) - .map_err(|_| Error::IO)?; - - Ok(()) - } - - fn get_by_key(db: &rocksdb::DB, k: K) -> Result, Error> { - let cf = Self::cf(db); - let raw_key = Box::<[u8]>::from(k); - let raw_value = db - .get_cf(&cf, raw_key) - .map_err(|_| Error::IO)? - .map(|x| Box::from(x.as_slice())); - - match raw_value { - Some(x) => { - let out = ::from(x); - Ok(Some(out)) - } - None => Ok(None), - } - } - - fn stage_upsert(db: &rocksdb::DB, k: K, v: V, batch: &mut rocksdb::WriteBatch) { - let cf = Self::cf(db); - - let k_raw = Box::<[u8]>::from(k); - let v_raw = Box::<[u8]>::from(v); - - batch.put_cf(&cf, k_raw, v_raw); - } - - fn is_empty(db: &rocksdb::DB) -> bool { - // HACK: can't find an easy way to size the num of keys, so we'll start an - // iterator and see if we have at least one value. If someone know a better way - // to accomplish this, please refactor. - let mut iter = Self::iter_keys(db, rocksdb::IteratorMode::Start); - iter.next().is_none() - } - - fn iter_keys<'a>(db: &'a rocksdb::DB, mode: rocksdb::IteratorMode) -> KeyIterator<'a, K> { - let cf = Self::cf(db); - let inner = db.iterator_cf(&cf, mode); - KeyIterator::new(inner) - } - - #[allow(dead_code)] - fn iter_keys_start(db: &rocksdb::DB) -> KeyIterator<'_, K> { - Self::iter_keys(db, rocksdb::IteratorMode::Start) - } - - fn iter_keys_from(db: &rocksdb::DB, from: K) -> KeyIterator<'_, K> { - let from_raw = Box::<[u8]>::from(from); - let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward); - - Self::iter_keys(db, mode) - } - - fn iter_values<'a>(db: &'a rocksdb::DB, mode: rocksdb::IteratorMode) -> ValueIterator<'a, V> { - let cf = Self::cf(db); - let inner = db.iterator_cf(&cf, mode); - ValueIterator::new(inner) - } - - #[allow(dead_code)] - fn iter_values_start(db: &rocksdb::DB) -> ValueIterator<'_, V> { - Self::iter_values(db, rocksdb::IteratorMode::Start) - } - - #[allow(dead_code)] - fn iter_values_from(db: &rocksdb::DB, from: K) -> ValueIterator<'_, V> { - let from_raw = Box::<[u8]>::from(from); - let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward); - - Self::iter_values(db, mode) - } - - fn iter_entries<'a>( - db: &'a rocksdb::DB, - mode: rocksdb::IteratorMode, - ) -> EntryIterator<'a, K, V> { - let cf = Self::cf(db); - let inner = db.iterator_cf(&cf, mode); - EntryIterator::new(inner) - } - - #[allow(dead_code)] - fn iter_entries_start(db: &rocksdb::DB) -> EntryIterator<'_, K, V> { - Self::iter_entries(db, rocksdb::IteratorMode::Start) - } - - fn iter_entries_from(db: &rocksdb::DB, from: K) -> EntryIterator<'_, K, V> { - let from_raw = Box::<[u8]>::from(from); - let mode = rocksdb::IteratorMode::From(&from_raw, rocksdb::Direction::Forward); - - Self::iter_entries(db, mode) - } - - fn last_key(db: &rocksdb::DB) -> Result, Error> { - let mut iter = Self::iter_keys(db, rocksdb::IteratorMode::End); - - match iter.next() { - None => Ok(None), - Some(x) => Ok(Some(x?)), - } - } - - #[allow(dead_code)] - fn last_value(db: &rocksdb::DB) -> Result, Error> { - let mut iter = Self::iter_values(db, rocksdb::IteratorMode::End); - - match iter.next() { - None => Ok(None), - Some(x) => Ok(Some(x?)), - } - } - - #[allow(dead_code)] - fn last_entry(db: &rocksdb::DB) -> Result, Error> { - let mut iter = Self::iter_entries(db, rocksdb::IteratorMode::End); - - match iter.next() { - None => Ok(None), - Some(x) => Ok(Some(x?)), - } - } - - fn scan_until( - db: &rocksdb::DB, - mode: rocksdb::IteratorMode, - predicate: F, - ) -> Result, Error> - where - F: Fn(&V) -> bool, - { - for (k, v) in Self::iter_entries(db, mode).flatten() { - if predicate(&v) { - return Ok(Some(k)); - } - } - - Ok(None) - } - - fn stage_delete(db: &rocksdb::DB, key: K, batch: &mut rocksdb::WriteBatch) { - let cf = Self::cf(db); - let k_raw = Box::<[u8]>::from(key); - batch.delete_cf(&cf, k_raw); - } -} diff --git a/pallas-rolldb/src/lib.rs b/pallas-rolldb/src/lib.rs deleted file mode 100644 index 46593781..00000000 --- a/pallas-rolldb/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod chain; -mod kvtable; -pub mod wal; diff --git a/pallas-rolldb/src/wal/mod.rs b/pallas-rolldb/src/wal/mod.rs deleted file mode 100644 index 124cd2a7..00000000 --- a/pallas-rolldb/src/wal/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -use pallas_crypto::hash::Hash; - -mod store; -mod stream; - -#[cfg(test)] -mod tests; - -pub type Seq = u64; -pub type BlockSlot = u64; -pub type BlockHash = Hash<32>; -pub type BlockBody = Vec; - -pub use store::*; -pub use stream::*; diff --git a/pallas-rolldb/src/wal/store.rs b/pallas-rolldb/src/wal/store.rs deleted file mode 100644 index 715e738b..00000000 --- a/pallas-rolldb/src/wal/store.rs +++ /dev/null @@ -1,406 +0,0 @@ -use rocksdb::Options; -use rocksdb::{IteratorMode, WriteBatch, DB}; -use serde::{Deserialize, Serialize}; -use std::{path::Path, sync::Arc}; - -use crate::kvtable::*; - -use super::{BlockBody, BlockHash, BlockSlot, Seq}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum Log { - Apply(BlockSlot, BlockHash, BlockBody), - Undo(BlockSlot, BlockHash, BlockBody), - Mark(BlockSlot, BlockHash, BlockBody), - Origin, -} - -impl Log { - pub fn into_apply( - slot: impl Into, - hash: impl Into, - block: impl Into, - ) -> Self { - Self::Apply(slot.into(), hash.into(), block.into()) - } - - pub fn slot(&self) -> Option { - match self { - Log::Apply(s, _, _) => Some(*s), - Log::Undo(s, _, _) => Some(*s), - Log::Mark(s, _, _) => Some(*s), - Log::Origin => None, - } - } - - pub fn hash(&self) -> Option<&BlockHash> { - match self { - Log::Apply(_, h, _) => Some(h), - Log::Undo(_, h, _) => Some(h), - Log::Mark(_, h, _) => Some(h), - Log::Origin => None, - } - } - - pub fn body(&self) -> Option<&BlockBody> { - match self { - Log::Apply(_, _, b) => Some(b), - Log::Undo(_, _, b) => Some(b), - Log::Mark(_, _, b) => Some(b), - Log::Origin => None, - } - } - - pub fn into_undo(self) -> Option { - match self { - Self::Apply(s, h, b) => Some(Self::Undo(s, h, b)), - _ => None, - } - } - - pub fn into_mark(self) -> Option { - match self { - Log::Apply(s, h, b) => Some(Log::Mark(s, h, b)), - Log::Mark(s, h, b) => Some(Log::Mark(s, h, b)), - Log::Origin => Some(Log::Origin), - Log::Undo(..) => None, - } - } - - pub fn is_apply(&self) -> bool { - matches!(self, Log::Apply(..)) - } - - pub fn is_mark(&self) -> bool { - matches!(self, Log::Mark(..)) - } - - pub fn is_undo(&self) -> bool { - matches!(self, Log::Undo(..)) - } - - pub fn is_origin(&self) -> bool { - matches!(self, Log::Origin) - } - - /// Checks if entry is a forward event (apply or mark) - pub fn is_forward(&self) -> bool { - self.is_mark() || self.is_apply() - } - - /// Checks if entry is a forward event that matches the specified point - pub fn equals_point(&self, point: &(BlockSlot, BlockHash)) -> bool { - if !self.is_forward() { - return false; - } - - self.slot().is_some_and(|x| x == point.0) && self.hash().is_some_and(|x| x.eq(&point.1)) - } - - /// Checks if entry is a forward event that matches any of the specified - /// points - pub fn equals_any_point(&self, points: &[(BlockSlot, BlockHash)]) -> bool { - points.iter().any(|x| self.equals_point(x)) - } -} - -// slot => block hash -pub struct WalKV; - -impl KVTable> for WalKV { - const CF_NAME: &'static str = "WalKV"; -} - -pub struct WalIterator<'a>(pub EntryIterator<'a, DBInt, DBSerde>); - -impl Iterator for WalIterator<'_> { - type Item = Result<(u64, Log), Error>; - - fn next(&mut self) -> Option { - self.0.next().map(|v| v.map(|(seq, val)| (seq.0, val.0))) - } -} - -impl WalKV { - pub fn initialize(db: &DB) -> Result { - if Self::is_empty(db) { - Self::write_seed(db)?; - Ok(0) - } else { - let last = Self::last_key(db)?.map(|x| x.0); - Ok(last.unwrap()) - } - } - - fn write_seed(db: &DB) -> Result<(), Error> { - let mut batch = WriteBatch::default(); - let k = DBInt(0); - let v = DBSerde(Log::Origin); - Self::stage_upsert(db, k, v, &mut batch); - - db.write(batch).map_err(|_| Error::IO) - } -} - -pub struct RollBatch<'a>(&'a DB, WriteBatch, Seq); - -impl<'a> RollBatch<'a> { - fn new(db: &'a DB, last_seq: Seq) -> Self { - Self(db, Default::default(), last_seq) - } - - fn stage_append(&mut self, log: Log) { - let new_seq = self.2 + 1; - WalKV::stage_upsert(self.0, DBInt(new_seq), DBSerde(log), &mut self.1); - self.2 = new_seq; - } - - fn apply(self) -> Result { - self.0.write(self.1).map_err(|_| Error::IO)?; - Ok(self.2) - } -} - -#[derive(Clone)] -pub struct Store { - db: Arc, - pub tip_change: Arc, - wal_seq: u64, - k_param: u64, - immutable_overlap: u64, -} - -impl Store { - pub fn open( - path: impl AsRef, - k_param: u64, - immutable_overlap: Option, - ) -> Result { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.create_missing_column_families(true); - - let db = DB::open_cf(&opts, path, [WalKV::CF_NAME]).map_err(|_| Error::IO)?; - - let wal_seq = WalKV::initialize(&db)?; - - let out = Self { - db: Arc::new(db), - tip_change: Arc::new(tokio::sync::Notify::new()), - wal_seq, - k_param, - immutable_overlap: immutable_overlap.unwrap_or(0), - }; - - Ok(out) - } - - pub fn roll_forward( - &mut self, - slot: BlockSlot, - hash: BlockHash, - body: BlockBody, - ) -> Result<(), Error> { - let mut batch = RollBatch::new(&self.db, self.wal_seq); - - batch.stage_append(Log::Apply(slot, hash, body)); - - self.wal_seq = batch.apply()?; - self.tip_change.notify_waiters(); - - Ok(()) - } - - pub fn roll_back(&mut self, until: BlockSlot) -> Result<(), Error> { - let mut batch = RollBatch::new(&self.db, self.wal_seq); - - let iter = WalKV::iter_values(&self.db, IteratorMode::End); - - for step in iter { - let value = step.map_err(|_| Error::IO)?.0; - - if value.slot().unwrap_or(0) <= until { - batch.stage_append(value.into_mark().unwrap()); - break; - } - - match value.into_undo() { - Some(undo) => { - batch.stage_append(undo); - } - None => continue, - }; - } - - self.wal_seq = batch.apply()?; - self.tip_change.notify_waiters(); - - Ok(()) - } - - pub fn roll_back_origin(&mut self) -> Result<(), Error> { - let mut batch = RollBatch::new(&self.db, self.wal_seq); - - let iter = WalKV::iter_values(&self.db, IteratorMode::End); - - for step in iter { - let value = step.map_err(|_| Error::IO)?.0; - - if value.is_origin() { - break; - } - - match value.into_undo() { - Some(undo) => { - batch.stage_append(undo); - } - None => continue, - }; - } - - self.wal_seq = batch.apply()?; - self.tip_change.notify_waiters(); - - Ok(()) - } - - pub fn find_tip(&self) -> Result, Error> { - let iter = WalKV::iter_values(&self.db, IteratorMode::End); - - for value in iter { - let value = value?; - - if value.is_apply() || value.is_mark() { - let slot = value.slot().unwrap(); - let hash = *value.hash().unwrap(); - return Ok(Some((slot, hash))); - } - } - - Ok(None) - } - - pub fn intersect_options( - &self, - max_items: usize, - ) -> Result, Error> { - let mut iter = WalKV::iter_values(&self.db, rocksdb::IteratorMode::End) - .filter_map(|res| res.ok()) - .filter(|v| !v.is_undo()); - - let mut out = Vec::with_capacity(max_items); - - // crawl the wal exponentially - while let Some(val) = iter.next() { - if !val.is_apply() && !val.is_mark() { - continue; - } - - out.push((val.slot().unwrap(), *val.hash().unwrap())); - - if out.len() >= max_items { - break; - } - - // skip exponentially - let skip = 2usize.pow(out.len() as u32) - 1; - for _ in 0..skip { - iter.next(); - } - } - - Ok(out) - } - - pub fn crawl_after(&self, seq: Option) -> WalIterator { - if let Some(seq) = seq { - let seq = Box::<[u8]>::from(DBInt(seq)); - let from = rocksdb::IteratorMode::From(&seq, rocksdb::Direction::Forward); - let mut iter = WalKV::iter_entries(&self.db, from); - - // skip current - iter.next(); - - WalIterator(iter) - } else { - let from = rocksdb::IteratorMode::Start; - let iter = WalKV::iter_entries(&self.db, from); - WalIterator(iter) - } - } - - pub fn find_wal_seq(&self, intersect: &[(BlockSlot, BlockHash)]) -> Result, Error> { - if intersect.is_empty() { - return Ok(None); - } - - let found = WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| { - v.equals_any_point(intersect) - })?; - - match found { - Some(DBInt(seq)) => Ok(Some(seq)), - None => Err(Error::NotFound), - } - } - - pub fn crawl_from_intersect( - &self, - options: &[(BlockSlot, BlockHash)], - ) -> Result { - let seq = self.find_wal_seq(options)?; - - // TODO: we need to create a RocksDB snapshot (with `db.snapshot()`) to use as - // the source for sequence scan and the iterator to ensure that sequence - // hasn't been pruned between operations. For the time being we consider - // this is a very narrow edge-case. - - if let Some(seq) = seq { - let seq = Box::<[u8]>::from(DBInt(seq)); - let from = rocksdb::IteratorMode::From(&seq, rocksdb::Direction::Forward); - let mut iter = WalKV::iter_entries(&self.db, from); - - // skip current - iter.next(); - - Ok(WalIterator(iter)) - } else { - let from = rocksdb::IteratorMode::Start; - let iter = WalKV::iter_entries(&self.db, from); - Ok(WalIterator(iter)) - } - } - - /// Prune the WAL of entries with slot values over `k_param` from the tip - pub fn prune_wal(&self) -> Result<(), Error> { - let tip = self.find_tip()?.map(|(slot, _)| slot).unwrap_or_default(); - - // iterate through all values in Wal from start - let mut iter = WalKV::iter_entries(&self.db, rocksdb::IteratorMode::Start); - - let mut batch = WriteBatch::default(); - - while let Some(Ok((wal_key, value))) = iter.next() { - // get the number of slots that have passed since the wal point - let slot_delta = tip - value.slot().unwrap_or(0); - - if slot_delta <= self.k_param + self.immutable_overlap { - break; - } else { - WalKV::stage_delete(&self.db, wal_key, &mut batch); - } - } - - self.db.write(batch).map_err(|_| Error::IO)?; - - Ok(()) - } - - pub fn is_empty(&self) -> bool { - WalKV::is_empty(&self.db) - } - - pub fn destroy(path: impl AsRef) -> Result<(), Error> { - DB::destroy(&Options::default(), path).map_err(|_| Error::IO) - } -} diff --git a/pallas-rolldb/src/wal/stream.rs b/pallas-rolldb/src/wal/stream.rs deleted file mode 100644 index 9b95a026..00000000 --- a/pallas-rolldb/src/wal/stream.rs +++ /dev/null @@ -1,83 +0,0 @@ -use futures_core::Stream; - -use super::{BlockHash, BlockSlot, Log, Store}; - -pub struct RollStream; - -impl RollStream { - pub fn intersect( - store: Store, - intersect: Vec<(BlockSlot, BlockHash)>, - ) -> impl Stream { - async_stream::stream! { - let mut last_seq = None; - - let iter = store.crawl_from_intersect(&intersect).unwrap(); - - for (seq, val) in iter.flatten() { - yield val; - last_seq = Some(seq); - } - - loop { - store.tip_change.notified().await; - let iter = store.crawl_after(last_seq); - - for (seq, val) in iter.flatten() { - yield val; - last_seq = Some(seq); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use futures_util::{pin_mut, StreamExt}; - - use crate::wal::{BlockBody, BlockHash, BlockSlot, Store}; - - fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) { - let hash = pallas_crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice()); - (slot, hash, slot.to_be_bytes().to_vec()) - } - - #[tokio::test] - async fn test_stream_waiting() { - let path = tempfile::tempdir().unwrap().into_path(); - let mut db = Store::open(path.clone(), 30, None).unwrap(); - - for i in 0..=100 { - let (slot, hash, body) = dummy_block(i * 10); - db.roll_forward(slot, hash, body).unwrap(); - } - - let mut db2 = db.clone(); - let background = tokio::spawn(async move { - for i in 101..=200 { - let (slot, hash, body) = dummy_block(i * 10); - db2.roll_forward(slot, hash, body).unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(5)).await; - } - }); - - let s = super::RollStream::intersect(db.clone(), vec![]); - - pin_mut!(s); - - let evt = s.next().await; - let evt = evt.unwrap(); - assert!(evt.is_origin()); - - for i in 0..=200 { - let evt = s.next().await; - let evt = evt.unwrap(); - assert!(evt.is_apply()); - assert_eq!(evt.slot().unwrap(), i * 10); - } - - background.abort(); - let _ = Store::destroy(path); //.unwrap(); - } -} diff --git a/pallas-rolldb/src/wal/tests.rs b/pallas-rolldb/src/wal/tests.rs deleted file mode 100644 index f10637c4..00000000 --- a/pallas-rolldb/src/wal/tests.rs +++ /dev/null @@ -1,213 +0,0 @@ -use super::{BlockBody, BlockHash, BlockSlot, Store}; - -fn with_tmp_db(k_param: u64, op: fn(store: Store) -> T) { - let path = tempfile::tempdir().unwrap().into_path(); - let store = Store::open(path.clone(), k_param, None).unwrap(); - - op(store); - - Store::destroy(path).unwrap(); -} - -fn with_tmp_db_overlap(k_param: u64, overlap: u64, op: fn(store: Store) -> T) { - let path = tempfile::tempdir().unwrap().into_path(); - let store = Store::open(path.clone(), k_param, Some(overlap)).unwrap(); - - op(store); - - Store::destroy(path).unwrap(); -} - -fn dummy_block(slot: u64) -> (BlockSlot, BlockHash, BlockBody) { - let hash = pallas_crypto::hash::Hasher::<256>::hash(slot.to_be_bytes().as_slice()); - (slot, hash, slot.to_be_bytes().to_vec()) -} - -#[test] -fn test_origin_event() { - with_tmp_db(30, |db| { - let mut iter = db.crawl_after(None); - - let origin = iter.next(); - assert!(origin.is_some()); - - let origin = origin.unwrap(); - assert!(origin.is_ok()); - - let (seq, value) = origin.unwrap(); - assert_eq!(seq, 0); - assert!(value.is_origin()); - }); -} - -#[test] -fn test_basic_append() { - with_tmp_db(30, |mut db| { - let (slot, hash, body) = dummy_block(11); - db.roll_forward(slot, hash, body.clone()).unwrap(); - - // ensure tip matches - let (tip_slot, tip_hash) = db.find_tip().unwrap().unwrap(); - assert_eq!(tip_slot, slot); - assert_eq!(tip_hash, hash); - - // ensure chain has item - let mut iter = db.crawl_after(None); - - // skip origin - iter.next(); - - let (seq, log) = iter.next().unwrap().unwrap(); - assert_eq!(seq, 1); - assert_eq!(log.slot().unwrap(), slot); - assert_eq!(log.hash().unwrap(), &hash); - assert_eq!(log.body().unwrap(), &body); - }); -} - -#[test] -fn test_rollback_undos() { - with_tmp_db(30, |mut db| { - for i in 0..=5 { - let (slot, hash, body) = dummy_block(i * 10); - db.roll_forward(slot, hash, body).unwrap(); - } - - db.roll_back(20).unwrap(); - - // ensure tip show rollback point - let (tip_slot, _) = db.find_tip().unwrap().unwrap(); - assert_eq!(tip_slot, 20); - - // ensure chain has items not rolled back - let mut wal = db.crawl_after(None); - - let (seq, log) = wal.next().unwrap().unwrap(); - assert_eq!(seq, 0); - assert!(log.is_origin()); - - for i in 0..=5 { - let (_, log) = wal.next().unwrap().unwrap(); - assert!(log.is_apply()); - assert_eq!(log.slot().unwrap(), i * 10); - } - - for i in (3..=5).rev() { - let (_, log) = wal.next().unwrap().unwrap(); - assert!(log.is_undo()); - assert_eq!(log.slot().unwrap(), i * 10); - } - - let (_, log) = wal.next().unwrap().unwrap(); - assert!(log.is_mark()); - assert_eq!(log.slot().unwrap(), 20); - - // ensure chain stops here - assert!(wal.next().is_none()); - }); -} - -//TODO: test rollback beyond K -//TODO: test rollback with unknown slot - -#[test] -fn test_prune_linear() { - with_tmp_db(30, |mut db| { - for i in 0..100 { - let (slot, hash, body) = dummy_block(i * 10); - db.roll_forward(slot, hash, body).unwrap(); - } - - // db contains slots: 0, 10, ..., 980, 990 - // this should prune slots less than (990 - 30) = 960 - db.prune_wal().unwrap(); - - let mut wal = db.crawl_after(None); - - for i in 96..100 { - let (_, val) = wal.next().unwrap().unwrap(); - assert_eq!(val.slot().unwrap(), i * 10); - } - - assert!(wal.next().is_none()); - }); -} - -#[test] -fn test_prune_linear_with_overlap() { - with_tmp_db_overlap(30, 20, |mut db| { - for i in 0..100 { - let (slot, hash, body) = dummy_block(i * 10); - db.roll_forward(slot, hash, body).unwrap(); - } - - // db contains slots: 0, 10, ..., 980, 990 - // this should prune slots less than (990 - 30 - 20) = 940 - db.prune_wal().unwrap(); - - let mut wal = db.crawl_after(None); - - for i in 94..100 { - let (_, val) = wal.next().unwrap().unwrap(); - assert_eq!(val.slot().unwrap(), i * 10); - } - - assert!(wal.next().is_none()); - }); -} - -#[test] -fn test_prune_with_rollback() { - with_tmp_db(30, |mut db| { - for i in 0..100 { - let (slot, hash, body) = dummy_block(i * 10); - db.roll_forward(slot, hash, body).unwrap(); - } - - db.roll_back(800).unwrap(); - - // tip is 800 (Mark) - - db.prune_wal().unwrap(); - - let mut wal = db.crawl_after(None); - - for i in 77..100 { - let (_, val) = wal.next().unwrap().unwrap(); - assert!(val.is_apply()); - assert_eq!(val.slot().unwrap(), i * 10); - } - - for i in (81..100).rev() { - let (_, val) = wal.next().unwrap().unwrap(); - assert!(val.is_undo()); - assert_eq!(val.slot().unwrap(), i * 10); - } - - let (_, val) = wal.next().unwrap().unwrap(); - assert!(val.is_mark()); - assert_eq!(val.slot().unwrap(), 800); - - assert!(wal.next().is_none()); - }); -} - -#[test] -fn test_intersect_options() { - with_tmp_db(1000, |mut db| { - for i in 0..200 { - let (slot, hash, body) = dummy_block(i * 10); - db.roll_forward(slot, hash, body).unwrap(); - } - - db.prune_wal().unwrap(); - - let intersect = db.intersect_options(10).unwrap(); - - let expected = vec![1990, 1970, 1930, 1850, 1690, 1370, 980]; - - for (out, exp) in intersect.iter().zip(expected) { - assert_eq!(out.0, exp); - } - }); -} diff --git a/pallas/Cargo.toml b/pallas/Cargo.toml index 20c27a15..15a00218 100644 --- a/pallas/Cargo.toml +++ b/pallas/Cargo.toml @@ -22,14 +22,12 @@ pallas-configs = { version = "=0.30.2", path = "../pallas-configs/" } pallas-txbuilder = { version = "=0.30.2", path = "../pallas-txbuilder/" } pallas-math = { version = "=0.30.2", path = "../pallas-math/", optional = true } pallas-applying = { version = "=0.30.2", path = "../pallas-applying/", optional = true } -pallas-rolldb = { version = "=0.30.2", path = "../pallas-rolldb/", optional = true } pallas-wallet = { version = "=0.30.2", path = "../pallas-wallet/", optional = true } pallas-hardano = { version = "=0.30.2", path = "../pallas-hardano/", optional = true } [features] -rolldb = ["pallas-rolldb"] hardano = ["pallas-hardano"] wallet = ["pallas-wallet"] applying = ["pallas-applying"] math = ["pallas-math"] -unstable = ["rolldb", "hardano", "wallet", "applying"] +unstable = ["hardano", "wallet", "applying"]