Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eigen-client-extra-features): Eigen client memstore #321

Merged
merged 16 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/node/da_clients/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ pbjson-types.workspace = true
tokio-stream.workspace = true
rlp.workspace = true
kzgpad-rs = { git = "https://github.com/Layr-Labs/kzgpad-rs.git", tag = "v0.1.0" }
rand.workspace = true
sha3.workspace = true
82 changes: 55 additions & 27 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,27 @@ use zksync_da_client::{
DataAvailabilityClient,
};

use super::{blob_info::BlobInfo, sdk::RawEigenClient};
use super::{blob_info::BlobInfo, memstore::MemStore, sdk::RawEigenClient, Disperser};
use crate::utils::to_non_retriable_da_error;

#[derive(Debug, Clone)]
pub struct EigenClient {
client: Arc<RawEigenClient>,
client: Disperser,
juan518munoz marked this conversation as resolved.
Show resolved Hide resolved
}

impl EigenClient {
pub async fn new(config: EigenConfig, secrets: EigenSecrets) -> anyhow::Result<Self> {
let private_key = SecretKey::from_str(secrets.private_key.0.expose_secret().as_str())
.map_err(|e| anyhow::anyhow!("Failed to parse private key: {}", e))?;

match config {
let disperser: Disperser = match config.clone() {
EigenConfig::Disperser(config) => {
// TODO: add complete config
let client = RawEigenClient::new(
config.disperser_rpc,
config.status_query_interval,
private_key,
config.authenticated,
)
.await?;
Ok(EigenClient {
client: Arc::new(client),
})
let client = RawEigenClient::new(private_key, config).await?;
Disperser::Remote(Arc::new(client))
}
EigenConfig::MemStore(_) => {
todo!()
}
}
EigenConfig::MemStore(config) => Disperser::Memory(MemStore::new(config)),
};
Ok(Self { client: disperser })
}
}

Expand All @@ -51,11 +41,17 @@ impl DataAvailabilityClient for EigenClient {
_: u32, // batch number
data: Vec<u8>,
) -> Result<DispatchResponse, DAError> {
let blob_id = self
.client
.dispatch_blob(data)
.await
.map_err(to_non_retriable_da_error)?;
let blob_id = match &self.client {
Disperser::Remote(remote_disperser) => remote_disperser
.dispatch_blob(data)
.await
.map_err(to_non_retriable_da_error)?,
Disperser::Memory(memstore) => memstore
.clone()
.put_blob(data)
.await
.map_err(to_non_retriable_da_error)?,
};

Ok(DispatchResponse::from(blob_id))
}
Expand Down Expand Up @@ -87,16 +83,15 @@ impl DataAvailabilityClient for EigenClient {
#[cfg(test)]
impl EigenClient {
pub async fn get_blob_data(&self, blob_id: &str) -> anyhow::Result<Option<Vec<u8>>, DAError> {
self.client.get_blob_data(blob_id).await
/*match &self.disperser {
match &self.client {
Disperser::Remote(remote_client) => remote_client.get_blob_data(blob_id).await,
Disperser::Memory(memstore) => memstore.clone().get_blob_data(blob_id).await,
}*/
}
}
}
#[cfg(test)]
mod tests {
use zksync_config::configs::da_client::eigen::DisperserConfig;
use zksync_config::configs::da_client::eigen::{DisperserConfig, MemStoreConfig};
use zksync_types::secrets::PrivateKey;

use super::*;
Expand Down Expand Up @@ -175,4 +170,37 @@ mod tests {
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

#[tokio::test]
async fn test_eigenda_memory_disperser() {
let config = EigenConfig::MemStore(MemStoreConfig {
max_blob_size_bytes: 2 * 1024 * 1024, // 2MB,
blob_expiration: 60 * 2,
get_latency: 0,
put_latency: 0,
});
let secrets = EigenSecrets {
private_key: PrivateKey::from_str(
"d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6",
)
.unwrap(),
};
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1u8; 100];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
.await
.unwrap()
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);

let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}
}
221 changes: 221 additions & 0 deletions core/node/da_clients/src/eigen/memstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
use std::{
collections::HashMap,
sync::{Arc, RwLock},
time::{Duration, Instant},
};

use anyhow::Error;
use rand::{rngs::OsRng, Rng, RngCore};
use sha3::{Digest, Keccak256};
use tokio::time::interval;
use zksync_config::configs::da_client::eigen::MemStoreConfig;
use zksync_da_client::types::{DAError, DispatchResponse, InclusionData};

use super::blob_info::{self, BlobInfo};

#[derive(Debug, PartialEq)]
pub enum MemStoreError {
BlobToLarge,
BlobAlreadyExists,
IncorrectCommitment,
#[cfg(test)]
BlobNotFound,
}

impl Into<Error> for MemStoreError {
fn into(self) -> Error {
match self {
MemStoreError::BlobToLarge => Error::msg("Blob too large"),
MemStoreError::BlobAlreadyExists => Error::msg("Blob already exists"),
MemStoreError::IncorrectCommitment => Error::msg("Incorrect commitment"),
#[cfg(test)]
MemStoreError::BlobNotFound => Error::msg("Blob not found"),
}
}
}

#[derive(Debug)]
struct MemStoreData {
store: HashMap<String, Vec<u8>>,
key_starts: HashMap<String, Instant>,
}

/// This struct represents a memory store for blobs.
/// It should be used for testing purposes only.
#[derive(Clone, Debug)]
pub struct MemStore {
config: MemStoreConfig,
data: Arc<RwLock<MemStoreData>>,
}

