From 961294c82c2484423653fea1d690f57ec08cf2e8 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Thu, 25 Jul 2024 10:21:57 +0100 Subject: [PATCH] chore(sequencer-relayer)!: minimize resubmissions to Celestia (#1234) ## Summary The relayer has been updated to avoid resubmitting the same data to Celestia on restart or on a timeout of the `BroadcastTx` gRPC. ## Background Currently the relayer will likely resubmit the same data when it restarts, since the majority of the time spent in the submit loop is waiting for confirmation of the `BlobTx` having been stored. While in this state, the on-disk files aren't updated to indicate that we're waiting for confirmation. Hence if we restart, the new session begins by submitting all data from the last-confirmed point, i.e. the same data which was already likely successfully submitted in the final attempt of the previous session. A similar situation happens if we timeout while waiting for the `BroadcastTx` response - the retry loop will attempt to resubmit the same data without first checking if the previous attempt succeeded. ## Changes * Replaced the two state files (presubmit and postsubmit) with a single one (submission-state) and similarly for their respective env vars. See [the updated spec](https://github.com/astriaorg/astria/blob/c110bdbf7d0fd05fba04775d07d046c37bbd7372/specs/sequencer-relayer.md#submission-state-file) for further details. * The `submission` module was heavily changed: there are now two primary state structs `StartedSubmission` and `PreparedSubmission`, between which the blob submitter toggles during normal operation. There is also `FreshSubmission` which is only relevant at startup, and an enum covering these three (`SubmissionStateAtStartup`) which is also only used during startup. Finally, the `State` enum is an ephemeral object only used to read/write the relevant state from/to disk. * `BlobSubmitter::run` was modified to try to confirm the last submission attempt from the previous session if the state file indicated the relayer exited while in `prepared` state. If that submission is confirmed (the most likely outcome), then the sequencer blocks in that submission are simply skipped in the write loop. (We could try to avoid even fetching these sequencer blocks, but that would be a significantly more pervasive change, and is probably not worth the complexity). * The relayer's celestia client was changed to split `try_submit` into `try_prepare` and `try_submit` so that the hash of the prepared `BlobTx` can be returned from `try_prepare` to be recorded in the state file before the transaction is broadcast to the Celestia app. * `submit_with_retry` was updated to check for a broadcast timeout error in the previous attempt, and in that case, attempts to just confirm that submission rather than automatically resubmitting the same data. ## Testing * Unit tests for the new `submission` types. * Black box test `later_height_in_state_leads_to_expected_relay` was updated. * Manually observed expected behaviour against a locally-running sequencer and Celestia app. ## Breaking Changelist * Removed `ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH` and `ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH` config env vars. * Added `ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH` config env var. ## Related Issues Closes #1200. --- Cargo.lock | 11 + charts/sequencer-relayer/Chart.yaml | 2 +- .../files/scripts/start-relayer.sh | 9 +- .../sequencer-relayer/templates/_helpers.tpl | 4 + .../templates/configmaps.yaml | 5 +- charts/sequencer/Chart.lock | 6 +- charts/sequencer/Chart.yaml | 4 +- crates/astria-sequencer-relayer/Cargo.toml | 8 +- .../local.env.example | 22 +- crates/astria-sequencer-relayer/src/config.rs | 6 +- .../src/relayer/builder.rs | 9 +- .../src/relayer/celestia_client/error.rs | 6 + .../src/relayer/celestia_client/mod.rs | 168 ++- .../src/relayer/celestia_client/tests.rs | 18 +- .../src/relayer/mod.rs | 51 +- .../src/relayer/submission.rs | 1266 ++++++++++------- .../src/relayer/write/mod.rs | 277 +++- .../src/sequencer_relayer.rs | 6 +- .../helpers/test_sequencer_relayer.rs | 100 +- specs/sequencer-relayer.md | 82 +- 20 files changed, 1321 insertions(+), 739 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 03634ce8fe..2816c4fc7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -867,6 +867,7 @@ dependencies = [ "hex", "http", "humantime", + "humantime-serde", "hyper", "itertools 0.12.1", "itoa", @@ -3624,6 +3625,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.28" diff --git a/charts/sequencer-relayer/Chart.yaml b/charts/sequencer-relayer/Chart.yaml index bcebe7dc5f..92b190fa92 100644 --- a/charts/sequencer-relayer/Chart.yaml +++ b/charts/sequencer-relayer/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.11.0 +version: 0.11.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/sequencer-relayer/files/scripts/start-relayer.sh b/charts/sequencer-relayer/files/scripts/start-relayer.sh index 9ad150c2d2..746d71022f 100644 --- a/charts/sequencer-relayer/files/scripts/start-relayer.sh +++ b/charts/sequencer-relayer/files/scripts/start-relayer.sh @@ -4,14 +4,19 @@ set -o errexit -o nounset -o pipefail echo "Starting the Astria Sequencer Relayer..." -if ! [ -f "$ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH" ]; then +if ! [ -z ${ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH+x} ] && ! [ -f "$ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH" ]; then echo "Pre-submit storage file not found, instantiating with ignore state. Post submit storage file will be created on first submit." echo "{\"state\": \"ignore\"}" > $ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH fi -if ! [ -f "$ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH" ]; then +if ! [ -z ${ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH+x} ] && ! [ -f "$ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH" ]; then echo "Post-submit storage file does not exist, instantiating with fresh state. Will start relaying from first sequencer block." echo "{\"state\": \"fresh\"}" > $ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH fi +if ! [ -z ${ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH+x} ] && ! [ -f "$ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH" ]; then + echo "Submission state file does not exist, instantiating with fresh state. Will start relaying from first sequencer block." + echo "{\"state\": \"fresh\"}" > $ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH +fi + exec /usr/local/bin/astria-sequencer-relayer diff --git a/charts/sequencer-relayer/templates/_helpers.tpl b/charts/sequencer-relayer/templates/_helpers.tpl index 1185798745..1dccd0a2bb 100644 --- a/charts/sequencer-relayer/templates/_helpers.tpl +++ b/charts/sequencer-relayer/templates/_helpers.tpl @@ -20,3 +20,7 @@ Namepsace to deploy elements into. {{- define "sequencer-relayer.storage.postSubmitPath" -}} {{ include "sequencer-relayer.storage.mountPath" . }}/postsubmit.json {{- end }} + +{{- define "sequencer-relayer.storage.submissionStatePath" -}} +{{ include "sequencer-relayer.storage.mountPath" . }}/submission-state.json +{{- end }} diff --git a/charts/sequencer-relayer/templates/configmaps.yaml b/charts/sequencer-relayer/templates/configmaps.yaml index 50917b74fb..b76f6c4029 100644 --- a/charts/sequencer-relayer/templates/configmaps.yaml +++ b/charts/sequencer-relayer/templates/configmaps.yaml @@ -11,8 +11,6 @@ data: ASTRIA_SEQUENCER_RELAYER_CELESTIA_APP_GRPC_ENDPOINT: "{{ .Values.config.relayer.celestiaAppGrpc }}" ASTRIA_SEQUENCER_RELAYER_CELESTIA_APP_KEY_FILE: "/celestia-key/{{ .Values.config.celestiaAppPrivateKey.secret.filename }}" ASTRIA_SEQUENCER_RELAYER_API_ADDR: "0.0.0.0:{{ .Values.ports.healthAPI }}" - ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH: "{{ include "sequencer-relayer.storage.preSubmitPath" . }}" - ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH: "{{ include "sequencer-relayer.storage.postSubmitPath" . }}" ASTRIA_SEQUENCER_RELAYER_NO_METRICS: "{{ not .Values.config.relayer.metrics.enabled }}" ASTRIA_SEQUENCER_RELAYER_METRICS_HTTP_LISTENER_ADDR: "0.0.0.0:{{ .Values.ports.metrics }}" ASTRIA_SEQUENCER_RELAYER_FORCE_STDOUT: "{{ .Values.global.useTTY }}" @@ -30,7 +28,10 @@ data: ASTRIA_SEQUENCER_RELAYER_SEQUENCER_CHAIN_ID: "{{ .Values.config.relayer.sequencerChainId }}" ASTRIA_SEQUENCER_RELAYER_CELESTIA_CHAIN_ID: "{{ .Values.config.relayer.celestiaChainId }}" {{- if not .Values.global.dev }} + ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH: "{{ include "sequencer-relayer.storage.preSubmitPath" . }}" + ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH: "{{ include "sequencer-relayer.storage.postSubmitPath" . }}" {{- else }} + ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH: "{{ include "sequencer-relayer.storage.submissionStatePath" . }}" {{- end }} --- apiVersion: v1 diff --git a/charts/sequencer/Chart.lock b/charts/sequencer/Chart.lock index 7807d9b493..0e2b3ea35f 100644 --- a/charts/sequencer/Chart.lock +++ b/charts/sequencer/Chart.lock @@ -1,6 +1,6 @@ dependencies: - name: sequencer-relayer repository: file://../sequencer-relayer - version: 0.11.0 -digest: sha256:70434f4e37c36660ff9b89258d4de6770f206712020bda7398a22772e8f74fa8 -generated: "2024-07-19T12:21:51.250339+02:00" + version: 0.11.1 +digest: sha256:9c44f4901c4b89bbf6261f1a92bb18f71aef6d95e536aadc8a4a275a01eec25b +generated: "2024-07-23T20:01:42.179395482+01:00" diff --git a/charts/sequencer/Chart.yaml b/charts/sequencer/Chart.yaml index 9982dc29ca..bb8d4fc527 100644 --- a/charts/sequencer/Chart.yaml +++ b/charts/sequencer/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.19.0 +version: 0.19.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. @@ -24,7 +24,7 @@ appVersion: "0.14.0" dependencies: - name: sequencer-relayer - version: "0.11.0" + version: "0.11.1" repository: "file://../sequencer-relayer" condition: sequencer-relayer.enabled diff --git a/crates/astria-sequencer-relayer/Cargo.toml b/crates/astria-sequencer-relayer/Cargo.toml index 3c4a687f98..c6cf8cedd0 100644 --- a/crates/astria-sequencer-relayer/Cargo.toml +++ b/crates/astria-sequencer-relayer/Cargo.toml @@ -25,6 +25,7 @@ const_format = { workspace = true } futures = { workspace = true } hex = { workspace = true, features = ["serde"] } humantime = { workspace = true } +humantime-serde = "1.1.1" hyper = { workspace = true } itoa = { workspace = true } metrics = { workspace = true } @@ -39,7 +40,12 @@ tendermint-config = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } tryhard = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } +tokio = { workspace = true, features = [ + "fs", + "macros", + "rt-multi-thread", + "signal", +] } tokio-stream = { workspace = true } tokio-util = { workspace = true } tonic = { workspace = true } diff --git a/crates/astria-sequencer-relayer/local.env.example b/crates/astria-sequencer-relayer/local.env.example index 62073e9834..81b5c75d5f 100644 --- a/crates/astria-sequencer-relayer/local.env.example +++ b/crates/astria-sequencer-relayer/local.env.example @@ -60,23 +60,17 @@ ASTRIA_SEQUENCER_RELAYER_ONLY_INCLUDE_ROLLUPS= # The socket address at which sequencer relayer will server healthz, readyz, and status calls. ASTRIA_SEQUENCER_RELAYER_API_ADDR=127.0.0.1:2450 -# The path to which relayer will write its state prior to submitting to Celestia. -# A file must exist at this path, be readable and writable, and contain one of: -# 1. {"state": "ignore"} -# to ignore the pre-submit state entirely and only consider the object stored in -# ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH. -# 2. {"state": "started", "last_submission": } -# which is usually only written by sequencer-relayer during normal operation and -# is checked for consistency with ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH at startup. -ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH=/path/to/presubmit.json - -# The path to which relayer will write its state after submitting to Celestia. +# The path to which relayer will write its state while submitting to Celestia. # A file must exist at this path, be readable and writable, and contain one of: # 1. {"state": "fresh"} # for relaying sequencer blocks starting at sequencer height 1. -# 2. {"state": "submitted", "celestia_height": , "sequencer_height": }} -# for relaying blocks starting at ` + 1`. -ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH=/path/to/postsubmit.json +# 2. {"state":"started","last_submission":{"celestia_height":,"sequencer_height":}} +# for relaying blocks starting at `[last_submission.sequencer_height] + 1`. +# 3. {"state":"prepared","sequencer_height":,"last_submission":{"celestia_height":,"sequencer_height":},"blob_tx_hash":"","at":""} +# for trying to continue from the last submission attempt. Checks if the given blob tx is stored +# on Celestia, and if so, begins relaying blocks starting at `[sequencer_height] + 1`, otherwise +# begins relaying blocks starting at `[last_submission.sequencer_height] + 1`. +ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH=/path/to/submission-state.json # Set to true to enable prometheus metrics. ASTRIA_SEQUENCER_RELAYER_NO_METRICS=true diff --git a/crates/astria-sequencer-relayer/src/config.rs b/crates/astria-sequencer-relayer/src/config.rs index 5d1186b927..b884634e81 100644 --- a/crates/astria-sequencer-relayer/src/config.rs +++ b/crates/astria-sequencer-relayer/src/config.rs @@ -48,10 +48,8 @@ pub struct Config { pub metrics_http_listener_addr: String, /// Writes a human readable format to stdout instead of JSON formatted OTEL trace data. pub pretty_print: bool, - /// The path to which relayer will write its state prior to submitting to Celestia. - pub pre_submit_path: PathBuf, - /// The path to which relayer will write its state after submitting to Celestia. - pub post_submit_path: PathBuf, + /// The path to which relayer will write its state while submitting to Celestia. + pub submission_state_path: PathBuf, } impl Config { diff --git a/crates/astria-sequencer-relayer/src/relayer/builder.rs b/crates/astria-sequencer-relayer/src/relayer/builder.rs index b43b343a4c..6312cd5caa 100644 --- a/crates/astria-sequencer-relayer/src/relayer/builder.rs +++ b/crates/astria-sequencer-relayer/src/relayer/builder.rs @@ -35,8 +35,7 @@ pub(crate) struct Builder { pub(crate) sequencer_poll_period: Duration, pub(crate) sequencer_grpc_endpoint: String, pub(crate) rollup_filter: IncludeRollup, - pub(crate) pre_submit_path: PathBuf, - pub(crate) post_submit_path: PathBuf, + pub(crate) submission_state_path: PathBuf, pub(crate) metrics: &'static Metrics, } @@ -53,8 +52,7 @@ impl Builder { sequencer_poll_period, sequencer_grpc_endpoint, rollup_filter, - pre_submit_path, - post_submit_path, + submission_state_path, metrics, } = self; @@ -93,8 +91,7 @@ impl Builder { celestia_client_builder, rollup_filter, state, - pre_submit_path, - post_submit_path, + submission_state_path, metrics, }) } diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs index b8848d53a0..9309694d31 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs @@ -103,6 +103,12 @@ pub(in crate::relayer) enum TrySubmitError { #[derive(Clone, Debug)] pub(in crate::relayer) struct GrpcResponseError(Status); +impl GrpcResponseError { + pub(in crate::relayer) fn is_timeout(&self) -> bool { + self.0.code() == tonic::Code::Cancelled + } +} + impl Display for GrpcResponseError { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { write!( diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs index ac504dbb8d..2f850c95d4 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs @@ -6,7 +6,14 @@ mod error; mod tests; use std::{ + borrow::Cow, convert::TryInto, + fmt::{ + self, + Debug, + Display, + Formatter, + }, sync::Arc, time::{ Duration, @@ -77,12 +84,27 @@ pub(super) use error::{ ProtobufDecodeError, TrySubmitError, }; +use hex::{ + FromHex, + FromHexError, +}; use prost::{ bytes::Bytes, Message as _, Name as _, }; -use tokio::sync::watch; +use serde::{ + Deserialize, + Deserializer, + Serialize, + Serializer, +}; +use sha2::{ + Digest as _, + Sha256, +}; +use telemetry::display::hex; +use thiserror::Error; use tonic::{ transport::Channel, Response, @@ -90,7 +112,9 @@ use tonic::{ }; use tracing::{ debug, + error, info, + instrument, trace, warn, }; @@ -124,11 +148,11 @@ impl CelestiaClient { /// used to obtain the appropriate fee in the case that the previous attempt failed due to a /// low fee. // Copied from https://github.com/celestiaorg/celestia-app/blob/v1.4.0/x/blob/payforblob.go - pub(super) async fn try_submit( - mut self, + pub(super) async fn try_prepare( + &mut self, blobs: Arc>, - last_error_receiver: watch::Receiver>, - ) -> Result { + maybe_last_error: Option, + ) -> Result { info!("fetching cost params and account info from celestia app"); let (blob_params, auth_params, min_gas_price, base_account) = tokio::try_join!( self.fetch_blob_params(), @@ -153,8 +177,6 @@ impl CelestiaClient { let cost_params = CelestiaCostParams::new(gas_per_blob_byte, tx_size_cost_per_byte, min_gas_price); let gas_limit = estimate_gas(&msg_pay_for_blobs.blob_sizes, cost_params); - // Get the error from the last attempt to `try_submit`. - let maybe_last_error = last_error_receiver.borrow().clone(); let fee = calculate_fee(cost_params, gas_limit, maybe_last_error); let signed_tx = new_signed_tx( @@ -166,20 +188,52 @@ impl CelestiaClient { &self.signing_keys, ); - let blob_tx = new_blob_tx(&signed_tx, blobs.iter()); - info!( gas_limit = gas_limit.0, fee_utia = fee, - "broadcasting blob transaction to celestia app" + "prepared blob transaction for celestia app" ); - let tx_hash = self.broadcast_tx(blob_tx).await?; - info!(tx_hash = %tx_hash.0, "broadcast blob transaction succeeded"); - let height = self.confirm_submission(tx_hash).await; + Ok(new_blob_tx(&signed_tx, blobs.iter())) + } + + pub(super) async fn try_submit( + &mut self, + blob_tx_hash: BlobTxHash, + blob_tx: BlobTx, + ) -> Result { + info!("broadcasting blob transaction to celestia app"); + let hex_encoded_tx_hash = self.broadcast_tx(blob_tx).await?; + if hex_encoded_tx_hash != blob_tx_hash.to_hex() { + // This is not a critical error. Worst case, we restart the process now and try for a + // short while to `GetTx` for this tx using the wrong hash, resulting in a likely + // duplicate submission of this set of blobs. + warn!( + "tx hash `{hex_encoded_tx_hash}` returned from celestia app is not the same as \ + the locally calculated one `{blob_tx_hash}`; submission file has invalid data" + ); + } + info!(tx_hash = %hex_encoded_tx_hash, "broadcast blob transaction succeeded"); + + let height = self.confirm_submission(hex_encoded_tx_hash).await; Ok(height) } + /// Repeatedly sends `GetTx` until a successful response is received or `timeout` duration has + /// elapsed. + /// + /// Returns the height of the Celestia block in which the blobs were submitted, or `None` if + /// timed out. + pub(super) async fn confirm_submission_with_timeout( + &mut self, + blob_tx_hash: &BlobTxHash, + timeout: Duration, + ) -> Option { + tokio::time::timeout(timeout, self.confirm_submission(blob_tx_hash.to_hex())) + .await + .ok() + } + async fn fetch_account(&self) -> Result { let mut auth_query_client = AuthQueryClient::new(self.grpc_channel.clone()); let request = QueryAccountRequest { @@ -245,7 +299,7 @@ impl CelestiaClient { /// [`CometBFT`][cometbft]. /// /// [cometbft]: https://github.com/cometbft/cometbft/blob/b139e139ad9ae6fccb9682aa5c2de4aa952fd055/rpc/openapi/openapi.yaml#L201-L204 - async fn broadcast_tx(&mut self, blob_tx: BlobTx) -> Result { + async fn broadcast_tx(&mut self, blob_tx: BlobTx) -> Result { let request = BroadcastTxRequest { tx_bytes: Bytes::from(blob_tx.encode_to_vec()), mode: i32::from(BroadcastMode::Sync), @@ -256,14 +310,14 @@ impl CelestiaClient { { trace!(?response); } - tx_hash_from_response(response) + lowercase_hex_encoded_tx_hash_from_response(response) } /// Returns `Some(height)` if the tx submission has completed, or `None` if it is still /// pending. - async fn get_tx(&mut self, tx_hash: TxHash) -> Result, TrySubmitError> { + async fn get_tx(&mut self, hex_encoded_tx_hash: String) -> Result, TrySubmitError> { let request = GetTxRequest { - hash: tx_hash.0.clone(), + hash: hex_encoded_tx_hash, }; let response = self.tx_client.get_tx(request).await; // trace-level logging, so using Debug format is ok. @@ -276,7 +330,8 @@ impl CelestiaClient { /// Repeatedly sends `GetTx` until a successful response is received. Returns the height of the /// Celestia block in which the blobs were submitted. - async fn confirm_submission(&mut self, tx_hash: TxHash) -> u64 { + #[instrument(skip_all, fields(hex_encoded_tx_hash))] + async fn confirm_submission(&mut self, hex_encoded_tx_hash: String) -> u64 { // The min seconds to sleep after receiving a GetTx response and sending the next request. const MIN_POLL_INTERVAL_SECS: u64 = 1; // The max seconds to sleep after receiving a GetTx response and sending the next request. @@ -296,7 +351,7 @@ impl CelestiaClient { let reason = maybe_error.map_or(Report::msg("transaction still pending"), Report::new); warn!( %reason, - tx_hash = tx_hash.0, + tx_hash = %hex_encoded_tx_hash, elapsed_seconds = start.elapsed().as_secs_f32(), "waiting to confirm blob submission" ); @@ -306,7 +361,7 @@ impl CelestiaClient { let mut sleep_secs = MIN_POLL_INTERVAL_SECS; loop { tokio::time::sleep(Duration::from_secs(sleep_secs)).await; - match self.get_tx(tx_hash.clone()).await { + match self.get_tx(hex_encoded_tx_hash.clone()).await { Ok(Some(height)) => return height, Ok(None) => { sleep_secs = MIN_POLL_INTERVAL_SECS; @@ -407,11 +462,11 @@ fn min_gas_price_from_response( }) } -/// Extracts the tx hash from the given response. -fn tx_hash_from_response( +/// Extracts the tx hash from the given response and converts to lowercase. +fn lowercase_hex_encoded_tx_hash_from_response( response: Result, Status>, -) -> Result { - let tx_response = response +) -> Result { + let mut tx_response = response .map_err(|status| TrySubmitError::FailedToBroadcastTx(GrpcResponseError::from(status)))? .into_inner() .tx_response @@ -425,7 +480,8 @@ fn tx_hash_from_response( }; return Err(error); } - Ok(TxHash(tx_response.txhash)) + tx_response.txhash.make_ascii_lowercase(); + Ok(tx_response.txhash) } /// Extracts the block height from the given response if available, or `None` if the transaction is @@ -736,6 +792,62 @@ struct Bech32Address(String); #[derive(Copy, Clone, Debug)] struct GasLimit(u64); -/// A hex-encoded transaction hash. -#[derive(Clone, Debug)] -struct TxHash(String); +/// A blob transaction hash. +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] +pub(super) struct BlobTxHash([u8; 32]); + +impl BlobTxHash { + /// Computes the SHA256 digest of the given blob transaction. + pub(super) fn compute(blob_tx: &BlobTx) -> Self { + let sha2 = Sha256::digest(&blob_tx.tx); + Self(sha2.into()) + } + + /// Converts `self` to a hex-encoded string. + pub(super) fn to_hex(self) -> String { + hex::encode(self.0) + } + + #[cfg(test)] + pub(super) const fn from_raw(hash: [u8; 32]) -> Self { + Self(hash) + } +} + +impl Display for BlobTxHash { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + write!(formatter, "{}", hex(&self.0)) + } +} + +impl Debug for BlobTxHash { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + formatter + .debug_tuple("BlobTxHash") + .field(&format_args!("{}", hex(&self.0))) + .finish() + } +} + +impl Serialize for BlobTxHash { + fn serialize(&self, serializer: S) -> Result { + serializer.collect_str(self) + } +} + +impl<'de> Deserialize<'de> for BlobTxHash { + fn deserialize>(deserializer: D) -> Result { + let hex = Cow::<'_, str>::deserialize(deserializer)?; + let raw_hash = <[u8; 32]>::from_hex(hex.as_bytes()).map_err(|error: FromHexError| { + serde::de::Error::custom(DeserializeBlobTxHashError::Hex(error.to_string())) + })?; + Ok(BlobTxHash(raw_hash)) + } +} + +#[derive(Error, Clone, Debug)] +#[non_exhaustive] +pub(in crate::relayer) enum DeserializeBlobTxHashError { + #[error("failed to decode as hex for blob tx hash: {0}")] + Hex(String), +} diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs index 2b85001314..7d59fdf02c 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs @@ -267,14 +267,14 @@ fn tx_hash_from_good_response_should_succeed() { tx_response: Some(tx_response), }); - let extracted_tx_hash = tx_hash_from_response(Ok(response)).unwrap(); - assert_eq!(tx_hash, extracted_tx_hash.0); + let extracted_tx_hash = lowercase_hex_encoded_tx_hash_from_response(Ok(response)).unwrap(); + assert_eq!(tx_hash, extracted_tx_hash); } #[test] fn tx_hash_from_bad_response_should_fail() { // Should return `FailedToBroadcastTx` if outer response is an error. - let error = tx_hash_from_response(Err(Status::internal(""))).unwrap_err(); + let error = lowercase_hex_encoded_tx_hash_from_response(Err(Status::internal(""))).unwrap_err(); // allow: `assert!(matches!(..))` provides poor feedback on failure. #[allow(clippy::manual_assert)] if !matches!(error, TrySubmitError::FailedToBroadcastTx(_)) { @@ -285,7 +285,7 @@ fn tx_hash_from_bad_response_should_fail() { let response = Ok(Response::new(BroadcastTxResponse { tx_response: None, })); - let error = tx_hash_from_response(response).unwrap_err(); + let error = lowercase_hex_encoded_tx_hash_from_response(response).unwrap_err(); // allow: `assert!(matches!(..))` provides poor feedback on failure. #[allow(clippy::manual_assert)] if !matches!(error, TrySubmitError::EmptyBroadcastTxResponse) { @@ -307,7 +307,7 @@ fn tx_hash_from_bad_response_should_fail() { let response = Ok(Response::new(BroadcastTxResponse { tx_response: Some(tx_response), })); - let error = tx_hash_from_response(response).unwrap_err(); + let error = lowercase_hex_encoded_tx_hash_from_response(response).unwrap_err(); match error { TrySubmitError::BroadcastTxResponseErrorCode { tx_hash: received_tx_hash, @@ -513,3 +513,11 @@ fn extract_required_fee_from_log_should_fail() { let bad_value = "insufficient fees; got: 1utia required: 2mutia: insufficient fee".to_string(); assert!(extract_required_fee_from_log(&bad_value).is_none()); } + +#[test] +fn blob_tx_hash_should_round_trip_json() { + let blob_tx_hash = BlobTxHash::from_raw([1; 32]); + let json_encoded = serde_json::to_string(&blob_tx_hash).unwrap(); + let decoded = serde_json::from_str(&json_encoded).unwrap(); + assert_eq!(blob_tx_hash, decoded); +} diff --git a/crates/astria-sequencer-relayer/src/relayer/mod.rs b/crates/astria-sequencer-relayer/src/relayer/mod.rs index 0c261f89d8..882aad521c 100644 --- a/crates/astria-sequencer-relayer/src/relayer/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/mod.rs @@ -1,8 +1,5 @@ use std::{ - path::{ - Path, - PathBuf, - }, + path::PathBuf, sync::Arc, time::Duration, }; @@ -62,6 +59,7 @@ mod write; pub(crate) use builder::Builder; use celestia_client::{ + BlobTxHash, BuilderError, CelestiaClientBuilder, CelestiaKeys, @@ -69,7 +67,11 @@ use celestia_client::{ }; use state::State; pub(crate) use state::StateSnapshot; -use submission::SubmissionState; +use submission::{ + PreparedSubmission, + StartedSubmission, + SubmissionStateAtStartup, +}; use crate::{ metrics::Metrics, @@ -106,8 +108,7 @@ pub(crate) struct Relayer { /// A watch channel to track the state of the relayer. Used by the API service. state: Arc, - pre_submit_path: PathBuf, - post_submit_path: PathBuf, + submission_state_path: PathBuf, metrics: &'static Metrics, } @@ -124,9 +125,9 @@ impl Relayer { /// failed catastrophically (after `u32::MAX` retries). #[instrument(skip_all)] pub(crate) async fn run(self) -> eyre::Result<()> { - let submission_state = read_submission_state(&self.pre_submit_path, &self.post_submit_path) - .await - .wrap_err("failed reading submission state from files")?; + // No need to add `wrap_err` as `new_from_path` already reports the path on error. + let submission_state_at_startup = + SubmissionStateAtStartup::new_from_path(&self.submission_state_path).await?; select!( () = self.relayer_shutdown_token.cancelled() => return Ok(()), @@ -136,7 +137,8 @@ impl Relayer { ) => init_result, )?; - let last_submitted_sequencer_height = submission_state.last_submitted_height(); + let last_completed_sequencer_height = + submission_state_at_startup.last_completed_sequencer_height(); let mut latest_height_stream = { use sequencer_client::StreamLatestHeight as _; @@ -148,7 +150,7 @@ impl Relayer { self.celestia_client_builder.clone(), self.rollup_filter.clone(), self.state.clone(), - submission_state, + submission_state_at_startup, self.submitter_shutdown_token.clone(), self.metrics, ); @@ -156,7 +158,7 @@ impl Relayer { let mut block_stream = read::BlockStream::builder(self.metrics) .block_time(self.sequencer_poll_period) .client(self.sequencer_grpc_client.clone()) - .set_last_fetched_height(last_submitted_sequencer_height) + .set_last_fetched_height(last_completed_sequencer_height) .state(self.state.clone()) .build(); @@ -348,30 +350,11 @@ async fn fetch_sequencer_chain_id( response.map(|status_response| status_response.node_info.network.to_string()) } -async fn read_submission_state, P2: AsRef>( - pre: P1, - post: P2, -) -> eyre::Result { - const LENIENT_CONSISTENCY_CHECK: bool = true; - let pre = pre.as_ref().to_path_buf(); - let post = post.as_ref().to_path_buf(); - crate::utils::flatten( - tokio::task::spawn_blocking(move || { - SubmissionState::from_paths::(pre, post) - }) - .await, - ) - .wrap_err( - "failed reading submission state from the configured pre- and post-submit files. Refer to \ - the values documented in `local.env.example` of the astria-sequencer-relayer service", - ) -} - fn spawn_submitter( client_builder: CelestiaClientBuilder, rollup_filter: IncludeRollup, state: Arc, - submission_state: SubmissionState, + submission_state_at_startup: SubmissionStateAtStartup, submitter_shutdown_token: CancellationToken, metrics: &'static Metrics, ) -> ( @@ -382,7 +365,7 @@ fn spawn_submitter( client_builder, rollup_filter, state, - submission_state, + submission_state_at_startup, submitter_shutdown_token, metrics, ); diff --git a/crates/astria-sequencer-relayer/src/relayer/submission.rs b/crates/astria-sequencer-relayer/src/relayer/submission.rs index 71654a8501..075e8d4b5b 100644 --- a/crates/astria-sequencer-relayer/src/relayer/submission.rs +++ b/crates/astria-sequencer-relayer/src/relayer/submission.rs @@ -1,366 +1,456 @@ //! Tracks the current submission state of sequencer-relayer and syncs it to disk. -use std::path::{ - Path, - PathBuf, +use std::{ + fmt::{ + self, + Display, + Formatter, + }, + path::{ + Path, + PathBuf, + }, + time::{ + Duration, + SystemTime, + }, }; use astria_eyre::eyre::{ self, - bail, ensure, WrapErr as _, }; -use sequencer_client::tendermint::block::Height as SequencerHeight; use serde::{ Deserialize, Serialize, }; -use tracing::{ - debug, - warn, -}; +use tendermint::block::Height as SequencerHeight; +use tracing::debug; + +use super::BlobTxHash; + +/// Represents a submission made to Celestia which has been confirmed as stored via a successful +/// `GetTx` call. +#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub(super) struct CompletedSubmission { + /// The height of the Celestia block in which the submission was stored. + celestia_height: u64, + /// The highest sequencer block height contained in the submission. + #[serde(with = "as_number")] + sequencer_height: SequencerHeight, +} + +impl CompletedSubmission { + fn new(celestia_height: u64, sequencer_height: SequencerHeight) -> Self { + Self { + celestia_height, + sequencer_height, + } + } +} + +/// Newtype wrapper for the file path of the submission state. +#[derive(Clone, Debug)] +struct StateFilePath(PathBuf); -#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "state")] -enum PostSubmission { +/// Newtype wrapper for the file path of the temp file used when writing submission state to disk. +#[derive(Clone, Debug)] +struct TempFilePath(PathBuf); + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case", tag = "state")] +enum State { + /// Indicates the first run of the sequencer, i.e. no previous session occurred. Fresh, - Submitted { - celestia_height: u64, + /// Indicates that we have started to prepare a new submission. Preparation involves fetching + /// information from the Celestia app (nonce, prices, etc.), and using that to create a signed + /// blob transaction. + Started { + last_submission: CompletedSubmission, + }, + /// Indicates that preparation of a signed blob transaction has happened, and we are now in the + /// process of submitting the transaction (sending a `broadcast_tx` gRPC) and confirming its + /// submission (polling via `get_tx` gRPCs). + Prepared { #[serde(with = "as_number")] sequencer_height: SequencerHeight, + last_submission: CompletedSubmission, + blob_tx_hash: BlobTxHash, + #[serde(with = "humantime_serde")] + at: SystemTime, }, } -impl PostSubmission { - fn is_fresh(&self) -> bool { - matches!(self, PostSubmission::Fresh) +impl State { + fn new_started(last_submission: CompletedSubmission) -> Self { + Self::Started { + last_submission, + } } - fn is_submitted(&self) -> bool { - matches!(self, PostSubmission::Submitted { .. }) + fn new_prepared( + sequencer_height: SequencerHeight, + last_submission: CompletedSubmission, + blob_tx_hash: BlobTxHash, + at: SystemTime, + ) -> Self { + Self::Prepared { + sequencer_height, + last_submission, + blob_tx_hash, + at, + } } - fn from_path>(path: P) -> eyre::Result { - let file = std::fs::File::open(&path) - .wrap_err("failed opening provided file path for reading post-submission state")?; - let state = serde_json::from_reader(file) - .wrap_err("failed reading contents of post-submission file")?; + /// Constructs an instance of `State` by parsing from `source`: a JSON-encoded file. + async fn read(source: &StateFilePath) -> eyre::Result { + let contents = tokio::fs::read_to_string(&source.0) + .await + .wrap_err_with(|| { + format!( + "failed reading submission state file at `{}`", + source.0.display() + ) + })?; + let state: State = serde_json::from_str(&contents) + .wrap_err_with(|| format!("failed parsing the contents of `{}`", source.0.display()))?; + + // Ensure the parsed values are sane. + match &state { + State::Fresh + | State::Started { + .. + } => {} + State::Prepared { + sequencer_height, + last_submission, + .. + } => ensure!( + *sequencer_height > last_submission.sequencer_height, + "submission state file `{}` invalid: current sequencer height \ + ({sequencer_height}) should be greater than last successful submission sequencer \ + height ({})", + source.0.display(), + last_submission.sequencer_height + ), + } + Ok(state) } - fn write_to_path>(&self, path: P) -> eyre::Result<()> { - let f = std::fs::File::options() - .write(true) - .truncate(true) - .open(path) - .wrap_err("failed opening file for writing state")?; - serde_json::to_writer(&f, self).wrap_err("failed writing json-serialized state to file")?; - f.sync_all() - .wrap_err("failed fully syncing state write to disk")?; - Ok(()) + /// Writes JSON-encoded `self` to `temp_file`, then renames `temp_file` to `destination`. + async fn write( + &self, + destination: &StateFilePath, + temp_file: &TempFilePath, + ) -> eyre::Result<()> { + let contents = + serde_json::to_string_pretty(self).wrap_err("failed json-encoding submission state")?; + tokio::fs::write(&temp_file.0, &contents) + .await + .wrap_err_with(|| { + format!( + "failed writing submission state to `{}`", + temp_file.0.display() + ) + })?; + tokio::fs::rename(&temp_file.0, &destination.0) + .await + .wrap_err_with(|| { + format!( + "failed moving `{}` to `{}`", + temp_file.0.display(), + destination.0.display() + ) + }) } } -#[derive(Clone, Copy, Debug, Deserialize, Serialize)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "state")] -enum PreSubmission { - Ignore, - Started { - #[serde(with = "as_number")] - sequencer_height: SequencerHeight, - last_submission: PostSubmission, - }, -} -impl PreSubmission { - fn from_path>(path: P) -> eyre::Result { - let file = std::fs::File::open(&path) - .wrap_err("failed opening provided file path for reading pre-submission state")?; - let state = serde_json::from_reader(file) - .wrap_err("failed reading contents of pre-submission file")?; - Ok(state) +impl Display for State { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + if formatter.alternate() { + write!(formatter, "{}", serde_json::to_string_pretty(self).unwrap()) + } else { + write!(formatter, "{}", serde_json::to_string(self).unwrap()) + } } +} - fn write_to_path>(&self, path: P) -> eyre::Result<()> { - let f = std::fs::File::options() - .write(true) - .truncate(true) - .open(path) - .wrap_err("failed opening file for writing state")?; - serde_json::to_writer(&f, self).wrap_err("failed writing json-serialized state to file")?; - f.sync_all() - .wrap_err("failed fully syncing state write to disk")?; - Ok(()) +/// State indicating the relayer has not performed any submissions previously. +#[derive(Debug)] +pub(super) struct FreshSubmission { + state_file_path: StateFilePath, + temp_file_path: TempFilePath, +} + +impl FreshSubmission { + /// Converts `self` into a `StartedSubmission` with last submission being given celestia and + /// sequencer heights of 0. + /// + /// The new state is not written to disk, as it is not required - a restart will perform the + /// correct operation if the state on disk is left as `fresh`. + pub(super) fn into_started(self) -> StartedSubmission { + let last_submission = CompletedSubmission::new(0, SequencerHeight::from(0_u8)); + StartedSubmission { + last_submission, + state_file_path: self.state_file_path, + temp_file_path: self.temp_file_path, + } } } +/// State indicating the relayer has started to prepare a new submission. #[derive(Clone, Debug)] -pub(super) struct SubmissionState { - pre: PreSubmission, - post: PostSubmission, - pre_path: PathBuf, - post_path: PathBuf, +pub(super) struct StartedSubmission { + last_submission: CompletedSubmission, + state_file_path: StateFilePath, + temp_file_path: TempFilePath, } -#[derive(Debug)] -pub(super) struct Started(SubmissionState); - -impl Started { - pub(super) fn finalize(self, celestia_height: u64) -> eyre::Result { - let Self(SubmissionState { - pre, - pre_path, - post_path, - .. - }) = self; - - let PreSubmission::Started { - sequencer_height, .. - } = pre - else { - panic!( - "once initialized, a submission's `pre` field must always be `Started`. Here it \ - is not. This is a bug" - ); - }; +impl StartedSubmission { + /// Constructs a new `StartedSubmission` and writes the state to disk. + async fn construct_and_write( + last_submission: CompletedSubmission, + state_file_path: StateFilePath, + temp_file_path: TempFilePath, + ) -> eyre::Result { + let state = State::new_started(last_submission); + debug!(%state, "writing submission started state to file"); + state + .write(&state_file_path, &temp_file_path) + .await + .wrap_err("failed commiting submission started state to disk")?; + Ok(Self { + last_submission, + state_file_path, + temp_file_path, + }) + } - let new = SubmissionState { - pre, - post: PostSubmission::Submitted { - celestia_height, - sequencer_height, - }, - pre_path, - post_path, - }; + /// Returns the celestia block height from the last completed submission. + pub(super) fn last_submission_celestia_height(&self) -> u64 { + self.last_submission.celestia_height + } - debug!( - state = serde_json::to_string(&new.post).expect("type contains no non-ascii keys"), - "finalizing submission by writing post-submit state to file", - ); - new.post - .write_to_path(&new.post_path) - .wrap_err("failed committing post-submission state to disk")?; - Ok(new) + /// Returns the sequencer block height from the last completed submission. + pub(super) fn last_submission_sequencer_height(&self) -> SequencerHeight { + self.last_submission.sequencer_height + } + + /// Converts `self` into a `PreparedSubmission` and writes the new state to disk. + pub(super) async fn into_prepared( + self, + new_sequencer_height: SequencerHeight, + blob_tx_hash: BlobTxHash, + ) -> eyre::Result { + PreparedSubmission::construct_and_write( + new_sequencer_height, + self.last_submission, + blob_tx_hash, + self.state_file_path, + self.temp_file_path, + ) + .await } } -impl SubmissionState { - pub(super) fn last_submitted_height(&self) -> Option { - match self.post { - PostSubmission::Fresh => None, - PostSubmission::Submitted { - sequencer_height, .. - } => Some(sequencer_height), - } +impl Display for StartedSubmission { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + write!( + formatter, + "file: {}, {}", + self.state_file_path.0.display(), + State::new_started(self.last_submission) + ) } +} - pub(super) fn initialize(self, sequencer_height: SequencerHeight) -> eyre::Result { - if let PostSubmission::Submitted { - sequencer_height: latest_submitted, - .. - } = self.post - { - ensure!( - sequencer_height > latest_submitted, - "refusing to submit a sequencer block at heights below or at what was already \ - submitted" - ); - } - let new = Self { - pre: PreSubmission::Started { - sequencer_height, - last_submission: self.post, - }, - ..self - }; - debug!( - state = serde_json::to_string(&new.pre).expect("type contains no non-ascii keys"), - "initializing submission by writing pre-submit state to file", +/// State indicating the relayer has prepared a new submission and is about to broadcast it to the +/// Celestia app. +#[derive(Clone, Debug)] +pub(super) struct PreparedSubmission { + sequencer_height: SequencerHeight, + last_submission: CompletedSubmission, + blob_tx_hash: BlobTxHash, + created_at: SystemTime, + state_file_path: StateFilePath, + temp_file_path: TempFilePath, +} + +impl PreparedSubmission { + /// Constructs a new `PreparedSubmission` and writes the state to disk. + async fn construct_and_write( + sequencer_height: SequencerHeight, + last_submission: CompletedSubmission, + blob_tx_hash: BlobTxHash, + state_file_path: StateFilePath, + temp_file_path: TempFilePath, + ) -> eyre::Result { + ensure!( + sequencer_height > last_submission.sequencer_height, + "cannot submit a sequencer block at height below or equal to what was already \ + successfully submitted" ); - new.pre - .write_to_path(&new.pre_path) - .wrap_err("failed commiting pre-submission state to disk")?; - Ok(Started(new)) + let created_at = SystemTime::now(); + let state = + State::new_prepared(sequencer_height, last_submission, blob_tx_hash, created_at); + state + .write(&state_file_path, &temp_file_path) + .await + .wrap_err("failed commiting submission prepared state to disk")?; + Ok(Self { + sequencer_height, + last_submission, + blob_tx_hash, + created_at, + state_file_path, + temp_file_path, + }) } - pub(super) fn from_paths, P2: AsRef>( - pre_path: P1, - post_path: P2, - ) -> eyre::Result { - let pre_path = pre_path.as_ref().to_path_buf(); - let post_path = post_path.as_ref().to_path_buf(); - - let mut pre = PreSubmission::from_path(&pre_path).wrap_err( - "failed constructing post submit state from provided path; if the post-submit state is - otherwise present and correctly formatted - and contains the correct information \ - about the - last submitted sequencer height and its inclusion height on Celestia - then \ - override the - contents of the pre-submission file with `{{\"state\": \"ignore\"}}`", - )?; - let post = PostSubmission::from_path(&post_path).wrap_err( - "failed constructing post submit state from provided path; if the pre-submit state is \ - otherwise present and correctly formatted then this indicates a file corruption. \ - Because the post-submit state is critical for determining the starting height for \ - relaying sequencer blocks, make sure it's set correctly.", - )?; - - let state = match (pre, post) { - (PreSubmission::Ignore, post) => Self { - pre, - post, - pre_path, - post_path, - }, + /// Returns the transaction hash of the prepared `BlobTx`. + pub(super) fn blob_tx_hash(&self) -> &BlobTxHash { + &self.blob_tx_hash + } - ( - PreSubmission::Started { - sequencer_height, - last_submission, - }, - post, - ) => { - if let Err(error) = ensure_consistent(sequencer_height, last_submission, post) { - if LENIENT { - warn!(%error, "pre- and post-submission states were inconsistent. Setting pre-state to `ignore` and continuing from last post-state. This could to double submission!!"); - pre = PreSubmission::Ignore; - } else { - return Err(error).wrap_err("on-disk states are inconsistent"); - } - } - Self { - pre, - post, - pre_path, - post_path, - } - } - }; + /// Returns the maximum duration for which the Celestia app should be polled with `GetTx` + /// requests to confirm successful storage of the associated `BlobTx`. + /// + /// This is at least 15 seconds, but up to a maximum of a minute from when the submission was + /// first attempted. + pub(super) fn confirmation_timeout(&self) -> Duration { + std::cmp::max( + Duration::from_secs(15), + Duration::from_secs(60).saturating_sub(self.created_at.elapsed().unwrap_or_default()), + ) + } - // testing if the states can be written to the provided paths - state.pre.write_to_path(&state.pre_path).wrap_err_with(|| { - format!( - "failed writing just-read pre-submission state to disk; is the file writable? \ - Write destination: {}", - state.pre_path.display(), - ) - })?; - state - .post - .write_to_path(&state.post_path) - .wrap_err_with(|| { - format!( - "failed writing just-read post-submission state to disk; is the file \ - writable? Write destination: {}", - state.post_path.display(), - ) - })?; + /// Converts `self` into a `StartedSubmission` with last submission being recorded using the + /// provided celestia height and the sequencer height from `self`. Writes the new state to disk. + pub(super) async fn into_started( + self, + celestia_height: u64, + ) -> eyre::Result { + let last_submission = CompletedSubmission::new(celestia_height, self.sequencer_height); + StartedSubmission::construct_and_write( + last_submission, + self.state_file_path, + self.temp_file_path, + ) + .await + } - Ok(state) + /// Reverts `self` into a `StartedSubmission` retaining the last submission from `self` as the + /// last submission. Writes the new state to disk. + pub(super) async fn revert(self) -> eyre::Result { + StartedSubmission::construct_and_write( + self.last_submission, + self.state_file_path, + self.temp_file_path, + ) + .await } } -fn ensure_consistent( - sequencer_height_started: SequencerHeight, - last_submission: PostSubmission, - current_submission: PostSubmission, -) -> eyre::Result<()> { - ensure_height_pre_submission_is_height_post_submission( - sequencer_height_started, - current_submission, - )?; - ensure_last_and_current_are_different(last_submission, current_submission)?; - ensure_last_is_not_submitted_while_current_is_fresh(last_submission, current_submission)?; - ensure_height_in_last_is_less_than_height_in_current(last_submission, current_submission)?; - Ok(()) +impl Display for PreparedSubmission { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + write!( + formatter, + "file: {}, {}", + self.state_file_path.0.display(), + State::new_prepared( + self.sequencer_height, + self.last_submission, + self.blob_tx_hash, + self.created_at + ) + ) + } } -fn ensure_height_pre_submission_is_height_post_submission( - sequencer_height_started: SequencerHeight, - current_submission: PostSubmission, -) -> eyre::Result<()> { - let PostSubmission::Submitted { - sequencer_height, .. - } = current_submission - else { - bail!( - "the pre-submit file indicated that a new submission was started, but the post-submit \ - file still contained a \"fresh\" state. This indicates that the submission was not \ - finalized." - ); - }; - ensure!( - sequencer_height_started == sequencer_height, - "the initialized `sequencer_height` in the pre-submit file does not match the sequencer \ - height in the post-submit file. This indicates that a new submission to Celestia was \ - started but not finalized. This is because a successful submission records the very same \ - `sequencer_height` in the post-submit file." - ); - Ok(()) +#[derive(Debug)] +pub(super) enum SubmissionStateAtStartup { + Fresh(FreshSubmission), + Started(StartedSubmission), + Prepared(PreparedSubmission), } -fn ensure_last_and_current_are_different( - last_submission: PostSubmission, - current_submission: PostSubmission, -) -> eyre::Result<()> { - ensure!( - last_submission != current_submission, - "the `last_submission` field of the pre-submit file matches the object found in the \ - post-submit file. This indicates that a new submission to Celestia was started but not \ - finalized. This is because when starting a new submission the object in the post-submit \ - file is written to `last_submission`." - ); - Ok(()) -} +impl SubmissionStateAtStartup { + /// Constructs a new `SubmissionStateAtStartup` by reading from the given `source`. + /// + /// `source` should be a JSON-encoded `State`, and should be writable. + pub(super) async fn new_from_path>(source: P) -> eyre::Result { + let file_path = source.as_ref(); + let state_file_path = StateFilePath(file_path.to_path_buf()); + let state = State::read(&state_file_path).await?; + let temp_file_path = match file_path.extension().and_then(|extn| extn.to_str()) { + Some(extn) => TempFilePath(file_path.with_extension(format!("{extn}.tmp"))), + None => TempFilePath(file_path.with_extension("tmp")), + }; -fn ensure_last_is_not_submitted_while_current_is_fresh( - last_submission: PostSubmission, - current_submission: PostSubmission, -) -> eyre::Result<()> { - ensure!( - !(last_submission.is_submitted() && current_submission.is_fresh()), - "the submission recorded in the post-submit file cannot be `fresh` while \ - `last_submission` in the pre-submit file is `submitted`", - ); - Ok(()) -} + // Ensure the state can be written. + state + .write(&state_file_path, &temp_file_path) + .await + .wrap_err_with(|| { + format!( + "failed writing just-read submission state to disk at `{}`", + state_file_path.0.display() + ) + })?; -fn ensure_height_in_last_is_less_than_height_in_current( - last_submission: PostSubmission, - current_submission: PostSubmission, -) -> eyre::Result<()> { - let PostSubmission::Submitted { - sequencer_height: height_in_last, - .. - } = last_submission - else { - return Ok(()); - }; - let PostSubmission::Submitted { - sequencer_height: height_in_current, - .. - } = current_submission - else { - return Ok(()); - }; - ensure!( - height_in_last < height_in_current, - "the `sequencer_height` in the post-submit file is not greater than the \ - `sequencer_height` stored in the `last_submission` field of the pre-submit file. - This indicates that a new submission was started not but finalized." - ); - Ok(()) + match state { + State::Fresh => Ok(Self::Fresh(FreshSubmission { + state_file_path, + temp_file_path, + })), + State::Started { + last_submission, + } => Ok(Self::Started(StartedSubmission { + last_submission, + state_file_path, + temp_file_path, + })), + State::Prepared { + sequencer_height, + last_submission, + blob_tx_hash, + at, + } => Ok(Self::Prepared(PreparedSubmission { + sequencer_height, + last_submission, + blob_tx_hash, + created_at: at, + state_file_path, + temp_file_path, + })), + } + } + + /// Returns the sequencer height of the last completed submission, or `None` if the state is + /// `Fresh`. + pub(super) fn last_completed_sequencer_height(&self) -> Option { + match &self { + SubmissionStateAtStartup::Fresh { + .. + } => None, + SubmissionStateAtStartup::Started(StartedSubmission { + last_submission, .. + }) + | SubmissionStateAtStartup::Prepared(PreparedSubmission { + last_submission, .. + }) => Some(last_submission.sequencer_height), + } + } } mod as_number { //! Logic to serialize sequencer heights as number, deserialize numbers as sequencer heights. //! //! This is unfortunately necessary because the [`serde::Serialize`], [`serde::Deserialize`] - //! implementations for [`sequencer_client::tendermint::block::Height`] write the integer as - //! string, probably due to tendermint's/cometbft's go-legacy. + //! implementations for [`tendermint::block::Height`] write the integer as a string, probably + //! due to tendermint's/cometbft's go-legacy. use serde::{ Deserialize as _, Deserializer, @@ -368,6 +458,7 @@ mod as_number { }; use super::SequencerHeight; + // Allow: the function signature is dictated by the serde(with) attribute. #[allow(clippy::trivially_copy_pass_by_ref)] pub(super) fn serialize(height: &SequencerHeight, serializer: S) -> Result @@ -381,239 +472,440 @@ mod as_number { where D: Deserializer<'de>, { - use serde::de::Error; let height = u64::deserialize(deserializer)?; - SequencerHeight::try_from(height).map_err(Error::custom) + SequencerHeight::try_from(height).map_err(serde::de::Error::custom) } } #[cfg(test)] mod tests { + use std::time::Duration; + use serde_json::json; use tempfile::NamedTempFile; - use super::SubmissionState; - use crate::relayer::submission::PostSubmission; + use super::*; - const STRICT_CONSISTENCY_CHECK: bool = false; + const CELESTIA_HEIGHT: u64 = 1234; + const SEQUENCER_HEIGHT_LOW: u32 = 111; + const SEQUENCER_HEIGHT_HIGH: u32 = 222; + const BLOB_TX_HASH_STR: &str = + "0909090909090909090909090909090909090909090909090909090909090909"; + const BLOB_TX_HASH: BlobTxHash = BlobTxHash::from_raw([9; 32]); + const AT_STR: &str = "2024-06-24T22:22:22.222222222Z"; + const AT_DURATION_SINCE_EPOCH: Duration = Duration::from_nanos(1_719_267_742_222_222_222); #[track_caller] - fn create_files() -> (NamedTempFile, NamedTempFile) { - let pre = NamedTempFile::new() - .expect("must be able to create an empty pre submit state file to run tests"); - let post = NamedTempFile::new() - .expect("must be able to create an empty post submit state file to run tests"); - (pre, post) + fn write(val: &serde_json::Value) -> NamedTempFile { + let file = NamedTempFile::new().unwrap(); + serde_json::to_writer(&file, val).unwrap(); + file } - fn write(f: &NamedTempFile, val: &serde_json::Value) { - serde_json::to_writer(f, val).expect("must be able to write state to run tests"); + fn write_fresh_state() -> NamedTempFile { + write(&json!({ "state": "fresh" })) } - #[test] - fn fresh_with_ignored_is_ok() { - let (pre, post) = create_files(); - write(&pre, &json!({ "state": "ignore" })); - write(&post, &json!({ "state": "fresh" })); - SubmissionState::from_paths::(pre.path(), post.path()) - .expect("states `ignore` and `fresh` give a working submission state"); + fn write_started_state() -> NamedTempFile { + write(&json!({ + "state": "started", + "last_submission": { + "celestia_height": CELESTIA_HEIGHT, + "sequencer_height": SEQUENCER_HEIGHT_LOW + } + })) } - #[test] - fn submitted_with_ignored_is_ok() { - let (pre, post) = create_files(); - write(&pre, &json!({ "state": "ignore" })); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - SubmissionState::from_paths::(pre.path(), post.path()) - .expect("states `ignore` and `submitted` give a working submission state"); + fn write_prepared_state() -> NamedTempFile { + write(&json!({ + "state": "prepared", + "sequencer_height": SEQUENCER_HEIGHT_HIGH, + "last_submission": { + "celestia_height": CELESTIA_HEIGHT, + "sequencer_height": SEQUENCER_HEIGHT_LOW + }, + "blob_tx_hash": BLOB_TX_HASH_STR, + "at": AT_STR + })) } - #[test] - fn started_with_same_fresh_in_last_and_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 5, "last_submission": { "state": "fresh"} }), - ); - write(&post, &json!({ "state": "fresh" })); - let _ = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect_err("started state with `fresh` in last and current gives error"); + #[tokio::test] + async fn should_read_fresh_state() { + let file = write_fresh_state(); + let parsed = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap(); + match parsed { + State::Fresh => (), + _ => panic!("expected fresh state, got:\n{parsed:#}"), + } } - #[test] - fn started_with_height_before_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 5, "last_submission": { "state": "fresh"} }), - ); - write( - &post, - &json!({ "state": "submitted", "sequencer_height": 6, "celestia_height": 2 }), - ); - let _ = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect_err( - "started state with sequencer height less then sequencer height recorded \ - submitted gives error", + #[tokio::test] + async fn should_read_started_state() { + let file = write_started_state(); + let parsed = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap(); + match parsed { + State::Started { + last_submission, + } => { + let expected_submission = CompletedSubmission::new( + CELESTIA_HEIGHT, + SequencerHeight::from(SEQUENCER_HEIGHT_LOW), ); + assert_eq!(last_submission, expected_submission); + } + _ => panic!("expected started state, got:\n{parsed:#}"), + } } - #[test] - fn started_with_same_submitted_in_last_and_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "submitted", "celestia_height": 5, "sequencer_height": 2} }), - ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - let _ = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect_err( - "started state with the same `submitted` in last and current give an error", + #[tokio::test] + async fn should_read_prepared_state() { + let file = write_prepared_state(); + let parsed = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap(); + match parsed { + State::Prepared { + sequencer_height, + last_submission, + blob_tx_hash, + at, + } => { + assert_eq!( + sequencer_height, + SequencerHeight::from(SEQUENCER_HEIGHT_HIGH) + ); + let expected_submission = CompletedSubmission::new( + CELESTIA_HEIGHT, + SequencerHeight::from(SEQUENCER_HEIGHT_LOW), ); + assert_eq!(last_submission, expected_submission); + assert_eq!(blob_tx_hash, BLOB_TX_HASH); + assert_eq!(at, SystemTime::UNIX_EPOCH + AT_DURATION_SINCE_EPOCH); + } + _ => panic!("expected prepared state, got:\n{parsed:?}"), + } } - #[test] - fn started_with_different_last_fresh_and_current_submitted_is_ok() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "fresh" }}), - ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - let _ = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect( - "started state with the `fresh` in last and `submitted` in current gives \ - working submission state", - ); + #[tokio::test] + async fn should_fail_to_read_missing_state_file() { + let bad_path = "bad path"; + let error = State::read(&StateFilePath(Path::new(bad_path).to_path_buf())) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains(bad_path)); + assert!(full_error.contains("failed reading submission state file")); } - #[test] - fn submit_initialize_finalize_flow_works() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "fresh" }}), - ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - let state = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect( - "started state with the `fresh` in last and `submitted` in current gives \ - working submission state", - ); - let started = state.initialize(3u32.into()).unwrap(); - let finalized = started.finalize(6).unwrap(); - let PostSubmission::Submitted { - celestia_height, + #[tokio::test] + async fn should_fail_to_read_invalid_state_file() { + let file = write(&json!({ "state": "invalid" })); + let error = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains(&file.path().display().to_string())); + assert!(full_error.contains("failed parsing the contents")); + } + + #[tokio::test] + async fn should_fail_to_read_state_file_with_broken_invariant() { + // The current sequencer height must be greater than the last submission's. + let file = write(&json!({ + "state": "prepared", + "sequencer_height": SEQUENCER_HEIGHT_HIGH, + "last_submission": { + "celestia_height": CELESTIA_HEIGHT, + "sequencer_height": SEQUENCER_HEIGHT_HIGH + }, + "blob_tx_hash": BLOB_TX_HASH_STR, + "at": AT_STR + })); + let error = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains(&file.path().display().to_string())); + assert!(full_error.contains("should be greater than last successful submission sequencer")); + } + + async fn should_write_state(state: State) { + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + state.write(&destination, &temp_file).await.unwrap(); + + let parsed_state = State::read(&destination).await.unwrap(); + assert_eq!(state, parsed_state); + assert!(!temp_file.0.exists()); + } + + #[tokio::test] + async fn should_write_fresh_state() { + should_write_state(State::Fresh).await; + } + + #[tokio::test] + async fn should_write_started_state() { + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + should_write_state(State::new_started(last_submission)).await; + } + + #[tokio::test] + async fn should_write_prepared_state() { + let sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_HIGH); + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let at = SystemTime::UNIX_EPOCH + AT_DURATION_SINCE_EPOCH; + let state = State::new_prepared(sequencer_height, last_submission, BLOB_TX_HASH, at); + should_write_state(state).await; + } + + #[tokio::test] + async fn started_submission_should_transition_to_prepared() { + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + let started_submission = StartedSubmission { + last_submission, + state_file_path: destination.clone(), + temp_file_path: temp_file.clone(), + }; + + // Transition to prepared. + let new_sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_HIGH); + let prepared_submission = started_submission + .into_prepared(new_sequencer_height, BLOB_TX_HASH) + .await + .unwrap(); + assert_eq!(prepared_submission.sequencer_height, new_sequencer_height); + assert_eq!(prepared_submission.last_submission, last_submission); + assert_eq!(prepared_submission.blob_tx_hash, BLOB_TX_HASH); + assert_eq!(prepared_submission.state_file_path.0, destination.0); + assert_eq!(prepared_submission.temp_file_path.0, temp_file.0); + + // Ensure the new state was written to disk. + let parsed_state = State::read(&destination).await.unwrap(); + match parsed_state { + State::Prepared { + .. + } => (), + _ => panic!("expected prepared state, got:\n{parsed_state:?}"), + } + } + + #[tokio::test] + async fn started_submission_should_not_transition_with_broken_invariant() { + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + let started_submission = StartedSubmission { + last_submission, + state_file_path: destination.clone(), + temp_file_path: temp_file.clone(), + }; + + // Try to transition to prepared - should fail as new sequencer height == last sequencer + // height. + let new_sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_LOW); + let error = started_submission + .into_prepared(new_sequencer_height, BLOB_TX_HASH) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains("cannot submit a sequencer block at height below or")); + + // Ensure the new state was not written to disk. + let error = State::read(&destination).await.unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains("failed reading submission state file")); + } + + #[tokio::test] + async fn prepared_submission_should_transition_to_started() { + let sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_HIGH); + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let created_at = SystemTime::UNIX_EPOCH + AT_DURATION_SINCE_EPOCH; + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + let prepared_submission = PreparedSubmission { sequencer_height, - } = finalized.post - else { - panic!("the post submission state should be `submitted`"); + last_submission, + blob_tx_hash: BLOB_TX_HASH, + created_at, + state_file_path: destination.clone(), + temp_file_path: temp_file.clone(), }; - assert_eq!(celestia_height, 6); - assert_eq!(sequencer_height.value(), 3); + + // Transition to started. + let new_celestia_height = CELESTIA_HEIGHT + 1; + let started_submission = prepared_submission + .into_started(new_celestia_height) + .await + .unwrap(); + let expected_last_submission = + CompletedSubmission::new(new_celestia_height, sequencer_height); + assert_eq!(started_submission.last_submission, expected_last_submission); + assert_eq!(started_submission.state_file_path.0, destination.0); + assert_eq!(started_submission.temp_file_path.0, temp_file.0); + + // Ensure the new state was written to disk. + let parsed_state = State::read(&destination).await.unwrap(); + match parsed_state { + State::Started { + .. + } => (), + _ => panic!("expected started state, got:\n{parsed_state:?}"), + } + } + + #[tokio::test] + async fn prepared_submission_should_revert_to_started() { + let sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_HIGH); + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let created_at = SystemTime::UNIX_EPOCH + AT_DURATION_SINCE_EPOCH; + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + let prepared_submission = PreparedSubmission { + sequencer_height, + last_submission, + blob_tx_hash: BLOB_TX_HASH, + created_at, + state_file_path: destination.clone(), + temp_file_path: temp_file.clone(), + }; + + // Revert to started - should hold last submission. + let reverted_submission = prepared_submission.revert().await.unwrap(); + assert_eq!(reverted_submission.last_submission, last_submission); + assert_eq!(reverted_submission.state_file_path.0, destination.0); + assert_eq!(reverted_submission.temp_file_path.0, temp_file.0); + + // Ensure the new state was written to disk. + let parsed_state = State::read(&destination).await.unwrap(); + match parsed_state { + State::Started { + .. + } => (), + _ => panic!("expected started state, got:\n{parsed_state:?}"), + } } #[test] - fn submit_old_blocks_gives_error() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "fresh" }}), + fn confirmation_timeout_should_respect_limits() { + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let mut prepared_submission = PreparedSubmission { + sequencer_height: SequencerHeight::from(SEQUENCER_HEIGHT_HIGH), + last_submission, + blob_tx_hash: BLOB_TX_HASH, + created_at: SystemTime::UNIX_EPOCH, + state_file_path: StateFilePath(PathBuf::new()), + temp_file_path: TempFilePath(PathBuf::new()), + }; + + // With a creation time far in the past, timeout should be 15 seconds. + assert_eq!( + prepared_submission.confirmation_timeout(), + Duration::from_secs(15) ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), + + // With a creation time in the future, timeout should be 60 seconds. + prepared_submission.created_at = SystemTime::now() + Duration::from_secs(1000); + assert_eq!( + prepared_submission.confirmation_timeout(), + Duration::from_secs(60) ); - let state = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect( - "started state with `fresh` in last and `submitted` in current gives working \ - submission state", - ); - let _ = state - .initialize(2u32.into()) - .expect_err("trying to submit the same sequencer height is an error"); } - mod lenient { - //! These test the same scenarios as the tests of the same name in the super module, but - //! whereas those are strict and should fail, the tests in this module should pass - - use super::{ - create_files, - json, - write, - SubmissionState, - }; - - const LENIENT_CONSISTENCY_CHECK: bool = true; - - #[test] - fn started_with_same_fresh_in_last_and_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 5, "last_submission": { "state": "fresh"} }), - ); - write(&post, &json!({ "state": "fresh" })); - let _ = SubmissionState::from_paths::( - pre.path(), - post.path(), - ) - .expect("this test should not fail when doing a leaning consistency check"); + #[tokio::test] + async fn should_construct_fresh_submission_state_at_startup() { + let file = write_fresh_state(); + let parsed = SubmissionStateAtStartup::new_from_path(file.path()) + .await + .unwrap(); + match parsed { + SubmissionStateAtStartup::Fresh(FreshSubmission { + state_file_path, + temp_file_path, + }) => { + assert_eq!(state_file_path.0, file.path()); + assert_eq!( + temp_file_path.0.display().to_string(), + format!("{}.tmp", file.path().display()) + ); + } + _ => panic!("expected fresh state, got: {parsed:?}"), } + } - #[test] - fn started_with_height_before_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 5, "last_submission": { "state": "fresh"} }), - ); - write( - &post, - &json!({ "state": "submitted", "sequencer_height": 6, "celestia_height": 2 }), - ); - let _ = SubmissionState::from_paths::( - pre.path(), - post.path(), - ) - .expect("this test should not fail when doing a leaning consistency check"); + #[tokio::test] + async fn should_construct_started_submission_state_at_startup() { + let file = write_started_state(); + let parsed = SubmissionStateAtStartup::new_from_path(file.path()) + .await + .unwrap(); + match parsed { + SubmissionStateAtStartup::Started(StartedSubmission { + state_file_path, + temp_file_path, + .. + }) => { + assert_eq!(state_file_path.0, file.path()); + assert_eq!( + temp_file_path.0.display().to_string(), + format!("{}.tmp", file.path().display()) + ); + } + _ => panic!("expected started state, got: {parsed:?}"), } + } - #[test] - fn started_with_same_submitted_in_last_and_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "submitted", "celestia_height": 5, "sequencer_height": 2} }), - ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - let _ = SubmissionState::from_paths::( - pre.path(), - post.path(), - ) - .expect("this test should not fail when doing a leaning consistency check"); + #[tokio::test] + async fn should_construct_prepared_submission_state_at_startup() { + let file = write_prepared_state(); + let parsed = SubmissionStateAtStartup::new_from_path(file.path()) + .await + .unwrap(); + match parsed { + SubmissionStateAtStartup::Prepared(PreparedSubmission { + state_file_path, + temp_file_path, + .. + }) => { + assert_eq!(state_file_path.0, file.path()); + assert_eq!( + temp_file_path.0.display().to_string(), + format!("{}.tmp", file.path().display()) + ); + } + _ => panic!("expected prepared state, got: {parsed:?}"), } } + + #[tokio::test] + async fn should_fail_to_construct_if_not_writable() { + let file = write_prepared_state(); + // Create a folder at the path where the temp file would be written. + std::fs::create_dir(format!("{}.tmp", file.path().display())).unwrap(); + let error = SubmissionStateAtStartup::new_from_path(file.path()) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains(&file.path().display().to_string())); + assert!(full_error.contains("failed writing just-read submission state to disk at")); + } } diff --git a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs index d3bf5b352e..3054f2b7d3 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs @@ -15,6 +15,8 @@ use std::{ use astria_eyre::eyre::{ self, + bail, + Report, WrapErr as _, }; use celestia_types::Blob; @@ -26,6 +28,8 @@ use futures::{ FutureExt as _, }; use sequencer_client::SequencerBlock; +use tendermint::block::Height as SequencerHeight; +use thiserror::Error; use tokio::{ select, sync::{ @@ -52,9 +56,12 @@ use tracing::{ use super::{ celestia_client::CelestiaClient, + BlobTxHash, BuilderError, CelestiaClientBuilder, - SubmissionState, + PreparedSubmission, + StartedSubmission, + SubmissionStateAtStartup, TrySubmitError, }; use crate::{ @@ -109,9 +116,8 @@ pub(super) struct BlobSubmitter { /// The state of the relayer. state: Arc, - /// Tracks the submission state and writes it to disk before and after each Celestia - /// submission. - submission_state: SubmissionState, + /// Provides the initial submission state. + submission_state_at_startup: Option, /// The shutdown token to signal that blob submitter should finish its current submission and /// exit. @@ -129,7 +135,7 @@ impl BlobSubmitter { client_builder: CelestiaClientBuilder, rollup_filter: IncludeRollup, state: Arc, - submission_state: SubmissionState, + submission_state_at_startup: SubmissionStateAtStartup, submitter_shutdown_token: CancellationToken, metrics: &'static Metrics, ) -> (Self, BlobSubmitterHandle) { @@ -141,7 +147,7 @@ impl BlobSubmitter { blocks: rx, next_submission: NextSubmission::new(rollup_filter, metrics), state, - submission_state, + submission_state_at_startup: Some(submission_state_at_startup), submitter_shutdown_token, pending_block: None, metrics, @@ -163,6 +169,27 @@ impl BlobSubmitter { error.wrap_err(message) })?; + let Some(submission_state_at_startup) = self.submission_state_at_startup.take() else { + bail!("submission state must be provided at startup"); + }; + let mut started_submission = match submission_state_at_startup { + SubmissionStateAtStartup::Fresh(fresh) => fresh.into_started(), + SubmissionStateAtStartup::Started(started) => started, + SubmissionStateAtStartup::Prepared(prepared) => { + try_confirm_submission_from_last_session( + client.clone(), + prepared, + self.state.clone(), + self.metrics, + ) + .await + .wrap_err( + "failed to confirm the unfinished submission state of the previously loaded \ + session", + )? + } + }; + // A submission to Celestia that is currently in-flight. let mut ongoing_submission = Fuse::terminated(); @@ -182,12 +209,11 @@ impl BlobSubmitter { { // XXX: Breaks the select-loop and returns. With the current retry-logic in // `submit_blobs` this happens after u32::MAX retries which is effectively never. - // self.submission_state = match submission_result.wrap_err("failed submitting blocks to Celestia") - self.submission_state = match submission_result { + started_submission = match submission_result { Ok(state) => state, Err(err) => { - // Use `wrap_err` on the return break value. Using it on the match-value causes - // type inference to fail. + // Use `wrap_err` on the return break value. Using it on the match-value + // causes type inference to fail. break Err(err).wrap_err("failed submitting blocks to Celestia"); } }; @@ -201,7 +227,7 @@ impl BlobSubmitter { client.clone(), submission, self.state.clone(), - self.submission_state.clone(), + started_submission.clone(), self.metrics, ).boxed().fuse(); if let Some(block) = self.pending_block.take() { @@ -215,7 +241,12 @@ impl BlobSubmitter { // add new blocks to the next submission if there is space. Some(block) = self.blocks.recv(), if self.has_capacity() => { - if let Err(error) = self.add_sequencer_block_to_next_submission(block) { + if block.height() <= started_submission.last_submission_sequencer_height() { + info!( + sequencer_height = %block.height(), + "skipping sequencer block as already included in previous submission" + ); + } else if let Err(error) = self.add_sequencer_block_to_next_submission(block) { break Err(error).wrap_err( "critically failed adding Sequencer block to next submission" ); @@ -269,15 +300,64 @@ impl BlobSubmitter { } } -/// Submits new blobs Celestia. +/// Tries to confirm the last attempted submission of the previous session. +/// +/// This should only be called where submission state on startup is `Prepared`, meaning we don't yet +/// know whether that final submission attempt succeeded or not. +/// +/// Internally, this polls `GetTx` for up to one minute. The returned `SubmissionState` is +/// guaranteed to be in `Started` state, either holding the heights of the previously prepared +/// submission if confirmed by Celestia, or holding the heights of the last known confirmed +/// submission in the case of timing out. #[instrument(skip_all)] +async fn try_confirm_submission_from_last_session( + mut client: CelestiaClient, + prepared_submission: PreparedSubmission, + state: Arc, + metrics: &'static Metrics, +) -> eyre::Result { + let blob_tx_hash = prepared_submission.blob_tx_hash(); + info!(%blob_tx_hash, "confirming submission of last `BlobTx` from previous session"); + + let timeout = prepared_submission.confirmation_timeout(); + let new_state = if let Some(celestia_height) = client + .confirm_submission_with_timeout(blob_tx_hash, timeout) + .await + { + info!(%celestia_height, "confirmed previous session submitted blobs to Celestia"); + prepared_submission + .into_started(celestia_height) + .await + .wrap_err("failed to convert previous session's state into `started`")? + } else { + info!( + "previous session's last submission was not completed; continuing from last confirmed \ + submission" + ); + prepared_submission + .revert() + .await + .wrap_err("failed to revert previous session's state into `started`")? + }; + + metrics.absolute_set_sequencer_submission_height( + new_state.last_submission_sequencer_height().value(), + ); + metrics.absolute_set_celestia_submission_height(new_state.last_submission_celestia_height()); + state.set_latest_confirmed_celestia_height(new_state.last_submission_celestia_height()); + + Ok(new_state) +} + +/// Submits new blobs Celestia. +#[instrument(skip_all, err)] async fn submit_blobs( client: CelestiaClient, data: conversion::Submission, state: Arc, - submission_state: SubmissionState, + started_submission: StartedSubmission, metrics: &'static Metrics, -) -> eyre::Result { +) -> eyre::Result { info!( blocks = %telemetry::display::json(&data.input_metadata()), total_data_uncompressed_size = data.uncompressed_size(), @@ -297,26 +377,18 @@ async fn submit_blobs( let largest_sequencer_height = data.greatest_sequencer_height(); let blobs = data.into_blobs(); - let submission_started = match crate::utils::flatten( - tokio::task::spawn_blocking(move || submission_state.initialize(largest_sequencer_height)) - .in_current_span() - .await, - ) { - Err(error) => { - error!(%error, "failed to initialize submission; abandoning"); - return Err(error); - } - Ok(state) => state, - }; + let new_state = submit_with_retry( + client, + blobs, + state.clone(), + started_submission, + largest_sequencer_height, + metrics, + ) + .await + .wrap_err("failed submitting blobs to Celestia")?; - let celestia_height = match submit_with_retry(client, blobs, state.clone(), metrics).await { - Err(error) => { - let message = "failed submitting blobs to Celestia"; - error!(%error, message); - return Err(error.wrap_err(message)); - } - Ok(height) => height, - }; + let celestia_height = new_state.last_submission_celestia_height(); metrics.absolute_set_sequencer_submission_height(largest_sequencer_height.value()); metrics.absolute_set_celestia_submission_height(celestia_height); metrics.record_celestia_submission_latency(start.elapsed()); @@ -326,18 +398,7 @@ async fn submit_blobs( state.set_celestia_connected(true); state.set_latest_confirmed_celestia_height(celestia_height); - let final_state = match crate::utils::flatten( - tokio::task::spawn_blocking(move || submission_started.finalize(celestia_height)) - .in_current_span() - .await, - ) { - Err(error) => { - error!(%error, "failed to finalize submission; abandoning"); - return Err(error); - } - Ok(state) => state, - }; - Ok(final_state) + Ok(new_state) } #[instrument(skip_all)] @@ -383,12 +444,25 @@ async fn init_with_retry(client_builder: CelestiaClientBuilder) -> eyre::Result< Ok(celestia_client) } +#[derive(Error, Clone, Debug)] +enum SubmissionError { + #[error(transparent)] + TrySubmit(#[from] TrySubmitError), + #[error("unrecoverable submission error")] + Unrecoverable(#[source] Arc), + #[error("broadcast tx timed out")] + BroadcastTxTimedOut(PreparedSubmission), +} + +#[instrument(skip_all)] async fn submit_with_retry( client: CelestiaClient, blobs: Vec, state: Arc, + started_submission: StartedSubmission, + largest_sequencer_height: SequencerHeight, metrics: &'static Metrics, -) -> eyre::Result { +) -> eyre::Result { // Moving the span into `on_retry`, because tryhard spawns these in a tokio // task, losing the span. let span = Span::current(); @@ -397,12 +471,22 @@ async fn submit_with_retry( // `TrySubmitError` to the next attempt of the `retry_fn`. let (last_error_sender, last_error_receiver) = watch::channel(None); + let initial_retry_delay = Duration::from_millis(100); let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) - .exponential_backoff(Duration::from_millis(100)) - // 12 seconds is the Celestia block time + // 12 seconds is the Celestia block time. .max_delay(Duration::from_secs(12)) + .custom_backoff(|attempt: u32, error: &SubmissionError| { + if matches!(error, SubmissionError::Unrecoverable(_)) { + return tryhard::RetryPolicy::Break; + } + // This is equivalent to the `exponential_backoff` policy. Note that `max_delay` + // above is still respected regardless of what we return here. + let delay = + initial_retry_delay.saturating_mul(2_u32.saturating_pow(attempt.saturating_sub(1))); + tryhard::RetryPolicy::Delay(delay) + }) .on_retry( - |attempt: u32, next_delay: Option, error: &TrySubmitError| { + |attempt: u32, next_delay: Option, error: &SubmissionError| { metrics.increment_celestia_submission_failure_count(); let state = Arc::clone(&state); @@ -426,14 +510,95 @@ async fn submit_with_retry( let blobs = Arc::new(blobs); - let height = tryhard::retry_fn(move || { - client - .clone() - .try_submit(blobs.clone(), last_error_receiver.clone()) + let final_state = tryhard::retry_fn(move || { + try_submit( + client.clone(), + blobs.clone(), + started_submission.clone(), + largest_sequencer_height, + last_error_receiver.clone(), + ) }) .with_config(retry_config) .in_current_span() .await - .wrap_err("retry attempts exhausted; bailing")?; - Ok(height) + .wrap_err("finished trying to submit")?; + Ok(final_state) +} + +#[instrument(skip_all)] +async fn try_submit( + mut client: CelestiaClient, + blobs: Arc>, + started_submission: StartedSubmission, + largest_sequencer_height: SequencerHeight, + last_error_receiver: watch::Receiver>, +) -> Result { + // Get the error from the last attempt to `try_submit`. + let maybe_last_error = last_error_receiver.borrow().clone(); + let maybe_try_submit_error = match maybe_last_error { + // If error is broadcast timeout, try to confirm submission from last attempt. + Some(SubmissionError::BroadcastTxTimedOut(prepared_submission)) => { + if let Some(new_state) = + try_confirm_submission_from_failed_attempt(client.clone(), prepared_submission) + .await? + { + return Ok(new_state); + } + None + } + Some(SubmissionError::TrySubmit(error)) => Some(error), + Some(SubmissionError::Unrecoverable(error)) => { + unreachable!("this error should not make it past `custom_backoff`: {error:#}"); + } + None => None, + }; + + let blob_tx = client.try_prepare(blobs, maybe_try_submit_error).await?; + let blob_tx_hash = BlobTxHash::compute(&blob_tx); + + let prepared_submission = started_submission + .into_prepared(largest_sequencer_height, blob_tx_hash) + .await + .map_err(|error| SubmissionError::Unrecoverable(Arc::new(error)))?; + + match client.try_submit(blob_tx_hash, blob_tx).await { + Ok(celestia_height) => prepared_submission + .into_started(celestia_height) + .await + .map_err(|error| SubmissionError::Unrecoverable(Arc::new(error))), + Err(TrySubmitError::FailedToBroadcastTx(error)) if error.is_timeout() => { + Err(SubmissionError::BroadcastTxTimedOut(prepared_submission)) + } + Err(error) => Err(SubmissionError::TrySubmit(error)), + } +} + +/// Tries to confirm the submission from a failed previous attempt. Returns `Some` if the +/// submission is confirmed, or `None` if not. +/// +/// This should only be called where submission state is `Prepared`, meaning we don't yet +/// know whether that previous submission attempt succeeded or not. +#[instrument(skip_all)] +async fn try_confirm_submission_from_failed_attempt( + mut client: CelestiaClient, + prepared_submission: PreparedSubmission, +) -> Result, SubmissionError> { + let blob_tx_hash = prepared_submission.blob_tx_hash(); + info!(%blob_tx_hash, "confirming submission of last `BlobTx` from previous attempt"); + + if let Some(celestia_height) = client + .confirm_submission_with_timeout(blob_tx_hash, prepared_submission.confirmation_timeout()) + .await + { + info!(%celestia_height, "confirmed previous attempt submitted blobs to Celestia"); + let new_state = prepared_submission + .into_started(celestia_height) + .await + .map_err(|error| SubmissionError::Unrecoverable(Arc::new(error)))?; + return Ok(Some(new_state)); + } + + info!("previous attempt's last submission was not completed; starting resubmission"); + Ok(None) } diff --git a/crates/astria-sequencer-relayer/src/sequencer_relayer.rs b/crates/astria-sequencer-relayer/src/sequencer_relayer.rs index c899ea09e8..d04b7c159c 100644 --- a/crates/astria-sequencer-relayer/src/sequencer_relayer.rs +++ b/crates/astria-sequencer-relayer/src/sequencer_relayer.rs @@ -63,8 +63,7 @@ impl SequencerRelayer { celestia_app_key_file, block_time, api_addr, - pre_submit_path, - post_submit_path, + submission_state_path, .. } = cfg; @@ -78,8 +77,7 @@ impl SequencerRelayer { sequencer_poll_period: Duration::from_millis(block_time), sequencer_grpc_endpoint, rollup_filter, - pre_submit_path, - post_submit_path, + submission_state_path, metrics, } .build() diff --git a/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs b/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs index 27edf2aa37..75308751f2 100644 --- a/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs +++ b/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs @@ -5,6 +5,7 @@ use std::{ Display, Formatter, }, + fs, future::Future, io::Write, mem, @@ -170,8 +171,7 @@ pub struct TestSequencerRelayer { pub signing_key: SigningKey, - pub pre_submit_file: NamedTempFile, - pub post_submit_file: NamedTempFile, + pub submission_state_file: NamedTempFile, /// The sequencer chain ID which will be returned by the mock `cometbft` instance, and set via /// `TestSequencerRelayerConfig`. pub actual_sequencer_chain_id: String, @@ -614,29 +614,18 @@ impl TestSequencerRelayer { } #[track_caller] - pub fn assert_state_files_are_as_expected( - &self, - pre_sequencer_height: u32, - post_sequencer_height: u32, - ) { - let pre_submit_state: serde_json::Value = - serde_json::from_str(&std::fs::read_to_string(&self.config.pre_submit_path).unwrap()) + pub fn check_state_file(&self, last_sequencer_height: u32, current_sequencer_height: u32) { + let submission_state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&self.config.submission_state_path).unwrap()) .unwrap(); assert_json_include!( - actual: pre_submit_state, - expected: json!({ - "sequencer_height": pre_sequencer_height - }), + actual: submission_state, + expected: json!({ "sequencer_height": last_sequencer_height }), ); - let post_submit_state: serde_json::Value = - serde_json::from_str(&std::fs::read_to_string(&self.config.post_submit_path).unwrap()) - .unwrap(); assert_json_include!( - actual: post_submit_state, - expected: json!({ - "sequencer_height": post_sequencer_height, - }), + actual: submission_state, + expected: json!({ "sequencer_height": current_sequencer_height }), ); } @@ -662,7 +651,7 @@ impl TestSequencerRelayer { // allow: want the name to reflect this is a test config. #[allow(clippy::module_name_repetitions)] pub struct TestSequencerRelayerConfig { - /// Sets the start height of relayer and configures the on-disk pre- and post-submit files to + /// Sets the start height of relayer and configures the on-disk submission-state file to /// look accordingly. pub last_written_sequencer_height: Option, /// The rollup ID filter, to be stringified and provided as `Config::only_include_rollups` @@ -707,11 +696,11 @@ impl TestSequencerRelayerConfig { let sequencer = MockSequencerServer::spawn().await; let sequencer_grpc_endpoint = format!("http://{}", sequencer.local_addr); - let (pre_submit_file, post_submit_file) = + let submission_state_file = if let Some(last_written_sequencer_height) = self.last_written_sequencer_height { - create_files_for_start_at_height(last_written_sequencer_height) + create_file_for_start_at_height(last_written_sequencer_height) } else { - create_files_for_fresh_start() + create_file_for_fresh_start() }; let only_include_rollups = self.only_include_rollups.iter().join(",").to_string(); @@ -732,8 +721,7 @@ impl TestSequencerRelayerConfig { no_metrics: false, metrics_http_listener_addr: String::new(), pretty_print: true, - pre_submit_path: pre_submit_file.path().to_owned(), - post_submit_path: post_submit_file.path().to_owned(), + submission_state_path: submission_state_file.path().to_owned(), }; info!(config = serde_json::to_string(&config).unwrap()); @@ -751,8 +739,7 @@ impl TestSequencerRelayerConfig { relayer_shutdown_handle: Some(relayer_shutdown_handle), sequencer_relayer, signing_key, - pre_submit_file, - post_submit_file, + submission_state_file, actual_sequencer_chain_id: self.sequencer_chain_id, actual_celestia_chain_id: self.celestia_chain_id, }; @@ -816,49 +803,28 @@ async fn write_file(data: &'static [u8]) -> NamedTempFile { .unwrap() } -fn create_files_for_fresh_start() -> (NamedTempFile, NamedTempFile) { - let pre = NamedTempFile::new() - .expect("must be able to create an empty pre submit state file to run tests"); - let post = NamedTempFile::new() - .expect("must be able to create an empty post submit state file to run tests"); - serde_json::to_writer( - &pre, - &json!({ - "state": "ignore" - }), - ) - .expect("must be able to write pre-submit state to run tests"); - serde_json::to_writer( - &post, - &json!({ - "state": "fresh" - }), - ) - .expect("must be able to write post-submit state to run tests"); - (pre, post) +fn create_file_for_fresh_start() -> NamedTempFile { + let temp_file = NamedTempFile::new() + .expect("must be able to create an empty submission state file to run tests"); + serde_json::to_writer(&temp_file, &json!({ "state": "fresh" })) + .expect("must be able to write submission state to run tests"); + temp_file } -fn create_files_for_start_at_height(height: u64) -> (NamedTempFile, NamedTempFile) { - let pre = NamedTempFile::new() - .expect("must be able to create an empty pre submit state file to run tests"); - let post = NamedTempFile::new() - .expect("must be able to create an empty post submit state file to run tests"); - - serde_json::to_writer( - &pre, - &json!({ - "state": "ignore", - }), - ) - .expect("must be able to write pre state to file to run tests"); +fn create_file_for_start_at_height(height: u64) -> NamedTempFile { + let temp_file = NamedTempFile::new() + .expect("must be able to create an empty submission state file to run tests"); serde_json::to_writer_pretty( - &post, + &temp_file, &json!({ - "state": "submitted", - "celestia_height": 5, - "sequencer_height": height + "state": "started", + "sequencer_height": height.saturating_add(10), + "last_submission": { + "celestia_height": 5, + "sequencer_height": height + } }), ) - .expect("must be able to write post state to file to run tests"); - (pre, post) + .expect("must be able to write submission state to run tests"); + temp_file } diff --git a/specs/sequencer-relayer.md b/specs/sequencer-relayer.md index e5e5dd024e..134218379e 100644 --- a/specs/sequencer-relayer.md +++ b/specs/sequencer-relayer.md @@ -140,48 +140,84 @@ which point backpressure will cause the reader task to pause as detailed above. ## Further Details -### Pre- and Post-Submit Files +### Submission State File -At the start and end of each successful attempt to put data onto Celestia, the -relayer writes some pertinent information to disk in the form of two JSON files; -the pre-submit file and post-submit file. These allow the relayer to restart and -continue submitting from where it left off. +During attempts to put data onto Celestia, the relayer writes some pertinent +information to disk in the form of a JSON file; the submission-state file. This +allows the relayer to restart and continue submitting from where it left off. -#### Pre-Submit File +This file needs to exist and be writable whenever the relayer starts, even on +first run. -The contents of the pre-submit file are one of either: +The submission state is one of three variants; `fresh`, `started` or `prepared`. -```json -{"state": "started", "sequencer_height": , "last_submission": } -``` +#### `fresh` State -or +The contents of the submission-state file when in `fresh` state are: ```json -{"state": "ignore"} +{"state": "fresh"} ``` -The former is the normal case, with the file updated at the start of every new -submission. The latter is used to force the relayer to ignore the pre-submit -state entirely and only consider the post-submit state. +This state is never written by the relayer: it needs to be provided externally +before the relayer is started. It indicates that the relayer should start +relaying from sequencer block 1. -#### Post-Submit File +#### `started` State -The contents of the post-submit file are one of either: +The contents of the submission-state file when in `started` state are: ```json -{"state": "fresh"} +{ + "state": "started", + "last_submission": { + "celestia_height": , + "sequencer_height": + } +} ``` -or +This state is written by the relayer at the start of each new submission. +`last_submission` records the last successful submission: the Celestia block +height at which the submission was stored, and the highest sequencer block +included in the submission. + +With the file in this state, on startup the relayer will begin submitting +sequencer blocks starting from `[last_submission.sequencer_height] + 1`. + +#### `prepared` State + +The contents of the submission-state file when in `prepared` state are: ```json -{"state": "submitted", "celestia_height": , "sequencer_height": } +{ + "state": "prepared", + "sequencer_height": , + "last_submission": { + "celestia_height": , + "sequencer_height": + }, + "blob_tx_hash": "<64-character hex string>", + "at": "" +} ``` -The former indicates the relayer should start relaying from sequencer block 1, -while the latter records the relevant block heights of the last successful -submission. +This state is written by the relayer when a submission has been prepared and is +about to be sent to the Celestia app for execution. `sequencer_height` indicates +the highest sequencer block included in the attempt, while `last_submission` is +as per that in `started` state. `blob_tx_hash` is the hex-encoded SHA-256 digest +of the `BlobTx` containing the data to be submitted, and `at` is the time at +which the `BlobTx` was created. + +With the file in this state, on startup the relayer has to first establish +whether that submission succeeded or not. It queries the Celestia app for the +given blob hash, and if not confirmed as stored repeats the query at a rate of +once per second until it is confirmed or until a timeout is hit. The timeout is +one minute after the `at` timestamp or 15 seconds, whichever is the greater. + +If the submission is confirmed, the relayer will then begin submitting sequencer +blocks starting from `[sequencer_height] + 1`, otherwise it begins from +`[last_submission.sequencer_height] + 1`. ### HTTP Servers