Skip to content

Commit

Permalink
feat: use InspectReader to calculate crc32 (#935)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jan 8, 2025
1 parent f4c991c commit 9823eac
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 44 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.2.3"
version = "0.2.4"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.2.3" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.3" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.3" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.3" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.3" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.3" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.3" }
dragonfly-client = { path = "dragonfly-client", version = "0.2.4" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.4" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.4" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.4" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.4" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.4" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.4" }
thiserror = "1.0"
dragonfly-api = "=2.1.3"
reqwest = { version = "0.12.4", features = [
Expand Down
53 changes: 25 additions & 28 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tokio::fs::{self, File, OpenOptions};
use tokio::io::{
self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom,
};
use tokio_util::io::InspectReader;
use tracing::{error, info, instrument, warn};

/// DEFAULT_CONTENT_DIR is the default directory for store content.
Expand Down Expand Up @@ -337,24 +338,22 @@ impl Content {
})?;

// Copy the piece to the file while updating the CRC32 value.
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
let crc = Crc::<u32, Table<16>>::new(&CRC_32_ISCSI);
let mut digest = crc.digest();
let mut length = 0;
let mut buffer = vec![0; self.config.storage.write_buffer_size];
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f);
let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);

loop {
let n = reader.read(&mut buffer).await?;
if n == 0 {
break;
}
let mut tee = InspectReader::new(reader, |bytes| {
digest.update(bytes);
});

digest.update(&buffer[..n]);
writer.write_all(&buffer[..n]).await?;
length += n as u64;
}
writer.flush().await?;
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f);
let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;

writer.flush().await.inspect_err(|err| {
error!("flush {:?} failed: {}", task_path, err);
})?;

// Calculate the hash of the piece.
Ok(WritePieceResponse {
Expand Down Expand Up @@ -458,24 +457,22 @@ impl Content {
})?;

// Copy the content to the file while updating the CRC32 value.
let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, from_f);
let crc = Crc::<u32, Table<16>>::new(&CRC_32_ISCSI);
let mut digest = crc.digest();
let mut length = 0;
let mut buffer = vec![0; self.config.storage.write_buffer_size];
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, to_f);
let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, from_f);

loop {
let n = reader.read(&mut buffer).await?;
if n == 0 {
break;
}
let mut tee = InspectReader::new(&mut reader, |bytes| {
digest.update(bytes);
});

digest.update(&buffer[..n]);
writer.write_all(&buffer[..n]).await?;
length += n as u64;
}
writer.flush().await?;
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, to_f);
let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;

writer.flush().await.inspect_err(|err| {
error!("flush {:?} failed: {}", task_path, err);
})?;

Ok(WritePersistentCacheTaskResponse {
length,
Expand Down

0 comments on commit 9823eac

Please sign in to comment.