Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archive crate #8

Merged
merged 13 commits into from
Feb 5, 2025
Merged

archive crate #8

merged 13 commits into from
Feb 5, 2025

Conversation

tmcgroul
Copy link
Collaborator

@tmcgroul tmcgroul commented Jan 17, 2025

this crate supposed to be a replacement for already existing parquet writer - https://github.com/subsquid/archive.py/tree/master/sqa/writer. some things like logging and metrics are expected to be identical cause it would simplify transition from the old version

@tmcgroul tmcgroul requested a review from eldargab January 17, 2025 01:46
Comment on lines 69 to 78
let mut set = JoinSet::new();
set.spawn(async move {
sink.r#loop().await
});
set.spawn(async move {
writer.start().await
});
while let Some(res) = set.join_next().await {
res??;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a simple futures::join(sink.r#loop(), writer.start()).await

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futures::join waits for both futures to complete. JoinSet supposed to catch an error from the writer task as early as possible.
we can easily imagine that smth happened during writing parquets and if we use join then the error will be propagated only when sink is trying to send a new value to already closed channel

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they both return Results, this one is even more convenient: https://docs.rs/tokio/latest/tokio/macro.try_join.html

while let Some(entry) = read_dir.next_entry().await? {
if let Some(name) = entry.file_name().to_str() {
result.push(name.to_string());
match tokio::fs::read_dir(&self.root).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably be simplified to let read_dir = read_dir(...).await.unwrap_or_default()

If clippy doesn't show it with the default config, you can try running cargo clippy -- -W clippy::pedantic — sometimes it gives some useful advices

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here i can't really use unwrap_or_default() because Default trait isn't implemented for tokio::ReadDir. but yeah i will use recommendations from clippy

let dest_path = format!("{}/{}", dest, file_name);

if path.is_dir() {
Box::pin(self.upload_directory(&path, &dest_path)).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinning on the stack is probably more efficient here: https://docs.rs/tokio/latest/tokio/macro.pin.html

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recursion in an async fn requires boxing
a recursive `async fn` call must introduce indirection such as `Box::pin` to avoid an infinitely sized future

seems like only Box::pin is available in this situation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔
Didn't know that, cool

Comment on lines 26 to 36
let mut stream = res.bytes_stream();
let mut buf = bytes::BytesMut::new();
while let Some(bytes) = stream.try_next().await? {
buf.extend_from_slice(&bytes);
let pos = buf.iter().position(|b| *b == b'\n');
if let Some(pos) = pos {
let line = buf.split_to(pos).freeze();
buf.advance(1);
yield line;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can probably be simplified with the StreamReader (less error-prone):

let res = client.post(url.as_str()).json(&range).send().await.map_err(|e| Error::new(ErrorKind::Other, e))?;
let mut stream = res.bytes_stream().map(|result| result.map_err(|e| Error::new(ErrorKind::Other, e)));
let mut reader = tokio_util::io::StreamReader::new(stream);
loop {
    let mut buf = Vec::new();
    reader.read_until(b'\n', &mut buf).await?;
    yield Bytes::from(buf);
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i remeber i was considering to use it but rejected the idea because it would require to map errors... let's try anyway

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is always an option to use anyhow::Result. This way in most situations you'll be able to just ?, and in worst case you'll have to do .map_err(Into::into)


pub fn register_metrics(registry: &mut Registry) {
registry.register(
"sqd_progress_blocks_per_second",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, it's both simpler and more flexible to expose raw counters (blocks processed since the start of the process) and then calculate useful functions like rate in the prometheus query. It will also remove ambiguity about what range the speed is averaged for.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metrics are identical to those that we have in current parquet writer and i would keep it like this so we can use existing charts and alerts

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then don't forget that this lib will add _total to the metric names of type Counter

*spare = chunk.into_processor().ok()
}
});
metrics::LAST_SAVED_BLOCK.inc_by(last_block);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter has no set method. i can switch to Gauge but i am not sure if won't affect charts 🤔

this way looks ugly but at least it should be correct

let val = last_block - metrics::LAST_SAVED_BLOCK.get();
metrics::LAST_SAVED_BLOCK.inc_by(val);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should probably be gauge if that's what you want. The way you proposed is not an atomic operation, so in case of concurrent access you can get a wrong value

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i found a hack for that - prometheus/client_rust#252 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why use a counter if it's always used as a gauge?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cause these metrics represent values that shouldn't go down

Comment on lines +120 to +122
if self.progress.has_news() {
self.report();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably be moved inside Progress:

self.progress.maybe_report();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here progress only tracks numbers it doesn't print any information. so i can't add maybe_report to it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I meant moving the logging and setting metrics to a new method in the Progress struct. But it doesn't really matter

state.builder.chunk_builder().byte_size() + state.processor.byte_size()
}
let last_block = item.chunk.last_block;
let target_dir = tempfile::tempdir()?.into_path();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit strange that a generic FSRef is passed to the Writer, but it still uses local fs in this method

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe it's parquet format limitation. we should write data to a local file first and then we can upload it to s3

self.has_news
}

pub fn speed(&mut self) -> f64 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be enough to just report the total average? It would simplify things a lot

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well... i just ported an algorithm that we already had in python version. i think this one is more accurate in comparison with the total average. it shows that current writing speed (due to data density or smth else) became faster or slower. the total average also would work

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the python version located here? Let's add a link in the PR description

}
if !data_ingested {
tracing::info!("no blocks were found. waiting 5 min for a new try");
tokio::time::sleep(Duration::from_secs(300)).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this delay be configurable? Sounds like 5 minutes may be a lot for some networks

@eldargab eldargab merged commit 7c0ac7a into master Feb 5, 2025
@eldargab eldargab deleted the dev branch February 5, 2025 19:12
@eldargab
Copy link
Collaborator

eldargab commented Feb 5, 2025

Good work, overall!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants