From c480c41183430d337db230cff0302322ea9d1d08 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 16 Dec 2024 16:41:40 +0100 Subject: [PATCH 01/25] feat: add util crate with the task module --- Cargo.lock | 17 +++- Cargo.toml | 2 + crates/pathfinder/Cargo.toml | 1 + crates/pathfinder/src/bin/pathfinder/main.rs | 10 +- crates/util/Cargo.toml | 15 +++ crates/util/src/lib.rs | 4 + crates/util/src/task.rs | 101 +++++++++++++++++++ 7 files changed, 145 insertions(+), 5 deletions(-) create mode 100644 crates/util/Cargo.toml create mode 100644 crates/util/src/lib.rs create mode 100644 crates/util/src/task.rs diff --git a/Cargo.lock b/Cargo.lock index 33cc87dca0..2a9a0c626d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7400,6 +7400,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "util", "warp", "zeroize", "zstd 0.13.2", @@ -10061,13 +10062,15 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.12" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] @@ -10477,6 +10480,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "util" +version = "0.15.2" +dependencies = [ + "anyhow", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "uuid" version = "1.10.0" diff --git a/Cargo.toml b/Cargo.toml index 5e4c82e1c0..5d9cbe4545 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "crates/storage", "crates/tagged", "crates/tagged-debug-derive", + "crates/util", ] exclude = [ "crates/load-test", @@ -137,6 +138,7 @@ tokio = "1.37.0" tokio-retry = "0.3.0" tokio-stream = "0.1.14" tokio-tungstenite = "0.21" +tokio-util = {version = "0.7.13", features = ["rt"]} tower = { version = "0.4.13", default-features = false } tower-http = { version = "0.5.2", default-features = false } tracing = "0.1.37" diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index 8f4468d4a1..fe8d109400 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -71,6 +71,7 @@ tracing-subscriber = { workspace = true, features = [ "ansi", ] } url = { workspace = true } +util = { path = "../util" } zeroize = { workspace = true } zstd = { workspace = true } diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index 8252b7d8be..98c1c4d7ee 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -330,13 +330,17 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst } _ = term_signal.recv() => { tracing::info!("TERM signal received, exiting gracefully"); - Ok(()) } _ = int_signal.recv() => { tracing::info!("INT signal received, exiting gracefully"); - Ok(()) } } + + util::task::tracker::close(); + tracing::info!("Waiting for all tasks to finish..."); + util::task::tracker::wait().await; + tracing::info!("Waiting for all tasks to finish... done!"); + Ok(()) } #[cfg(feature = "tokio-console")] @@ -614,7 +618,7 @@ fn start_feeder_gateway_sync( fetch_casm_from_fgw: config.fetch_casm_from_fgw, }; - tokio::spawn(state::sync(sync_context, state::l1::sync, state::l2::sync)) + util::task::spawn(state::sync(sync_context, state::l1::sync, state::l2::sync)) } #[cfg(feature = "p2p")] diff --git a/crates/util/Cargo.toml b/crates/util/Cargo.toml new file mode 100644 index 0000000000..cd33573d56 --- /dev/null +++ b/crates/util/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "util" +description = "Common Rust utilities" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +rust-version = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +tracing = { workspace = true } diff --git a/crates/util/src/lib.rs b/crates/util/src/lib.rs new file mode 100644 index 0000000000..170a4d32a6 --- /dev/null +++ b/crates/util/src/lib.rs @@ -0,0 +1,4 @@ +//! Common Rust utilities used in Pathfinder. This crate does not include any +//! Starknet specific code. + +pub mod task; diff --git a/crates/util/src/task.rs b/crates/util/src/task.rs new file mode 100644 index 0000000000..d2666fbd14 --- /dev/null +++ b/crates/util/src/task.rs @@ -0,0 +1,101 @@ +use std::future::Future; +use std::sync::LazyLock; + +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; + +pub trait FutureOutputExt { + fn cancelled() -> Self; +} + +impl FutureOutputExt for () { + fn cancelled() -> Self {} +} + +impl FutureOutputExt for anyhow::Result { + fn cancelled() -> Self { + Err(anyhow::anyhow!("Cancelled due to graceful shutdown")) + } +} + +/// Spawns a future on the tokio runtime through a +/// [`tokio_util::task::TaskTracker`]. This ensures that upon graceful shutdown +/// the future will have already completed or will be cancelled in an orderly +/// fashion. +pub fn spawn(future: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: FutureOutputExt + Send + 'static, +{ + let Handle { + task_tracker, + cancellation_token, + } = HANDLE.clone(); + + task_tracker.spawn(async move { + tokio::select! { + _ = cancellation_token.cancelled() => { + F::Output::cancelled() + } + res = future => { + res + } + } + }) +} + +/// Runs the provided closure on a thread where blocking is acceptable, +/// similarly to [`tokio::task::spawn_blocking`], however internally the closure +/// is spawned through a [`tokio_util::task::TaskTracker`] to ensure that it +/// will always be waited on and completed upon graceful shutdown. +/// +/// Additionally, a [`CancellationToken`] is provided to the closure to allow +/// for bailing out early in case of long running tasks when a graceful shutdown +/// is triggered. [`CancellationToken::is_cancelled`] should be used to perform +/// the check. +pub fn spawn_blocking(f: F) -> JoinHandle +where + F: FnOnce(CancellationToken) -> R + Send + 'static, + R: Send + 'static, +{ + let Handle { + task_tracker, + cancellation_token, + } = HANDLE.clone(); + + task_tracker.spawn_blocking(|| f(cancellation_token)) +} + +pub mod tracker { + use super::*; + + /// Close the task tracker and then cancel all tracked futures. See + /// [`TaskTracker::close`] and [`CancellationToken::cancel`]. + pub fn close() { + let Handle { + task_tracker, + cancellation_token, + } = HANDLE.clone(); + task_tracker.close(); + cancellation_token.cancel(); + } + + /// Wait until task tracker is both closed and empty. See + /// [`TaskTracker::wait`]. + pub async fn wait() { + let Handle { task_tracker, .. } = HANDLE.clone(); + task_tracker.wait().await; + } +} + +#[derive(Clone)] +struct Handle { + task_tracker: TaskTracker, + cancellation_token: CancellationToken, +} + +static HANDLE: LazyLock = LazyLock::new(|| Handle { + task_tracker: TaskTracker::new(), + cancellation_token: CancellationToken::new(), +}); From 9fbeab3b8df052442fee7ecb4cb1591ddab7c99f Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 10:09:23 +0100 Subject: [PATCH 02/25] refactor: use rayon where tokio::task::spawn_blocking was sub-optimal --- crates/pathfinder/src/state/sync/class.rs | 13 +++++------ crates/pathfinder/src/state/sync/l2.rs | 27 +++++++++++++---------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/crates/pathfinder/src/state/sync/class.rs b/crates/pathfinder/src/state/sync/class.rs index 051746bb68..41860fbf1d 100644 --- a/crates/pathfinder/src/state/sync/class.rs +++ b/crates/pathfinder/src/state/sync/class.rs @@ -27,13 +27,12 @@ pub async fn download_class( .with_context(|| format!("Downloading class {}", class_hash.0))? .to_vec(); - let (hash, definition) = tokio::task::spawn_blocking(move || -> (anyhow::Result<_>, _) { - ( - compute_class_hash(&definition).context("Computing class hash"), - definition, - ) - }) - .await?; + let (tx, rx) = tokio::sync::oneshot::channel(); + rayon::spawn(move || { + let computed_hash = compute_class_hash(&definition).context("Computing class hash"); + let _ = tx.send((computed_hash, definition)); + }); + let (hash, definition) = rx.await.context("Panic on rayon thread")?; let hash = hash?; use starknet_gateway_types::class_hash::ComputedClassHash; diff --git a/crates/pathfinder/src/state/sync/l2.rs b/crates/pathfinder/src/state/sync/l2.rs index 8d1908e791..b6ffad81c4 100644 --- a/crates/pathfinder/src/state/sync/l2.rs +++ b/crates/pathfinder/src/state/sync/l2.rs @@ -299,14 +299,14 @@ where let (signature, state_update) = match block_validation_mode { BlockValidationMode::Strict => { let block_hash = block.block_hash; - let (verify_result, signature, state_update) = tokio::task::spawn_blocking(move || -> (Result<(), pathfinder_crypto::signature::SignatureError>, BlockCommitmentSignature, Box) { - let verify_result = signature - .verify( - sequencer_public_key, - block_hash, - ); - (verify_result, signature, state_update) - }).await?; + let (tx, rx) = tokio::sync::oneshot::channel(); + rayon::spawn(move || { + let verify_result = signature.verify(sequencer_public_key, block_hash); + let _ = tx.send((verify_result, signature, state_update)); + }); + let (verify_result, signature, state_update) = + rx.await.context("Panic on rayon thread")?; + if let Err(error) = verify_result { tracing::warn!(%error, block_number=%block.block_number, "Block commitment signature mismatch"); } @@ -510,7 +510,8 @@ async fn download_block( let block = recv.await.expect("Panic on rayon thread")?; // Check if commitments and block hash are correct - let verify_hash = tokio::task::spawn_blocking(move || -> anyhow::Result<_> { + let (tx, rx) = tokio::sync::oneshot::channel(); + rayon::spawn(move || { let state_diff_commitment = StateUpdateData::from(state_update.clone()).compute_state_diff_commitment(); let state_update = Box::new(state_update); @@ -524,11 +525,13 @@ async fn download_block( chain, chain_id, ) - .with_context(move || format!("Verify block {block_number}"))?; - Ok((block, state_update, state_diff_commitment, verify_result)) + .with_context(move || format!("Verify block {block_number}")); + + let _ = tx.send((block, state_update, state_diff_commitment, verify_result)); }); let (block, state_update, state_diff_commitment, verify_result) = - verify_hash.await.context("Verify block hash")??; + rx.await.context("Panic on rayon thread")?; + let verify_result = verify_result.context("Verify block hash")?; match (block.status, verify_result, mode) { ( From 256c63f9ec2af4bd4d703c6152c0cbfe759f0d59 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 11:05:29 +0100 Subject: [PATCH 03/25] feat(util/task): add cancellation aware std::thread::spawn wrapper --- crates/util/src/task.rs | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/crates/util/src/task.rs b/crates/util/src/task.rs index d2666fbd14..1bef6d282b 100644 --- a/crates/util/src/task.rs +++ b/crates/util/src/task.rs @@ -1,7 +1,6 @@ use std::future::Future; use std::sync::LazyLock; -use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; @@ -23,7 +22,7 @@ impl FutureOutputExt for anyhow::Result { /// [`tokio_util::task::TaskTracker`]. This ensures that upon graceful shutdown /// the future will have already completed or will be cancelled in an orderly /// fashion. -pub fn spawn(future: F) -> JoinHandle +pub fn spawn(future: F) -> tokio::task::JoinHandle where F: Future + Send + 'static, F::Output: FutureOutputExt + Send + 'static, @@ -50,11 +49,11 @@ where /// is spawned through a [`tokio_util::task::TaskTracker`] to ensure that it /// will always be waited on and completed upon graceful shutdown. /// -/// Additionally, a [`CancellationToken`] is provided to the closure to allow +/// A [`CancellationToken`] is provided to the closure to allow /// for bailing out early in case of long running tasks when a graceful shutdown /// is triggered. [`CancellationToken::is_cancelled`] should be used to perform /// the check. -pub fn spawn_blocking(f: F) -> JoinHandle +pub fn spawn_blocking(f: F) -> tokio::task::JoinHandle where F: FnOnce(CancellationToken) -> R + Send + 'static, R: Send + 'static, @@ -67,6 +66,29 @@ where task_tracker.spawn_blocking(|| f(cancellation_token)) } +/// Runs the provided closure on an [`std::thread`] by calling +/// [`std::thread::spawn`]. +/// +/// A [`CancellationToken`] is provided to the closure to allow for bailing out +/// early in case of long running tasks when a graceful shutdown is triggered. +/// [`CancellationToken::is_cancelled`] should be used to perform the check. +/// +/// ### Important +/// +/// Caller must take care to ensure that the spawned thread is properly joined +/// or make sure that detachment is safe for the application. +pub fn spawn_std(f: F) -> std::thread::JoinHandle +where + F: FnOnce(CancellationToken) -> R + Send + 'static, + R: Send + 'static, +{ + let Handle { + cancellation_token, .. + } = HANDLE.clone(); + + std::thread::spawn(|| f(cancellation_token)) +} + pub mod tracker { use super::*; From be05b27f4db0180e21ff6bd17c2ddaa2cb1e0816 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 11:20:47 +0100 Subject: [PATCH 04/25] refactor: merge make_stream into the util crate --- Cargo.lock | 12 ++---------- Cargo.toml | 1 - crates/make-stream/Cargo.toml | 11 ----------- crates/p2p/Cargo.toml | 2 +- crates/p2p/src/client/peer_agnostic.rs | 10 +++++----- crates/pathfinder/Cargo.toml | 1 - crates/pathfinder/src/sync/class_definitions.rs | 4 ++-- crates/pathfinder/src/sync/storage_adapters.rs | 2 +- crates/util/Cargo.toml | 1 + crates/util/src/lib.rs | 1 + .../src/lib.rs => util/src/make_stream.rs} | 0 11 files changed, 13 insertions(+), 32 deletions(-) delete mode 100644 crates/make-stream/Cargo.toml rename crates/{make-stream/src/lib.rs => util/src/make_stream.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index 2a9a0c626d..7c9957ab0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6536,14 +6536,6 @@ dependencies = [ "libc", ] -[[package]] -name = "make-stream" -version = "0.15.2" -dependencies = [ - "tokio", - "tokio-stream", -] - [[package]] name = "match_cfg" version = "0.1.0" @@ -7151,7 +7143,6 @@ dependencies = [ "hex", "ipnet", "libp2p", - "make-stream", "p2p_proto", "p2p_stream", "pathfinder-common", @@ -7174,6 +7165,7 @@ dependencies = [ "tracing", "tracing-subscriber", "unsigned-varint 0.8.0", + "util", "void", "zeroize", ] @@ -7357,7 +7349,6 @@ dependencies = [ "http 1.1.0", "ipnet", "jemallocator", - "make-stream", "metrics", "metrics-exporter-prometheus", "mockall", @@ -10486,6 +10477,7 @@ version = "0.15.2" dependencies = [ "anyhow", "tokio", + "tokio-stream", "tokio-util", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 5d9cbe4545..a26892a3ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,6 @@ members = [ "crates/gateway-test-fixtures", "crates/gateway-test-utils", "crates/gateway-types", - "crates/make-stream", "crates/merkle-tree", "crates/p2p", "crates/p2p_proto", diff --git a/crates/make-stream/Cargo.toml b/crates/make-stream/Cargo.toml deleted file mode 100644 index 7a4a6c98e0..0000000000 --- a/crates/make-stream/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "make-stream" -version = { workspace = true } -authors = { workspace = true } -edition = { workspace = true } -license = { workspace = true } -rust-version = { workspace = true } - -[dependencies] -tokio = { workspace = true } -tokio-stream = { workspace = true } diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index b5cc5d643d..e2e530256a 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -34,7 +34,6 @@ libp2p = { workspace = true, features = [ "tokio", "yamux", ] } -make-stream = { path = "../make-stream" } p2p_proto = { path = "../p2p_proto" } p2p_stream = { path = "../p2p_stream" } pathfinder-common = { path = "../common" } @@ -55,6 +54,7 @@ tokio-stream = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } unsigned-varint = { workspace = true, features = ["futures"] } +util = { path = "../util" } void = { workspace = true } zeroize = { workspace = true } diff --git a/crates/p2p/src/client/peer_agnostic.rs b/crates/p2p/src/client/peer_agnostic.rs index 56cc0d3c39..ba539720da 100644 --- a/crates/p2p/src/client/peer_agnostic.rs +++ b/crates/p2p/src/client/peer_agnostic.rs @@ -644,7 +644,7 @@ mod header_stream { tracing::trace!(?start, ?stop, ?dir, "Streaming headers"); - make_stream::from_future(move |tx| async move { + util::make_stream::from_future(move |tx| async move { // Loop which refreshes peer set once we exhaust it. loop { 'next_peer: for peer in get_peers().await { @@ -778,7 +778,7 @@ mod transaction_stream { { tracing::trace!(?start, ?stop, "Streaming Transactions"); - make_stream::from_future(move |tx| async move { + util::make_stream::from_future(move |tx| async move { let mut expected_transaction_counts_stream = Box::pin(counts_stream); let cnt = match try_next(&mut expected_transaction_counts_stream).await { @@ -961,7 +961,7 @@ mod state_diff_stream { { tracing::trace!(?start, ?stop, "Streaming state diffs"); - make_stream::from_future(move |tx| async move { + util::make_stream::from_future(move |tx| async move { let mut length_stream = Box::pin(length_stream); let cnt = match try_next(&mut length_stream).await { @@ -1185,7 +1185,7 @@ mod class_definition_stream { { tracing::trace!(?start, ?stop, "Streaming classes"); - make_stream::from_future(move |tx| async move { + util::make_stream::from_future(move |tx| async move { let mut declared_class_counts_stream = Box::pin(counts_stream); let cnt = match try_next(&mut declared_class_counts_stream).await { @@ -1384,7 +1384,7 @@ mod event_stream { { tracing::trace!(?start, ?stop, "Streaming events"); - make_stream::from_future(move |tx| async move { + util::make_stream::from_future(move |tx| async move { let mut counts_stream = Box::pin(counts_stream); let Some(Ok(cnt)) = counts_stream.next().await else { diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index fe8d109400..5b370516ef 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -30,7 +30,6 @@ futures = { workspace = true, features = ["alloc"] } http = { workspace = true } ipnet = { workspace = true } jemallocator = { workspace = true } -make-stream = { path = "../make-stream" } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } p2p = { path = "../p2p" } diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index 4db1522c39..ec89f79373 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -286,7 +286,7 @@ pub(super) fn verify_declared_at( >, mut classes: BoxStream<'static, Result>, SyncError>>, ) -> impl futures::Stream, SyncError>> { - make_stream::from_future(move |tx| async move { + util::make_stream::from_future(move |tx| async move { let mut dechunker = ClassDechunker::new(); while let Some(expected) = expected_declarations.next().await { @@ -375,7 +375,7 @@ pub(super) fn expected_declarations_stream( mut start: BlockNumber, stop: BlockNumber, ) -> impl futures::Stream)>> { - make_stream::from_blocking(move |tx| { + util::make_stream::from_blocking(move |tx| { let mut db = match storage.connection().context("Creating database connection") { Ok(x) => x, Err(e) => { diff --git a/crates/pathfinder/src/sync/storage_adapters.rs b/crates/pathfinder/src/sync/storage_adapters.rs index 2c2e495f19..41f5798986 100644 --- a/crates/pathfinder/src/sync/storage_adapters.rs +++ b/crates/pathfinder/src/sync/storage_adapters.rs @@ -21,7 +21,7 @@ pub fn counts_stream( + Send + 'static, ) -> impl futures::Stream> { - make_stream::from_blocking(move |tx| { + util::make_stream::from_blocking(move |tx| { let mut batch = VecDeque::new(); while start <= stop { diff --git a/crates/util/Cargo.toml b/crates/util/Cargo.toml index cd33573d56..134b7e7661 100644 --- a/crates/util/Cargo.toml +++ b/crates/util/Cargo.toml @@ -11,5 +11,6 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } diff --git a/crates/util/src/lib.rs b/crates/util/src/lib.rs index 170a4d32a6..0ccda43012 100644 --- a/crates/util/src/lib.rs +++ b/crates/util/src/lib.rs @@ -1,4 +1,5 @@ //! Common Rust utilities used in Pathfinder. This crate does not include any //! Starknet specific code. +pub mod make_stream; pub mod task; diff --git a/crates/make-stream/src/lib.rs b/crates/util/src/make_stream.rs similarity index 100% rename from crates/make-stream/src/lib.rs rename to crates/util/src/make_stream.rs From 4ec79f2d69d87534c2ce99f6133ee97cc2a2747d Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 11:23:24 +0100 Subject: [PATCH 05/25] refactor: move AnyhowExt into the util crate --- crates/common/src/lib.rs | 1 - crates/pathfinder/src/sync.rs | 3 +-- crates/{common => util}/src/error.rs | 0 crates/util/src/lib.rs | 1 + 4 files changed, 2 insertions(+), 3 deletions(-) rename crates/{common => util}/src/error.rs (100%) diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index ed226143c1..9a39d03860 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -17,7 +17,6 @@ use serde::{Deserialize, Serialize}; pub mod casm_class; pub mod class_definition; pub mod consts; -pub mod error; pub mod event; pub mod hash; mod header; diff --git a/crates/pathfinder/src/sync.rs b/crates/pathfinder/src/sync.rs index 2262274383..3231a8ce32 100644 --- a/crates/pathfinder/src/sync.rs +++ b/crates/pathfinder/src/sync.rs @@ -16,7 +16,6 @@ use p2p::client::peer_agnostic::traits::{ }; use p2p::PeerData; use pathfinder_block_hashes::BlockHashDb; -use pathfinder_common::error::AnyhowExt; use pathfinder_common::{ block_hash, BlockHash, @@ -33,6 +32,7 @@ use starknet_gateway_client::{Client as GatewayClient, GatewayApi}; use stream::ProcessStage; use tokio::sync::watch::{self, Receiver}; use tokio_stream::wrappers::WatchStream; +use util::error::AnyhowExt; use crate::state::RESET_DELAY_ON_FAILURE; @@ -205,7 +205,6 @@ where Ok(_) => tracing::debug!("Restarting track sync: unexpected end of Block stream"), Err(SyncError::Fatal(mut error)) => { tracing::error!(%error, "Stopping track sync"); - use pathfinder_common::error::AnyhowExt; return Err(error.take_or_deep_clone()); } Err(error) => { diff --git a/crates/common/src/error.rs b/crates/util/src/error.rs similarity index 100% rename from crates/common/src/error.rs rename to crates/util/src/error.rs diff --git a/crates/util/src/lib.rs b/crates/util/src/lib.rs index 0ccda43012..d2cc9c411e 100644 --- a/crates/util/src/lib.rs +++ b/crates/util/src/lib.rs @@ -1,5 +1,6 @@ //! Common Rust utilities used in Pathfinder. This crate does not include any //! Starknet specific code. +pub mod error; pub mod make_stream; pub mod task; From 4c70576f1da2152ac3f34cea9632edab29730e08 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 11:46:34 +0100 Subject: [PATCH 06/25] feat(make_stream/from_blocking): bail out upon cancellation --- .../pathfinder/src/sync/class_definitions.rs | 6 ++++- .../pathfinder/src/sync/storage_adapters.rs | 10 +++++++- crates/pathfinder/src/sync/stream.rs | 24 +++++++++++++++---- crates/util/src/make_stream.rs | 9 +++++-- 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index ec89f79373..7da2ea7630 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -375,7 +375,7 @@ pub(super) fn expected_declarations_stream( mut start: BlockNumber, stop: BlockNumber, ) -> impl futures::Stream)>> { - util::make_stream::from_blocking(move |tx| { + util::make_stream::from_blocking(move |cancellation_token, tx| { let mut db = match storage.connection().context("Creating database connection") { Ok(x) => x, Err(e) => { @@ -385,6 +385,10 @@ pub(super) fn expected_declarations_stream( }; while start <= stop { + if cancellation_token.is_cancelled() { + return; + } + let db = match db.transaction().context("Creating database transaction") { Ok(x) => x, Err(e) => { diff --git a/crates/pathfinder/src/sync/storage_adapters.rs b/crates/pathfinder/src/sync/storage_adapters.rs index 41f5798986..bc0624cb47 100644 --- a/crates/pathfinder/src/sync/storage_adapters.rs +++ b/crates/pathfinder/src/sync/storage_adapters.rs @@ -21,10 +21,14 @@ pub fn counts_stream( + Send + 'static, ) -> impl futures::Stream> { - util::make_stream::from_blocking(move |tx| { + util::make_stream::from_blocking(move |cancellation_token, tx| { let mut batch = VecDeque::new(); while start <= stop { + if cancellation_token.is_cancelled() { + return; + } + if let Some(counts) = batch.pop_front() { _ = tx.blocking_send(Ok(counts)); continue; @@ -68,6 +72,10 @@ pub fn counts_stream( } while let Some(counts) = batch.pop_front() { + if cancellation_token.is_cancelled() { + return; + } + _ = tx.blocking_send(Ok(counts)); } }) diff --git a/crates/pathfinder/src/sync/stream.rs b/crates/pathfinder/src/sync/stream.rs index 6d6404340c..0f702f9add 100644 --- a/crates/pathfinder/src/sync/stream.rs +++ b/crates/pathfinder/src/sync/stream.rs @@ -82,10 +82,18 @@ impl SyncReceiver { { let (tx, rx) = tokio::sync::mpsc::channel(buffer); - std::thread::spawn(move || { + util::task::spawn_std(move |cancellation_token| { let queue_capacity = self.inner.max_capacity(); - while let Some(input) = self.inner.blocking_recv() { + loop { + if cancellation_token.is_cancelled() { + return; + } + + let Some(input) = self.inner.blocking_recv() else { + return; + }; + let result = match input { Ok(PeerData { peer, data }) => { // Stats for tracing and metrics. @@ -131,12 +139,20 @@ impl SyncReceiver { pub fn try_chunks(mut self, capacity: usize, buffer: usize) -> ChunkSyncReceiver { let (tx, rx) = tokio::sync::mpsc::channel(buffer); - std::thread::spawn(move || { + util::task::spawn_std(move |cancellation_token| { let mut chunk = Vec::with_capacity(capacity); let mut peer = None; let mut err = None; - while let Some(input) = self.inner.blocking_recv() { + loop { + if cancellation_token.is_cancelled() { + return; + } + + let Some(input) = self.inner.blocking_recv() else { + return; + }; + let input = match input { Ok(x) => x, Err(e) => { diff --git a/crates/util/src/make_stream.rs b/crates/util/src/make_stream.rs index c504cd5672..616d36697a 100644 --- a/crates/util/src/make_stream.rs +++ b/crates/util/src/make_stream.rs @@ -3,6 +3,7 @@ use std::future::Future; use tokio::sync::mpsc::{self, Sender}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::Stream; +use tokio_util::sync::CancellationToken; /// Use the sender to yield items to the stream. /// @@ -25,6 +26,10 @@ where /// Use the sender to yield items to the stream. /// +/// A [`CancellationToken`] is provided to the closure to allow for bailing out +/// early in case of long running tasks when a graceful shutdown is triggered. +/// [`CancellationToken::is_cancelled`] should be used to perform the check. +/// /// ### Warning /// /// Implementor of the closure must ensure that the `src` closure __exits if the @@ -34,10 +39,10 @@ where pub fn from_blocking(src: U) -> impl Stream where T: Send + 'static, - U: FnOnce(Sender) + Send + 'static, + U: FnOnce(CancellationToken, Sender) + Send + 'static, { let (tx, rx) = mpsc::channel(1); - std::thread::spawn(move || src(tx)); + crate::task::spawn_std(move |cancellation_token| src(cancellation_token, tx)); ReceiverStream::new(rx) } From aa63cd9ac75b51d22cbe48692c8d59a75469d65b Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 11:59:36 +0100 Subject: [PATCH 07/25] doc(util/task): minor refinement of doc comments --- crates/util/src/task.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/util/src/task.rs b/crates/util/src/task.rs index 1bef6d282b..64aa636dc1 100644 --- a/crates/util/src/task.rs +++ b/crates/util/src/task.rs @@ -18,7 +18,7 @@ impl FutureOutputExt for anyhow::Result { } } -/// Spawns a future on the tokio runtime through a +/// Spawns a future on the `tokio` runtime through a /// [`tokio_util::task::TaskTracker`]. This ensures that upon graceful shutdown /// the future will have already completed or will be cancelled in an orderly /// fashion. @@ -44,7 +44,7 @@ where }) } -/// Runs the provided closure on a thread where blocking is acceptable, +/// Runs the provided closure on a `tokio` thread where blocking is acceptable, /// similarly to [`tokio::task::spawn_blocking`], however internally the closure /// is spawned through a [`tokio_util::task::TaskTracker`] to ensure that it /// will always be waited on and completed upon graceful shutdown. @@ -92,7 +92,7 @@ where pub mod tracker { use super::*; - /// Close the task tracker and then cancel all tracked futures. See + /// Close the task tracker and then **cancel all tracked futures**. See /// [`TaskTracker::close`] and [`CancellationToken::cancel`]. pub fn close() { let Handle { From d73b01543aed72bbe04c301bb23d152e6d61b665 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 12:10:55 +0100 Subject: [PATCH 08/25] fix(sync/checkpoint): rollback to anchor is not committed to the DB Also: - use util::task::spawn_blocking in place of tokio::task::spawn_blocking --- crates/pathfinder/src/sync/checkpoint.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index eb1a7684ae..e534efc5ef 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -607,7 +607,7 @@ struct LocalState { impl LocalState { async fn from_db(storage: Storage, checkpoint: EthereumStateUpdate) -> anyhow::Result { // TODO: this should include header gaps. - spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let mut db = storage .connection() .context("Creating database connection")?; @@ -641,7 +641,7 @@ async fn rollback_to_anchor( local: BlockNumber, anchor: Option, ) -> anyhow::Result<()> { - spawn_blocking(move || { + util::task::spawn_blocking(move |_| { tracing::info!(%local, ?anchor, "Rolling back storage to anchor point"); let last_block_to_remove = anchor.map(|n| n + 1).unwrap_or_default(); @@ -671,6 +671,8 @@ async fn rollback_to_anchor( .reset() .context("Resetting local DB state after reorg")?; + transaction.commit().context("Committing transaction")?; + Ok(()) }) .await From 03df8da917b1ebc72f9495afbf59c59d10aa2f00 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 12:55:13 +0100 Subject: [PATCH 09/25] feat: track tasks in pathfinder-ethereum and pathfinder crates --- Cargo.lock | 1 + crates/ethereum/Cargo.toml | 2 +- crates/ethereum/src/lib.rs | 11 +++++++++- crates/pathfinder/src/bin/pathfinder/main.rs | 7 +++--- crates/pathfinder/src/monitoring.rs | 2 +- crates/pathfinder/src/p2p_network.rs | 4 ++-- .../src/p2p_network/sync_handlers.rs | 2 +- crates/pathfinder/src/state/sync.rs | 20 +++++++++-------- crates/pathfinder/src/state/sync/l2.rs | 12 +++++----- crates/pathfinder/src/sync.rs | 12 +++++++--- crates/pathfinder/src/sync/checkpoint.rs | 5 +++-- .../pathfinder/src/sync/class_definitions.rs | 5 ++--- crates/pathfinder/src/sync/events.rs | 6 ++--- crates/pathfinder/src/sync/headers.rs | 7 +++--- crates/pathfinder/src/sync/state_updates.rs | 5 ++--- crates/pathfinder/src/sync/stream.rs | 20 +++++------------ crates/pathfinder/src/sync/track.rs | 22 +++++++++++-------- crates/pathfinder/src/sync/transactions.rs | 2 +- crates/util/src/make_stream.rs | 2 +- 19 files changed, 81 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c9957ab0b..69b5c6f4bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7486,6 +7486,7 @@ dependencies = [ "serde_json", "tokio", "tracing", + "util", ] [[package]] diff --git a/crates/ethereum/Cargo.toml b/crates/ethereum/Cargo.toml index 484dd140fd..783590d98a 100644 --- a/crates/ethereum/Cargo.toml +++ b/crates/ethereum/Cargo.toml @@ -26,4 +26,4 @@ reqwest = { workspace = true, features = ["json"] } serde_json = { workspace = true } tokio = { workspace = true, features = ["macros"] } tracing = { workspace = true } - +util = { path = "../util" } diff --git a/crates/ethereum/src/lib.rs b/crates/ethereum/src/lib.rs index f12e435c88..004b6a5bd8 100644 --- a/crates/ethereum/src/lib.rs +++ b/crates/ethereum/src/lib.rs @@ -153,7 +153,8 @@ impl EthereumApi for EthereumClient { let provider_clone = provider.clone(); let (finalized_block_tx, mut finalized_block_rx) = tokio::sync::mpsc::channel::(1); - tokio::spawn(async move { + + util::task::spawn(async move { let mut interval = tokio::time::interval(poll_interval); loop { interval.tick().await; @@ -165,6 +166,14 @@ impl EthereumApi for EthereumClient { let _ = finalized_block_tx.send(block_number).await.unwrap(); } } + // This it to mitigate the warning: "this function depends on never type + // fallback being `()`" as we are unable to implement + // [`util::task::FutureOutputExt`] for the never type `!`. Consequently, when + // this warning becomes a hard error we should keep this workaround or by then + // the `!` type will not be experimental anymore. + // Warning related issue: https://github.com/rust-lang/rust/issues/123748 + #[allow(unreachable_code)] + () }); // Process incoming events diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index 98c1c4d7ee..75dcdcbd16 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -296,7 +296,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst }; if !config.disable_version_update_check { - tokio::spawn(update::poll_github_for_releases()); + util::task::spawn(update::poll_github_for_releases()); } let mut term_signal = signal(SignalKind::terminate())?; @@ -645,7 +645,7 @@ fn start_p2p_sync( verify_tree_hashes, block_hash_db: Some(BlockHashDb::new(pathfinder_context.network)), }; - tokio::spawn(sync.run()) + util::task::spawn(sync.run()) } /// Spawns the monitoring task at the given address. @@ -879,7 +879,8 @@ async fn verify_database( gateway_client: &starknet_gateway_client::Client, ) -> anyhow::Result<()> { let storage = storage.clone(); - let db_genesis = tokio::task::spawn_blocking(move || { + + let db_genesis = util::task::spawn_blocking(move |_| { let mut conn = storage.connection().context("Create database connection")?; let tx = conn.transaction().context("Create database transaction")?; diff --git a/crates/pathfinder/src/monitoring.rs b/crates/pathfinder/src/monitoring.rs index bbd76fedf7..aa865cd007 100644 --- a/crates/pathfinder/src/monitoring.rs +++ b/crates/pathfinder/src/monitoring.rs @@ -32,7 +32,7 @@ pub async fn spawn_server( }); let listener = tokio::net::TcpListener::bind(addr.into()).await?; let addr = listener.local_addr()?; - let spawn = tokio::spawn(async move { + let spawn = util::task::spawn(async move { axum::serve(listener, app.into_make_service()) .await .expect("server error") diff --git a/crates/pathfinder/src/p2p_network.rs b/crates/pathfinder/src/p2p_network.rs index 436ff0719e..fc47e67510 100644 --- a/crates/pathfinder/src/p2p_network.rs +++ b/crates/pathfinder/src/p2p_network.rs @@ -46,7 +46,7 @@ pub async fn start(context: P2PContext) -> anyhow::Result { let mut main_loop_handle = { let span = tracing::info_span!("behaviour"); - tokio::task::spawn(p2p_main_loop.run().instrument(span)) + util::task::spawn(p2p_main_loop.run().instrument(span)) }; for addr in listen_on { @@ -96,7 +96,7 @@ pub async fn start(context: P2PContext) -> anyhow::Result { let (mut tx, rx) = tokio::sync::watch::channel(None); let join_handle = { - tokio::task::spawn( + util::task::spawn( async move { loop { tokio::select! { diff --git a/crates/pathfinder/src/p2p_network/sync_handlers.rs b/crates/pathfinder/src/p2p_network/sync_handlers.rs index fe59b3a680..7cd3452ef8 100644 --- a/crates/pathfinder/src/p2p_network/sync_handlers.rs +++ b/crates/pathfinder/src/p2p_network/sync_handlers.rs @@ -420,7 +420,7 @@ where let (sync_tx, mut rx) = mpsc::channel(1); // For backpressure let db_fut = async { - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut connection = storage .connection() diff --git a/crates/pathfinder/src/state/sync.rs b/crates/pathfinder/src/state/sync.rs index 2cc6d80b29..11cb14dd0b 100644 --- a/crates/pathfinder/src/state/sync.rs +++ b/crates/pathfinder/src/state/sync.rs @@ -232,7 +232,7 @@ where // Keep polling the sequencer for the latest block let (tx_latest, rx_latest) = tokio::sync::watch::channel(gateway_latest); - let mut latest_handle = tokio::spawn(l2::poll_latest( + let mut latest_handle = util::task::spawn(l2::poll_latest( sequencer.clone(), head_poll_interval, tx_latest, @@ -245,7 +245,8 @@ where BlockHash(Felt::ZERO), StateCommitment(Felt::ZERO), )); - let _status_sync = tokio::spawn(update_sync_status_latest( + + let _status_sync = util::task::spawn(update_sync_status_latest( Arc::clone(&state), starting_block_hash, starting_block_num, @@ -255,7 +256,7 @@ where // Start L1 producer task. Clone the event sender so that the channel remains // open even if the producer task fails. - let mut l1_handle = tokio::spawn(l1_sync(event_sender.clone(), l1_context.clone())); + let mut l1_handle = util::task::spawn(l1_sync(event_sender.clone(), l1_context.clone())); // Fetch latest blocks from storage let latest_blocks = latest_n_blocks(&mut db_conn, block_cache_size) @@ -265,7 +266,7 @@ where // Start L2 producer task. Clone the event sender so that the channel remains // open even if the producer task fails. - let mut l2_handle = tokio::spawn(l2_sync( + let mut l2_handle = util::task::spawn(l2_sync( event_sender.clone(), l2_context.clone(), l2_head, @@ -283,9 +284,10 @@ where websocket_txs, notifications, }; - let mut consumer_handle = tokio::spawn(consumer(event_receiver, consumer_context, tx_current)); + let mut consumer_handle = + util::task::spawn(consumer(event_receiver, consumer_context, tx_current)); - let mut pending_handle = tokio::spawn(pending::poll_pending( + let mut pending_handle = util::task::spawn(pending::poll_pending( event_sender.clone(), sequencer.clone(), Duration::from_secs(2), @@ -300,7 +302,7 @@ where _ = &mut pending_handle => { tracing::error!("Pending tracking task ended unexpectedly"); - pending_handle = tokio::spawn(pending::poll_pending( + pending_handle = util::task::spawn(pending::poll_pending( event_sender.clone(), sequencer.clone(), Duration::from_secs(2), @@ -337,7 +339,7 @@ where } let fut = l1_sync(event_sender.clone(), l1_context.clone()); - l1_handle = tokio::spawn(async move { + l1_handle = util::task::spawn(async move { tokio::time::sleep(RESET_DELAY_ON_FAILURE).await; fut.await }); @@ -364,7 +366,7 @@ where let block_chain = BlockChain::with_capacity(1_000, latest_blocks); let fut = l2_sync(event_sender.clone(), l2_context.clone(), l2_head, block_chain, rx_latest.clone()); - l2_handle = tokio::spawn(async move { + l2_handle = util::task::spawn(async move { tokio::time::sleep(restart_delay).await; fut.await }); diff --git a/crates/pathfinder/src/state/sync/l2.rs b/crates/pathfinder/src/state/sync/l2.rs index b6ffad81c4..eda2bdbbda 100644 --- a/crates/pathfinder/src/state/sync/l2.rs +++ b/crates/pathfinder/src/state/sync/l2.rs @@ -152,14 +152,14 @@ where }; // We start downloading the signature for the block - let signature_handle = tokio::spawn({ + let signature_handle = util::task::spawn({ let sequencer = sequencer.clone(); async move { let t_signature = std::time::Instant::now(); let result = sequencer.signature(next.into()).await; let t_signature = t_signature.elapsed(); - (result, t_signature) + Ok((result, t_signature)) } }); @@ -261,8 +261,10 @@ where let t_declare = t_declare.elapsed(); // Download signature - let (signature_result, t_signature) = - signature_handle.await.context("Joining signature task")?; + let (signature_result, t_signature) = signature_handle + .await + .context("Joining signature task")? + .context("Task cancelled")?; let (signature, t_signature) = match signature_result { Ok(signature) => (signature, t_signature), Err(SequencerError::StarknetError(err)) @@ -413,7 +415,7 @@ pub async fn download_new_classes( return Ok(vec![]); } - let require_downloading = tokio::task::spawn_blocking(move || { + let require_downloading = util::task::spawn_blocking(move |_| { let mut db_conn = storage .connection() .context("Creating database connection")?; diff --git a/crates/pathfinder/src/sync.rs b/crates/pathfinder/src/sync.rs index 3231a8ce32..f04d5687de 100644 --- a/crates/pathfinder/src/sync.rs +++ b/crates/pathfinder/src/sync.rs @@ -253,7 +253,7 @@ impl LatestStream { // No buffer, for backpressure let (tx, rx) = watch::channel((BlockNumber::GENESIS, BlockHash::ZERO)); - tokio::spawn(async move { + util::task::spawn(async move { let mut interval = tokio::time::interval(head_poll_interval); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); @@ -346,6 +346,8 @@ mod tests { BlockHeaderData, }; + const TIMEOUT: Duration = Duration::from_secs(10); + /// Generate a fake chain of blocks as in /// [`pathfinder_storage::fake::generate`] but with additional /// guarantees: @@ -382,8 +384,12 @@ mod tests { let db = db.transaction().unwrap(); let header = db.block_header(expected_last.into()).unwrap(); if let Some(header) = header { + let after = start.elapsed(); + if after > TIMEOUT { + break; + } + if header.number == expected_last { - let after = start.elapsed(); tracing::info!(?after, "Sync done"); break; } @@ -478,7 +484,7 @@ mod tests { }; tokio::select! { - result = tokio::time::timeout(Duration::from_secs(10), sync.run()) => match result { + result = tokio::time::timeout(TIMEOUT, sync.run()) => match result { Ok(Ok(())) => unreachable!("Sync does not exit upon success, sync_done_watch should have been triggered"), Ok(Err(e)) => tracing::debug!(%e, "Sync failed with a fatal error"), Err(_) => tracing::debug!("Test timed out"), diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index e534efc5ef..4bb22200ec 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -38,7 +38,6 @@ use primitive_types::H160; use serde_json::de; use starknet_gateway_client::{Client, GatewayApi}; use tokio::sync::Mutex; -use tokio::task::spawn_blocking; use tracing::Instrument; use crate::state::block_hash::calculate_transaction_commitment; @@ -680,7 +679,7 @@ async fn rollback_to_anchor( } async fn persist_anchor(storage: Storage, anchor: EthereumStateUpdate) -> anyhow::Result<()> { - spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let mut db = storage .connection() .context("Creating database connection")?; @@ -699,6 +698,8 @@ async fn persist_anchor(storage: Storage, anchor: EthereumStateUpdate) -> anyhow #[cfg(test)] mod tests { + use tokio::task::spawn_blocking; + use super::*; mod handle_header_stream { diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index 7da2ea7630..d5a4d9148a 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -24,7 +24,6 @@ use starknet_gateway_types::error::SequencerError; use starknet_gateway_types::reply::call; use tokio::sync::mpsc::{self, Receiver}; use tokio::sync::{oneshot, Mutex}; -use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use super::storage_adapters; @@ -75,7 +74,7 @@ pub(super) async fn next_missing( storage: Storage, head: BlockNumber, ) -> anyhow::Result> { - spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let mut db = storage .connection() .context("Creating database connection")?; @@ -545,7 +544,7 @@ pub(super) async fn persist( storage: Storage, classes: Vec>, ) -> Result { - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let mut db = storage .connection() .context("Creating database connection")?; diff --git a/crates/pathfinder/src/sync/events.rs b/crates/pathfinder/src/sync/events.rs index f9b11c20d2..28867b93d0 100644 --- a/crates/pathfinder/src/sync/events.rs +++ b/crates/pathfinder/src/sync/events.rs @@ -17,7 +17,6 @@ use pathfinder_common::{ }; use pathfinder_storage::Storage; use tokio::sync::mpsc; -use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use super::error::SyncError; @@ -65,7 +64,8 @@ pub(super) async fn verify_commitment( peer, data: (block_number, events), } = events; - let events = tokio::task::spawn_blocking(move || { + + let events = util::task::spawn_blocking(move |_| { let mut connection = storage .connection() .context("Creating database connection")?; @@ -100,7 +100,7 @@ pub(super) async fn persist( storage: Storage, events: Vec>, ) -> Result { - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let mut connection = storage .connection() .context("Creating database connection")?; diff --git a/crates/pathfinder/src/sync/headers.rs b/crates/pathfinder/src/sync/headers.rs index a0589230a5..ee2d80ba80 100644 --- a/crates/pathfinder/src/sync/headers.rs +++ b/crates/pathfinder/src/sync/headers.rs @@ -17,7 +17,6 @@ use pathfinder_common::{ StorageCommitment, }; use pathfinder_storage::Storage; -use tokio::task::spawn_blocking; use crate::state::block_hash::{BlockHeaderData, VerifyResult}; use crate::sync::error::SyncError; @@ -55,7 +54,7 @@ pub(super) async fn next_gap( head: BlockNumber, head_hash: BlockHash, ) -> anyhow::Result> { - spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let mut db = storage .connection() .context("Creating database connection")?; @@ -124,8 +123,8 @@ pub(super) async fn query( storage: Storage, block_number: BlockNumber, ) -> anyhow::Result> { - spawn_blocking({ - move || { + util::task::spawn_blocking({ + move |_| { let mut db = storage .connection() .context("Creating database connection")?; diff --git a/crates/pathfinder/src/sync/state_updates.rs b/crates/pathfinder/src/sync/state_updates.rs index e27b93f8aa..6a6c48c59c 100644 --- a/crates/pathfinder/src/sync/state_updates.rs +++ b/crates/pathfinder/src/sync/state_updates.rs @@ -33,7 +33,6 @@ use pathfinder_merkle_tree::starknet_state::update_starknet_state; use pathfinder_merkle_tree::StorageCommitmentTree; use pathfinder_storage::{Storage, TrieUpdate}; use tokio::sync::mpsc; -use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use super::storage_adapters; @@ -46,7 +45,7 @@ pub(super) async fn next_missing( storage: Storage, head: BlockNumber, ) -> anyhow::Result> { - spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let mut db = storage .connection() .context("Creating database connection")?; @@ -260,7 +259,7 @@ pub async fn batch_update_starknet_state( verify_tree_hashes: bool, state_updates: Vec>, ) -> Result, SyncError> { - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let mut db = storage .connection() .context("Creating database connection")?; diff --git a/crates/pathfinder/src/sync/stream.rs b/crates/pathfinder/src/sync/stream.rs index 0f702f9add..e8a4f0ff5a 100644 --- a/crates/pathfinder/src/sync/stream.rs +++ b/crates/pathfinder/src/sync/stream.rs @@ -85,15 +85,11 @@ impl SyncReceiver { util::task::spawn_std(move |cancellation_token| { let queue_capacity = self.inner.max_capacity(); - loop { + while let Some(input) = self.inner.blocking_recv() { if cancellation_token.is_cancelled() { - return; + break; } - let Some(input) = self.inner.blocking_recv() else { - return; - }; - let result = match input { Ok(PeerData { peer, data }) => { // Stats for tracing and metrics. @@ -144,15 +140,11 @@ impl SyncReceiver { let mut peer = None; let mut err = None; - loop { + while let Some(input) = self.inner.blocking_recv() { if cancellation_token.is_cancelled() { - return; + break; } - let Some(input) = self.inner.blocking_recv() else { - return; - }; - let input = match input { Ok(x) => x, Err(e) => { @@ -247,7 +239,7 @@ where pub fn spawn(self) -> SyncReceiver { let (tx, rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(async move { + util::task::spawn(async move { let mut inner_stream = Box::pin(self.0); while let Some(item) = inner_stream.next().await { @@ -280,7 +272,7 @@ where pub fn spawn(self) -> SyncReceiver { let (tx, rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(async move { + util::task::spawn(async move { let mut inner_stream = Box::pin(self.0); while let Some(item) = inner_stream.next().await { diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 232321dbd0..e667eb83d7 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -206,7 +206,7 @@ impl HeaderSource { mut start, } = self; - tokio::spawn(async move { + util::task::spawn(async move { let mut latest_onchain = Box::pin(latest_onchain); while let Some(latest_onchain) = latest_onchain.next().await { let mut headers = @@ -241,7 +241,7 @@ impl StateDiffFanout { let (d1_tx, d1_rx) = tokio::sync::mpsc::channel(buffer); let (d2_tx, d2_rx) = tokio::sync::mpsc::channel(buffer); - tokio::spawn(async move { + util::task::spawn(async move { while let Some(state_update) = source.recv().await { let is_err = state_update.is_err(); @@ -288,7 +288,7 @@ impl TransactionsFanout { let (t_tx, t_rx) = tokio::sync::mpsc::channel(buffer); let (e_tx, e_rx) = tokio::sync::mpsc::channel(buffer); - tokio::spawn(async move { + util::task::spawn(async move { while let Some(transactions) = source.recv().await { let is_err = transactions.is_err(); @@ -329,7 +329,7 @@ impl HeaderFanout { let (s_tx, s_rx) = tokio::sync::mpsc::channel(buffer); let (t_tx, t_rx) = tokio::sync::mpsc::channel(buffer); - tokio::spawn(async move { + util::task::spawn(async move { while let Some(signed_header) = source.recv().await { let is_err = signed_header.is_err(); @@ -381,7 +381,8 @@ impl

