diff --git a/crates/store/src/backend/composite/distributed_blob.rs b/crates/store/src/backend/composite/distributed_blob.rs index 33cf8aa0f..c92ef030a 100644 --- a/crates/store/src/backend/composite/distributed_blob.rs +++ b/crates/store/src/backend/composite/distributed_blob.rs @@ -72,7 +72,7 @@ impl DistributedBlob { #[cfg(feature = "rocks")] Store::RocksDb(store) => store.get_blob(key, read_range).await, #[cfg(feature = "etcd")] - Store::Etcd(_) => unimplemented!(), + Store::Etcd(_) => store.get_blob(key, read_range).await, #[cfg(all( feature = "enterprise", any(feature = "postgres", feature = "mysql") @@ -104,7 +104,7 @@ impl DistributedBlob { #[cfg(feature = "rocks")] Store::RocksDb(store) => store.put_blob(key, data).await, #[cfg(feature = "etcd")] - Store::Etcd(_) => unimplemented!(), + Store::Etcd(_) => store.put_blob(key, data).await, #[cfg(all( feature = "enterprise", any(feature = "postgres", feature = "mysql") @@ -136,7 +136,7 @@ impl DistributedBlob { #[cfg(feature = "rocks")] Store::RocksDb(store) => store.delete_blob(key).await, #[cfg(feature = "etcd")] - Store::Etcd(_) => unimplemented!(), + Store::Etcd(_) => store.delete_blob(key).await, #[cfg(all( feature = "enterprise", any(feature = "postgres", feature = "mysql") diff --git a/crates/store/src/backend/etcd/blob.rs b/crates/store/src/backend/etcd/blob.rs new file mode 100644 index 000000000..5fc8dfa40 --- /dev/null +++ b/crates/store/src/backend/etcd/blob.rs @@ -0,0 +1,177 @@ +/* + * SPDX-FileCopyrightText: 2024 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::ops::Range; + +use utils::BLOB_HASH_LEN; + +use etcd_client::{DeleteOptions, GetOptions, Txn, TxnOp}; + +use super::{into_error, EtcdStore, MAX_VALUE_SIZE}; + +use crate::{ + write::{ + key::KeySerializer, + }, SUBSPACE_BLOBS +}; + +impl EtcdStore { + pub(crate) async fn get_blob( + &self, + key: &[u8], + range: Range, + ) -> trc::Result>> { + let block_start = range.start / MAX_VALUE_SIZE; + let bytes_start = range.start % MAX_VALUE_SIZE; + let block_end = (range.end / MAX_VALUE_SIZE) + 1; + + let begin = KeySerializer::new(key.len() + 3) + .write(SUBSPACE_BLOBS) + .write(key) + .write(block_start as u16) + .finalize(); + let end = KeySerializer::new(key.len() + 3) + .write(SUBSPACE_BLOBS) + .write(key) + .write(block_end as u16) + .finalize(); + let key_len = begin.len(); + let mut client = self.client.clone(); + let mut values = match client.get(begin, Some(GetOptions::new().with_range(end))).await { + Ok(mut res) => res.take_kvs().into_iter(), + Err(err) => return Err(trc::StoreEvent::EtcdError + .ctx( + trc::Key::Reason, + err.to_string(), + )) + }; + let mut blob_data: Option> = None; + let blob_range = range.end - range.start; + + 'outer: while let Some(value) = values.next() { + let key = value.key(); + if key.len() == key_len { + let value = value.value(); + if let Some(blob_data) = &mut blob_data { + blob_data.extend_from_slice( + value + .get( + ..std::cmp::min( + blob_range.saturating_sub(blob_data.len()), + value.len(), + ), + ) + .unwrap_or(&[]), + ); + if blob_data.len() == blob_range { + break 'outer; + } + } else { + let blob_size = if blob_range <= (5 * (1 << 20)) { + blob_range + } else if value.len() == MAX_VALUE_SIZE { + MAX_VALUE_SIZE * 2 + } else { + value.len() + }; + let mut blob_data_ = Vec::with_capacity(blob_size); + blob_data_.extend_from_slice( + value + .get(bytes_start..std::cmp::min(bytes_start + blob_range, value.len())) + .unwrap_or(&[]), + ); + if blob_data_.len() == blob_range { + return Ok(Some(blob_data_)); + } + blob_data = blob_data_.into(); + } + } + } + + Ok(blob_data) + } + + pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> trc::Result<()> { + const N_CHUNKS: usize = (1 << 5) - 1; + let last_chunk = std::cmp::max( + (data.len() / MAX_VALUE_SIZE) + + if data.len() % MAX_VALUE_SIZE > 0 { + 1 + } else { + 0 + }, + 1, + ) - 1; + let mut client = self.client.clone(); + let mut trx = Txn::new(); + + for (chunk_pos, chunk_bytes) in data.chunks(MAX_VALUE_SIZE).enumerate() { + + let key = KeySerializer::new(key.len() + 3) + .write(SUBSPACE_BLOBS) + .write(key) + .write(chunk_pos as u16) + .finalize(); + + let trx_operations: Vec = vec![ + TxnOp::put(key, chunk_bytes, None) + ]; + + if chunk_pos == last_chunk || (chunk_pos > 0 && chunk_pos % N_CHUNKS == 0) { + let _ = match client + .txn(trx.and_then(trx_operations)) + .await { + Ok(_) => {}, + Err(err) => return Err(into_error(err)) + }; + + if chunk_pos < last_chunk { + // Create a new transaction for the next chunk + trx = Txn::new(); + } else { + break; + } + } + } + + + + + + + Ok(()) + } + + pub(crate) async fn delete_blob(&self, key: &[u8]) -> trc::Result { + if key.len() < BLOB_HASH_LEN { + return Ok(false); + } + + let mut client = self.client.clone(); + let trx = Txn::new(); + let start_key = KeySerializer::new(key.len() + 3) + .write(SUBSPACE_BLOBS) + .write(key) + .write(0u16) + .finalize(); + let end_key = KeySerializer::new(key.len() + 3) + .write(SUBSPACE_BLOBS) + .write(key) + .write(u16::MAX) + .finalize(); + + let trx_operations: Vec = vec![ + TxnOp::delete(start_key, Some(DeleteOptions::new().with_range(end_key))) + ]; + + match client + .txn(trx.and_then(trx_operations)) + .await { + Ok(data) => Ok(data.succeeded()), + Err(err) => Err(into_error(err)) + } + } +} diff --git a/crates/store/src/backend/etcd/mod.rs b/crates/store/src/backend/etcd/mod.rs index f21e3c24a..ee0206190 100644 --- a/crates/store/src/backend/etcd/mod.rs +++ b/crates/store/src/backend/etcd/mod.rs @@ -8,6 +8,7 @@ use etcd_client::{KvClient, KvClientPrefix, Error as EtcdError}; pub mod main; pub mod read; pub mod write; +pub mod blob; // See: https://etcd.io/docs/v3.4/dev-guide/limit/ // maximum size of any request is 1.5 MiB diff --git a/crates/store/src/backend/etcd/write.rs b/crates/store/src/backend/etcd/write.rs index eb4877060..a419bf425 100644 --- a/crates/store/src/backend/etcd/write.rs +++ b/crates/store/src/backend/etcd/write.rs @@ -256,8 +256,6 @@ impl EtcdStore { (&result).into(), ); - ; - let matches = match client.get(key, None).await { Ok(res) => match res.kvs().first() { Some(value) => assert_value.matches(value.value()), diff --git a/crates/store/src/dispatch/blob.rs b/crates/store/src/dispatch/blob.rs index d1d237f1c..6dd82ebc9 100644 --- a/crates/store/src/dispatch/blob.rs +++ b/crates/store/src/dispatch/blob.rs @@ -31,7 +31,7 @@ impl BlobStore { #[cfg(feature = "rocks")] Store::RocksDb(store) => store.get_blob(key, read_range).await, #[cfg(feature = "etcd")] - Store::Etcd(_) => unimplemented!(), + Store::Etcd(_) => store.get_blob(key, read_range).await, #[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))] Store::SQLReadReplica(store) => store.get_blob(key, read_range).await, Store::None => Err(trc::StoreEvent::NotConfigured.into()), @@ -113,7 +113,7 @@ impl BlobStore { #[cfg(feature = "rocks")] Store::RocksDb(store) => store.put_blob(key, data.as_ref()).await, #[cfg(feature = "etcd")] - Store::Etcd(_) => unimplemented!(), + Store::Etcd(_) => store.put_blob(key, data.as_ref()).await, #[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))] Store::SQLReadReplica(store) => store.put_blob(key, data.as_ref()).await, Store::None => Err(trc::StoreEvent::NotConfigured.into()), @@ -151,7 +151,7 @@ impl BlobStore { #[cfg(feature = "rocks")] Store::RocksDb(store) => store.delete_blob(key).await, #[cfg(feature = "etcd")] - Store::Etcd(_) => unimplemented!(), + Store::Etcd(_) => store.delete_blob(key).await, #[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))] Store::SQLReadReplica(store) => store.delete_blob(key).await, Store::None => Err(trc::StoreEvent::NotConfigured.into()), diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index b24b7bb90..9dadf5c1e 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -524,7 +524,7 @@ impl Store { #[cfg(feature = "rocks")] Self::RocksDb(store) => store.get_blob(key, range).await, #[cfg(feature = "etcd")] - Self::Etcd(_) => unimplemented!(), + Self::Etcd(store) => store.get_blob(key, range).await, #[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))] Self::SQLReadReplica(store) => store.get_blob(key, range).await, Self::None => Err(trc::StoreEvent::NotConfigured.into()), @@ -545,7 +545,7 @@ impl Store { #[cfg(feature = "rocks")] Self::RocksDb(store) => store.put_blob(key, data).await, #[cfg(feature = "etcd")] - Self::Etcd(_) => unimplemented!(), + Self::Etcd(store) => store.put_blob(key, data).await, #[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))] Self::SQLReadReplica(store) => store.put_blob(key, data).await, Self::None => Err(trc::StoreEvent::NotConfigured.into()), @@ -566,7 +566,7 @@ impl Store { #[cfg(feature = "rocks")] Self::RocksDb(store) => store.delete_blob(key).await, #[cfg(feature = "etcd")] - Self::Etcd(_) => unimplemented!(), + Self::Etcd(store) => store.delete_blob(key).await, #[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))] Self::SQLReadReplica(store) => store.delete_blob(key).await, Self::None => Err(trc::StoreEvent::NotConfigured.into()),