diff --git a/Cargo.lock b/Cargo.lock index b87ba986c..7e615708e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1379,6 +1379,34 @@ dependencies = [ "serde", ] +[[package]] +name = "beetswap" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55f2cf2244bd65e9f00adb06c6b5e951bab980fdacdd572ff035bf610628be99" +dependencies = [ + "asynchronous-codec 0.7.0", + "blockstore", + "bytes", + "cid 0.11.1", + "fnv", + "futures-core", + "futures-timer", + "futures-util", + "libp2p-core 0.42.0", + "libp2p-identity", + "libp2p-swarm 0.45.1", + "multihash-codetable", + "quick-protobuf", + "smallvec", + "thiserror 1.0.69", + "time", + "tracing", + "unsigned-varint 0.8.0", + "void", + "web-time", +] + [[package]] name = "bellpepper" version = "0.2.1" @@ -9435,8 +9463,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67996849749d25f1da9f238e8ace2ece8f9d6bdf3f9750aaf2ae7de3a5cad8ea" dependencies = [ "blake2b_simd", + "blake2s_simd 1.0.2", + "blake3", "core2", + "digest 0.10.7", "multihash-derive 0.9.1", + "ripemd", + "sha1", + "sha2 0.10.8", + "sha3", + "strobe-rs", ] [[package]] @@ -13704,6 +13740,21 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polka-fetch" +version = "0.1.0" +dependencies = [ + "anyhow", + "cid 0.11.1", + "clap", + "libp2p 0.54.1", + "multihash-codetable", + "polka-storage-retrieval", + "tokio", + "tracing", + "tracing-subscriber 0.3.19", +] + [[package]] name = "polka-index" version = "0.1.0" @@ -13898,6 +13949,30 @@ dependencies = [ "uuid", ] +[[package]] +name = "polka-storage-retrieval" +version = "0.1.0" +dependencies = [ + "anyhow", + "beetswap", + "blockstore", + "cid 0.11.1", + "futures", + "ipld-core", + "ipld-dagpb", + "libp2p 0.54.1", + "libp2p-core 0.42.0", + "libp2p-swarm 0.45.1", + "mater", + "multihash-codetable", + "polka-index", + "thiserror 2.0.8", + "tokio", + "tracing", + "tracing-appender", + "tracing-subscriber 0.3.19", +] + [[package]] name = "polka-storage-runtime" version = "0.0.0" @@ -16399,7 +16474,7 @@ dependencies = [ "once_cell", "socket2 0.5.8", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -22471,6 +22546,19 @@ dependencies = [ "serde", ] +[[package]] +name = "strobe-rs" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98fe17535ea31344936cc58d29fec9b500b0452ddc4cc24c429c8a921a0e84e5" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "keccak", + "subtle 2.6.1", + "zeroize", +] + [[package]] name = "strsim" version = "0.10.0" diff --git a/Cargo.toml b/Cargo.toml index 025d80b8c..96c4a40de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,8 @@ members = [ "storage-provider/client", "storage-provider/common", "storage-provider/server", + "storage-retrieval/cli", + "storage-retrieval/lib", "storage/polka-index", "storagext/cli", "storagext/lib", @@ -51,6 +53,7 @@ async-stream = "0.3.6" async-trait = "0.1.80" axum = "0.7.5" base64 = "0.22.1" +beetswap = "0.4.0" bitflags = "2.5.0" blake2b_simd = { version = "1.0.2", default-features = false } blockstore = "0.7.1" @@ -81,6 +84,8 @@ ipld-dagpb = "0.2.1" itertools = "0.13.0" jsonrpsee = { version = "0.24.7" } libp2p = { version = "0.54", default-features = false } +libp2p-core = "0.42.0" +libp2p-swarm = "0.45.1" log = { version = "0.4.21", default-features = false } multihash-codetable = { version = "0.1.1", default-features = false } num-bigint = { version = "0.4.5", default-features = false } @@ -140,8 +145,10 @@ pallet-market = { path = "pallets/market", default-features = false } pallet-proofs = { path = "pallets/proofs", default-features = false } pallet-randomness = { path = "pallets/randomness", default-features = false } pallet-storage-provider = { path = "pallets/storage-provider", default-features = false } +polka-index = { path = "storage/polka-index" } polka-storage-proofs = { path = "lib/polka-storage-proofs", default-features = false } polka-storage-provider-common = { path = "storage-provider/common" } +polka-storage-retrieval = { path = "storage-retrieval/lib" } polka-storage-runtime = { path = "runtime" } primitives = { path = "primitives", default-features = false } storagext = { path = "storagext/lib" } diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index 1bce95ecb..cb60d2383 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -19,6 +19,7 @@ mod v2; // We need to re-expose this because `read_block` returns `(Cid, Vec)`. pub use ipld_core::cid::Cid; +pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE}; pub use stores::{create_filestore, Blockstore, Config, FileBlockstore}; pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; pub use v2::{ diff --git a/mater/lib/src/multicodec.rs b/mater/lib/src/multicodec.rs index 8d06402c4..9a7393833 100644 --- a/mater/lib/src/multicodec.rs +++ b/mater/lib/src/multicodec.rs @@ -6,7 +6,11 @@ use ipld_core::cid::{multihash::Multihash, CidGeneric}; pub const SHA_256_CODE: u64 = 0x12; pub const SHA_512_CODE: u64 = 0x13; + +/// The RAW multicodec code pub const RAW_CODE: u64 = 0x55; + +/// THE DAG_PB multicodec code pub const DAG_PB_CODE: u64 = 0x70; /// The IDENTITY multicodec code diff --git a/mater/lib/tests/fixtures/car_v1/spaceglenda_wrapped.car b/mater/lib/tests/fixtures/car_v1/spaceglenda_wrapped.car new file mode 100644 index 000000000..f1005c79e Binary files /dev/null and b/mater/lib/tests/fixtures/car_v1/spaceglenda_wrapped.car differ diff --git a/mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car b/mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car new file mode 100644 index 000000000..e33af4b0f Binary files /dev/null and b/mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car differ diff --git a/storage-retrieval/cli/Cargo.toml b/storage-retrieval/cli/Cargo.toml new file mode 100644 index 000000000..a2c5609d4 --- /dev/null +++ b/storage-retrieval/cli/Cargo.toml @@ -0,0 +1,23 @@ +[package] +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license-file.workspace = true +name = "polka-fetch" +repository.workspace = true +version = "0.1.0" + + +[dependencies] +anyhow.workspace = true +cid = { workspace = true } +clap = { workspace = true, features = ["derive"] } +libp2p = { workspace = true } +multihash-codetable = { workspace = true, features = ["sha2"] } +polka-storage-retrieval = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } + +[lints] +workspace = true diff --git a/storage-retrieval/cli/src/main.rs b/storage-retrieval/cli/src/main.rs new file mode 100644 index 000000000..185fbb7b1 --- /dev/null +++ b/storage-retrieval/cli/src/main.rs @@ -0,0 +1,81 @@ +use std::{path::PathBuf, time::Duration}; + +use cid::Cid; +use clap::{command, Parser}; +use libp2p::Multiaddr; +use polka_storage_retrieval::client::Client; +use tokio::time::timeout; +use tracing::{error, info, level_filters::LevelFilter}; +use tracing_subscriber::{ + filter::FromEnvError, fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, +}; + +#[derive(Parser, Debug)] +#[command()] +struct Cli { + /// Provider used for data download + #[arg(long)] + provider: Multiaddr, + /// The CAR file to write to. + #[arg(long)] + output: PathBuf, + /// Cancel the download if not completed after the specified duration in + /// seconds. If not set the download will never timeout. + #[arg(long, value_parser = parse_duration)] + timeout: Option, + /// payload CID + #[arg(long)] + payload_cid: Cid, +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + setup_tracing()?; + + let arguments = Cli::parse(); + + let client = Client::new( + arguments.output, + vec![arguments.provider], + vec![arguments.payload_cid], + ) + .await?; + + let download_result = match arguments.timeout { + Some(duration) => timeout(duration, client.download()).await, + None => Ok(client.download().await), + }; + + match download_result { + Ok(Ok(_)) => info!("download successfully finished"), + Ok(Err(err)) => error!(?err, "error occurred while downloading"), + Err(_) => error!("download timeout"), + } + + Ok(()) +} + +fn parse_duration(arg: &str) -> Result { + let seconds = arg + .parse() + .map_err(|err| format!("failed to parse duration from string: {}", err))?; + Ok(Duration::from_secs(seconds)) +} + +/// Configure and initialize tracing. +fn setup_tracing() -> Result<(), FromEnvError> { + tracing_subscriber::registry() + .with( + fmt::layer().with_filter( + EnvFilter::builder() + .with_default_directive(if cfg!(debug_assertions) { + LevelFilter::DEBUG.into() + } else { + LevelFilter::WARN.into() + }) + .from_env()?, + ), + ) + .init(); + Ok(()) +} diff --git a/storage-retrieval/lib/Cargo.toml b/storage-retrieval/lib/Cargo.toml new file mode 100644 index 000000000..8352ea728 --- /dev/null +++ b/storage-retrieval/lib/Cargo.toml @@ -0,0 +1,33 @@ +[package] +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license-file.workspace = true +name = "polka-storage-retrieval" +repository.workspace = true +version = "0.1.0" + +[lints] +workspace = true + +[dependencies] +anyhow = { workspace = true } +beetswap = { workspace = true } +blockstore = { workspace = true } +cid = { workspace = true } +futures = { workspace = true } +ipld-core = { workspace = true, features = ["serde"] } +ipld-dagpb.workspace = true +libp2p = { workspace = true, features = ["macros", "noise", "tcp", "tokio", "yamux"] } +libp2p-core = { workspace = true } +libp2p-swarm = { workspace = true } +mater = { workspace = true, features = ["blockstore"] } +polka-index = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "time"] } +tracing = { workspace = true } + +[dev-dependencies] +multihash-codetable = { workspace = true, features = ["sha2"] } +tracing-appender = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/storage-retrieval/lib/README.md b/storage-retrieval/lib/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/storage-retrieval/lib/examples/simple_server.rs b/storage-retrieval/lib/examples/simple_server.rs new file mode 100644 index 000000000..0ad42df61 --- /dev/null +++ b/storage-retrieval/lib/examples/simple_server.rs @@ -0,0 +1,49 @@ +//! The example showcases how to setup a retrieval server with the simple +//! blockstore. Because the server is simple it is used for manual testing of +//! the retrieval client. + +use std::sync::Arc; + +use anyhow::Result; +use libp2p::Multiaddr; +use mater::FileBlockstore; +use polka_storage_retrieval::server::Server; + +#[tokio::main] +async fn main() -> Result<()> { + // Init tracing + let _guard = init_tracing(); + + // Example blockstore providing only a single file. + let blockstore = Arc::new( + FileBlockstore::from_existing("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car") + .await?, + ); + + // Setup & run the server + let server = Server::new(blockstore)?; + let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?; + server.run(vec![listener]).await?; + + Ok(()) +} + +fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard { + let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout()); + + let filter = tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + .from_env_lossy(); + + tracing_subscriber::fmt() + .event_format( + tracing_subscriber::fmt::format() + .with_file(true) + .with_line_number(true), + ) + .with_env_filter(filter) + .with_writer(non_blocking) + .init(); + + guard +} diff --git a/storage-retrieval/lib/src/client.rs b/storage-retrieval/lib/src/client.rs new file mode 100644 index 000000000..7a044ca2b --- /dev/null +++ b/storage-retrieval/lib/src/client.rs @@ -0,0 +1,229 @@ +use std::{collections::HashMap, path::Path, sync::Arc}; + +use beetswap::{Event, QueryId}; +use cid::Cid; +use futures::StreamExt; +use ipld_core::codec::Codec; +use ipld_dagpb::{DagPbCodec, PbNode}; +use libp2p::{Multiaddr, PeerId, Swarm}; +use libp2p_core::ConnectedPoint; +use libp2p_swarm::{ConnectionId, DialError, SwarmEvent}; +use mater::{FileBlockstore, DAG_PB_CODE, RAW_CODE}; +use thiserror::Error; +use tracing::{debug, error, info, instrument, trace}; + +use crate::{new_swarm, Behaviour, BehaviourEvent, InitSwarmError}; + +/// Errors that can occur while retrieving some content. +#[derive(Debug, Error)] +pub enum ClientError { + /// Error occurred while initialing swarm + #[error("Swarm initialization error: {0}")] + InitSwarm(#[from] InitSwarmError), + /// This error indicates that the download was timed out. + #[error("Download timeout")] + DownloadTimeout, + /// Error occurred when trying to establish or upgrade an outbound connection. + #[error("Dial error: {0}")] + Dial(#[from] DialError), + /// Error produced by the mater + #[error("Mater error: {0}")] + Mater(#[from] mater::Error), +} + +/// A client is used to download blocks from the storage provider. Single client +/// supports getting a single payload. +pub struct Client { + /// Providers of data + providers: Vec, + /// Swarm instance + swarm: Swarm>, + /// The in flight block queries. If empty we know that the client received + /// all requested data. + queries: HashMap, + /// Blockstore used by the client to store blocks into. + blockstore: FileBlockstore, + /// Content roots being downloaded. + roots: Vec, +} + +impl Client { + pub async fn new

( + path: P, + providers: Vec, + roots: Vec, + ) -> Result + where + P: AsRef, + { + // The p2p node which is created by the client doesn't need a real + // blockstore. The reason is that the blockstore is only used by the + // node when sharing blocks with other peers. + let swarm = new_swarm(Arc::new(PassthroughBlockstore))?; + + // Blockstore used to store blocks in. The reason why we separated the + // actual blockstore used by the client and the blockstore passed to the + // swarm is, because the bitswap behaviour is adding blocks to the + // blockstore in asynchronous manner. Because of that we couldn't know + // when was the download actually finished. + let blockstore = FileBlockstore::new(path, roots.clone()).await?; + + Ok(Self { + providers, + swarm, + queries: HashMap::new(), + blockstore, + roots, + }) + } + + /// Start download of some content with a payload cid. + pub async fn download(mut self) -> Result<(), ClientError> { + // Dial all providers + for provider in self.providers.clone() { + self.swarm.dial(provider)?; + } + + // Start the download by requesting the roots of the trees. + self.roots + .clone() + .into_iter() + .for_each(|root| self.request_block(root)); + + while let Some(event) = self.swarm.next().await { + // Handle event received from the providers + self.on_swarm_event(event).await?; + + // if no inflight queries, that means we received + // everything requested. Finalize the blockstore. + if self.queries.is_empty() { + self.blockstore.finalize(None).await?; + break; + } + } + + Ok(()) + } + + fn request_block(&mut self, cid: Cid) { + debug!("requesting block {cid}"); + let query_id = self.swarm.behaviour_mut().bitswap.get(&cid); + self.queries.insert(query_id, cid); + } + + async fn on_swarm_event( + &mut self, + event: SwarmEvent>, + ) -> Result<(), ClientError> { + trace!(?event, "Received swarm event"); + + match event { + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + endpoint, + .. + } => { + self.on_peer_connected(peer_id, connection_id, endpoint); + } + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + .. + } => { + self.on_peer_disconnected(peer_id, connection_id); + } + SwarmEvent::Behaviour(BehaviourEvent::Bitswap(event)) => { + self.on_bitswap_event(event).await?; + } + _ => { + // Nothing to do here + } + } + + Ok(()) + } + + #[instrument(skip_all, fields(peer_id = %peer_id))] + fn on_peer_connected( + &mut self, + peer_id: PeerId, + _connection_id: ConnectionId, + _endpoint: ConnectedPoint, + ) { + debug!("Peer connected"); + } + + #[instrument(skip_all, fields(peer_id = %peer_id, connection_id = %connection_id))] + fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) { + debug!("Peer disconnected"); + } + + #[instrument(level = "trace", skip(self))] + async fn on_bitswap_event(&mut self, event: Event) -> Result<(), ClientError> { + match event { + Event::GetQueryResponse { query_id, data } => { + let Some(cid) = self.queries.remove(&query_id) else { + return Ok(()); + }; + + // Store the received block to a blockstore + self.blockstore.put_keyed(&cid, &data).await?; + info!("received new block {cid:?}"); + + match cid.codec() { + DAG_PB_CODE => { + let node = >::decode_from_slice(&data).unwrap(); + + // Request a block for each new discoverd link + node.links + .iter() + .for_each(|link| self.request_block(link.cid)); + } + RAW_CODE => { + debug!("{cid} raw block. nothing to do"); + } + _other => { + error!("{cid} codec {_other} not supported"); + } + } + } + Event::GetQueryError { query_id, error } => { + if let Some(cid) = self.queries.remove(&query_id) { + info!("received error for {cid:?}: {error}"); + } + } + } + + Ok(()) + } +} + +/// The blockstore used by the client. It simulates a blockstore that never +/// holds any blocks. +struct PassthroughBlockstore; + +impl blockstore::Blockstore for PassthroughBlockstore { + async fn get( + &self, + _cid: &cid::CidGeneric, + ) -> blockstore::Result>> { + Ok(None) + } + + async fn put_keyed( + &self, + _cid: &cid::CidGeneric, + _data: &[u8], + ) -> blockstore::Result<()> { + Ok(()) + } + + async fn remove(&self, _cid: &cid::CidGeneric) -> blockstore::Result<()> { + Ok(()) + } + + async fn close(self) -> blockstore::Result<()> { + Ok(()) + } +} diff --git a/storage-retrieval/lib/src/lib.rs b/storage-retrieval/lib/src/lib.rs new file mode 100644 index 000000000..99f6cd946 --- /dev/null +++ b/storage-retrieval/lib/src/lib.rs @@ -0,0 +1,51 @@ +pub mod client; +pub mod server; + +use std::{sync::Arc, time::Duration}; + +use ::blockstore::Blockstore; +pub use client::Client; +use libp2p::{noise, swarm::NetworkBehaviour, tcp, yamux, Swarm, SwarmBuilder}; +pub use server::Server; +use thiserror::Error; + +const MAX_MULTIHASH_LENGTH: usize = 64; + +/// Custom Behaviour used by the server and client. +#[derive(NetworkBehaviour)] +struct Behaviour +where + B: Blockstore + 'static, +{ + bitswap: beetswap::Behaviour, +} + +/// Error that can occur while initializing a swarm +#[derive(Debug, Error)] +pub enum InitSwarmError { + /// Failed to initialize noise protocol. + #[error("Failed to initialize noise: {0}")] + Noise(#[from] noise::Error), +} + +/// Initialize a new swarm with our custom Behaviour. +fn new_swarm(blockstore: Arc) -> Result>, InitSwarmError> +where + B: Blockstore + 'static, +{ + let swarm = SwarmBuilder::with_new_identity() + .with_tokio() + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default, + )? + .with_behaviour(|_| Behaviour { + bitswap: beetswap::Behaviour::new(blockstore), + }) + .expect("Moving behaviour doesn't fail") + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) + .build(); + + Ok(swarm) +} diff --git a/storage-retrieval/lib/src/server.rs b/storage-retrieval/lib/src/server.rs new file mode 100644 index 000000000..7295432bb --- /dev/null +++ b/storage-retrieval/lib/src/server.rs @@ -0,0 +1,100 @@ +use std::{io, sync::Arc}; + +use blockstore::Blockstore; +use futures::StreamExt; +use libp2p::{Multiaddr, PeerId, Swarm, TransportError}; +use libp2p_core::ConnectedPoint; +use libp2p_swarm::{ConnectionId, SwarmEvent}; +use thiserror::Error; +use tracing::{debug, instrument, trace}; + +use crate::{new_swarm, Behaviour, BehaviourEvent, InitSwarmError}; + +/// Error that can occur while running storage retrieval server. +#[derive(Debug, Error)] +pub enum ServerError { + /// Error occurred while initialing swarm + #[error("Swarm initialization error: {0}")] + InitSwarm(#[from] InitSwarmError), + /// An error propagated from the libp2p transport. + #[error("Transport error: {0}")] + Transport(#[from] TransportError), +} + +/// Storage retrieval server. Server listens on the block requests and provide +/// them to the client. +pub struct Server +where + B: Blockstore + 'static, +{ + // Swarm instance + swarm: Swarm>, +} + +impl Server +where + B: Blockstore + 'static, +{ + pub fn new(blockstore: Arc) -> Result { + let swarm = new_swarm(blockstore)?; + + Ok(Self { swarm }) + } + + // Start the server. The server will stop if it received a cancellation + // event or some error occurred. + pub async fn run(mut self, listeners: Vec) -> Result<(), ServerError> { + // Listen on + for listener in listeners { + self.swarm.listen_on(listener)?; + } + + // Keep server running + loop { + let event = self.swarm.select_next_some().await; + self.on_swarm_event(event)?; + } + } + + fn on_swarm_event(&mut self, event: SwarmEvent>) -> Result<(), ServerError> { + trace!(?event, "Received swarm event"); + + match event { + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + endpoint, + .. + } => { + self.on_peer_connected(peer_id, connection_id, endpoint); + } + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + .. + } => { + self.on_peer_disconnected(peer_id, connection_id); + } + _ => { + // Nothing to do here + } + } + + Ok(()) + } + + #[instrument(skip_all, fields(peer_id = %peer_id))] + fn on_peer_connected( + &mut self, + peer_id: PeerId, + _connection_id: ConnectionId, + _endpoint: ConnectedPoint, + ) { + debug!("Peer connected"); + } + + #[instrument(skip_all, fields(peer_id = %peer_id, connection_id = %connection_id))] + fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) { + debug!("Peer disconnected"); + } +}