From fe052bb1f877e7034527b7d325a1e74a27f29f92 Mon Sep 17 00:00:00 2001 From: petkodes Date: Tue, 4 Feb 2025 12:50:16 +0200 Subject: [PATCH] feat: unixfs dag --- Cargo.lock | 9 +- mater/cli/src/convert.rs | 56 ++-- mater/cli/src/main.rs | 43 ++- mater/lib/Cargo.toml | 1 + mater/lib/src/lib.rs | 9 +- mater/lib/src/stores/blockstore.rs | 8 +- mater/lib/src/stores/filestore.rs | 311 +++++++++++++++++----- mater/lib/src/stores/mod.rs | 110 ++++++-- mater/lib/src/unixfs/mod.rs | 348 ++++++++++++++----------- mater/lib/src/v2/reader.rs | 63 +++-- storage-provider/server/src/storage.rs | 58 +++-- 11 files changed, 693 insertions(+), 323 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84cff591a..650aac41d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4842,7 +4842,7 @@ checksum = "3a82608ee96ce76aeab659e9b8d3c2b787bffd223199af88c674923d861ada10" dependencies = [ "execute-command-macro", "execute-command-tokens", - "generic-array 1.1.1", + "generic-array 1.1.0", ] [[package]] @@ -6084,9 +6084,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "1.1.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cb8bc4c28d15ade99c7e90b219f30da4be5c88e586277e8cbe886beeb868ab2" +checksum = "96512db27971c2c3eece70a1e106fbe6c87760234e31e8f7e5634912fe52794a" dependencies = [ "typenum", ] @@ -9100,6 +9100,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "tracing", ] [[package]] @@ -12084,7 +12085,7 @@ dependencies = [ "frame-benchmarking 28.0.0", "frame-support 28.0.0", "frame-system 28.0.0", - "generic-array 1.1.1", + "generic-array 1.1.0", "hex", "log", "num-bigint", diff --git a/mater/cli/src/convert.rs b/mater/cli/src/convert.rs index 67c3a64fd..1eee859bb 100644 --- a/mater/cli/src/convert.rs +++ b/mater/cli/src/convert.rs @@ -9,16 +9,11 @@ use crate::error::Error; pub(crate) async fn convert_file_to_car( input_path: &PathBuf, output_path: &PathBuf, - overwrite: bool, + config: Config, ) -> Result { let source_file = File::open(input_path).await?; - let output_file = if overwrite { - File::create(output_path).await - } else { - File::create_new(output_path).await - }?; - let cid = create_filestore(source_file, output_file, Config::default()).await?; - + let output_file = File::create(output_path).await?; + let cid = create_filestore(source_file, output_file, config).await?; Ok(cid) } @@ -26,17 +21,16 @@ pub(crate) async fn convert_file_to_car( /// MaterError cases are not handled because these are tested in the mater library. #[cfg(test)] mod tests { - use std::str::FromStr; - use anyhow::Result; use mater::Cid; + use mater::Config; + use mater::{DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; + use std::str::FromStr; use tempfile::tempdir; use tokio::{fs::File, io::AsyncWriteExt}; - use crate::{convert::convert_file_to_car, error::Error}; - #[tokio::test] - async fn convert_file_to_car_success() -> Result<()> { + async fn convert_file_to_car_raw_success() -> Result<()> { // Setup: Create a dummy input file let temp_dir = tempdir()?; let input_path = temp_dir.path().join("test_input.txt"); @@ -49,18 +43,14 @@ mod tests { // Define output path let output_path = temp_dir.path().join("test_output.car"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + // Configure in raw mode + let config = Config::balanced_raw(DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH); - // Assert the result is Ok + // Call the function under test + let result = super::convert_file_to_car(&input_path, &output_path, config).await; assert!(result.is_ok()); - - // Verify that the CID is as expected assert_eq!(result?, expected_cid); - // Close temporary directory - temp_dir.close()?; - Ok(()) } @@ -69,19 +59,15 @@ mod tests { // Define non-existent input path let temp_dir = tempdir()?; let input_path = temp_dir.path().join("non_existent_input.txt"); - // Define output path let output_path = temp_dir.path().join("test_output.car"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + let config = Config::default(); - // Assert the result is an error + // Call the function under test + let result = super::convert_file_to_car(&input_path, &output_path, config).await; assert!(result.is_err()); - assert!(matches!(result, Err(Error::IoError(..)))); - - // Close temporary directory - temp_dir.close()?; + assert!(matches!(result, Err(super::Error::IoError(..)))); Ok(()) } @@ -97,17 +83,13 @@ mod tests { // Create output file let output_path = temp_dir.path().join("output_file"); File::create_new(&output_path).await?; - println!("gets here"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + let config = Config::default(); - // Assert the result is an error + // Call the function under test + let result = super::convert_file_to_car(&input_path, &output_path, config).await; assert!(result.is_err()); - assert!(matches!(result, Err(Error::IoError(..)))); - - // Close temporary directory - temp_dir.close()?; + assert!(matches!(result, Err(super::Error::IoError(..)))); Ok(()) } diff --git a/mater/cli/src/main.rs b/mater/cli/src/main.rs index ec00f6068..f507be6b1 100644 --- a/mater/cli/src/main.rs +++ b/mater/cli/src/main.rs @@ -1,9 +1,7 @@ -use std::path::PathBuf; - -use clap::Parser; - use crate::{convert::convert_file_to_car, error::Error, extract::extract_file_from_car}; - +use clap::Parser; +use mater::Config; +use std::path::PathBuf; mod convert; mod error; mod extract; @@ -19,21 +17,32 @@ enum MaterCli { input_path: PathBuf, /// Optional path to output CARv2 file. - /// If no output path is given it will store the `.car` file in the same location. + /// If no output path is given it will store the .car file in the same location. output_path: Option, /// If enabled, only the resulting CID will be printed. #[arg(short, long, action)] quiet: bool, - /// If enabled, the output will overwrite any existing files. + /// If enabled, content will be stored directly without UnixFS wrapping. + /// By default, content is wrapped in UnixFS format for IPFS compatibility. #[arg(long, action)] - overwrite: bool, + raw: bool, + + /// Size of each chunk in bytes. Defaults to 256 KiB. + #[arg(long)] + chunk_size: Option, + + /// Maximum number of children per parent node. Defaults to 174. + #[arg(long)] + tree_width: Option, }, + /// Convert a CARv2 file to its original format Extract { /// Path to CARv2 file input_path: PathBuf, + /// Path to output file output_path: Option, }, @@ -46,14 +55,24 @@ async fn main() -> Result<(), Error> { input_path, output_path, quiet, - overwrite, + raw, + chunk_size, + tree_width, } => { let output_path = output_path.unwrap_or_else(|| { let mut new_path = input_path.clone(); new_path.set_extension("car"); new_path }); - let cid = convert_file_to_car(&input_path, &output_path, overwrite).await?; + + // Build config with UnixFS wrapping by default + let config = Config::balanced( + chunk_size.unwrap_or(256 * 1024), + tree_width.unwrap_or(174), + raw, + ); + + let cid = convert_file_to_car(&input_path, &output_path, config).await?; if quiet { println!("{}", cid); @@ -75,14 +94,12 @@ async fn main() -> Result<(), Error> { new_path }); extract_file_from_car(&input_path, &output_path).await?; - println!( - "Successfully converted CARv2 file {} and saved it to to {}", + "Successfully converted CARv2 file {} and saved it to {}", input_path.display(), output_path.display() ); } } - Ok(()) } diff --git a/mater/lib/Cargo.toml b/mater/lib/Cargo.toml index 0191772cf..edf04e20a 100644 --- a/mater/lib/Cargo.toml +++ b/mater/lib/Cargo.toml @@ -29,6 +29,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] } tokio-stream.workspace = true tokio-util = { workspace = true, features = ["io"] } +tracing = { workspace = true } # Optional dependencies blockstore = { workspace = true, optional = true } diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index 739d237c1..a807cdabc 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -21,7 +21,9 @@ mod v2; // We need to re-expose this because `read_block` returns `(Cid, Vec)`. pub use ipld_core::cid::Cid; pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE}; -pub use stores::{create_filestore, Blockstore, Config, FileBlockstore}; +pub use stores::{ + create_filestore, Blockstore, Config, FileBlockstore, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH, +}; pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; pub use v2::{ verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, @@ -111,6 +113,11 @@ pub enum Error { /// See [`DagPbError`](ipld_dagpb::Error) for more information. #[error(transparent)] DagPbError(#[from] ipld_dagpb::Error), + + /// Error returned when attempting to encode an incorrect node type. + /// For example, when attempting to encode a Leaf node as a Stem node. + #[error("Invalid node type: {0}")] + InvalidNodeType(String), } #[cfg(test)] diff --git a/mater/lib/src/stores/blockstore.rs b/mater/lib/src/stores/blockstore.rs index 3b6bb66f3..6f4c4e9da 100644 --- a/mater/lib/src/stores/blockstore.rs +++ b/mater/lib/src/stores/blockstore.rs @@ -12,7 +12,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; -use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH}; +use super::{DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; use crate::{ multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, @@ -76,7 +76,7 @@ impl Blockstore { root: None, blocks: IndexMap::new(), indexed: HashSet::new(), - chunk_size: chunk_size.unwrap_or(DEFAULT_BLOCK_SIZE), + chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE), tree_width: tree_width.unwrap_or(DEFAULT_TREE_WIDTH), } } @@ -85,7 +85,7 @@ impl Blockstore { /// converting the contents into a CARv2 file. pub async fn read(&mut self, reader: R) -> Result<(), Error> where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + Send + 'static, { let chunks = ReaderStream::with_capacity(reader, self.chunk_size); @@ -206,7 +206,7 @@ impl Default for Blockstore { root: None, blocks: IndexMap::new(), indexed: HashSet::new(), - chunk_size: DEFAULT_BLOCK_SIZE, + chunk_size: DEFAULT_CHUNK_SIZE, tree_width: DEFAULT_TREE_WIDTH, } } diff --git a/mater/lib/src/stores/filestore.rs b/mater/lib/src/stores/filestore.rs index b8e494a21..923761b31 100644 --- a/mater/lib/src/stores/filestore.rs +++ b/mater/lib/src/stores/filestore.rs @@ -1,15 +1,25 @@ -use bytes::BytesMut; -use futures::stream::StreamExt; -use ipld_core::cid::Cid; -use sha2::{Digest, Sha256}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite}; - use super::Config; use crate::{ - multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, - Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, + multicodec::SHA_256_CODE, + unixfs::{stream_balanced_tree, stream_balanced_tree_unixfs}, + CarV1Header, CarV2Header, CarV2Writer, Error, Index, IndexEntry, MultihashIndexSorted, + SingleWidthIndex, }; +use bytes::BytesMut; +use digest::Digest; +use futures::StreamExt; +use ipld_core::cid::Cid; +use sha2::Sha256; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite}; +use tracing::trace; +/// Converts a source stream into a CARv2 file and writes it to an output stream. +/// +/// The expanded trait bounds are required because: +/// - `Send + 'static`: The async stream operations require the ability to move the source/output +/// between threads and ensure they live long enough for the entire async operation +/// - `AsyncSeek`: Required for the output to write the final header after processing all blocks +/// - `Unpin`: Required because we need to move the source/output around during async operations async fn balanced_import( mut source: Src, mut output: Out, @@ -27,39 +37,26 @@ where // https://gist.github.com/jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 let chunker = async_stream::try_stream! { let mut buf = BytesMut::with_capacity(chunk_size); - loop { if buf.capacity() < chunk_size { - // BytesMut::reserve *may* allocate more memory than requested to avoid further - // allocations, while that's very helpful, it's also unpredictable. - // If and when necessary, we can replace this with the following line: - // std::mem::replace(buf, BytesMut::with_capacity(chunk_size)): - - // Reserve only the difference as the split may leave nothing, or something buf.reserve(chunk_size - buf.capacity()); } - // If the read length is 0, we *assume* we reached EOF - // tokio's docs state that this does not mean we exhausted the reader, - // as it may be able to return more bytes later, *however*, - // this means there is no right way of knowing when the reader is fully exhausted! - // If we need to support a case like that, we just need to track how many times - // the reader returned 0 and break at a certain point - if source.read_buf(&mut buf).await? == 0 { - // EOF but there's still content to yield -> yield it - if buf.len() > 0 { - let chunk = buf.split(); - yield chunk.freeze(); - } - break - } else if buf.len() >= chunk_size { - // The buffer may have a larger capacity than chunk_size due to reserve - // this also means that our read may have read more bytes than we expected, - // thats why we check if the length if bigger than the chunk_size and if so - // we split the buffer to the chunk_size, then freeze and return + let read_bytes = source.read_buf(&mut buf).await?; + trace!(bytes_read = read_bytes, buffer_size = buf.len(), "Buffer read status"); + + while buf.len() >= chunk_size { let chunk = buf.split_to(chunk_size); yield chunk.freeze(); - } // otherwise, the buffer is not full, so we don't do a thing + } + + if read_bytes == 0 && !buf.is_empty() { + let chunk = buf.split(); + yield chunk.freeze(); + break; + } else if read_bytes == 0 { + break; + } } }; @@ -69,19 +66,19 @@ where let mut writer = CarV2Writer::new(&mut output); let mut position = 0; - let placeholder_header = CarV2Header::default(); - position += writer.write_header(&placeholder_header).await?; + position += writer.write_header(&CarV2Header::default()).await?; let car_v1_start = position; - - let placeholder_header_v1 = CarV1Header::default(); - position += writer.write_v1_header(&placeholder_header_v1).await?; + position += writer.write_v1_header(&CarV1Header::default()).await?; let mut root = None; let mut entries = vec![]; + while let Some(node) = nodes.next().await { let (node_cid, node_bytes) = node?; - let digest = node_cid.hash().digest().to_owned(); - let entry = IndexEntry::new(digest, (position - car_v1_start) as u64); + let entry = IndexEntry::new( + node_cid.hash().digest().to_vec(), + (position - car_v1_start) as u64, + ); entries.push(entry); position += writer.write_block(&node_cid, &node_bytes).await?; @@ -90,6 +87,88 @@ where } } + let index_offset = position; + let single_width_index = + SingleWidthIndex::new(Sha256::output_size() as u32, entries.len() as u64, entries); + let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( + SHA_256_CODE, + single_width_index.into(), + )); + writer.write_index(&index).await?; + + writer.get_inner_mut().rewind().await?; + let header = CarV2Header::new( + false, + car_v1_start.try_into().unwrap(), + (index_offset - car_v1_start).try_into().unwrap(), + index_offset.try_into().unwrap(), + ); + writer.write_header(&header).await?; + + let header_v1 = CarV1Header::new(vec![root.unwrap()]); + writer.write_v1_header(&header_v1).await?; + + writer.finish().await?; + + Ok(root.unwrap()) +} + +async fn balanced_import_unixfs( + mut source: Src, + mut output: Out, + chunk_size: usize, + tree_width: usize, +) -> Result +where + Src: AsyncRead + Unpin + Send + 'static, + Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static, +{ + let chunker = async_stream::try_stream! { + let mut buf = BytesMut::with_capacity(chunk_size); + loop { + let read_bytes = source.read_buf(&mut buf).await?; + while buf.len() >= chunk_size { + let chunk = buf.split_to(chunk_size); + yield chunk.freeze(); + } + + if read_bytes == 0 && !buf.is_empty() { + let chunk = buf.split(); + yield chunk.freeze(); + break; + } else if read_bytes == 0 { + break; + } + } + }; + + let nodes = stream_balanced_tree_unixfs(chunker, tree_width).peekable(); + tokio::pin!(nodes); + + let mut writer = CarV2Writer::new(&mut output); + let mut position = 0; + + position += writer.write_header(&CarV2Header::default()).await?; + let car_v1_start = position; + position += writer.write_v1_header(&CarV1Header::default()).await?; + + let mut root = None; + let mut entries = vec![]; + + while let Some(node) = nodes.next().await { + let (node_cid, node_bytes) = node?; + let entry = IndexEntry::new( + node_cid.hash().digest().to_vec(), + (position - car_v1_start) as u64, + ); + entries.push(entry); + position += writer.write_block(&node_cid, &node_bytes).await?; + + if root.is_none() { + root = Some(node_cid); + } + } + let Some(root) = root else { return Err(Error::EmptyRootsError); }; @@ -103,23 +182,19 @@ where )); writer.write_index(&index).await?; - // Go back to the beginning of the file writer.get_inner_mut().rewind().await?; let header = CarV2Header::new( false, - (car_v1_start) as u64, - (index_offset - car_v1_start) as u64, - (index_offset) as u64, + car_v1_start.try_into().unwrap(), + (index_offset - car_v1_start).try_into().unwrap(), + index_offset.try_into().unwrap(), ); writer.write_header(&header).await?; - // If the length of the roots doesn't match the previous one, you WILL OVERWRITE parts of the file let header_v1 = CarV1Header::new(vec![root]); writer.write_v1_header(&header_v1).await?; - // Flush even if the caller doesn't - we did our best writer.finish().await?; - Ok(root) } @@ -130,29 +205,36 @@ pub async fn create_filestore( config: Config, ) -> Result where - Src: AsyncRead + Unpin, - Out: AsyncWrite + AsyncSeek + Unpin, + Src: AsyncRead + Unpin + Send + 'static, + Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static, { match config { Config::Balanced { chunk_size, tree_width, - } => balanced_import(source, output, chunk_size, tree_width).await, + raw_mode, + } => { + if raw_mode { + balanced_import(source, output, chunk_size, tree_width).await + } else { + balanced_import_unixfs(source, output, chunk_size, tree_width).await + } + } } } #[cfg(test)] mod test { - use std::path::Path; - + use super::*; + use crate::unixfs::Data; + use crate::{test_utils::assert_buffer_eq, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; + use ipld_core::codec::Codec; + use ipld_dagpb::{DagPbCodec, PbNode}; + use quick_protobuf::MessageRead; + use std::{collections::HashSet, path::Path}; use tempfile::tempdir; use tokio::fs::File; - use crate::{ - stores::{filestore::create_filestore, Config}, - test_utils::assert_buffer_eq, - }; - async fn test_filestore_roundtrip(original: P1, expected: P2) where P1: AsRef, @@ -163,7 +245,8 @@ mod test { let source_file = File::open(original).await.unwrap(); let output_file = File::create(&temp_path).await.unwrap(); - create_filestore(source_file, output_file, Config::default()) + let config = Config::balanced_raw(DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH); + create_filestore(source_file, output_file, config) .await .unwrap(); @@ -174,7 +257,7 @@ mod test { } #[tokio::test] - async fn test_filestore_lorem() { + async fn test_lorem_roundtrip() { test_filestore_roundtrip( "tests/fixtures/original/lorem.txt", "tests/fixtures/car_v2/lorem.car", @@ -183,11 +266,117 @@ mod test { } #[tokio::test] - async fn test_filestore_spaceglenda() { + async fn test_spaceglenda_roundtrip() { test_filestore_roundtrip( "tests/fixtures/original/spaceglenda.jpg", "tests/fixtures/car_v2/spaceglenda.car", ) .await } + + #[tokio::test] + async fn test_filestore_unixfs_dag_structure() { + use rand::{thread_rng, Rng}; + + let temp_dir = tempdir().unwrap(); + let input_path = temp_dir.path().join("input.bin"); + let temp_path = temp_dir.path().join("temp.car"); + + // Create test file with random data to ensure unique chunks + let mut rng = thread_rng(); + let test_data = (0..512 * 1024).map(|_| rng.gen::()).collect::>(); + + trace!("Creating test file of size: {} bytes", test_data.len()); + tokio::fs::write(&input_path, &test_data).await.unwrap(); + + let source_file = File::open(&input_path).await.unwrap(); + let output_file = File::create(&temp_path).await.unwrap(); + + let config = Config::balanced_unixfs(64 * 1024, 2); + + let root_cid = create_filestore(source_file, output_file, config) + .await + .unwrap(); + trace!("Root CID: {}", root_cid); + + // Read back and verify structure + let file = File::open(&temp_path).await.unwrap(); + let mut reader = crate::CarV2Reader::new(file); + + reader.read_pragma().await.unwrap(); + reader.read_header().await.unwrap(); + reader.read_v1_header().await.unwrap(); + + // Track all unique blocks and statistics + let mut unique_blocks = HashSet::new(); + let mut leaf_blocks = HashSet::new(); + let mut parent_blocks = HashSet::new(); + let mut level_sizes = Vec::new(); + let mut current_level_nodes = HashSet::new(); + let mut current_level = 0; + + while let Ok((cid, data)) = reader.read_block().await { + unique_blocks.insert(cid); + + let pb_node: PbNode = DagPbCodec::decode(&data[..]).unwrap(); + let reader = + &mut quick_protobuf::BytesReader::from_bytes(&pb_node.data.clone().unwrap()); + let bytes = &pb_node.data.unwrap(); + let unixfs_data = Data::from_reader(reader, bytes).unwrap(); + + if pb_node.links.is_empty() { + leaf_blocks.insert(cid); + trace!("Found leaf node: {} (size: {})", cid, data.len()); + trace!( + " Data size: {}", + unixfs_data.Data.as_ref().map_or(0, |d| d.len()) + ); + trace!(" Blocksizes: {:?}", unixfs_data.blocksizes); + + // New level if this is first leaf + if current_level_nodes.is_empty() { + level_sizes.push(0); + current_level = level_sizes.len() - 1; + } + } else { + parent_blocks.insert(cid); + + trace!( + "Found parent node: {} with {} links (size: {})", + cid, + pb_node.links.len(), + data.len() + ); + trace!(" Total filesize: {:?}", unixfs_data.filesize); + trace!(" Blocksizes: {:?}", unixfs_data.blocksizes); + + for link in &pb_node.links { + trace!(" -> Link to: {} (size: {:?})", link.cid, link.size); + } + + // Track level changes + if !current_level_nodes.is_empty() + && current_level_nodes + .iter() + .any(|n| pb_node.links.iter().any(|l| l.cid == *n)) + { + level_sizes.push(0); + current_level = level_sizes.len() - 1; + current_level_nodes.clear(); + } + } + + level_sizes[current_level] += 1; + current_level_nodes.insert(cid); + } + + // Verify structure + assert!(!leaf_blocks.is_empty(), "No leaf nodes found"); + assert!(!parent_blocks.is_empty(), "No parent nodes found"); + assert_eq!( + unique_blocks.len(), + leaf_blocks.len() + parent_blocks.len(), + "Block count mismatch" + ); + } } diff --git a/mater/lib/src/stores/mod.rs b/mater/lib/src/stores/mod.rs index 66920d97d..a86cdd452 100644 --- a/mater/lib/src/stores/mod.rs +++ b/mater/lib/src/stores/mod.rs @@ -6,42 +6,122 @@ pub use blockstore::Blockstore; pub use file::FileBlockstore; pub use filestore::create_filestore; -/// The default block size, as defined in -/// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13). -pub(crate) const DEFAULT_BLOCK_SIZE: usize = 1024 * 256; +/// The default chunk size for balanced trees (256 KiB) +/// Reference: https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13 +pub const DEFAULT_CHUNK_SIZE: usize = 256 * 1024; -/// The default tree width, also called links per block, as defined in -/// [boxo](https://github.com/ipfs/boxo/blob/625ba769263c2beeec934836f54bbd6624db945a/ipld/unixfs/importer/helpers/helpers.go#L16-L30). -pub(crate) const DEFAULT_TREE_WIDTH: usize = 174; +/// The default number of children per parent node in balanced trees. +/// This value comes from the go-ipfs implementation and provides a good balance +/// between tree depth and width for most use cases. +pub const DEFAULT_TREE_WIDTH: usize = 174; -/// Store configuration options. +/// Store configuration options for controlling how data is stored and structured. +#[derive(Debug, Clone)] pub enum Config { - /// The store should use the balanced tree layout, - /// generating byte chunks of `chunk_size` and - /// generating parent nodes every `tree_width` nodes. + /// Creates a balanced tree structure by generating fixed-size chunks + /// and arranging them into a tree with a specified width. + /// By default, content is wrapped in UnixFS format for IPFS compatibility. Balanced { - /// The size of the byte chunks. + /// Size of each chunk in bytes. Defaults to 256 KiB. + /// Larger chunks reduce tree depth but increase minimum storage unit size. chunk_size: usize, - /// The number of children per parent node. + + /// Maximum number of children per parent node. Defaults to 174. + /// This affects tree shape and traversal performance. tree_width: usize, + + /// If true, store content directly without UnixFS metadata. + /// More space efficient but loses IPFS compatibility features. + /// Default is false (uses UnixFS wrapping). + raw_mode: bool, }, } impl Config { - /// Create a new [`Config::Balanced`]. - pub fn balanced(chunk_size: usize, tree_width: usize) -> Self { + /// Creates a new balanced tree configuration with the specified parameters. + /// + /// # Arguments + /// * `chunk_size` - Size of each data chunk in bytes + /// * `tree_width` - Maximum number of children per parent node + /// * `raw_mode` - Whether to store content directly without UnixFS wrapping + pub fn balanced(chunk_size: usize, tree_width: usize, raw_mode: bool) -> Self { Self::Balanced { chunk_size, tree_width, + raw_mode, } } + + /// Creates a new balanced tree configuration with UnixFS wrapping (recommended). + pub fn balanced_unixfs(chunk_size: usize, tree_width: usize) -> Self { + Self::balanced(chunk_size, tree_width, false) + } + + /// Creates a new balanced tree configuration with raw storage. + pub fn balanced_raw(chunk_size: usize, tree_width: usize) -> Self { + Self::balanced(chunk_size, tree_width, true) + } } impl Default for Config { fn default() -> Self { Self::Balanced { - chunk_size: DEFAULT_BLOCK_SIZE, + chunk_size: DEFAULT_CHUNK_SIZE, tree_width: DEFAULT_TREE_WIDTH, + raw_mode: false, // Default to UnixFS wrapping for IPFS compatibility + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = Config::default(); + match config { + Config::Balanced { + chunk_size, + tree_width, + raw_mode, + } => { + assert_eq!(chunk_size, DEFAULT_CHUNK_SIZE); + assert_eq!(tree_width, DEFAULT_TREE_WIDTH); + assert!(!raw_mode); + } + } + } + + #[test] + fn test_config_builders() { + let chunk_size = 1024; + let tree_width = 10; + + let unixfs = Config::balanced_unixfs(chunk_size, tree_width); + match unixfs { + Config::Balanced { + chunk_size: cs, + tree_width: tw, + raw_mode, + } => { + assert_eq!(cs, chunk_size); + assert_eq!(tw, tree_width); + assert!(!raw_mode); + } + } + + let raw = Config::balanced_raw(chunk_size, tree_width); + match raw { + Config::Balanced { + chunk_size: cs, + tree_width: tw, + raw_mode, + } => { + assert_eq!(cs, chunk_size); + assert_eq!(tw, tree_width); + assert!(raw_mode); + } } } } diff --git a/mater/lib/src/unixfs/mod.rs b/mater/lib/src/unixfs/mod.rs index 1a84cfa66..737847266 100644 --- a/mater/lib/src/unixfs/mod.rs +++ b/mater/lib/src/unixfs/mod.rs @@ -1,26 +1,26 @@ -//! The original implementation of this module is located at +//! UnixFS implementation based on //! . mod unixfs_pb; +pub use unixfs_pb::{mod_Data, Data}; use std::collections::VecDeque; +use crate::{ + multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE}, + Error, +}; use async_stream::try_stream; use bytes::Bytes; use futures::TryStreamExt; +use futures::{Stream, StreamExt}; use ipld_core::{cid::Cid, codec::Codec}; use ipld_dagpb::{DagPbCodec, PbLink, PbNode}; use quick_protobuf::MessageWrite; use sha2::Sha256; -use tokio_stream::{Stream, StreamExt}; - -use crate::{ - multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE}, - Error, -}; #[derive(Debug, Clone, Copy)] -pub(crate) struct LinkInfo { +struct LinkInfo { raw_data_length: u64, encoded_data_length: u64, } @@ -41,7 +41,105 @@ enum TreeNode { } impl TreeNode { - fn encode(self) -> Result<((Cid, Bytes), LinkInfo), Error> { + fn encode_unixfs_leaf_node(chunk: &Bytes) -> Result<((Cid, Bytes), LinkInfo), Error> { + let chunk_len = chunk.len() as u64; + + // Build UnixFS metadata + let unixfs_data = Data { + Type: mod_Data::DataType::File, + filesize: Some(chunk_len), + blocksizes: vec![chunk_len], + Data: Some(chunk.to_vec().into()), + hashType: None, + fanout: None, + }; + + // Encode UnixFS data and create DAG-PB node + let mut data_buf = Vec::new(); + { + let mut w = quick_protobuf::Writer::new(&mut data_buf); + unixfs_data.write_message(&mut w)?; + } + + let pb_node = PbNode { + links: vec![], + data: Some(data_buf.clone().into()), + }; + + let encoded = DagPbCodec::encode_to_vec(&pb_node)?; + let mh = generate_multihash::(&encoded); + let cid = Cid::new_v1(DAG_PB_CODE, mh); + + let info = LinkInfo { + raw_data_length: chunk_len, + encoded_data_length: encoded.len() as u64, + }; + + Ok(((cid, encoded.into()), info)) + } + + fn encode_unixfs_stem_node( + children: Vec<(Cid, LinkInfo)>, + ) -> Result<((Cid, Bytes), LinkInfo), Error> { + // Process all children in a single pass, gathering totals and building links and blocksizes + let (total_raw_size, total_encoded_size, pb_links, blocksizes) = children.iter().fold( + ( + 0u64, + 0u64, + Vec::with_capacity(children.len()), + Vec::with_capacity(children.len()), + ), + |(raw_sum, encoded_sum, mut links, mut sizes), (child_cid, link_info)| { + sizes.push(link_info.raw_data_length); + links.push(PbLink { + cid: *child_cid, + name: Some("".to_string()), + size: Some(link_info.encoded_data_length), + }); + ( + raw_sum + link_info.raw_data_length, + encoded_sum + link_info.encoded_data_length, + links, + sizes, + ) + }, + ); + + // Create UnixFS metadata + let unixfs_data = Data { + Type: mod_Data::DataType::File, + filesize: Some(total_raw_size), + blocksizes, + Data: None, + hashType: None, + fanout: None, + }; + + // Encode UnixFS data + let mut data_buf = Vec::new(); + { + let mut w = quick_protobuf::Writer::new(&mut data_buf); + unixfs_data.write_message(&mut w)?; + } + + // Create DAG-PB node + let pb_node = PbNode { + links: pb_links, + data: Some(data_buf.clone().into()), + }; + + let encoded = DagPbCodec::encode_to_vec(&pb_node)?; + let mh = generate_multihash::(&encoded); + let cid = Cid::new_v1(DAG_PB_CODE, mh); + + let info = LinkInfo { + raw_data_length: data_buf.len() as u64, + encoded_data_length: encoded.len() as u64 + total_encoded_size, + }; + + Ok(((cid, encoded.into()), info)) + } + fn encode_raw(self) -> Result<((Cid, Bytes), LinkInfo), Error> { match self { TreeNode::Leaf(bytes) => { let data_length = bytes.len() as u64; @@ -104,150 +202,35 @@ impl TreeNode { } } -/// This function takes a stream of chunks of bytes and returns a stream of [`Block`]s. -/// -/// It works by accumulating `width` blocks and lazily creating stems. -/// The tree grows upwards and does not keep previously completed `width` blocks. -/// -/// As a demonstration, consider a `width` of 2 and an `input` stream that will yield 7 blocks. -/// ```text -/// Input stream <- Block 1, Block 2, Block 3, Block 4, Block 5, Block 6, Block 7 -/// ``` +/// Takes a stream of byte chunks and produces a stream of blocks organized in a balanced tree structure. +/// The bytes are processed into either raw blocks or UnixFS nodes depending on configuration. /// -/// Each time a block is taken out of the stream, it is stored in the lower level of the tree, -/// but it is also yielded as output: -/// ```text -/// Input stream <- Block 2, Block 3, Block 4, Block 5, Block 6, Block 7 -/// Tree: [ -/// [Block 1] -/// ] -/// Output stream -> Block 1 -/// ``` +/// The algorithm builds the tree by: +/// 1. Accumulating input chunks at the leaf level +/// 2. Creating parent "stem" nodes when enough children accumulate +/// 3. Growing upward as lower levels fill up +/// 4. Yielding nodes as they're created /// -/// Once the first `width` blocks (in this case, 2) are taken from the stream: -/// * A new stem is added, linking back to the two blocks -/// ```text -/// Input stream <- | Block 3 | Block 4 | Block 5 | Block 6 | Block 7 | -/// Tree: [ -/// [Block 1, Block 2], -/// [Stem (B1, B2)] -/// ] -/// ``` -/// * The previous level to the stem is evicted -/// ```text -/// Input stream <- | Block 3 | Block 4 | Block 5 | Block 6 | Block 7 | -/// Tree: [ -/// [], -/// [Stem 1 (B1, B2)] -/// ] -/// ``` -/// * The new stem is yielded +/// Example with width=2 showing tree growth: /// ```text -/// Input stream <- Block 3, Block 4, Block 5, Block 6, Block 7 -/// Tree: [ -/// [], -/// [Stem 1 (B1, B2)] -/// ] -/// Output stream -> Stem (B1, B2) -/// ``` +/// Input: [B1][B2][B3][B4][B5] /// -/// This process happens recursively, so when the stem level is full, like so: -/// ```text -/// Input stream <- Block 5, Block 6, Block 7 -/// Tree: [ -/// [], -/// [Stem 1 (B1, B2), Stem 2 (B3, B4)] -/// ] -/// ``` +/// Step 1: B1,B2 -> S1(B1,B2) +/// Step 2: B3,B4 -> S2(B3,B4) +/// Step 3: S1,S2 -> R1(S1,S2) +/// Step 4: B5 /// -/// A new stem is built upwards: -/// ```text -/// Input stream <- Block 5, Block 6, Block 7 -/// Tree: [ -/// [], -/// [], -/// [Stem 3 (S1, S2)] -/// ] -/// Output stream -> Stem 3 (S1, S2) +/// Final tree: +/// R1 +/// / \ +/// S1 S2 B5 +/// /\ /\ | +/// B1 B2 B3 B4 B5 /// ``` /// -/// Once the stream is exhausted, we need to clean up any remaining state: -/// ```text -/// Input stream <- -/// Tree: [ -/// [Block 7], -/// [Stem 4 (B5, B6)], -/// [Stem 3 (S1, S2)], -/// ] -/// ``` -/// -/// In this case, the yielded tree looks like: -/// ```text -/// S3 -/// / \ -/// S1 S2 S4 -/// / \ / \ / \ -/// B1 B2 B3 B4 B5 B6 B7 -/// ``` -/// -/// We work bottom-up, removing the levels one by one, creating new stems from them and returning the stems: -/// ```text -/// Tree: [ -/// [], # popped -/// [Stem 4 (B5, B6), Stem 5 (B7)], -/// [Stem 3 (S1, S2)] -/// ] -/// Output stream -> Stem 5 (B7) -/// ``` -/// -/// The previous tree now looks like: -/// ```text -/// S3 -/// / \ -/// S1 S2 S4 S5 -/// / \ / \ / \ | -/// B1 B2 B3 B4 B5 B6 B7 -/// ``` -/// -/// If we repeat the process again: -/// ```text -/// Tree: [ -/// [Stem 4 (B5, B6), Stem 5 (B7)], # popped -/// [Stem 3 (S1, S2), Stem 6 (S4, S5)] -/// ] -/// Output stream -> Stem 6 (S4, S5) -/// ``` +/// The stream yields blocks in creation order: B1,B2,S1,B3,B4,S2,R1,B5 /// -/// The tree becomes: -/// ```text -/// S3 S6 -/// / \ / \ -/// S1 S2 S4 S5 -/// / \ / \ / \ | -/// B1 B2 B3 B4 B5 B6 B7 -/// ``` -/// -/// And finally, we build the last stem, yielding it: -/// ```text -/// Tree: [ -/// [Stem 3 (S1, S2), Stem 6 (S4, S5)] # popped -/// ] -/// Output stream -> Stem 7 (S3, S6) -/// ``` -/// -/// Making the final tree: -/// ```text -/// S7 -/// / \ -/// S3 S6 -/// / \ / \ -/// S1 S2 S4 S5 -/// / \ / \ / \ | -/// B1 B2 B3 B4 B5 B6 B7 -/// ``` -/// -/// The original implementation is in -/// . +/// Based on: https://github.com/n0-computer/beetle/tree/main/iroh-unixfs pub(crate) fn stream_balanced_tree( input: I, width: usize, @@ -261,9 +244,9 @@ where let input = input .err_into::() - // The TreeNode::Leaf(data).encode() just wraps it with a Cid marking the payload as Raw + // The TreeNode::Leaf(data).encode_raw() just wraps it with a Cid marking the payload as Raw // we may be able move this responsibility to the caller for more efficient memory usage - .map(|data| data.and_then(|data| TreeNode::Leaf(data).encode())) + .map(|data| data.and_then(|data| TreeNode::Leaf(data).encode_raw())) .err_into::(); tokio::pin!(input); @@ -293,7 +276,7 @@ where // it's most likely less performant (I didn't measure) // due to the different nature of the approaches (batch vs iterator) let links = std::mem::replace(&mut tree[level], Vec::with_capacity(width)); - let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode()?; + let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode_raw()?; yield block; tree[level + 1].push((cid, link_info)); @@ -317,7 +300,7 @@ where // Once `input` is exhausted, we need to perform cleanup of any leftovers, // to do so, we start by popping levels from the front and building stems over them. while let Some(links) = tree.pop_front() { - let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode()?; + let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode_raw()?; yield block; // If there's still a level in the front, it means the stem we just built will have a parent @@ -331,6 +314,59 @@ where } } +pub fn stream_balanced_tree_unixfs( + input: I, + width: usize, +) -> impl Stream> +where + I: Stream>, +{ + try_stream! { + let mut tree: VecDeque> = VecDeque::new(); + tree.push_back(vec![]); + + tokio::pin!(input); + + while let Some(data) = input.next().await { + let data = data?; + let (block @ (cid, _), link_info) = TreeNode::encode_unixfs_leaf_node(&data)?; + yield block; + + tree[0].push((cid, link_info)); + + // Build parent nodes when necessary + for level in 0..tree.len() { + if tree[level].len() < width { + break; + } + + let links = std::mem::replace(&mut tree[level], Vec::new()); + let (block @ (cid, _), link_info) = TreeNode::encode_unixfs_stem_node(links)?; + yield block; + + if level + 1 == tree.len() { + tree.push_back(vec![]); + } + tree[level + 1].push((cid, link_info)); + } + } + + // Finalize tree: Flush remaining levels + while let Some(links) = tree.pop_front() { + if links.is_empty() { + continue; + } + + let (block @ (cid, _), link_info) = TreeNode::encode_unixfs_stem_node(links)?; + yield block; + + if let Some(next_level) = tree.front_mut() { + next_level.push((cid, link_info)); + } + } + } +} + #[cfg(test)] mod tests { //! Tests were taken from [beetle][beetle] too, I did modify them to suit our needs. @@ -360,7 +396,7 @@ mod tests { if num_chunks / degree == 0 { let chunk = chunks.next().await.unwrap().unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block, _) = leaf.encode().unwrap(); + let (block, _) = leaf.encode_raw().unwrap(); tree[0].push(block); return tree; } @@ -368,7 +404,7 @@ mod tests { while let Some(chunk) = chunks.next().await { let chunk = chunk.unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block @ (cid, _), link_info) = leaf.encode().unwrap(); + let (block @ (cid, _), link_info) = leaf.encode_raw().unwrap(); links[0].push((cid, link_info)); tree[0].push(block); } @@ -380,7 +416,7 @@ mod tests { let mut links_layer = Vec::with_capacity(count); for links in prev_layer.chunks(degree) { let stem = TreeNode::Stem(links.to_vec()); - let (block @ (cid, _), link_info) = stem.encode().unwrap(); + let (block @ (cid, _), link_info) = stem.encode_raw().unwrap(); links_layer.push((cid, link_info)); tree_layer.push(block); } @@ -420,7 +456,7 @@ mod tests { if *count % degree != 0 { push = false; } - } + } } for (num_layer, count) in counts.into_iter().enumerate() { @@ -444,12 +480,12 @@ mod tests { fn make_leaf(data: usize) -> ((Cid, Bytes), LinkInfo) { TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze()) - .encode() + .encode_raw() .unwrap() } fn make_stem(links: Vec<(Cid, LinkInfo)>) -> ((Cid, Bytes), LinkInfo) { - TreeNode::Stem(links).encode().unwrap() + TreeNode::Stem(links).encode_raw().unwrap() } #[tokio::test] @@ -596,4 +632,4 @@ mod tests { tokio::pin!(got); ensure_equal(expect, got).await; } -} + } diff --git a/mater/lib/src/v2/reader.rs b/mater/lib/src/v2/reader.rs index 4507c15c9..df9c1a9f4 100644 --- a/mater/lib/src/v2/reader.rs +++ b/mater/lib/src/v2/reader.rs @@ -1,13 +1,17 @@ use ipld_core::cid::Cid; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufReader}; +use std::collections::HashSet; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt}; use super::index::read_index; +use crate::multicodec::DAG_PB_CODE; use crate::{ v1::BlockMetadata, v2::{index::Index, Characteristics, Header, PRAGMA}, Error, }; - +use ipld_core::codec::Codec; +use ipld_dagpb::DagPbCodec; +use ipld_dagpb::PbNode; /// Low-level CARv2 reader. pub struct Reader { reader: R, @@ -22,7 +26,7 @@ impl Reader { impl Reader where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + AsyncSeek, { /// Takes in a CID and checks that the contents in the reader matches this CID pub async fn verify_cid(&mut self, contents_cid: Cid) -> Result<(), Error> { @@ -63,21 +67,48 @@ where { self.read_pragma().await?; let header = self.read_header().await?; - let _v1_header = self.read_v1_header().await?; + let v1_header = self.read_v1_header().await?; let mut written = 0; - while let Ok((_cid, contents)) = self.read_block().await { - // CAR file contents is empty - if contents.len() == 0 { - break; - } + // Keep track of root CID and position + let root_cid = v1_header.roots.first().ok_or(Error::EmptyRootsError)?; + let data_end = header.data_offset + header.data_size; + + // Track what we've processed and need to process + let mut processed: HashSet = HashSet::new(); + let mut to_process = vec![*root_cid]; + + while !to_process.is_empty() { let position = self.get_inner_mut().stream_position().await?; - let data_end = header.data_offset + header.data_size; - // Add the `written != 0` clause for files that are less than a single block. if position >= data_end && written != 0 { break; } - written += output_file.write(&contents).await?; + + if let Ok((cid, contents)) = self.read_block().await { + if contents.len() == 0 { + break; + } + + // Write the block data + written += output_file.write(&contents).await?; + + // If it's a DAG-PB node, queue up its children + if cid.codec() == DAG_PB_CODE && !processed.contains(&cid) { + let reader = std::io::BufReader::new(&contents[..]); + if let Ok(node) = DagPbCodec::decode(reader) { + let pb_node: PbNode = node; + to_process.extend( + pb_node + .links + .iter() + .map(|link| link.cid) + .filter(|cid| !processed.contains(cid)), + ); + } + } + + processed.insert(cid); + } } Ok(()) @@ -164,9 +195,11 @@ where } /// Function verifies that a given CID matches the CID for the CAR file in the given reader -pub async fn verify_cid(reader: R, contents_cid: Cid) -> Result<(), Error> { - let mut reader = Reader::new(BufReader::new(reader)); - +pub async fn verify_cid(reader: R, contents_cid: Cid) -> Result<(), Error> +where + R: AsyncRead + AsyncSeek + Unpin, +{ + let mut reader = Reader::new(reader); reader.verify_cid(contents_cid).await } diff --git a/storage-provider/server/src/storage.rs b/storage-provider/server/src/storage.rs index 40663edfb..0393585c1 100644 --- a/storage-provider/server/src/storage.rs +++ b/storage-provider/server/src/storage.rs @@ -9,8 +9,10 @@ use axum::{ Router, }; use futures::{TryFutureExt, TryStreamExt}; +use mater::create_filestore; use mater::Cid; -use polka_storage_provider_common::commp::{commp, CommPError}; +use polka_storage_provider_common::commp::commp; +use polka_storage_provider_common::commp::CommPError; use primitives::{commitment::piece::PaddedPieceSize, proofs::RegisteredPoStProof}; use tokio::{ fs::{self, File}, @@ -22,6 +24,7 @@ use tokio_util::{ }; use tower_http::trace::TraceLayer; use uuid::Uuid; +use mater::Config; #[cfg(feature = "delia")] mod delia_imports { @@ -115,8 +118,13 @@ fn configure_router(state: Arc) -> Router { #[cfg(not(feature = "delia"))] fn config_non_delia(state: Arc) -> Router { + // Type annotation required to satisfy Send bounds needed for UnixFS processing + // across async operations and thread boundaries Router::new() - .route("/upload/:cid", put(upload)) + .route( + "/upload/:cid", + put(upload as fn(State>, Path, Request) -> _), + ) .route("/download/:cid", get(download)) .with_state(state) .layer( @@ -160,9 +168,9 @@ fn configure_router(state: Arc) -> Router { /// ``` #[tracing::instrument(skip_all, fields(cid))] async fn upload( - ref s @ State(ref state): State>, + State(state): State>, Path(cid): Path, - request: Request, + request: Request, ) -> Result { let deal_cid = cid::Cid::from_str(&cid).map_err(|err| { tracing::error!(cid, "failed to parse cid"); @@ -193,20 +201,25 @@ async fn upload( // Branching needed here since the resulting `StreamReader`s don't have the same type let file_cid = if request.headers().contains_key("Content-Type") { - // Handle multipart forms - let mut multipart = Multipart::from_request(request, &s) + // Handle the multipart data + let mut multipart = Multipart::from_request(request, &state) .await .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; - let Some(field) = multipart + + // Get the field data + let field_bytes = multipart .next_field() - .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string())) - .await? - else { - return Err((StatusCode::BAD_REQUEST, "empty request".to_string())); - }; + .await + .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))? + .ok_or_else(|| (StatusCode::BAD_REQUEST, "empty request".to_string()))? + .bytes() + .await + .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; + + // Create reader from the field data + let reader = std::io::Cursor::new(field_bytes); - let field_reader = StreamReader::new(field.map_err(std::io::Error::other)); - stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), field_reader) + stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), reader) .await .map_err(|err| { tracing::error!(%err, "failed to store file into CAR archive"); @@ -366,13 +379,21 @@ fn content_path(folder: &std::path::Path, cid: Cid) -> (String, PathBuf) { (name, path) } -/// Reads bytes from the source and writes them to a CAR file. +/// Converts a source stream into a CARv2 file and writes it to an output stream. +/// +/// Send + 'static bounds are required because the UnixFS processing involves: +/// - Async stream processing that may cross thread boundaries +/// - State management for DAG construction and deduplication +/// - Block tracking that must be thread-safe +/// +/// The expanded trait bounds ensure that all data can be safely moved between +/// threads during async operations. async fn stream_contents_to_car( folder: &std::path::Path, source: R, ) -> Result> where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + Send + 'static, { // Temp file which will be used to store the CAR file content. The temp // director has a randomized name and is created in the same folder as the @@ -384,7 +405,10 @@ where // Stream the body from source to the temp file. let file = File::create(&temp_file_path).await?; let writer = BufWriter::new(file); - let cid = mater::create_filestore(source, writer, mater::Config::default()).await?; + + let config = Config::default(); + + let cid = create_filestore(source, writer, config).await?; tracing::trace!("finished writing the CAR archive"); // If the file is successfully written, we can now move it to the final