Skip to content

Commit

Permalink
feat: setup retrieval server and client (#681)
Browse files Browse the repository at this point in the history
  • Loading branch information
cernicc authored Jan 17, 2025
1 parent 44705b4 commit 60b42db
Show file tree
Hide file tree
Showing 14 changed files with 667 additions and 1 deletion.
90 changes: 89 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ members = [
"storage-provider/client",
"storage-provider/common",
"storage-provider/server",
"storage-retrieval/cli",
"storage-retrieval/lib",
"storage/polka-index",
"storagext/cli",
"storagext/lib",
Expand Down Expand Up @@ -51,6 +53,7 @@ async-stream = "0.3.6"
async-trait = "0.1.80"
axum = "0.7.5"
base64 = "0.22.1"
beetswap = "0.4.0"
bitflags = "2.5.0"
blake2b_simd = { version = "1.0.2", default-features = false }
blockstore = "0.7.1"
Expand Down Expand Up @@ -81,6 +84,8 @@ ipld-dagpb = "0.2.1"
itertools = "0.13.0"
jsonrpsee = { version = "0.24.7" }
libp2p = { version = "0.54", default-features = false }
libp2p-core = "0.42.0"
libp2p-swarm = "0.45.1"
log = { version = "0.4.21", default-features = false }
multihash-codetable = { version = "0.1.1", default-features = false }
num-bigint = { version = "0.4.5", default-features = false }
Expand Down Expand Up @@ -140,8 +145,10 @@ pallet-market = { path = "pallets/market", default-features = false }
pallet-proofs = { path = "pallets/proofs", default-features = false }
pallet-randomness = { path = "pallets/randomness", default-features = false }
pallet-storage-provider = { path = "pallets/storage-provider", default-features = false }
polka-index = { path = "storage/polka-index" }
polka-storage-proofs = { path = "lib/polka-storage-proofs", default-features = false }
polka-storage-provider-common = { path = "storage-provider/common" }
polka-storage-retrieval = { path = "storage-retrieval/lib" }
polka-storage-runtime = { path = "runtime" }
primitives = { path = "primitives", default-features = false }
storagext = { path = "storagext/lib" }
Expand Down
1 change: 1 addition & 0 deletions mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod v2;

// We need to re-expose this because `read_block` returns `(Cid, Vec<u8>)`.
pub use ipld_core::cid::Cid;
pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE};
pub use stores::{create_filestore, Blockstore, Config, FileBlockstore};
pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
Expand Down
4 changes: 4 additions & 0 deletions mater/lib/src/multicodec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use ipld_core::cid::{multihash::Multihash, CidGeneric};

pub const SHA_256_CODE: u64 = 0x12;
pub const SHA_512_CODE: u64 = 0x13;

/// The RAW multicodec code
pub const RAW_CODE: u64 = 0x55;

/// THE DAG_PB multicodec code
pub const DAG_PB_CODE: u64 = 0x70;

/// The IDENTITY multicodec code
Expand Down
Binary file not shown.
Binary file not shown.
23 changes: 23 additions & 0 deletions storage-retrieval/cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license-file.workspace = true
name = "polka-fetch"
repository.workspace = true
version = "0.1.0"


[dependencies]
anyhow.workspace = true
cid = { workspace = true }
clap = { workspace = true, features = ["derive"] }
libp2p = { workspace = true }
multihash-codetable = { workspace = true, features = ["sha2"] }
polka-storage-retrieval = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }

[lints]
workspace = true
81 changes: 81 additions & 0 deletions storage-retrieval/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::{path::PathBuf, time::Duration};

use cid::Cid;
use clap::{command, Parser};
use libp2p::Multiaddr;
use polka_storage_retrieval::client::Client;
use tokio::time::timeout;
use tracing::{error, info, level_filters::LevelFilter};
use tracing_subscriber::{
filter::FromEnvError, fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer,
};

#[derive(Parser, Debug)]
#[command()]
struct Cli {
/// Provider used for data download
#[arg(long)]
provider: Multiaddr,
/// The CAR file to write to.
#[arg(long)]
output: PathBuf,
/// Cancel the download if not completed after the specified duration in
/// seconds. If not set the download will never timeout.
#[arg(long, value_parser = parse_duration)]
timeout: Option<Duration>,
/// payload CID
#[arg(long)]
payload_cid: Cid,
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
setup_tracing()?;

let arguments = Cli::parse();

let client = Client::new(
arguments.output,
vec![arguments.provider],
vec![arguments.payload_cid],
)
.await?;

let download_result = match arguments.timeout {
Some(duration) => timeout(duration, client.download()).await,
None => Ok(client.download().await),
};

match download_result {
Ok(Ok(_)) => info!("download successfully finished"),
Ok(Err(err)) => error!(?err, "error occurred while downloading"),
Err(_) => error!("download timeout"),
}

Ok(())
}

fn parse_duration(arg: &str) -> Result<Duration, String> {
let seconds = arg
.parse()
.map_err(|err| format!("failed to parse duration from string: {}", err))?;
Ok(Duration::from_secs(seconds))
}

/// Configure and initialize tracing.
fn setup_tracing() -> Result<(), FromEnvError> {
tracing_subscriber::registry()
.with(
fmt::layer().with_filter(
EnvFilter::builder()
.with_default_directive(if cfg!(debug_assertions) {
LevelFilter::DEBUG.into()
} else {
LevelFilter::WARN.into()
})
.from_env()?,
),
)
.init();
Ok(())
}
33 changes: 33 additions & 0 deletions storage-retrieval/lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license-file.workspace = true
name = "polka-storage-retrieval"
repository.workspace = true
version = "0.1.0"

[lints]
workspace = true

[dependencies]
anyhow = { workspace = true }
beetswap = { workspace = true }
blockstore = { workspace = true }
cid = { workspace = true }
futures = { workspace = true }
ipld-core = { workspace = true, features = ["serde"] }
ipld-dagpb.workspace = true
libp2p = { workspace = true, features = ["macros", "noise", "tcp", "tokio", "yamux"] }
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
mater = { workspace = true, features = ["blockstore"] }
polka-index = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }

[dev-dependencies]
multihash-codetable = { workspace = true, features = ["sha2"] }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
Empty file added storage-retrieval/lib/README.md
Empty file.
49 changes: 49 additions & 0 deletions storage-retrieval/lib/examples/simple_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! The example showcases how to setup a retrieval server with the simple
//! blockstore. Because the server is simple it is used for manual testing of
//! the retrieval client.
use std::sync::Arc;

use anyhow::Result;
use libp2p::Multiaddr;
use mater::FileBlockstore;
use polka_storage_retrieval::server::Server;

#[tokio::main]
async fn main() -> Result<()> {
// Init tracing
let _guard = init_tracing();

// Example blockstore providing only a single file.
let blockstore = Arc::new(
FileBlockstore::from_existing("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car")
.await?,
);

// Setup & run the server
let server = Server::new(blockstore)?;
let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?;
server.run(vec![listener]).await?;

Ok(())
}

fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy();

tracing_subscriber::fmt()
.event_format(
tracing_subscriber::fmt::format()
.with_file(true)
.with_line_number(true),
)
.with_env_filter(filter)
.with_writer(non_blocking)
.init();

guard
}
Loading

0 comments on commit 60b42db

Please sign in to comment.