TransactionSource

{ P: Clone + BlockClient + Send + 'static, { let (tx, rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(async move { + + util::task::spawn(async move { let Self { p2p, mut headers } = self; while let Some(header) = headers.next().await { @@ -456,7 +457,8 @@ impl

EventSource

{ P: Clone + BlockClient + Send + 'static, { let (tx, rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(async move { + + util::task::spawn(async move { let Self { p2p, mut transactions, @@ -539,7 +541,8 @@ impl

StateDiffSource

{ P: Clone + BlockClient + Send + 'static, { let (tx, rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(async move { + + util::task::spawn(async move { let Self { p2p, mut headers } = self; while let Some(header) = headers.next().await { @@ -595,7 +598,8 @@ impl

ClassSource

{ P: Clone + BlockClient + Send + 'static, { let (tx, rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(async move { + + util::task::spawn(async move { let Self { p2p, mut declarations, @@ -663,7 +667,7 @@ impl BlockStream { fn spawn(mut self) -> SyncReceiver { let (tx, rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(async move { + util::task::spawn(async move { loop { let Some(result) = self.next().await else { return; diff --git a/crates/pathfinder/src/sync/transactions.rs b/crates/pathfinder/src/sync/transactions.rs index c947ec9487..1b98890515 100644 --- a/crates/pathfinder/src/sync/transactions.rs +++ b/crates/pathfinder/src/sync/transactions.rs @@ -46,7 +46,7 @@ pub(super) async fn next_missing( storage: Storage, head: BlockNumber, ) -> anyhow::Result> { - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let mut db = storage .connection() .context("Creating database connection")?; diff --git a/crates/util/src/make_stream.rs b/crates/util/src/make_stream.rs index 616d36697a..57444b308b 100644 --- a/crates/util/src/make_stream.rs +++ b/crates/util/src/make_stream.rs @@ -19,7 +19,7 @@ where V: Future + Send + 'static, { let (tx, rx) = mpsc::channel(1); - tokio::spawn(src(tx)); + crate::task::spawn(src(tx)); ReceiverStream::new(rx) } From eaf2689e73bd5e58c8f7b778e4978ee1db8c761e Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 20:19:07 +0100 Subject: [PATCH 10/25] chore: cargo sort --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a26892a3ac..fd19825fcd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -137,7 +137,7 @@ tokio = "1.37.0" tokio-retry = "0.3.0" tokio-stream = "0.1.14" tokio-tungstenite = "0.21" -tokio-util = {version = "0.7.13", features = ["rt"]} +tokio-util = { version = "0.7.13", features = ["rt"] } tower = { version = "0.4.13", default-features = false } tower-http = { version = "0.5.2", default-features = false } tracing = "0.1.37" From 9b857e61490993f82dd22991b9b2a5b5e419d2cb Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 17 Dec 2024 21:28:12 +0100 Subject: [PATCH 11/25] chore: remove dead code --- crates/pathfinder/src/sync/headers.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/crates/pathfinder/src/sync/headers.rs b/crates/pathfinder/src/sync/headers.rs index ee2d80ba80..2459b3cff5 100644 --- a/crates/pathfinder/src/sync/headers.rs +++ b/crates/pathfinder/src/sync/headers.rs @@ -119,24 +119,6 @@ pub(super) async fn next_gap( .context("Joining blocking task")? } -pub(super) async fn query( - storage: Storage, - block_number: BlockNumber, -) -> anyhow::Result> { - util::task::spawn_blocking({ - move |_| { - let mut db = storage - .connection() - .context("Creating database connection")?; - let db = db.transaction().context("Creating database transaction")?; - db.block_header(block_number.into()) - .context("Querying first block without transactions") - } - }) - .await - .context("Joining blocking task")? -} - /// Ensures that the hash chain is continuous i.e. that block numbers increment /// and hashes become parent hashes. pub struct ForwardContinuity { From 1969346697b4ded82fca34371528271374116821 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 18 Dec 2024 11:00:42 +0100 Subject: [PATCH 12/25] feat(rpc): enable graceful shutdown for the rpc server --- Cargo.lock | 1 + crates/rpc/Cargo.toml | 1 + crates/rpc/src/lib.rs | 3 ++- crates/util/src/task.rs | 6 ++++++ 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 69b5c6f4bb..455210137e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7586,6 +7586,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "util", "zstd 0.13.2", ] diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 8d2fd4f1e9..b7dc771ea3 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -52,6 +52,7 @@ tower-http = { workspace = true, features = [ "util", ] } tracing = { workspace = true } +util = { path = "../util" } zstd = { workspace = true } [dev-dependencies] diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 92709018d1..f27df41655 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -201,8 +201,9 @@ impl RpcServer { let router = router.layer(middleware); - let server_handle = tokio::spawn(async move { + let server_handle = util::task::spawn(async move { axum::serve(listener, router.into_make_service()) + .with_graceful_shutdown(util::task::cancellation_token().cancelled_owned()) .await .map_err(Into::into) }); diff --git a/crates/util/src/task.rs b/crates/util/src/task.rs index 64aa636dc1..0fd185c3cf 100644 --- a/crates/util/src/task.rs +++ b/crates/util/src/task.rs @@ -89,6 +89,12 @@ where std::thread::spawn(|| f(cancellation_token)) } +/// Returns a [`CancellationToken`] that can be used to check for graceful +/// shutdown. +pub fn cancellation_token() -> CancellationToken { + HANDLE.clone().cancellation_token +} + pub mod tracker { use super::*; From f7442dc5390ab0b2ffaf4a8d4ac82a77cb80bb41 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 18 Dec 2024 12:13:03 +0100 Subject: [PATCH 13/25] feat: track tasks in pathfinder-rpc --- crates/rpc/src/jsonrpc/router/subscription.rs | 12 +++++----- crates/rpc/src/jsonrpc/websocket/logic.rs | 22 ++++++++++--------- .../rpc/src/method/block_hash_and_number.rs | 3 +-- crates/rpc/src/method/block_number.rs | 3 +-- crates/rpc/src/method/call.rs | 2 +- crates/rpc/src/method/estimate_fee.rs | 3 +-- crates/rpc/src/method/estimate_message_fee.rs | 3 +-- .../src/method/get_block_transaction_count.rs | 3 +-- .../rpc/src/method/get_block_with_receipts.rs | 2 +- .../src/method/get_block_with_tx_hashes.rs | 3 +-- crates/rpc/src/method/get_block_with_txs.rs | 3 +-- crates/rpc/src/method/get_class.rs | 2 +- crates/rpc/src/method/get_class_at.rs | 3 +-- crates/rpc/src/method/get_class_hash_at.rs | 2 +- crates/rpc/src/method/get_compiled_casm.rs | 2 +- crates/rpc/src/method/get_events.rs | 2 +- crates/rpc/src/method/get_nonce.rs | 3 +-- crates/rpc/src/method/get_state_update.rs | 3 +-- crates/rpc/src/method/get_storage_at.rs | 3 +-- crates/rpc/src/method/get_storage_proof.rs | 3 +-- .../get_transaction_by_block_id_and_index.rs | 3 +-- .../rpc/src/method/get_transaction_by_hash.rs | 3 +-- .../rpc/src/method/get_transaction_receipt.rs | 3 +-- .../rpc/src/method/get_transaction_status.rs | 3 +-- .../rpc/src/method/simulate_transactions.rs | 2 +- crates/rpc/src/method/subscribe_events.rs | 2 +- crates/rpc/src/method/subscribe_new_heads.rs | 2 +- .../method/subscribe_transaction_status.rs | 4 ++-- .../src/method/trace_block_transactions.rs | 2 +- crates/rpc/src/method/trace_transaction.rs | 2 +- .../rpc/src/pathfinder/methods/get_proof.rs | 6 ++--- .../methods/get_transaction_status.rs | 3 +-- crates/rpc/src/v06/method/call.rs | 2 +- crates/rpc/src/v06/method/estimate_fee.rs | 3 +-- .../src/v06/method/estimate_message_fee.rs | 3 +-- .../v06/method/get_block_with_tx_hashes.rs | 3 +-- .../rpc/src/v06/method/get_block_with_txs.rs | 3 +-- .../src/v06/method/get_transaction_receipt.rs | 3 +-- .../src/v06/method/get_transaction_status.rs | 3 +-- .../src/v06/method/simulate_transactions.rs | 2 +- .../v06/method/trace_block_transactions.rs | 2 +- .../rpc/src/v06/method/trace_transaction.rs | 2 +- 42 files changed, 60 insertions(+), 83 deletions(-) diff --git a/crates/rpc/src/jsonrpc/router/subscription.rs b/crates/rpc/src/jsonrpc/router/subscription.rs index 336e4a5f0f..5c82681a48 100644 --- a/crates/rpc/src/jsonrpc/router/subscription.rs +++ b/crates/rpc/src/jsonrpc/router/subscription.rs @@ -176,7 +176,7 @@ where let first_block = pathfinder_storage::BlockId::try_from(first_block) .map_err(|e| RpcError::InvalidParams(e.to_string()))?; let storage = router.context.storage.clone(); - let current_block = tokio::task::spawn_blocking(move || -> Result<_, RpcError> { + let current_block = util::task::spawn_blocking(move |_| -> Result<_, RpcError> { let mut conn = storage.connection().map_err(RpcError::InternalError)?; let db = conn.transaction().map_err(RpcError::InternalError)?; db.block_number(first_block) @@ -189,7 +189,7 @@ where } }; - Ok(tokio::spawn(async move { + Ok(util::task::spawn(async move { let _subscription_guard = SubscriptionsGuard { subscription_id, subscriptions, @@ -244,7 +244,7 @@ where // Subscribe to new blocks. Receive the first subscription message. let (tx1, mut rx1) = mpsc::channel::>(1024); - tokio::spawn({ + util::task::spawn({ let params = params.clone(); let context = router.context.clone(); let tx = tx.clone(); @@ -352,7 +352,7 @@ pub fn split_ws(ws: WebSocket, version: RpcVersion) -> (WsSender, WsReceiver) { let (mut ws_sender, mut ws_receiver) = ws.split(); // Send messages to the websocket using an MPSC channel. let (sender_tx, mut sender_rx) = mpsc::channel::>(1024); - tokio::spawn(async move { + util::task::spawn(async move { while let Some(msg) = sender_rx.recv().await { match msg { Ok(msg) => { @@ -379,7 +379,7 @@ pub fn split_ws(ws: WebSocket, version: RpcVersion) -> (WsSender, WsReceiver) { }); // Receive messages from the websocket using an MPSC channel. let (receiver_tx, receiver_rx) = mpsc::channel::>(1024); - tokio::spawn(async move { + util::task::spawn(async move { while let Some(msg) = ws_receiver.next().await { if receiver_tx.send(msg).await.is_err() { break; @@ -397,7 +397,7 @@ pub fn handle_json_rpc_socket( let subscriptions: Arc>> = Default::default(); // Read and handle messages from the websocket. - tokio::spawn(async move { + util::task::spawn(async move { loop { let request = match ws_rx.recv().await { Some(Ok(Message::Text(msg))) => msg, diff --git a/crates/rpc/src/jsonrpc/websocket/logic.rs b/crates/rpc/src/jsonrpc/websocket/logic.rs index 471f3475c1..89f87bd8d2 100644 --- a/crates/rpc/src/jsonrpc/websocket/logic.rs +++ b/crates/rpc/src/jsonrpc/websocket/logic.rs @@ -86,13 +86,13 @@ async fn handle_socket(socket: WebSocket, router: RpcRouter) { let (response_sender, response_receiver) = mpsc::channel(10); - tokio::spawn(write( + util::task::spawn(write( ws_sender, response_receiver, websocket_context.socket_buffer_capacity, router.version, )); - tokio::spawn(read(ws_receiver, response_sender, router)); + util::task::spawn(read(ws_receiver, response_sender, router)); } async fn write( @@ -293,7 +293,7 @@ impl SubscriptionManager { let handle = match params { Params::NewHeads => { let receiver = websocket_source.new_head.subscribe(); - tokio::spawn(header_subscription( + util::task::spawn(header_subscription( response_sender, receiver, subscription_id, @@ -302,7 +302,7 @@ impl SubscriptionManager { Params::Events(filter) => { let l2_blocks = websocket_source.l2_blocks.subscribe(); let pending_data = websocket_source.pending_data.clone(); - tokio::spawn(event_subscription( + util::task::spawn(event_subscription( response_sender, l2_blocks, pending_data, @@ -310,12 +310,14 @@ impl SubscriptionManager { filter, )) } - Params::TransactionStatus(params) => tokio::spawn(transaction_status_subscription( - response_sender, - subscription_id, - params.transaction_hash, - gateway, - )), + Params::TransactionStatus(params) => { + util::task::spawn(transaction_status_subscription( + response_sender, + subscription_id, + params.transaction_hash, + gateway, + )) + } }; self.subscriptions.insert(subscription_id, handle); diff --git a/crates/rpc/src/method/block_hash_and_number.rs b/crates/rpc/src/method/block_hash_and_number.rs index 837f524c2f..ce5bace50a 100644 --- a/crates/rpc/src/method/block_hash_and_number.rs +++ b/crates/rpc/src/method/block_hash_and_number.rs @@ -13,8 +13,7 @@ crate::error::generate_rpc_error_subset!(Error: NoBlocks); /// Get the latest block hash and number. pub async fn block_hash_and_number(context: RpcContext) -> Result { let span = tracing::Span::current(); - - let jh = tokio::task::spawn_blocking(move || { + let jh = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/block_number.rs b/crates/rpc/src/method/block_number.rs index 4fa84dc992..06494c430d 100644 --- a/crates/rpc/src/method/block_number.rs +++ b/crates/rpc/src/method/block_number.rs @@ -10,8 +10,7 @@ crate::error::generate_rpc_error_subset!(Error: NoBlocks); /// Get the latest block number. pub async fn block_number(context: RpcContext) -> Result { let span = tracing::Span::current(); - - let jh = tokio::task::spawn_blocking(move || { + let jh = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/call.rs b/crates/rpc/src/method/call.rs index 6eb2570764..1fdd5b3559 100644 --- a/crates/rpc/src/method/call.rs +++ b/crates/rpc/src/method/call.rs @@ -109,7 +109,7 @@ pub struct Output(pub Vec); pub async fn call(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - let result = tokio::task::spawn_blocking(move || { + let result = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context diff --git a/crates/rpc/src/method/estimate_fee.rs b/crates/rpc/src/method/estimate_fee.rs index d461d22f30..0aa97bbbbb 100644 --- a/crates/rpc/src/method/estimate_fee.rs +++ b/crates/rpc/src/method/estimate_fee.rs @@ -47,8 +47,7 @@ pub struct Output(Vec); pub async fn estimate_fee(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - - let result = tokio::task::spawn_blocking(move || { + let result = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .execution_storage diff --git a/crates/rpc/src/method/estimate_message_fee.rs b/crates/rpc/src/method/estimate_message_fee.rs index 3bcb864a2c..c99f3b573b 100644 --- a/crates/rpc/src/method/estimate_message_fee.rs +++ b/crates/rpc/src/method/estimate_message_fee.rs @@ -64,8 +64,7 @@ pub async fn estimate_message_fee( input: EstimateMessageFeeInput, ) -> Result { let span = tracing::Span::current(); - - let mut result = tokio::task::spawn_blocking(move || { + let mut result = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/get_block_transaction_count.rs b/crates/rpc/src/method/get_block_transaction_count.rs index e1d5f7831b..5adcfcf777 100644 --- a/crates/rpc/src/method/get_block_transaction_count.rs +++ b/crates/rpc/src/method/get_block_transaction_count.rs @@ -29,8 +29,7 @@ pub async fn get_block_transaction_count( input: Input, ) -> Result { let span = tracing::Span::current(); - - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/get_block_with_receipts.rs b/crates/rpc/src/method/get_block_with_receipts.rs index 6e0cd87311..b1d9404c09 100644 --- a/crates/rpc/src/method/get_block_with_receipts.rs +++ b/crates/rpc/src/method/get_block_with_receipts.rs @@ -37,7 +37,7 @@ crate::error::generate_rpc_error_subset!(Error: BlockNotFound); pub async fn get_block_with_receipts(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/get_block_with_tx_hashes.rs b/crates/rpc/src/method/get_block_with_tx_hashes.rs index 8445ea6af5..1c4c7343bc 100644 --- a/crates/rpc/src/method/get_block_with_tx_hashes.rs +++ b/crates/rpc/src/method/get_block_with_tx_hashes.rs @@ -37,8 +37,7 @@ pub enum Output { /// Get block information with transaction hashes given the block id pub async fn get_block_with_tx_hashes(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut connection = context .storage diff --git a/crates/rpc/src/method/get_block_with_txs.rs b/crates/rpc/src/method/get_block_with_txs.rs index ee0818542f..27ae8fc945 100644 --- a/crates/rpc/src/method/get_block_with_txs.rs +++ b/crates/rpc/src/method/get_block_with_txs.rs @@ -38,8 +38,7 @@ pub enum Output { /// Get block information with full transactions given the block id pub async fn get_block_with_txs(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut connection = context .storage diff --git a/crates/rpc/src/method/get_class.rs b/crates/rpc/src/method/get_class.rs index c1e5214ce5..2653cc2265 100644 --- a/crates/rpc/src/method/get_class.rs +++ b/crates/rpc/src/method/get_class.rs @@ -43,7 +43,7 @@ impl From for Output { /// Get a contract class. pub async fn get_class(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - let jh = tokio::task::spawn_blocking(move || -> Result { + let jh = util::task::spawn_blocking(move |_| -> Result { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/get_class_at.rs b/crates/rpc/src/method/get_class_at.rs index a259833675..a83faafe92 100644 --- a/crates/rpc/src/method/get_class_at.rs +++ b/crates/rpc/src/method/get_class_at.rs @@ -57,8 +57,7 @@ impl SerializeForVersion for Output { /// Get a contract class. pub async fn get_class_at(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - - let jh = tokio::task::spawn_blocking(move || { + let jh = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/get_class_hash_at.rs b/crates/rpc/src/method/get_class_hash_at.rs index 484267b489..af52660a68 100644 --- a/crates/rpc/src/method/get_class_hash_at.rs +++ b/crates/rpc/src/method/get_class_hash_at.rs @@ -27,7 +27,7 @@ pub struct Output(ClassHash); pub async fn get_class_hash_at(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/get_compiled_casm.rs b/crates/rpc/src/method/get_compiled_casm.rs index 393113f2e0..531f24b943 100644 --- a/crates/rpc/src/method/get_compiled_casm.rs +++ b/crates/rpc/src/method/get_compiled_casm.rs @@ -60,7 +60,7 @@ impl From for crate::jsonrpc::RpcError { /// Get the compiled casm for a given class hash. pub async fn get_compiled_casm(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - let jh = tokio::task::spawn_blocking(move || -> Result { + let jh = util::task::spawn_blocking(move |_| -> Result { let _g = span.enter(); let mut db = context diff --git a/crates/rpc/src/method/get_events.rs b/crates/rpc/src/method/get_events.rs index dba6874418..2acc47e39a 100644 --- a/crates/rpc/src/method/get_events.rs +++ b/crates/rpc/src/method/get_events.rs @@ -156,7 +156,7 @@ pub async fn get_events( // blocking task to perform database event query let span = tracing::Span::current(); - let db_events: JoinHandle> = tokio::task::spawn_blocking(move || { + let db_events: JoinHandle> = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut connection = storage .connection() diff --git a/crates/rpc/src/method/get_nonce.rs b/crates/rpc/src/method/get_nonce.rs index 44cf168a36..5c1631879a 100644 --- a/crates/rpc/src/method/get_nonce.rs +++ b/crates/rpc/src/method/get_nonce.rs @@ -27,8 +27,7 @@ crate::error::generate_rpc_error_subset!(Error: BlockNotFound, ContractNotFound) pub async fn get_nonce(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - - tokio::task::spawn_blocking(move || -> Result<_, Error> { + util::task::spawn_blocking(move |_| -> Result<_, Error> { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/get_state_update.rs b/crates/rpc/src/method/get_state_update.rs index b6ba9b7579..52eafdd5ce 100644 --- a/crates/rpc/src/method/get_state_update.rs +++ b/crates/rpc/src/method/get_state_update.rs @@ -43,8 +43,7 @@ impl dto::serialize::SerializeForVersion for Output { pub async fn get_state_update(context: RpcContext, input: Input) -> Result { let storage = context.storage.clone(); let span = tracing::Span::current(); - - let jh = tokio::task::spawn_blocking(move || { + let jh = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = storage .connection() diff --git a/crates/rpc/src/method/get_storage_at.rs b/crates/rpc/src/method/get_storage_at.rs index e8c214c43f..79acaa5f43 100644 --- a/crates/rpc/src/method/get_storage_at.rs +++ b/crates/rpc/src/method/get_storage_at.rs @@ -30,8 +30,7 @@ crate::error::generate_rpc_error_subset!(Error: ContractNotFound, BlockNotFound) /// Get the value of the storage at the given address and key. pub async fn get_storage_at(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - - let jh = tokio::task::spawn_blocking(move || { + let jh = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/get_storage_proof.rs b/crates/rpc/src/method/get_storage_proof.rs index 1b96f4f9ad..91e81fe659 100644 --- a/crates/rpc/src/method/get_storage_proof.rs +++ b/crates/rpc/src/method/get_storage_proof.rs @@ -292,8 +292,7 @@ pub async fn get_storage_proof(context: RpcContext, input: Input) -> Result Result { let storage = context.storage.clone(); let span = tracing::Span::current(); - - let jh = tokio::task::spawn_blocking(move || { + let jh = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = storage .connection() diff --git a/crates/rpc/src/method/get_transaction_receipt.rs b/crates/rpc/src/method/get_transaction_receipt.rs index b345c8666b..97717298af 100644 --- a/crates/rpc/src/method/get_transaction_receipt.rs +++ b/crates/rpc/src/method/get_transaction_receipt.rs @@ -79,8 +79,7 @@ crate::error::generate_rpc_error_subset!(Error: TxnHashNotFound); pub async fn get_transaction_receipt(context: RpcContext, input: Input) -> Result { let span = tracing::Span::current(); - - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/method/get_transaction_status.rs b/crates/rpc/src/method/get_transaction_status.rs index f48762fa2c..0a8194c33f 100644 --- a/crates/rpc/src/method/get_transaction_status.rs +++ b/crates/rpc/src/method/get_transaction_status.rs @@ -42,8 +42,7 @@ crate::error::generate_rpc_error_subset!(Error: TxnHashNotFound); pub async fn get_transaction_status(context: RpcContext, input: Input) -> Result { // Check database. let span = tracing::Span::current(); - - let db_status = tokio::task::spawn_blocking(move || { + let db_status = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context diff --git a/crates/rpc/src/method/simulate_transactions.rs b/crates/rpc/src/method/simulate_transactions.rs index 3486cf8cc3..24eb5b26c2 100644 --- a/crates/rpc/src/method/simulate_transactions.rs +++ b/crates/rpc/src/method/simulate_transactions.rs @@ -34,7 +34,7 @@ pub async fn simulate_transactions( input: SimulateTransactionInput, ) -> Result { let span = tracing::Span::current(); - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let skip_validate = input diff --git a/crates/rpc/src/method/subscribe_events.rs b/crates/rpc/src/method/subscribe_events.rs index ebefa7b3df..2df225c673 100644 --- a/crates/rpc/src/method/subscribe_events.rs +++ b/crates/rpc/src/method/subscribe_events.rs @@ -101,7 +101,7 @@ impl RpcSubscriptionFlow for SubscribeEvents { ) -> Result, RpcError> { let params = params.clone().unwrap_or_default(); let storage = state.storage.clone(); - let (events, last_block) = tokio::task::spawn_blocking(move || -> Result<_, RpcError> { + let (events, last_block) = util::task::spawn_blocking(move |_| -> Result<_, RpcError> { let mut conn = storage.connection().map_err(RpcError::InternalError)?; let db = conn.transaction().map_err(RpcError::InternalError)?; let events = db diff --git a/crates/rpc/src/method/subscribe_new_heads.rs b/crates/rpc/src/method/subscribe_new_heads.rs index 8b4a3de0ee..83cf74ee67 100644 --- a/crates/rpc/src/method/subscribe_new_heads.rs +++ b/crates/rpc/src/method/subscribe_new_heads.rs @@ -79,7 +79,7 @@ impl RpcSubscriptionFlow for SubscribeNewHeads { to: BlockNumber, ) -> Result, RpcError> { let storage = state.storage.clone(); - let headers = tokio::task::spawn_blocking(move || -> Result<_, RpcError> { + let headers = util::task::spawn_blocking(move |_| -> Result<_, RpcError> { let mut conn = storage.connection().map_err(RpcError::InternalError)?; let db = conn.transaction().map_err(RpcError::InternalError)?; db.block_range(from, to).map_err(RpcError::InternalError) diff --git a/crates/rpc/src/method/subscribe_transaction_status.rs b/crates/rpc/src/method/subscribe_transaction_status.rs index 704a5684d4..5e3983951b 100644 --- a/crates/rpc/src/method/subscribe_transaction_status.rs +++ b/crates/rpc/src/method/subscribe_transaction_status.rs @@ -147,7 +147,7 @@ impl RpcSubscriptionFlow for SubscribeTransactionStatus { // Check if we have the transaction in our database, and if so, send the // relevant transaction status updates. let (first_block, l1_state, tx_with_receipt) = - tokio::task::spawn_blocking(move || -> Result<_, RpcError> { + util::task::spawn_blocking(move |_| -> Result<_, RpcError> { let mut conn = storage.connection().map_err(RpcError::InternalError)?; let db = conn.transaction().map_err(RpcError::InternalError)?; let first_block = db @@ -340,7 +340,7 @@ impl RpcSubscriptionFlow for SubscribeTransactionStatus { // here because it guarantees that the ACCEPTED_ON_L2 update will be // sent before the ACCEPTED_ON_L1 update. let storage = state.storage.clone(); - let l1_state = tokio::task::spawn_blocking(move || -> Result<_, RpcError> { + let l1_state = util::task::spawn_blocking(move |_| -> Result<_, RpcError> { let mut conn = storage.connection().map_err(RpcError::InternalError)?; let db = conn.transaction().map_err(RpcError::InternalError)?; let l1_state = db.latest_l1_state().map_err(RpcError::InternalError)?; diff --git a/crates/rpc/src/method/trace_block_transactions.rs b/crates/rpc/src/method/trace_block_transactions.rs index 65c6876f52..76e9232c75 100644 --- a/crates/rpc/src/method/trace_block_transactions.rs +++ b/crates/rpc/src/method/trace_block_transactions.rs @@ -46,7 +46,7 @@ pub async fn trace_block_transactions( let span = tracing::Span::current(); let storage = context.execution_storage.clone(); - let traces = tokio::task::spawn_blocking(move || { + let traces = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = storage.connection()?; diff --git a/crates/rpc/src/method/trace_transaction.rs b/crates/rpc/src/method/trace_transaction.rs index 3a78f1810c..4386cd4c46 100644 --- a/crates/rpc/src/method/trace_transaction.rs +++ b/crates/rpc/src/method/trace_transaction.rs @@ -52,7 +52,7 @@ pub async fn trace_transaction<'a>( let span = tracing::Span::current(); let local = - tokio::task::spawn_blocking(move || -> Result { + util::task::spawn_blocking(move |_| -> Result { let _g = span.enter(); let mut db = context diff --git a/crates/rpc/src/pathfinder/methods/get_proof.rs b/crates/rpc/src/pathfinder/methods/get_proof.rs index b336ca11b4..f3cbc5029b 100644 --- a/crates/rpc/src/pathfinder/methods/get_proof.rs +++ b/crates/rpc/src/pathfinder/methods/get_proof.rs @@ -248,8 +248,7 @@ pub async fn get_proof( let storage = context.storage.clone(); let span = tracing::Span::current(); - - let jh = tokio::task::spawn_blocking(move || { + let jh = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = storage .connection() @@ -391,8 +390,7 @@ pub async fn get_class_proof( let storage = context.storage.clone(); let span = tracing::Span::current(); - - let jh = tokio::task::spawn_blocking(move || { + let jh = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = storage .connection() diff --git a/crates/rpc/src/pathfinder/methods/get_transaction_status.rs b/crates/rpc/src/pathfinder/methods/get_transaction_status.rs index f093c946e4..8da9655b5d 100644 --- a/crates/rpc/src/pathfinder/methods/get_transaction_status.rs +++ b/crates/rpc/src/pathfinder/methods/get_transaction_status.rs @@ -26,8 +26,7 @@ pub async fn get_transaction_status( input: GetGatewayTransactionInput, ) -> Result { let span = tracing::Span::current(); - - let db_status = tokio::task::spawn_blocking(move || { + let db_status = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context diff --git a/crates/rpc/src/v06/method/call.rs b/crates/rpc/src/v06/method/call.rs index d5fb20977d..bad1be8082 100644 --- a/crates/rpc/src/v06/method/call.rs +++ b/crates/rpc/src/v06/method/call.rs @@ -95,7 +95,7 @@ pub struct CallOutput(#[serde_as(as = "Vec")] pub Vec) pub async fn call(context: RpcContext, input: CallInput) -> Result { let span = tracing::Span::current(); - let result = tokio::task::spawn_blocking(move || { + let result = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context diff --git a/crates/rpc/src/v06/method/estimate_fee.rs b/crates/rpc/src/v06/method/estimate_fee.rs index 49ef14bf95..1c53ba5a75 100644 --- a/crates/rpc/src/v06/method/estimate_fee.rs +++ b/crates/rpc/src/v06/method/estimate_fee.rs @@ -156,8 +156,7 @@ pub async fn estimate_fee_impl( l1_blob_data_availability: L1BlobDataAvailability, ) -> Result, EstimateFeeError> { let span = tracing::Span::current(); - - let result = tokio::task::spawn_blocking(move || { + let result = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .execution_storage diff --git a/crates/rpc/src/v06/method/estimate_message_fee.rs b/crates/rpc/src/v06/method/estimate_message_fee.rs index 48a0953a60..d646311ed7 100644 --- a/crates/rpc/src/v06/method/estimate_message_fee.rs +++ b/crates/rpc/src/v06/method/estimate_message_fee.rs @@ -126,8 +126,7 @@ pub(crate) async fn estimate_message_fee_impl( l1_blob_data_availability: L1BlobDataAvailability, ) -> Result { let span = tracing::Span::current(); - - let mut result = tokio::task::spawn_blocking(move || { + let mut result = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context .storage diff --git a/crates/rpc/src/v06/method/get_block_with_tx_hashes.rs b/crates/rpc/src/v06/method/get_block_with_tx_hashes.rs index 5449f2429f..46251d8a59 100644 --- a/crates/rpc/src/v06/method/get_block_with_tx_hashes.rs +++ b/crates/rpc/src/v06/method/get_block_with_tx_hashes.rs @@ -27,8 +27,7 @@ pub async fn get_block_with_tx_hashes( ) -> Result { let storage = context.storage.clone(); let span = tracing::Span::current(); - - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut connection = storage .connection() diff --git a/crates/rpc/src/v06/method/get_block_with_txs.rs b/crates/rpc/src/v06/method/get_block_with_txs.rs index 65f29dedfb..f479f45e26 100644 --- a/crates/rpc/src/v06/method/get_block_with_txs.rs +++ b/crates/rpc/src/v06/method/get_block_with_txs.rs @@ -28,8 +28,7 @@ pub async fn get_block_with_txs( ) -> Result { let storage = context.storage.clone(); let span = tracing::Span::current(); - - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut connection = storage .connection() diff --git a/crates/rpc/src/v06/method/get_transaction_receipt.rs b/crates/rpc/src/v06/method/get_transaction_receipt.rs index afc09951a3..a726a4d0e0 100644 --- a/crates/rpc/src/v06/method/get_transaction_receipt.rs +++ b/crates/rpc/src/v06/method/get_transaction_receipt.rs @@ -32,8 +32,7 @@ pub async fn get_transaction_receipt_impl( ) -> Result { let storage = context.storage.clone(); let span = tracing::Span::current(); - - let jh = tokio::task::spawn_blocking(move || { + let jh = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = storage .connection() diff --git a/crates/rpc/src/v06/method/get_transaction_status.rs b/crates/rpc/src/v06/method/get_transaction_status.rs index c22d94404d..a98de8c8d2 100644 --- a/crates/rpc/src/v06/method/get_transaction_status.rs +++ b/crates/rpc/src/v06/method/get_transaction_status.rs @@ -82,8 +82,7 @@ pub async fn get_transaction_status( ) -> Result { // Check database. let span = tracing::Span::current(); - - let db_status = tokio::task::spawn_blocking(move || { + let db_status = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = context diff --git a/crates/rpc/src/v06/method/simulate_transactions.rs b/crates/rpc/src/v06/method/simulate_transactions.rs index 571d40219d..8791597d3d 100644 --- a/crates/rpc/src/v06/method/simulate_transactions.rs +++ b/crates/rpc/src/v06/method/simulate_transactions.rs @@ -107,7 +107,7 @@ pub async fn simulate_transactions_impl( l1_blob_data_availability: L1BlobDataAvailability, ) -> Result { let span = tracing::Span::current(); - tokio::task::spawn_blocking(move || { + util::task::spawn_blocking(move |_| { let _g = span.enter(); let skip_validate = input diff --git a/crates/rpc/src/v06/method/trace_block_transactions.rs b/crates/rpc/src/v06/method/trace_block_transactions.rs index 97e8ab23bb..e0bbee8ce3 100644 --- a/crates/rpc/src/v06/method/trace_block_transactions.rs +++ b/crates/rpc/src/v06/method/trace_block_transactions.rs @@ -205,7 +205,7 @@ pub async fn trace_block_transactions_impl( let span = tracing::Span::current(); let storage = context.execution_storage.clone(); - let traces = tokio::task::spawn_blocking(move || { + let traces = util::task::spawn_blocking(move |_| { let _g = span.enter(); let mut db = storage.connection()?; diff --git a/crates/rpc/src/v06/method/trace_transaction.rs b/crates/rpc/src/v06/method/trace_transaction.rs index b658ee6470..e4eb0177cc 100644 --- a/crates/rpc/src/v06/method/trace_transaction.rs +++ b/crates/rpc/src/v06/method/trace_transaction.rs @@ -124,7 +124,7 @@ pub async fn trace_transaction_impl( let span = tracing::Span::current(); let local = - tokio::task::spawn_blocking(move || -> Result { + util::task::spawn_blocking(move |_| -> Result { let _g = span.enter(); let mut db = context From 27cf8f6f788efb8c3a1d5935cd98ff5587b266e6 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 18 Dec 2024 12:32:22 +0100 Subject: [PATCH 14/25] feat(pathfinder): force exit after a grace period --- crates/pathfinder/src/bin/pathfinder/main.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index 75dcdcbd16..8899641ca1 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -5,7 +5,7 @@ use std::num::NonZeroU32; use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::Context; use metrics_exporter_prometheus::PrometheusBuilder; @@ -338,9 +338,17 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst util::task::tracker::close(); tracing::info!("Waiting for all tasks to finish..."); - util::task::tracker::wait().await; - tracing::info!("Waiting for all tasks to finish... done!"); - Ok(()) + // Force exit after a grace period + match tokio::time::timeout(Duration::from_secs(10), util::task::tracker::wait()).await { + Ok(_) => { + tracing::info!("All tasks finished successfully"); + Ok(()) + } + Err(_) => { + tracing::warn!("Graceful shutdown timed out"); + Err(anyhow::anyhow!("Graceful shutdown timed out")) + } + } } #[cfg(feature = "tokio-console")] From 8bc8a2835936cae0f3941ebdd6478e16da78db9f Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 18 Dec 2024 12:51:13 +0100 Subject: [PATCH 15/25] feat(pathfinder/config): add grace period to config --- .../pathfinder/src/bin/pathfinder/config.rs | 21 ++++++++++++++----- crates/pathfinder/src/bin/pathfinder/main.rs | 4 ++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/crates/pathfinder/src/bin/pathfinder/config.rs b/crates/pathfinder/src/bin/pathfinder/config.rs index c318dda19d..9107949d0a 100644 --- a/crates/pathfinder/src/bin/pathfinder/config.rs +++ b/crates/pathfinder/src/bin/pathfinder/config.rs @@ -316,6 +316,15 @@ This should only be enabled for debugging purposes as it adds substantial proces action=ArgAction::Set )] fetch_casm_from_fgw: bool, + + #[arg( + long = "shutdown.grace-period", + value_name = "Seconds", + long_help = "Timeout duration for graceful shutdown after receiving a SIGINT or SIGTERM", + env = "PATHFINDER_SHUTDOWN_GRACE_PERIOD", + default_value = "10" + )] + shutdown_grace_period: std::num::NonZeroU64, } #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq)] @@ -704,8 +713,8 @@ pub struct Config { pub execution_concurrency: Option, pub sqlite_wal: JournalMode, pub max_rpc_connections: std::num::NonZeroUsize, - pub poll_interval: std::time::Duration, - pub l1_poll_interval: std::time::Duration, + pub poll_interval: Duration, + pub l1_poll_interval: Duration, pub color: Color, pub log_output_json: bool, pub disable_version_update_check: bool, @@ -724,6 +733,7 @@ pub struct Config { pub custom_versioned_constants: Option, pub feeder_gateway_fetch_concurrency: NonZeroUsize, pub fetch_casm_from_fgw: bool, + pub shutdown_grace_period: Duration, } pub struct Ethereum { @@ -769,7 +779,7 @@ pub struct P2PConfig; pub struct DebugConfig { pub pretty_log: bool, - pub restart_delay: std::time::Duration, + pub restart_delay: Duration, } impl NetworkConfig { @@ -955,7 +965,7 @@ impl DebugConfig { fn parse(_: ()) -> Self { Self { pretty_log: false, - restart_delay: std::time::Duration::from_secs(60), + restart_delay: Duration::from_secs(60), } } } @@ -965,7 +975,7 @@ impl DebugConfig { fn parse(args: DebugCli) -> Self { Self { pretty_log: args.pretty_log, - restart_delay: std::time::Duration::from_secs(args.restart_delay), + restart_delay: Duration::from_secs(args.restart_delay), } } } @@ -1018,6 +1028,7 @@ impl Config { .custom_versioned_constants_path .map(parse_versioned_constants_or_exit), fetch_casm_from_fgw: cli.fetch_casm_from_fgw, + shutdown_grace_period: Duration::from_secs(cli.shutdown_grace_period.get()), } } } diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index 8899641ca1..c26f24e5b8 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -5,7 +5,7 @@ use std::num::NonZeroU32; use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::Context; use metrics_exporter_prometheus::PrometheusBuilder; @@ -339,7 +339,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst util::task::tracker::close(); tracing::info!("Waiting for all tasks to finish..."); // Force exit after a grace period - match tokio::time::timeout(Duration::from_secs(10), util::task::tracker::wait()).await { + match tokio::time::timeout(config.shutdown_grace_period, util::task::tracker::wait()).await { Ok(_) => { tracing::info!("All tasks finished successfully"); Ok(()) From 7b4dadba937f05261bee3335c990e2b9d75ec599 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 18 Dec 2024 22:12:01 +0100 Subject: [PATCH 16/25] feat(monitoring): add graceful shutdown --- crates/pathfinder/src/monitoring.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/pathfinder/src/monitoring.rs b/crates/pathfinder/src/monitoring.rs index aa865cd007..a0feb17a31 100644 --- a/crates/pathfinder/src/monitoring.rs +++ b/crates/pathfinder/src/monitoring.rs @@ -34,6 +34,7 @@ pub async fn spawn_server( let addr = listener.local_addr()?; let spawn = util::task::spawn(async move { axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(util::task::cancellation_token().cancelled_owned()) .await .expect("server error") }); From e3abcea97fd9a4ae1d19cca7f6eb8569b8708712 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Thu, 19 Dec 2024 10:48:24 +0100 Subject: [PATCH 17/25] refactor(main): reorder initialization towards disallowing task detachment upon error --- crates/pathfinder/src/bin/pathfinder/main.rs | 69 +++++++++++--------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index c26f24e5b8..dfae6cd051 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -44,6 +44,14 @@ fn main() -> anyhow::Result<()> { } async fn async_main() -> anyhow::Result<()> { + // All of the following code is either sync and blocking or async but called + // sequentially down to the point where the monitoring server is spawned. No + // tokio tasks are spawned to that point. If an error occurs before that + // point, the process will just exit with that error, regardless of whether + // any of the signals were received and no tasks will be left detached. + let mut term_signal = signal(SignalKind::terminate())?; + let mut int_signal = signal(SignalKind::interrupt())?; + if std::env::var_os("RUST_LOG").is_none() { // Disable all dependency logs by default. std::env::set_var("RUST_LOG", "pathfinder=info"); @@ -95,24 +103,12 @@ async fn async_main() -> anyhow::Result<()> { .default_network() .context("Using default Starknet network based on Ethereum configuration")?, }; - - // Spawn monitoring if configured. - if let Some(address) = config.monitor_address { - let network_label = match &network { - NetworkConfig::Mainnet => "mainnet", - NetworkConfig::SepoliaTestnet => "testnet-sepolia", - NetworkConfig::SepoliaIntegration => "integration-sepolia", - NetworkConfig::Custom { .. } => "custom", - }; - spawn_monitoring( - network_label, - address, - readiness.clone(), - sync_state.clone(), - ) - .await - .context("Starting monitoring task")?; - } + let network_label = match &network { + NetworkConfig::Mainnet => "mainnet", + NetworkConfig::SepoliaTestnet => "testnet-sepolia", + NetworkConfig::SepoliaIntegration => "integration-sepolia", + NetworkConfig::Custom { .. } => "custom", + }; let pathfinder_context = PathfinderContext::configure_and_proxy_check( network, @@ -257,6 +253,25 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst None => rpc_server, }; + // Spawn monitoring if configured. + if let Some(address) = config.monitor_address { + spawn_monitoring( + network_label, + address, + readiness.clone(), + sync_state.clone(), + ) + .await + .context("Starting monitoring task")?; + } + + // From this point onwards, until the final select, we cannot exit the process + // even if some error is encountered or a signal is received as it would result + // in tasks being detached and cancelled abruptly without a chance to clean + // up. We need to wait for the final select where we can cancel all the tasks + // and wait for them to finish. Only then can we exit the process and return an + // error if some of the tasks failed or no error if we have received a signal. + let (p2p_handle, gossiper, p2p_client) = start_p2p( pathfinder_context.network_id, p2p_storage, @@ -299,9 +314,6 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst util::task::spawn(update::poll_github_for_releases()); } - let mut term_signal = signal(SignalKind::terminate())?; - let mut int_signal = signal(SignalKind::interrupt())?; - // We are now ready. readiness.store(true, std::sync::atomic::Ordering::Relaxed); @@ -888,16 +900,13 @@ async fn verify_database( ) -> anyhow::Result<()> { let storage = storage.clone(); - let db_genesis = util::task::spawn_blocking(move |_| { - let mut conn = storage.connection().context("Create database connection")?; - let tx = conn.transaction().context("Create database transaction")?; + let mut conn = storage.connection().context("Create database connection")?; + let tx = conn.transaction().context("Create database transaction")?; - tx.block_id(BlockNumber::GENESIS.into()) - }) - .await - .context("Joining database task")? - .context("Fetching genesis hash from database")? - .map(|x| x.1); + let db_genesis = tx + .block_id(BlockNumber::GENESIS.into()) + .context("Fetching genesis hash from database")? + .map(|x| x.1); if let Some(database_genesis) = db_genesis { use pathfinder_common::consts::{ From 1bc3889320f09a87b1a9afddcfe66bc42d2ccc8e Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 23 Dec 2024 12:57:18 +0100 Subject: [PATCH 18/25] fix(p2p): storage connection pool too small --- crates/pathfinder/src/bin/pathfinder/main.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index dfae6cd051..5d2c5f40a4 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -182,8 +182,10 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst Try increasing the file limit to using `ulimit` or similar tooling.", )?; + // 5 is enough for normal sync operations, and then `available_parallelism` for + // the rayon thread pool workers to use. let p2p_storage = storage_manager - .create_pool(NonZeroU32::new(1).unwrap()) + .create_pool(NonZeroU32::new(5 + available_parallelism.get() as u32).unwrap()) .context( r"Creating database connection pool for p2p From 52577a8553ab97dfe152739e2ff73650d72eb369 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 23 Dec 2024 13:18:37 +0100 Subject: [PATCH 19/25] fix: critical task errors not propagated to process exit --- crates/pathfinder/src/bin/pathfinder/main.rs | 76 +++++++++++--------- crates/pathfinder/src/p2p_network.rs | 8 ++- 2 files changed, 50 insertions(+), 34 deletions(-) diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index 5d2c5f40a4..cc02a56afd 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -21,6 +21,7 @@ use pathfinder_storage::Storage; use primitive_types::H160; use starknet_gateway_client::GatewayApi; use tokio::signal::unix::{signal, SignalKind}; +use tokio::task::JoinError; use tracing::{info, warn}; use crate::config::{NetworkConfig, StateTries}; @@ -320,49 +321,35 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst readiness.store(true, std::sync::atomic::Ordering::Relaxed); // Monitor our critical spawned process tasks. - tokio::select! { - result = sync_handle => { - match result { - Ok(task_result) => tracing::error!("Sync process ended unexpected with: {:?}", task_result), - Err(err) => tracing::error!("Sync process ended unexpected; failed to join task handle: {:?}", err), - } - anyhow::bail!("Unexpected shutdown"); - } - result = rpc_handle => { - match result { - Ok(_) => tracing::error!("RPC server process ended unexpectedly"), - Err(err) => tracing::error!(error=%err, "RPC server process ended unexpectedly"), - } - anyhow::bail!("Unexpected shutdown"); - } - result = p2p_handle => { - match result { - Ok(_) => tracing::error!("P2P process ended unexpectedly"), - Err(err) => tracing::error!(error=%err, "P2P process ended unexpectedly"), - } - anyhow::bail!("Unexpected shutdown"); - } + let main_result = tokio::select! { + result = sync_handle => handle_critical_task_result("Sync", result), + result = rpc_handle => handle_critical_task_result("RPC", result), + result = p2p_handle => handle_critical_task_result("P2P", result), _ = term_signal.recv() => { - tracing::info!("TERM signal received, exiting gracefully"); + tracing::info!("TERM signal received"); + Ok(()) } _ = int_signal.recv() => { - tracing::info!("INT signal received, exiting gracefully"); + tracing::info!("INT signal received"); + Ok(()) } - } + }; + // If we get here either a signal was received or a task ended unexpectedly, + // which means we need to cancel all the remaining tasks. + tracing::info!("Shutdown started, waiting for tasks to finish..."); util::task::tracker::close(); - tracing::info!("Waiting for all tasks to finish..."); // Force exit after a grace period match tokio::time::timeout(config.shutdown_grace_period, util::task::tracker::wait()).await { Ok(_) => { - tracing::info!("All tasks finished successfully"); - Ok(()) + tracing::info!("Shutdown finished successfully") } Err(_) => { - tracing::warn!("Graceful shutdown timed out"); - Err(anyhow::anyhow!("Graceful shutdown timed out")) + tracing::error!("Some tasks failed to finish in time, forcing exit"); } } + + main_result } #[cfg(feature = "tokio-console")] @@ -438,7 +425,7 @@ async fn start_p2p( storage: Storage, config: config::P2PConfig, ) -> anyhow::Result<( - tokio::task::JoinHandle<()>, + tokio::task::JoinHandle>, state::Gossiper, Option, )> { @@ -522,7 +509,7 @@ async fn start_p2p( _: Storage, _: config::P2PConfig, ) -> anyhow::Result<( - tokio::task::JoinHandle<()>, + tokio::task::JoinHandle>, state::Gossiper, Option, )> { @@ -950,3 +937,28 @@ async fn verify_database( Ok(()) } + +fn handle_critical_task_result( + task_name: &str, + task_result: Result, JoinError>, +) -> anyhow::Result<()> { + match task_result { + Ok(task_result) => { + tracing::error!(?task_result, "{} task ended unexpectedly", task_name); + task_result + } + Err(error) if error.is_panic() => { + tracing::error!(%error, "{} task panicked", task_name); + Err(anyhow::anyhow!("{} task panicked", task_name)) + } + // Cancelling all tracked tasks via [`util::task::tracker::close()`] does not cause join + // errors on registered task handles, so this is unexpected and we should threat it as error + Err(_) => { + tracing::error!("{} task was cancelled unexpectedly", task_name); + Err(anyhow::anyhow!( + "{} task was cancelled unexpectedly", + task_name + )) + } + } +} diff --git a/crates/pathfinder/src/p2p_network.rs b/crates/pathfinder/src/p2p_network.rs index fc47e67510..cab8a2b717 100644 --- a/crates/pathfinder/src/p2p_network.rs +++ b/crates/pathfinder/src/p2p_network.rs @@ -13,7 +13,11 @@ mod sync_handlers; use sync_handlers::{get_classes, get_events, get_headers, get_state_diffs, get_transactions}; // Silence clippy -pub type P2PNetworkHandle = (peer_agnostic::Client, HeadRx, tokio::task::JoinHandle<()>); +pub type P2PNetworkHandle = ( + peer_agnostic::Client, + HeadRx, + tokio::task::JoinHandle>, +); pub struct P2PContext { pub cfg: p2p::Config, @@ -102,7 +106,7 @@ pub async fn start(context: P2PContext) -> anyhow::Result { tokio::select! { _ = &mut main_loop_handle => { tracing::error!("p2p task ended unexpectedly"); - break; + anyhow::bail!("p2p task ended unexpectedly"); } Some(event) = p2p_events.recv() => { match handle_p2p_event(event, storage.clone(), &mut tx).await { From 0900358cb52081a90a7cd13d9b58e53f3471c915 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 23 Dec 2024 13:45:23 +0100 Subject: [PATCH 20/25] feat: make sure the last RW db connection pool is dropped as the last ...if shutdown in graceful. --- crates/pathfinder/src/bin/pathfinder/main.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index cc02a56afd..fbc88331b3 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -41,10 +41,13 @@ fn main() -> anyhow::Result<()> { .thread_stack_size(8 * 1024 * 1024) .build() .unwrap() - .block_on(async { async_main().await }) + .block_on(async { + async_main().await?; + Ok(()) + }) } -async fn async_main() -> anyhow::Result<()> { +async fn async_main() -> anyhow::Result { // All of the following code is either sync and blocking or async but called // sequentially down to the point where the monitoring server is spawned. No // tokio tasks are spawned to that point. If an error occurs before that @@ -182,7 +185,6 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst Hint: This is usually caused by exceeding the file descriptor limit of your system. Try increasing the file limit to using `ulimit` or similar tooling.", )?; - // 5 is enough for normal sync operations, and then `available_parallelism` for // the rayon thread pool workers to use. let p2p_storage = storage_manager @@ -193,7 +195,6 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst Hint: This is usually caused by exceeding the file descriptor limit of your system. Try increasing the file limit to using `ulimit` or similar tooling.", )?; - info!(location=?pathfinder_context.database, "Database migrated."); verify_database( &sync_storage, @@ -284,7 +285,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst let sync_handle = if config.is_sync_enabled { start_sync( - sync_storage, + sync_storage.clone(), pathfinder_context, ethereum.client, sync_state.clone(), @@ -349,7 +350,11 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst } } - main_result + // If a RO db connection pool remains after all RW connection pools have been + // dropped, WAL & SHM files are never cleaned up. To avoid this, we make sure + // that all RO pools and all but one RW pools are dropped when task tracker + // finishes waiting, and then we drop the last RW pool. + main_result.map(|_| sync_storage) } #[cfg(feature = "tokio-console")] From 8bed9c2aa8517f2bac4ce4607fd0cdbcd5eea1e1 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 23 Dec 2024 15:32:37 +0100 Subject: [PATCH 21/25] feat: force orderly cancellation even if rpc server or p2p fail to start --- crates/pathfinder/src/bin/pathfinder/main.rs | 25 ++++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index fbc88331b3..9dcc4a0867 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -281,7 +281,16 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst p2p_storage, config.p2p.clone(), ) - .await?; + .await + .unwrap_or_else(|error| { + ( + tokio::task::spawn(std::future::ready( + Err(error.context("P2P failed to start")), + )), + Default::default(), + None, + ) + }); let sync_handle = if config.is_sync_enabled { start_sync( @@ -303,13 +312,19 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst }; let rpc_handle = if config.is_rpc_enabled { - let (rpc_handle, local_addr) = rpc_server + match rpc_server .with_max_connections(config.max_rpc_connections.get()) .spawn() .await - .context("Starting the RPC server")?; - info!("📡 HTTP-RPC server started on: {}", local_addr); - rpc_handle + { + Ok((rpc_handle, on)) => { + info!(%on, "📡 RPC server started"); + rpc_handle + } + Err(error) => tokio::task::spawn(std::future::ready(Err( + error.context("RPC server failed to start") + ))), + } } else { tokio::spawn(std::future::pending()) }; From 13643ee56691b4041dc0c5f230a0802a72d8dcb0 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 23 Dec 2024 15:53:52 +0100 Subject: [PATCH 22/25] fixup: allow interrupting migrations and trie pruning --- crates/pathfinder/src/bin/pathfinder/main.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index 9dcc4a0867..23298e9be2 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -48,14 +48,6 @@ fn main() -> anyhow::Result<()> { } async fn async_main() -> anyhow::Result { - // All of the following code is either sync and blocking or async but called - // sequentially down to the point where the monitoring server is spawned. No - // tokio tasks are spawned to that point. If an error occurs before that - // point, the process will just exit with that error, regardless of whether - // any of the signals were received and no tasks will be left detached. - let mut term_signal = signal(SignalKind::terminate())?; - let mut int_signal = signal(SignalKind::interrupt())?; - if std::env::var_os("RUST_LOG").is_none() { // Disable all dependency logs by default. std::env::set_var("RUST_LOG", "pathfinder=info"); @@ -145,6 +137,7 @@ async fn async_main() -> anyhow::Result { None => None, }) .migrate()?; + let sync_storage = storage_manager // 5 is enough for normal sync operations, and then `available_parallelism` for // the rayon thread pool workers to use. @@ -212,6 +205,12 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst .prune_tries() .context("Pruning tries on startup")?; + // Register signal handlers here, because we want to be able to interrupt long + // running migrations or trie pruning. No tasks are spawned before this point so + // we don't worry about detachment. + let mut term_signal = signal(SignalKind::terminate())?; + let mut int_signal = signal(SignalKind::interrupt())?; + let (tx_pending, rx_pending) = tokio::sync::watch::channel(Default::default()); let rpc_config = pathfinder_rpc::context::RpcConfig { @@ -269,7 +268,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst .context("Starting monitoring task")?; } - // From this point onwards, until the final select, we cannot exit the process + // From this point onwards, until the final select, we don't exit the process // even if some error is encountered or a signal is received as it would result // in tasks being detached and cancelled abruptly without a chance to clean // up. We need to wait for the final select where we can cancel all the tasks From e4ef8e790b969bf6df224225b398bbd16771a694 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Mon, 23 Dec 2024 17:45:22 +0100 Subject: [PATCH 23/25] chore: update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 633e67d6cd..4c638678a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + +- Graceful shutdown upon SIGINT and SIGTERM with a default grace period of 10 seconds, configurable via `--shutdown.grace-period`. + ### Removed - `storage_commitment` and `class_commitment` fields from the `pathfinder_subscribe_newHeads` method response. From 423d84a596a32cc604033b50d9fa703f6b6dc392 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 27 Dec 2024 11:06:23 +0100 Subject: [PATCH 24/25] chore: remove surplus comments from cargo.toml files Co-authored-by: sistemd --- crates/common/Cargo.toml | 1 - crates/compiler/Cargo.toml | 1 - crates/crypto/Cargo.toml | 1 - crates/executor/Cargo.toml | 1 - crates/gateway-client/Cargo.toml | 1 - crates/gateway-test-fixtures/Cargo.toml | 1 - crates/gateway-types/Cargo.toml | 1 - crates/load-test/Cargo.toml | 1 - crates/merkle-tree/Cargo.toml | 1 - crates/p2p/Cargo.toml | 1 - crates/pathfinder/Cargo.toml | 1 - crates/retry/Cargo.toml | 1 - crates/rpc/Cargo.toml | 1 - crates/serde/Cargo.toml | 1 - crates/stark_hash_python/Cargo.toml | 1 - crates/storage/Cargo.toml | 1 - crates/util/Cargo.toml | 1 - 17 files changed, 17 deletions(-) diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 05b4fcd8c0..5b7e9715eb 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] full-serde = [] diff --git a/crates/compiler/Cargo.toml b/crates/compiler/Cargo.toml index f826f63553..f9e63d69fd 100644 --- a/crates/compiler/Cargo.toml +++ b/crates/compiler/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } diff --git a/crates/crypto/Cargo.toml b/crates/crypto/Cargo.toml index e02ea436c2..e4b3f97063 100644 --- a/crates/crypto/Cargo.toml +++ b/crates/crypto/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] name = "pathfinder_crypto" diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml index 9c33a3662b..2984a2131a 100644 --- a/crates/executor/Cargo.toml +++ b/crates/executor/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } diff --git a/crates/gateway-client/Cargo.toml b/crates/gateway-client/Cargo.toml index dc05f9d375..97bbc994fc 100644 --- a/crates/gateway-client/Cargo.toml +++ b/crates/gateway-client/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } diff --git a/crates/gateway-test-fixtures/Cargo.toml b/crates/gateway-test-fixtures/Cargo.toml index ceda03f0fd..52fd52d047 100644 --- a/crates/gateway-test-fixtures/Cargo.toml +++ b/crates/gateway-test-fixtures/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] pathfinder-common = { path = "../common" } diff --git a/crates/gateway-types/Cargo.toml b/crates/gateway-types/Cargo.toml index ec5be8c247..ef62b04423 100644 --- a/crates/gateway-types/Cargo.toml +++ b/crates/gateway-types/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } diff --git a/crates/load-test/Cargo.toml b/crates/load-test/Cargo.toml index ba4983ee14..cd1960624b 100644 --- a/crates/load-test/Cargo.toml +++ b/crates/load-test/Cargo.toml @@ -6,7 +6,6 @@ authors = ["Equilibrium Labs "] edition = "2021" license = "MIT OR Apache-2.0" rust-version = "1.73" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] goose = "0.17.2" diff --git a/crates/merkle-tree/Cargo.toml b/crates/merkle-tree/Cargo.toml index 54daf4e1c6..ef1a42a034 100644 --- a/crates/merkle-tree/Cargo.toml +++ b/crates/merkle-tree/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index e2e530256a..1aa971a202 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -6,7 +6,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index 5b370516ef..c667023444 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] name = "pathfinder_lib" diff --git a/crates/retry/Cargo.toml b/crates/retry/Cargo.toml index 18738fd135..a831396e4e 100644 --- a/crates/retry/Cargo.toml +++ b/crates/retry/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] tokio = { workspace = true } diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index b7dc771ea3..43cc862266 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } diff --git a/crates/serde/Cargo.toml b/crates/serde/Cargo.toml index c3cb703382..5da68f99a4 100644 --- a/crates/serde/Cargo.toml +++ b/crates/serde/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } diff --git a/crates/stark_hash_python/Cargo.toml b/crates/stark_hash_python/Cargo.toml index 01f73ee27c..3b1c9b9ae7 100644 --- a/crates/stark_hash_python/Cargo.toml +++ b/crates/stark_hash_python/Cargo.toml @@ -5,7 +5,6 @@ edition = "2021" license = "MIT OR Apache-2.0" rust-version = "1.73" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] name = "starknet_pathfinder_crypto" crate-type = ["cdylib"] diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 4ce401146c..f494ab25cf 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -5,7 +5,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } diff --git a/crates/util/Cargo.toml b/crates/util/Cargo.toml index 134b7e7661..acb15e3ace 100644 --- a/crates/util/Cargo.toml +++ b/crates/util/Cargo.toml @@ -6,7 +6,6 @@ authors = { workspace = true } edition = { workspace = true } license = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] anyhow = { workspace = true } From 4b8cbd491b9ab97469f88b55c21b756dd19a1469 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 27 Dec 2024 11:37:07 +0100 Subject: [PATCH 25/25] fix: post rebase build error --- crates/rpc/src/pathfinder/methods/get_proof.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/rpc/src/pathfinder/methods/get_proof.rs b/crates/rpc/src/pathfinder/methods/get_proof.rs index f3cbc5029b..3f7ebbac08 100644 --- a/crates/rpc/src/pathfinder/methods/get_proof.rs +++ b/crates/rpc/src/pathfinder/methods/get_proof.rs @@ -198,7 +198,6 @@ impl crate::dto::serialize::SerializeForVersion for GetProofOutput { ) -> Result { let mut serializer = serializer.serialize_struct()?; serializer.serialize_optional("state_commitment", self.state_commitment)?; - serializer.serialize_optional("class_commitment", self.class_commitment)?; serializer.serialize_field("contract_proof", &self.contract_proof)?; serializer.serialize_optional("contract_data", self.contract_data.clone())?; serializer.end() @@ -217,7 +216,6 @@ impl crate::dto::serialize::SerializeForVersion for GetClassProofOutput { serializer: crate::dto::serialize::Serializer, ) -> Result { let mut serializer = serializer.serialize_struct()?; - serializer.serialize_optional("class_commitment", self.class_commitment)?; serializer.serialize_field("class_proof", &self.class_proof)?; serializer.end() }