Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archive crate #8

Merged
merged 13 commits into from
Feb 5, 2025
254 changes: 247 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ tracing = "0.1.41"
[profile.release]
debug = 1
codegen-units = 128
incremental = true

[profile.bench]
debug = 1
codegen-units = 128
incremental = true
9 changes: 8 additions & 1 deletion crates/archive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ async-stream = "0.3.6"
async-trait = "0.1.83"
aws-config = { version = "1.5.9", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.59.0", features = ["rt-tokio"] }
axum = "0.8.1"
bytes = { workspace = true }
clap = { version = "4.5.9", features = ["derive"] }
futures-core = "0.3.31"
futures-util = "0.3.31"
parking_lot = { workspace = true }
lazy_static = "1.5.0"
parquet = { workspace = true }
prometheus-client = "0.23.0"
rayon = { workspace = true }
regex = "1.11.1"
reqwest = { version = "0.12.9", features = ["json", "gzip", "stream"] }
scopeguard = "1.2.0"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
Expand All @@ -25,4 +30,6 @@ sqd-dataset = { path = "../dataset" }
sqd-primitives = { path = "../primitives" }
tempfile = { workspace = true }
tokio = { version = "1.38.0", features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json", "time"] }
url = "2.5.3"
74 changes: 60 additions & 14 deletions crates/archive/src/archive.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use crate::chain_builder::{ChainBuilder, ChainBuilderBox};
use crate::cli::{Cli, NetworkKind};
use crate::fs::create_fs;
use crate::ingest::ingest_from_service;
use crate::layout::Layout;
use crate::writer::ParquetWriter;
use crate::metrics;
use crate::processor::LineProcessor;
use crate::server::run_server;
use crate::sink::Sink;
use crate::writer::{Writer, WriterItem};
use anyhow::ensure;
use sqd_data::solana::tables::SolanaChunkBuilder;
use sqd_data_types::BlockNumber;
use prometheus_client::registry::Registry;
use tokio::task::JoinSet;


pub async fn run(args: &Cli) -> anyhow::Result<()> {
Expand All @@ -15,6 +20,8 @@ pub async fn run(args: &Cli) -> anyhow::Result<()> {
"--first-block is greater than --last-block"
);

init_logging(args.json_log);

let fs = create_fs(&args.dest).await?;
let layout = Layout::new(fs.clone());

Expand All @@ -27,28 +34,50 @@ pub async fn run(args: &Cli) -> anyhow::Result<()> {

if let Some(last_block) = args.last_block {
if chunk_writer.next_block() > last_block {
println!("nothing to do");
tracing::info!("nothing to do");
return Ok(());
}
}

if let Some(prom_port) = args.prom_port {
let mut metrics_registry = Registry::default();
metrics::register_metrics(&mut metrics_registry);
let server = run_server(metrics_registry, prom_port);
tokio::spawn(server);
}

let chunk_builder: ChainBuilderBox = match args.network_kind {
NetworkKind::Solana => Box::new(
ChainBuilder::<SolanaChunkBuilder>::default()
ChainBuilder::<SolanaChunkBuilder>::default(),
),
};

let writer = ParquetWriter::new(chunk_builder);
let processor = LineProcessor::new(chunk_builder);

let block_stream = ingest_from_service(
let (chunk_sender, chunk_receiver) = tokio::sync::mpsc::unbounded_channel::<WriterItem>();

let mut sink = Sink::new(
processor,
chunk_writer,
args.chunk_size,
args.src.clone(),
chunk_writer.next_block(),
args.last_block
args.last_block,
chunk_sender,
);

let prev_chunk_hash = chunk_writer.prev_chunk_hash();
let mut writer = Writer::new(fs, chunk_receiver);

todo!()
let mut set = JoinSet::new();
set.spawn(async move {
sink.r#loop().await
});
set.spawn(async move {
writer.start().await
});
while let Some(res) = set.join_next().await {
res??;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a simple futures::join(sink.r#loop(), writer.start()).await

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futures::join waits for both futures to complete. JoinSet supposed to catch an error from the writer task as early as possible.
we can easily imagine that smth happened during writing parquets and if we use join then the error will be propagated only when sink is trying to send a new value to already closed channel

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they both return Results, this one is even more convenient: https://docs.rs/tokio/latest/tokio/macro.try_join.html


Ok(())
}


Expand All @@ -62,6 +91,23 @@ fn chunk_check(filelist: &[String]) -> bool {
}


fn short_hash(value: &str) -> &str {
&value[value.len().saturating_sub(5)..]
}
fn init_logging(json: bool) {
let env_filter = tracing_subscriber::EnvFilter::builder().parse_lossy(
std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV)
.unwrap_or(format!("{}=info", std::env!("CARGO_CRATE_NAME"))),
);

if json {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.json()
.flatten_event(true)
.init();
} else {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.init();
}
}
28 changes: 24 additions & 4 deletions crates/archive/src/chain_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,38 @@ pub trait AnyChainBuilder: Send + Sync {
fn last_block_number(&self) -> BlockNumber;

fn last_block_hash(&self) -> &str;

fn last_parent_block_hash(&self) -> &str;
}


pub struct ChainBuilder<B> {
chunk_builder: B,
last_block_number: BlockNumber,
last_block_hash: String
last_block_hash: String,
last_parent_block_hash: String
}


impl<B: Default> ChainBuilder<B> {
pub fn new(base_block_number: BlockNumber, base_block_hash: String) -> Self {
pub fn new(
base_block_number: BlockNumber,
base_block_hash: String,
base_parent_block_hash: String
) -> Self {
Self {
chunk_builder: B::default(),
last_block_number: base_block_number,
last_block_hash: base_block_hash
last_block_hash: base_block_hash,
last_parent_block_hash: base_parent_block_hash
}
}
}


impl<B: Default> Default for ChainBuilder<B> {
fn default() -> Self {
Self::new(0, String::new())
Self::new(0, String::new(), String::new())
}
}

Expand All @@ -61,6 +69,8 @@ where
self.last_block_number = block.number();
self.last_block_hash.clear();
self.last_block_hash.insert_str(0, block.hash());
self.last_parent_block_hash.clear();
self.last_parent_block_hash.insert_str(0, block.parent_hash());
Ok(())
}

Expand All @@ -83,6 +93,11 @@ where
fn last_block_hash(&self) -> &str {
&self.last_block_hash
}

#[inline]
fn last_parent_block_hash(&self) -> &str {
&self.last_parent_block_hash
}
}


Expand Down Expand Up @@ -111,4 +126,9 @@ impl AnyChainBuilder for ChainBuilderBox {
fn last_block_hash(&self) -> &str {
self.as_ref().last_block_hash()
}

#[inline]
fn last_parent_block_hash(&self) -> &str {
self.as_ref().last_parent_block_hash()
}
}
8 changes: 8 additions & 0 deletions crates/archive/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,12 @@ pub struct Cli {
/// Network kind
#[arg(long, value_enum)]
pub network_kind: NetworkKind,

/// Whether the logs should be structured in JSON format
#[arg(long)]
pub json_log: bool,

/// Port to use for built-in prometheus metrics server
#[arg(long)]
pub prom_port: Option<u16>,
}
18 changes: 11 additions & 7 deletions crates/archive/src/fs/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,26 @@ impl Fs for LocalFs {
}

async fn ls(&self) -> anyhow::Result<Vec<String>> {
let mut read_dir = tokio::fs::read_dir(self.root.as_path()).await?;
let mut result = Vec::new();
while let Some(entry) = read_dir.next_entry().await? {
if let Some(name) = entry.file_name().to_str() {
result.push(name.to_string());
match tokio::fs::read_dir(&self.root).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably be simplified to let read_dir = read_dir(...).await.unwrap_or_default()

If clippy doesn't show it with the default config, you can try running cargo clippy -- -W clippy::pedantic — sometimes it gives some useful advices

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here i can't really use unwrap_or_default() because Default trait isn't implemented for tokio::ReadDir. but yeah i will use recommendations from clippy

Ok(mut read_dir) => {
let mut result = Vec::new();
while let Some(entry) = read_dir.next_entry().await? {
if let Some(name) = entry.file_name().to_str() {
result.push(name.to_string());
}
}
Ok(result)
}
Err(_) => Ok(vec![])
}
Ok(result)
}

async fn move_local(&self, local_src: &Path, dest: &str) -> anyhow::Result<()> {
let dest = self.root.join(dest);
if let Some(dir) = dest.parent() {
tokio::fs::create_dir_all(dir).await?;
}
tokio::fs::rename(local_src, self.root.join(dest)).await?;
tokio::fs::rename(local_src, dest).await?;
Ok(())
}

Expand Down
74 changes: 41 additions & 33 deletions crates/archive/src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::fs::local::LocalFs;
use crate::fs::s3::S3Fs;
use anyhow::{anyhow, ensure, Context};
use anyhow::{anyhow, ensure};
use async_trait::async_trait;
use std::path::Path;
use std::sync::Arc;
use url::Url;


mod local;
mod s3;
pub mod local;
pub mod s3;


pub type FSRef = Arc<dyn Fs>;
pub type FSRef = Arc<dyn Fs + Sync + Send>;


#[async_trait]
Expand All @@ -27,36 +27,44 @@ pub trait Fs {


pub async fn create_fs(url: &str) -> anyhow::Result<FSRef> {
let u = Url::parse(url)?;

if u.scheme() == "s3" {
ensure!(!u.cannot_be_a_base(), "invalid s3 url - {}", url);

let bucket = u.host_str().ok_or_else(|| {
anyhow!("bucket is missing in {}", url)
})?;

let mut path = &u.path()[1..];
if path.ends_with('/') {
path = &path[0..path.len() - 1];
match Url::parse(url) {
Ok(u) => {
if u.scheme() == "s3" {
ensure!(!u.cannot_be_a_base(), "invalid s3 url - {}", url);

let bucket = u.host_str().ok_or_else(|| {
anyhow!("bucket is missing in {}", url)
})?;

let mut path = &u.path()[..];
if path.starts_with('/') {
path = &path[1..path.len()];
}
if path.ends_with('/') {
path = &path[0..path.len() - 1];
}

let mut config_loader = aws_config::from_env();
if let Some(s3_endpoint) = std::env::var("AWS_S3_ENDPOINT").ok() {
config_loader = config_loader.endpoint_url(s3_endpoint);
}
let config = config_loader.load().await;

let s3_client = aws_sdk_s3::Client::new(&config);
let fs = S3Fs::new(s3_client, bucket.to_string(), path.to_string());
return Ok(Arc::new(fs))
} else {
anyhow::bail!("unsupported protocol - {}", u.scheme())
}
}

let mut config_loader = aws_config::from_env();
if let Some(s3_endpoint) = std::env::var("AWS_S3_ENDPOINT").ok() {
config_loader = config_loader.endpoint_url(s3_endpoint);
Err(_) => {
let path = Path::new(url);
if path.is_absolute() || path.is_relative() {
let fs = LocalFs::new(path);
return Ok(Arc::new(fs))
} else {
anyhow::bail!(format!("unsupported filesystem - {url}"))
}
}
let config = config_loader.load().await;

let s3_client = aws_sdk_s3::Client::new(&config);
let fs = S3Fs::new(s3_client, bucket.to_string(), path.to_string());
return Ok(Arc::new(fs))
}

if u.scheme() == "file" || u.scheme() == "" {
let path = Path::new(u.path());
let fs = LocalFs::new(path);
return Ok(Arc::new(fs))
}

anyhow::bail!("unsupported protocol - {}", u.scheme())
}
Loading