Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: setup retrieval server and client #681

Merged
merged 5 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ members = [
"storage-provider/client",
"storage-provider/common",
"storage-provider/server",
"storage-retrieval/cli",
"storage-retrieval/lib",
"storage/polka-index",
"storagext/cli",
"storagext/lib",
Expand Down Expand Up @@ -51,6 +53,7 @@ async-stream = "0.3.6"
async-trait = "0.1.80"
axum = "0.7.5"
base64 = "0.22.1"
beetswap = "0.4.0"
bitflags = "2.5.0"
blake2b_simd = { version = "1.0.2", default-features = false }
blockstore = "0.7.1"
Expand Down Expand Up @@ -81,6 +84,8 @@ ipld-dagpb = "0.2.1"
itertools = "0.13.0"
jsonrpsee = { version = "0.24.7" }
libp2p = { version = "0.54", default-features = false }
libp2p-core = "0.42.0"
libp2p-swarm = "0.45.1"
log = { version = "0.4.21", default-features = false }
multihash-codetable = { version = "0.1.1", default-features = false }
num-bigint = { version = "0.4.5", default-features = false }
Expand Down Expand Up @@ -140,8 +145,10 @@ pallet-market = { path = "pallets/market", default-features = false }
pallet-proofs = { path = "pallets/proofs", default-features = false }
pallet-randomness = { path = "pallets/randomness", default-features = false }
pallet-storage-provider = { path = "pallets/storage-provider", default-features = false }
polka-index = { path = "storage/polka-index" }
polka-storage-proofs = { path = "lib/polka-storage-proofs", default-features = false }
polka-storage-provider-common = { path = "storage-provider/common" }
polka-storage-retrieval = { path = "storage-retrieval/lib" }
polka-storage-runtime = { path = "runtime" }
primitives = { path = "primitives", default-features = false }
storagext = { path = "storagext/lib" }
Expand Down
1 change: 1 addition & 0 deletions mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod v2;

// We need to re-expose this because `read_block` returns `(Cid, Vec<u8>)`.
pub use ipld_core::cid::Cid;
pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE};
pub use stores::{create_filestore, Blockstore, Config, FileBlockstore};
pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
Expand Down
4 changes: 4 additions & 0 deletions mater/lib/src/multicodec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use ipld_core::cid::{multihash::Multihash, CidGeneric};

pub const SHA_256_CODE: u64 = 0x12;
pub const SHA_512_CODE: u64 = 0x13;

/// The RAW multicodec code
pub const RAW_CODE: u64 = 0x55;

/// THE DAG_PB multicodec code
pub const DAG_PB_CODE: u64 = 0x70;

/// The IDENTITY multicodec code
Expand Down
Binary file not shown.
Binary file not shown.
23 changes: 23 additions & 0 deletions storage-retrieval/cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license-file.workspace = true
name = "polka-fetch"
repository.workspace = true
version = "0.1.0"


[dependencies]
anyhow.workspace = true
cid = { workspace = true }
clap = { workspace = true, features = ["derive"] }
libp2p = { workspace = true }
multihash-codetable = { workspace = true, features = ["sha2"] }
polka-storage-retrieval = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }

[lints]
workspace = true
81 changes: 81 additions & 0 deletions storage-retrieval/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::{path::PathBuf, time::Duration};

use cid::Cid;
use clap::{command, Parser};
use libp2p::Multiaddr;
use polka_storage_retrieval::client::Client;
use tokio::time::timeout;
use tracing::{error, info, level_filters::LevelFilter};
use tracing_subscriber::{
filter::FromEnvError, fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer,
};

#[derive(Parser, Debug)]
#[command()]
struct Cli {
/// Provider used for data download
#[arg(long)]
provider: Multiaddr,
/// The CAR file to write to.
#[arg(long)]
output: PathBuf,
/// Cancel the download if not completed after the specified duration in
/// seconds. If not set the download will never timeout.
#[arg(long, value_parser = parse_duration)]
timeout: Option<Duration>,
/// payload CID
#[arg(long)]
payload_cid: Cid,
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
setup_tracing()?;

let arguments = Cli::parse();

let client = Client::new(
arguments.output,
vec![arguments.provider],
vec![arguments.payload_cid],
)
.await?;

let download_result = match arguments.timeout {
Some(duration) => timeout(duration, client.download()).await,
None => Ok(client.download().await),
};

match download_result {
Ok(Ok(_)) => info!("download successfully finished"),
Ok(Err(err)) => error!(?err, "error occurred while downloading"),
Err(_) => error!("download timeout"),
}

Ok(())
}

fn parse_duration(arg: &str) -> Result<Duration, String> {
let seconds = arg
.parse()
.map_err(|err| format!("failed to parse duration from string: {}", err))?;
Ok(Duration::from_secs(seconds))
}

/// Configure and initialize tracing.
fn setup_tracing() -> Result<(), FromEnvError> {
tracing_subscriber::registry()
.with(
fmt::layer().with_filter(
EnvFilter::builder()
.with_default_directive(if cfg!(debug_assertions) {
LevelFilter::DEBUG.into()
} else {
LevelFilter::WARN.into()
})
.from_env()?,
),
)
.init();
Ok(())
}
33 changes: 33 additions & 0 deletions storage-retrieval/lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license-file.workspace = true
name = "polka-storage-retrieval"
repository.workspace = true
version = "0.1.0"

[lints]
workspace = true

[dependencies]
anyhow = { workspace = true }
beetswap = { workspace = true }
blockstore = { workspace = true }
cid = { workspace = true }
futures = { workspace = true }
ipld-core = { workspace = true, features = ["serde"] }
ipld-dagpb.workspace = true
libp2p = { workspace = true, features = ["macros", "noise", "tcp", "tokio", "yamux"] }
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
mater = { workspace = true, features = ["blockstore"] }
polka-index = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }

[dev-dependencies]
multihash-codetable = { workspace = true, features = ["sha2"] }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
Empty file added storage-retrieval/lib/README.md
Empty file.
45 changes: 45 additions & 0 deletions storage-retrieval/lib/examples/simple_server.rs
cernicc marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::sync::Arc;

use anyhow::Result;
use libp2p::Multiaddr;
use mater::FileBlockstore;
use polka_storage_retrieval::server::Server;

#[tokio::main]
async fn main() -> Result<()> {
// Init tracing
let _guard = init_tracing();

// Example blockstore providing only a single file.
let blockstore = Arc::new(
FileBlockstore::from_existing("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car")
.await?,
);

// Setup & run the server
let server = Server::new(blockstore)?;
let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?;
server.run(vec![listener]).await?;

Ok(())
}

fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy();

tracing_subscriber::fmt()
.event_format(
tracing_subscriber::fmt::format()
.with_file(true)
.with_line_number(true),
)
.with_env_filter(filter)
.with_writer(non_blocking)
.init();

guard
}
Loading