Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archive crate #8

Merged
merged 13 commits into from
Feb 5, 2025
255 changes: 248 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ tracing = "0.1.41"
[profile.release]
debug = 1
codegen-units = 128
incremental = true

[profile.bench]
debug = 1
codegen-units = 128
incremental = true
10 changes: 9 additions & 1 deletion crates/archive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ async-stream = "0.3.6"
async-trait = "0.1.83"
aws-config = { version = "1.5.9", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.59.0", features = ["rt-tokio"] }
axum = "0.8.1"
bytes = { workspace = true }
clap = { version = "4.5.9", features = ["derive"] }
futures-core = "0.3.31"
futures-util = "0.3.31"
parking_lot = { workspace = true }
lazy_static = "1.5.0"
parquet = { workspace = true }
prometheus-client = "0.23.0"
rayon = { workspace = true }
regex = "1.11.1"
reqwest = { version = "0.12.9", features = ["json", "gzip", "stream"] }
scopeguard = "1.2.0"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
Expand All @@ -25,4 +30,7 @@ sqd-dataset = { path = "../dataset" }
sqd-primitives = { path = "../primitives" }
tempfile = { workspace = true }
tokio = { version = "1.38.0", features = ["full"] }
tokio-util = "0.7.13"
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json", "time"] }
url = "2.5.3"
76 changes: 62 additions & 14 deletions crates/archive/src/archive.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use crate::chain_builder::{ChainBuilder, ChainBuilderBox};
use crate::cli::{Cli, NetworkKind};
use crate::fs::create_fs;
use crate::ingest::ingest_from_service;
use crate::layout::Layout;
use crate::writer::ParquetWriter;
use crate::metrics;
use crate::processor::LineProcessor;
use crate::server::run_server;
use crate::sink::Sink;
use crate::writer::{Writer, WriterItem};
use anyhow::ensure;
use futures_util::FutureExt;
use sqd_data::solana::tables::SolanaChunkBuilder;
use sqd_data_types::BlockNumber;
use std::time::Duration;
use prometheus_client::registry::Registry;


pub async fn run(args: &Cli) -> anyhow::Result<()> {
Expand All @@ -15,6 +21,8 @@ pub async fn run(args: &Cli) -> anyhow::Result<()> {
"--first-block is greater than --last-block"
);

init_logging(args.json_log);

let fs = create_fs(&args.dest).await?;
let layout = Layout::new(fs.clone());

Expand All @@ -27,28 +35,51 @@ pub async fn run(args: &Cli) -> anyhow::Result<()> {

if let Some(last_block) = args.last_block {
if chunk_writer.next_block() > last_block {
println!("nothing to do");
tracing::info!("nothing to do");
return Ok(());
}
}

if let Some(prom_port) = args.prom_port {
let mut metrics_registry = Registry::default();
metrics::register_metrics(&mut metrics_registry);
let server = run_server(metrics_registry, prom_port);
tokio::spawn(server);
}

let chunk_builder: ChainBuilderBox = match args.network_kind {
NetworkKind::Solana => Box::new(
ChainBuilder::<SolanaChunkBuilder>::default()
ChainBuilder::<SolanaChunkBuilder>::default(),
),
};

let writer = ParquetWriter::new(chunk_builder);
let processor = LineProcessor::new(chunk_builder);

let block_stream = ingest_from_service(
let (chunk_sender, chunk_receiver) = tokio::sync::mpsc::unbounded_channel::<WriterItem>();

let block_stream_interval = Duration::from_secs(args.block_stream_interval.into());
let mut sink = Sink::new(
processor,
chunk_writer,
args.chunk_size,
args.src.clone(),
chunk_writer.next_block(),
args.last_block
block_stream_interval,
args.last_block,
chunk_sender,
);

let prev_chunk_hash = chunk_writer.prev_chunk_hash();
let mut writer = Writer::new(fs, chunk_receiver);

todo!()
tokio::try_join!(
async {
let res = sink.r#loop().await;
// manual drop should close writer's channel
drop(sink);
res
},
writer.start()
)?;

Ok(())
}


Expand All @@ -62,6 +93,23 @@ fn chunk_check(filelist: &[String]) -> bool {
}


fn short_hash(value: &str) -> &str {
&value[value.len().saturating_sub(5)..]
}
fn init_logging(json: bool) {
let env_filter = tracing_subscriber::EnvFilter::builder().parse_lossy(
std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV)
.unwrap_or(format!("{}=info", std::env!("CARGO_CRATE_NAME"))),
);

if json {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.json()
.flatten_event(true)
.init();
} else {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.init();
}
}
28 changes: 24 additions & 4 deletions crates/archive/src/chain_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,38 @@ pub trait AnyChainBuilder: Send + Sync {
fn last_block_number(&self) -> BlockNumber;

fn last_block_hash(&self) -> &str;

fn last_parent_block_hash(&self) -> &str;
}


pub struct ChainBuilder<B> {
chunk_builder: B,
last_block_number: BlockNumber,
last_block_hash: String
last_block_hash: String,
last_parent_block_hash: String
}


impl<B: Default> ChainBuilder<B> {
pub fn new(base_block_number: BlockNumber, base_block_hash: String) -> Self {
pub fn new(
base_block_number: BlockNumber,
base_block_hash: String,
base_parent_block_hash: String
) -> Self {
Self {
chunk_builder: B::default(),
last_block_number: base_block_number,
last_block_hash: base_block_hash
last_block_hash: base_block_hash,
last_parent_block_hash: base_parent_block_hash
}
}
}


impl<B: Default> Default for ChainBuilder<B> {
fn default() -> Self {
Self::new(0, String::new())
Self::new(0, String::new(), String::new())
}
}

Expand All @@ -61,6 +69,8 @@ where
self.last_block_number = block.number();
self.last_block_hash.clear();
self.last_block_hash.insert_str(0, block.hash());
self.last_parent_block_hash.clear();
self.last_parent_block_hash.insert_str(0, block.parent_hash());
Ok(())
}

Expand All @@ -83,6 +93,11 @@ where
fn last_block_hash(&self) -> &str {
&self.last_block_hash
}

#[inline]
fn last_parent_block_hash(&self) -> &str {
&self.last_parent_block_hash
}
}


Expand Down Expand Up @@ -111,4 +126,9 @@ impl AnyChainBuilder for ChainBuilderBox {
fn last_block_hash(&self) -> &str {
self.as_ref().last_block_hash()
}

#[inline]
fn last_parent_block_hash(&self) -> &str {
self.as_ref().last_parent_block_hash()
}
}
14 changes: 13 additions & 1 deletion crates/archive/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use clap::{Parser, ValueEnum};
use clap::{Parser, ValueEnum, value_parser};
use sqd_data_types::BlockNumber;
use url::Url;

Expand Down Expand Up @@ -39,4 +39,16 @@ pub struct Cli {
/// Network kind
#[arg(long, value_enum)]
pub network_kind: NetworkKind,

/// Whether the logs should be structured in JSON format
#[arg(long)]
pub json_log: bool,

/// Port to use for built-in prometheus metrics server
#[arg(long)]
pub prom_port: Option<u16>,

// Interval between attempts to stream new blocks in seconds
#[arg(long, value_parser = value_parser!(u16).range(1..), default_value_t = 300)]
pub block_stream_interval: u16,
}
18 changes: 11 additions & 7 deletions crates/archive/src/fs/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,26 @@ impl Fs for LocalFs {
}

async fn ls(&self) -> anyhow::Result<Vec<String>> {
let mut read_dir = tokio::fs::read_dir(self.root.as_path()).await?;
let mut result = Vec::new();
while let Some(entry) = read_dir.next_entry().await? {
if let Some(name) = entry.file_name().to_str() {
result.push(name.to_string());
match tokio::fs::read_dir(&self.root).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably be simplified to let read_dir = read_dir(...).await.unwrap_or_default()

If clippy doesn't show it with the default config, you can try running cargo clippy -- -W clippy::pedantic — sometimes it gives some useful advices

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here i can't really use unwrap_or_default() because Default trait isn't implemented for tokio::ReadDir. but yeah i will use recommendations from clippy

Ok(mut read_dir) => {
let mut result = Vec::new();
while let Some(entry) = read_dir.next_entry().await? {
if let Some(name) = entry.file_name().to_str() {
result.push(name.to_string());
}
}
Ok(result)
}
Err(_) => Ok(vec![])
}
Ok(result)
}

async fn move_local(&self, local_src: &Path, dest: &str) -> anyhow::Result<()> {
let dest = self.root.join(dest);
if let Some(dir) = dest.parent() {
tokio::fs::create_dir_all(dir).await?;
}
tokio::fs::rename(local_src, self.root.join(dest)).await?;
tokio::fs::rename(local_src, dest).await?;
Ok(())
}

Expand Down
74 changes: 41 additions & 33 deletions crates/archive/src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::fs::local::LocalFs;
use crate::fs::s3::S3Fs;
use anyhow::{anyhow, ensure, Context};
use anyhow::{anyhow, ensure};
use async_trait::async_trait;
use std::path::Path;
use std::sync::Arc;
use url::Url;


mod local;
mod s3;
pub mod local;
pub mod s3;


pub type FSRef = Arc<dyn Fs>;
pub type FSRef = Arc<dyn Fs + Sync + Send>;


#[async_trait]
Expand All @@ -27,36 +27,44 @@ pub trait Fs {


pub async fn create_fs(url: &str) -> anyhow::Result<FSRef> {
let u = Url::parse(url)?;

if u.scheme() == "s3" {
ensure!(!u.cannot_be_a_base(), "invalid s3 url - {}", url);

let bucket = u.host_str().ok_or_else(|| {
anyhow!("bucket is missing in {}", url)
})?;

let mut path = &u.path()[1..];
if path.ends_with('/') {
path = &path[0..path.len() - 1];
match Url::parse(url) {
Ok(u) => {
if u.scheme() == "s3" {
ensure!(!u.cannot_be_a_base(), "invalid s3 url - {}", url);

let bucket = u.host_str().ok_or_else(|| {
anyhow!("bucket is missing in {}", url)
})?;

let mut path = &u.path()[..];
if path.starts_with('/') {
path = &path[1..path.len()];
}
if path.ends_with('/') {
path = &path[0..path.len() - 1];
}

let mut config_loader = aws_config::from_env();
if let Ok(s3_endpoint) = std::env::var("AWS_S3_ENDPOINT") {
config_loader = config_loader.endpoint_url(s3_endpoint);
}
let config = config_loader.load().await;

let s3_client = aws_sdk_s3::Client::new(&config);
let fs = S3Fs::new(s3_client, bucket.to_string(), path.to_string());
return Ok(Arc::new(fs))
} else {
anyhow::bail!("unsupported protocol - {}", u.scheme())
}
}

let mut config_loader = aws_config::from_env();
if let Some(s3_endpoint) = std::env::var("AWS_S3_ENDPOINT").ok() {
config_loader = config_loader.endpoint_url(s3_endpoint);
Err(_) => {
let path = Path::new(url);
if path.is_absolute() || path.is_relative() {
let fs = LocalFs::new(path);
return Ok(Arc::new(fs))
} else {
anyhow::bail!(format!("unsupported filesystem - {url}"))
}
}
let config = config_loader.load().await;

let s3_client = aws_sdk_s3::Client::new(&config);
let fs = S3Fs::new(s3_client, bucket.to_string(), path.to_string());
return Ok(Arc::new(fs))
}

if u.scheme() == "file" || u.scheme() == "" {
let path = Path::new(u.path());
let fs = LocalFs::new(path);
return Ok(Arc::new(fs))
}

anyhow::bail!("unsupported protocol - {}", u.scheme())
}
Loading