Skip to content

Commit

Permalink
feat: bump api from v2.1.23 to v2.1.25 and optimize gc for persistent…
Browse files Browse the repository at this point in the history
… cache task (#978)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Feb 12, 2025
1 parent ac0dbbd commit 3339d5c
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 40 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.2.10"
version = "0.2.11"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,15 +22,15 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.2.10" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.10" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.10" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.10" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.10" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.10" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.10" }
dragonfly-client = { path = "dragonfly-client", version = "0.2.11" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.11" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.11" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.11" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.11" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.11" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.11" }
thiserror = "1.0"
dragonfly-api = "=2.1.23"
dragonfly-api = "=2.1.25"
reqwest = { version = "0.12.4", features = [
"stream",
"native-tls",
Expand Down
138 changes: 123 additions & 15 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,10 @@ impl Content {
info!("remove {:?} failed: {}", to, err);
});

if let Err(err) = self.hard_link_task(task.id.as_str(), to).await {
if let Err(err) = self
.hard_link_persistent_cache_task(task.id.as_str(), to)
.await
{
warn!("hard link {:?} to {:?} failed: {}", task_path, to, err);

// If the persistent cache task is empty, no need to copy. Need to open the file to
Expand All @@ -427,7 +430,7 @@ impl Content {
return Ok(());
}

self.copy_task(task.id.as_str(), to)
self.copy_persistent_cache_task(task.id.as_str(), to)
.await
.inspect_err(|err| {
error!("copy {:?} to {:?} failed: {}", task_path, to, err);
Expand All @@ -441,38 +444,119 @@ impl Content {
Ok(())
}

/// copy_persistent_cache_task copies the persistent cache task content to the destination.
/// read_persistent_cache_piece reads the persistent cache piece from the content.
#[instrument(skip_all)]
pub async fn write_persistent_cache_task(
pub async fn read_persistent_cache_piece(
&self,
task_id: &str,
from: &Path,
) -> Result<WritePersistentCacheTaskResponse> {
// Open the file to copy the content.
let from_f = File::open(from).await?;
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_persistent_cache_task_path(task_id);

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = calculate_piece_range(offset, length, range);

let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

f_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;

Ok(f_reader.take(target_length))
}

/// read_persistent_cache_piece_with_dual_read return two readers, one is the range reader, and the other is the
/// full reader of the persistent cache piece. It is used for cache the piece content to the proxy cache.
#[instrument(skip_all)]
pub async fn read_persistent_cache_piece_with_dual_read(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
let task_path = self.get_persistent_cache_task_path(task_id);

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = calculate_piece_range(offset, length, range);

let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

f_range_reader
.seek(SeekFrom::Start(target_offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let range_reader = f_range_reader.take(target_length);

// Create full reader of the piece.
let f = File::open(task_path.as_path()).await.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

f_reader
.seek(SeekFrom::Start(offset))
.await
.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;
let reader = f_reader.take(length);

Ok((range_reader, reader))
}

/// write_persistent_cache_piece_with_crc32_castagnoli writes the persistent cache piece to the content with crc32 castagnoli.
/// Calculate the hash of the piece by crc32 castagnoli with hardware acceleration.
#[instrument(skip_all)]
pub async fn write_persistent_cache_piece_with_crc32_castagnoli<
R: AsyncRead + Unpin + ?Sized,
>(
&self,
task_id: &str,
offset: u64,
reader: &mut R,
) -> Result<WritePieceResponse> {
// Open the file and seek to the offset.
let task_path = self
.create_or_get_persistent_cache_task_path(task_id)
.await?;
let to_f = OpenOptions::new()
.create_new(true)
let mut f = OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(task_path.as_path())
.await
.inspect_err(|err| {
error!("open {:?} failed: {}", task_path, err);
})?;

// Copy the content to the file while updating the CRC32 value.
let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, from_f);
f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
})?;

// Copy the piece to the file while updating the CRC32 value.
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
let crc = Crc::<u32, Table<16>>::new(&CRC_32_ISCSI);
let mut digest = crc.digest();

let mut tee = InspectReader::new(&mut reader, |bytes| {
let mut tee = InspectReader::new(reader, |bytes| {
digest.update(bytes);
});

let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, to_f);
let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f);
let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| {
error!("copy {:?} failed: {}", task_path, err);
})?;
Expand All @@ -481,12 +565,36 @@ impl Content {
error!("flush {:?} failed: {}", task_path, err);
})?;

Ok(WritePersistentCacheTaskResponse {
// Calculate the hash of the piece.
Ok(WritePieceResponse {
length,
hash: digest.finalize().to_string(),
})
}

/// hard_link_persistent_cache_task hard links the persistent cache task content.
#[instrument(skip_all)]
async fn hard_link_persistent_cache_task(&self, task_id: &str, link: &Path) -> Result<()> {
fs::hard_link(self.get_persistent_cache_task_path(task_id), link).await?;
Ok(())
}

/// copy_persistent_cache_task copies the persistent cache task content to the destination.
#[instrument(skip_all)]
async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> {
// Ensure the parent directory of the destination exists.
if let Some(parent) = to.parent() {
if !parent.exists() {
fs::create_dir_all(parent).await.inspect_err(|err| {
error!("failed to create directory {:?}: {}", parent, err);
})?;
}
}

fs::copy(self.get_persistent_cache_task_path(task_id), to).await?;
Ok(())
}

/// delete_task deletes the persistent cache task content.
#[instrument(skip_all)]
pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> {
Expand Down
9 changes: 6 additions & 3 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

use chrono::NaiveDateTime;
use dragonfly_api::common::v2::Range;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{Error, Result};
Expand Down Expand Up @@ -214,13 +215,15 @@ impl Storage {
persistent: bool,
piece_length: u64,
content_length: u64,
created_at: NaiveDateTime,
) -> Result<metadata::PersistentCacheTask> {
self.metadata.download_persistent_cache_task_started(
id,
ttl,
persistent,
piece_length,
content_length,
created_at,
)
}

Expand Down Expand Up @@ -312,7 +315,7 @@ impl Storage {
) -> Result<metadata::Piece> {
let response = self
.content
.write_piece_with_crc32_castagnoli(task_id, offset, reader)
.write_persistent_cache_piece_with_crc32_castagnoli(task_id, offset, reader)
.await?;
let digest = Digest::new(Algorithm::Crc32, response.hash);

Expand Down Expand Up @@ -562,7 +565,7 @@ impl Storage {
) -> Result<metadata::Piece> {
let response = self
.content
.write_piece_with_crc32_castagnoli(task_id, offset, reader)
.write_persistent_cache_piece_with_crc32_castagnoli(task_id, offset, reader)
.await?;

let length = response.length;
Expand Down Expand Up @@ -613,7 +616,7 @@ impl Storage {
Ok(Some(piece)) => {
match self
.content
.read_piece(task_id, piece.offset, piece.length, range)
.read_persistent_cache_piece(task_id, piece.offset, piece.length, range)
.await
{
Ok(reader) => {
Expand Down
3 changes: 2 additions & 1 deletion dragonfly-client-storage/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
persistent: bool,
piece_length: u64,
content_length: u64,
created_at: NaiveDateTime,
) -> Result<PersistentCacheTask> {
let task = match self.db.get::<PersistentCacheTask>(id.as_bytes())? {
Some(mut task) => {
Expand All @@ -601,7 +602,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
piece_length,
content_length,
updated_at: Utc::now().naive_utc(),
created_at: Utc::now().naive_utc(),
created_at,
..Default::default()
},
};
Expand Down
9 changes: 9 additions & 0 deletions dragonfly-client/src/bin/dfcache/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ impl ImportCommand {

/// validate_args validates the command line arguments.
fn validate_args(&self) -> Result<()> {
if self.ttl < Duration::from_secs(5 * 60)
|| self.ttl > Duration::from_secs(7 * 24 * 60 * 60)
{
return Err(Error::ValidationError(format!(
"ttl must be between 5 minutes and 7 days, but got {}",
self.ttl.as_secs()
)));
}

if let Some(id) = self.id.as_ref() {
if id.len() != 64 {
return Err(Error::ValidationError(format!(
Expand Down
Loading

0 comments on commit 3339d5c

Please sign in to comment.