Skip to content

Commit

Permalink
feat(etcd): implement blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
williamdes committed Oct 27, 2024
1 parent 9117070 commit 52158c5
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 11 deletions.
6 changes: 3 additions & 3 deletions crates/store/src/backend/composite/distributed_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl DistributedBlob {
#[cfg(feature = "rocks")]
Store::RocksDb(store) => store.get_blob(key, read_range).await,
#[cfg(feature = "etcd")]
Store::Etcd(_) => unimplemented!(),
Store::Etcd(_) => store.get_blob(key, read_range).await,
#[cfg(all(
feature = "enterprise",
any(feature = "postgres", feature = "mysql")
Expand Down Expand Up @@ -104,7 +104,7 @@ impl DistributedBlob {
#[cfg(feature = "rocks")]
Store::RocksDb(store) => store.put_blob(key, data).await,
#[cfg(feature = "etcd")]
Store::Etcd(_) => unimplemented!(),
Store::Etcd(_) => store.put_blob(key, data).await,
#[cfg(all(
feature = "enterprise",
any(feature = "postgres", feature = "mysql")
Expand Down Expand Up @@ -136,7 +136,7 @@ impl DistributedBlob {
#[cfg(feature = "rocks")]
Store::RocksDb(store) => store.delete_blob(key).await,
#[cfg(feature = "etcd")]
Store::Etcd(_) => unimplemented!(),
Store::Etcd(_) => store.delete_blob(key).await,
#[cfg(all(
feature = "enterprise",
any(feature = "postgres", feature = "mysql")
Expand Down
177 changes: 177 additions & 0 deletions crates/store/src/backend/etcd/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* SPDX-FileCopyrightText: 2024 Stalwart Labs Ltd <[email protected]>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

use std::ops::Range;

use utils::BLOB_HASH_LEN;

use etcd_client::{DeleteOptions, GetOptions, Txn, TxnOp};

use super::{into_error, EtcdStore, MAX_VALUE_SIZE};

use crate::{
write::{
key::KeySerializer,
}, SUBSPACE_BLOBS
};

impl EtcdStore {
pub(crate) async fn get_blob(
&self,
key: &[u8],
range: Range<usize>,
) -> trc::Result<Option<Vec<u8>>> {
let block_start = range.start / MAX_VALUE_SIZE;
let bytes_start = range.start % MAX_VALUE_SIZE;
let block_end = (range.end / MAX_VALUE_SIZE) + 1;

let begin = KeySerializer::new(key.len() + 3)
.write(SUBSPACE_BLOBS)
.write(key)
.write(block_start as u16)
.finalize();
let end = KeySerializer::new(key.len() + 3)
.write(SUBSPACE_BLOBS)
.write(key)
.write(block_end as u16)
.finalize();
let key_len = begin.len();
let mut client = self.client.clone();
let mut values = match client.get(begin, Some(GetOptions::new().with_range(end))).await {
Ok(mut res) => res.take_kvs().into_iter(),
Err(err) => return Err(trc::StoreEvent::EtcdError
.ctx(
trc::Key::Reason,
err.to_string(),
))
};
let mut blob_data: Option<Vec<u8>> = None;
let blob_range = range.end - range.start;

'outer: while let Some(value) = values.next() {
let key = value.key();
if key.len() == key_len {
let value = value.value();
if let Some(blob_data) = &mut blob_data {
blob_data.extend_from_slice(
value
.get(
..std::cmp::min(
blob_range.saturating_sub(blob_data.len()),
value.len(),
),
)
.unwrap_or(&[]),
);
if blob_data.len() == blob_range {
break 'outer;
}
} else {
let blob_size = if blob_range <= (5 * (1 << 20)) {
blob_range
} else if value.len() == MAX_VALUE_SIZE {
MAX_VALUE_SIZE * 2
} else {
value.len()
};
let mut blob_data_ = Vec::with_capacity(blob_size);
blob_data_.extend_from_slice(
value
.get(bytes_start..std::cmp::min(bytes_start + blob_range, value.len()))
.unwrap_or(&[]),
);
if blob_data_.len() == blob_range {
return Ok(Some(blob_data_));
}
blob_data = blob_data_.into();
}
}
}

Ok(blob_data)
}

pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> trc::Result<()> {
const N_CHUNKS: usize = (1 << 5) - 1;
let last_chunk = std::cmp::max(
(data.len() / MAX_VALUE_SIZE)
+ if data.len() % MAX_VALUE_SIZE > 0 {
1
} else {
0
},
1,
) - 1;
let mut client = self.client.clone();
let mut trx = Txn::new();

for (chunk_pos, chunk_bytes) in data.chunks(MAX_VALUE_SIZE).enumerate() {

let key = KeySerializer::new(key.len() + 3)
.write(SUBSPACE_BLOBS)
.write(key)
.write(chunk_pos as u16)
.finalize();

let trx_operations: Vec<TxnOp> = vec![
TxnOp::put(key, chunk_bytes, None)
];

if chunk_pos == last_chunk || (chunk_pos > 0 && chunk_pos % N_CHUNKS == 0) {
let _ = match client
.txn(trx.and_then(trx_operations))
.await {
Ok(_) => {},
Err(err) => return Err(into_error(err))
};

if chunk_pos < last_chunk {
// Create a new transaction for the next chunk
trx = Txn::new();
} else {
break;
}
}
}






Ok(())
}

pub(crate) async fn delete_blob(&self, key: &[u8]) -> trc::Result<bool> {
if key.len() < BLOB_HASH_LEN {
return Ok(false);
}

let mut client = self.client.clone();
let trx = Txn::new();
let start_key = KeySerializer::new(key.len() + 3)
.write(SUBSPACE_BLOBS)
.write(key)
.write(0u16)
.finalize();
let end_key = KeySerializer::new(key.len() + 3)
.write(SUBSPACE_BLOBS)
.write(key)
.write(u16::MAX)
.finalize();

let trx_operations: Vec<TxnOp> = vec![
TxnOp::delete(start_key, Some(DeleteOptions::new().with_range(end_key)))
];

match client
.txn(trx.and_then(trx_operations))
.await {
Ok(data) => Ok(data.succeeded()),
Err(err) => Err(into_error(err))
}
}
}
1 change: 1 addition & 0 deletions crates/store/src/backend/etcd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use etcd_client::{KvClient, KvClientPrefix, Error as EtcdError};
pub mod main;
pub mod read;
pub mod write;
pub mod blob;

// See: https://etcd.io/docs/v3.4/dev-guide/limit/
// maximum size of any request is 1.5 MiB
Expand Down
2 changes: 0 additions & 2 deletions crates/store/src/backend/etcd/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,6 @@ impl EtcdStore {
(&result).into(),
);

;

let matches = match client.get(key, None).await {
Ok(res) => match res.kvs().first() {
Some(value) => assert_value.matches(value.value()),
Expand Down
6 changes: 3 additions & 3 deletions crates/store/src/dispatch/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl BlobStore {
#[cfg(feature = "rocks")]
Store::RocksDb(store) => store.get_blob(key, read_range).await,
#[cfg(feature = "etcd")]
Store::Etcd(_) => unimplemented!(),
Store::Etcd(_) => store.get_blob(key, read_range).await,
#[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))]
Store::SQLReadReplica(store) => store.get_blob(key, read_range).await,
Store::None => Err(trc::StoreEvent::NotConfigured.into()),
Expand Down Expand Up @@ -113,7 +113,7 @@ impl BlobStore {
#[cfg(feature = "rocks")]
Store::RocksDb(store) => store.put_blob(key, data.as_ref()).await,
#[cfg(feature = "etcd")]
Store::Etcd(_) => unimplemented!(),
Store::Etcd(_) => store.put_blob(key, data.as_ref()).await,
#[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))]
Store::SQLReadReplica(store) => store.put_blob(key, data.as_ref()).await,
Store::None => Err(trc::StoreEvent::NotConfigured.into()),
Expand Down Expand Up @@ -151,7 +151,7 @@ impl BlobStore {
#[cfg(feature = "rocks")]
Store::RocksDb(store) => store.delete_blob(key).await,
#[cfg(feature = "etcd")]
Store::Etcd(_) => unimplemented!(),
Store::Etcd(_) => store.delete_blob(key).await,
#[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))]
Store::SQLReadReplica(store) => store.delete_blob(key).await,
Store::None => Err(trc::StoreEvent::NotConfigured.into()),
Expand Down
6 changes: 3 additions & 3 deletions crates/store/src/dispatch/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ impl Store {
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.get_blob(key, range).await,
#[cfg(feature = "etcd")]
Self::Etcd(_) => unimplemented!(),
Self::Etcd(store) => store.get_blob(key, range).await,
#[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))]
Self::SQLReadReplica(store) => store.get_blob(key, range).await,
Self::None => Err(trc::StoreEvent::NotConfigured.into()),
Expand All @@ -545,7 +545,7 @@ impl Store {
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.put_blob(key, data).await,
#[cfg(feature = "etcd")]
Self::Etcd(_) => unimplemented!(),
Self::Etcd(store) => store.put_blob(key, data).await,
#[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))]
Self::SQLReadReplica(store) => store.put_blob(key, data).await,
Self::None => Err(trc::StoreEvent::NotConfigured.into()),
Expand All @@ -566,7 +566,7 @@ impl Store {
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.delete_blob(key).await,
#[cfg(feature = "etcd")]
Self::Etcd(_) => unimplemented!(),
Self::Etcd(store) => store.delete_blob(key).await,
#[cfg(all(feature = "enterprise", any(feature = "postgres", feature = "mysql")))]
Self::SQLReadReplica(store) => store.delete_blob(key).await,
Self::None => Err(trc::StoreEvent::NotConfigured.into()),
Expand Down

0 comments on commit 52158c5

Please sign in to comment.