impl MemStore {
pub fn new(config: MemStoreConfig) -> Arc<Self> {
let memstore = Arc::new(Self {
config,
data: Arc::new(RwLock::new(MemStoreData {
store: HashMap::new(),
key_starts: HashMap::new(),
})),
});
let store_clone = Arc::clone(&memstore);
tokio::spawn(async move {
store_clone.pruning_loop().await;
});
memstore
}

/// Saves a blob to the memory store, it harcodes the blob info, since we don't care about it in a memory based store
pub async fn put_blob(self: Arc<Self>, value: Vec<u8>) -> Result<String, MemStoreError> {
tokio::time::sleep(Duration::from_millis(self.config.put_latency)).await;
if value.len() as u64 > self.config.max_blob_size_bytes {
return Err(MemStoreError::BlobToLarge.into());
}

let mut entropy = [0u8; 10];
OsRng.fill_bytes(&mut entropy);

let mut hasher = Keccak256::new();
hasher.update(&entropy);
let mock_batch_root = hasher.finalize().to_vec();

let block_num = OsRng.gen_range(0u32..1000);

let blob_info = blob_info::BlobInfo {
blob_header: blob_info::BlobHeader {
commitment: blob_info::G1Commitment {
// todo: generate real commitment
x: vec![0u8; 32],
y: vec![0u8; 32],
},
data_length: value.len() as u32,
blob_quorum_params: vec![blob_info::BlobQuorumParam {
quorum_number: 1,
adversary_threshold_percentage: 29,
confirmation_threshold_percentage: 30,
chunk_length: 300,
}],
},
blob_verification_proof: blob_info::BlobVerificationProof {
batch_medatada: blob_info::BatchMetadata {
batch_header: blob_info::BatchHeader {
batch_root: mock_batch_root.clone(),
quorum_numbers: vec![0x1, 0x0],
quorum_signed_percentages: vec![0x60, 0x90],
reference_block_number: block_num,
},
signatory_record_hash: mock_batch_root,
fee: vec![],
confirmation_block_number: block_num,
batch_header_hash: vec![],
},
batch_id: 69,
blob_index: 420,
inclusion_proof: entropy.to_vec(),
quorum_indexes: vec![0x1, 0x0],
},
};

let cert_bytes = rlp::encode(&blob_info).to_vec();

let key = String::from_utf8_lossy(
blob_info
.blob_verification_proof
.inclusion_proof
.clone()
.as_slice(),
)
.to_string();

let mut data = self.data.write().unwrap();

if data.store.contains_key(key.as_str()) {
return Err(MemStoreError::BlobAlreadyExists);
}

data.key_starts.insert(key.clone(), Instant::now());
data.store.insert(key, value);
Ok(hex::encode(cert_bytes))
}

/// It returns the inclusion proof
pub async fn get_inclusion_data(
self: Arc<Self>,
blob_id: &str,
) -> anyhow::Result<Option<InclusionData>, DAError> {
let rlp_encoded_bytes = hex::decode(blob_id).map_err(|_| DAError {
error: MemStoreError::IncorrectCommitment.into(),
is_retriable: false,
})?;
let blob_info: BlobInfo = rlp::decode(&rlp_encoded_bytes).map_err(|_| DAError {
error: MemStoreError::IncorrectCommitment.into(),
is_retriable: false,
})?;
let inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
Ok(Some(InclusionData {
data: inclusion_data,
}))
}

/// This function is only used on tests, it returns the blob data
#[cfg(test)]
pub async fn get_blob_data(
self: Arc<Self>,
blob_id: &str,
) -> anyhow::Result<Option<Vec<u8>>, DAError> {
tokio::time::sleep(Duration::from_millis(self.config.get_latency)).await;
let request_id = hex::decode(blob_id).map_err(|_| DAError {
error: MemStoreError::IncorrectCommitment.into(),
is_retriable: false,
})?;
let blob_info: BlobInfo = rlp::decode(&request_id).map_err(|_| DAError {
error: MemStoreError::IncorrectCommitment.into(),
is_retriable: false,
})?;
let key = String::from_utf8_lossy(
blob_info
.blob_verification_proof
.inclusion_proof
.clone()
.as_slice(),
)
.to_string();

let data = self.data.read().map_err(|_| DAError {
error: MemStoreError::BlobNotFound.into(),
is_retriable: false,
})?;
match data.store.get(&key) {
Some(value) => Ok(Some(value.clone())),
None => Err(DAError {
error: MemStoreError::BlobNotFound.into(),
is_retriable: false,
}),
}
}

/// After some time has passed, blobs are removed from the store
async fn prune_expired(self: Arc<Self>) {
let mut data = self.data.write().unwrap();
let mut to_remove = vec![];
for (key, start) in data.key_starts.iter() {
if start.elapsed() > Duration::from_secs(self.config.blob_expiration) {
to_remove.push(key.clone());
}
}
for key in to_remove {
data.store.remove(&key);
data.key_starts.remove(&key);
}
}

/// Loop used to prune expired blobs
async fn pruning_loop(self: Arc<Self>) {
let mut interval = interval(Duration::from_secs(self.config.blob_expiration));

loop {
interval.tick().await;
let self_clone = Arc::clone(&self);
self_clone.prune_expired().await;
}
}
}
12 changes: 12 additions & 0 deletions core/node/da_clients/src/eigen/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
mod blob_info;
mod client;
mod memstore;
mod sdk;

use std::sync::Arc;

use memstore::MemStore;
use sdk::RawEigenClient;

pub use self::client::EigenClient;

#[allow(clippy::all)]
Expand All @@ -13,3 +19,9 @@ pub(crate) mod disperser {
pub(crate) mod common {
include!("generated/common.rs");
}

#[derive(Clone, Debug)]
pub(crate) enum Disperser {
Remote(Arc<RawEigenClient>),
Memory(Arc<MemStore>),
}
Loading
Loading