Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use InspectReader to calculate crc32 #935

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.2.3"
version = "0.2.4"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.2.3" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.3" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.3" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.3" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.3" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.3" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.3" }
dragonfly-client = { path = "dragonfly-client", version = "0.2.4" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.4" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.4" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.4" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.4" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.4" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.4" }
thiserror = "1.0"
dragonfly-api = "=2.1.3"
reqwest = { version = "0.12.4", features = [
Expand Down
53 changes: 25 additions & 28 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tokio::fs::{self, File, OpenOptions};
use tokio::io::{
self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom,
};
use tokio_util::io::InspectReader;
use tracing::{error, info, instrument, warn};

/// DEFAULT_CONTENT_DIR is the default directory for store content.
Expand Down Expand Up @@ -337,24 +338,22 @@ impl Content {
})?;

// Copy the piece to the file while updating the CRC32 value.
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
let crc = Crc::<u32, Table<16>>::new(&CRC_32_ISCSI);
let mut digest = crc.digest();
let mut length = 0;
let mut buffer = vec![0; self.config.storage.write_buffer_size];
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f);
let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);

loop {
let n = reader.read(&mut buffer).await?;
if n == 0 {
break;
}
let mut tee = InspectReader::new(reader, |bytes| {
digest.update(bytes);
});

digest.update(&buffer[..n]);
writer.write_all(&buffer[..n]).await?;
length += n as u64;
}
writer.flush().await?;
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f);
let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;

writer.flush().await.inspect_err(|err| {
error!("flush {:?} failed: {}", task_path, err);
})?;

// Calculate the hash of the piece.
Ok(WritePieceResponse {
Expand Down Expand Up @@ -458,24 +457,22 @@ impl Content {
})?;

// Copy the content to the file while updating the CRC32 value.
let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, from_f);
let crc = Crc::<u32, Table<16>>::new(&CRC_32_ISCSI);
let mut digest = crc.digest();
let mut length = 0;
let mut buffer = vec![0; self.config.storage.write_buffer_size];
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, to_f);
let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, from_f);

loop {
let n = reader.read(&mut buffer).await?;
if n == 0 {
break;
}
let mut tee = InspectReader::new(&mut reader, |bytes| {
digest.update(bytes);
});

digest.update(&buffer[..n]);
writer.write_all(&buffer[..n]).await?;
length += n as u64;
}
writer.flush().await?;
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, to_f);
let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;

writer.flush().await.inspect_err(|err| {
error!("flush {:?} failed: {}", task_path, err);
})?;

Ok(WritePersistentCacheTaskResponse {
length,
Expand Down
Loading