diff --git a/Cargo.lock b/Cargo.lock index 03634ce8fe..2816c4fc7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -867,6 +867,7 @@ dependencies = [ "hex", "http", "humantime", + "humantime-serde", "hyper", "itertools 0.12.1", "itoa", @@ -3624,6 +3625,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.28" diff --git a/charts/sequencer-relayer/Chart.yaml b/charts/sequencer-relayer/Chart.yaml index bcebe7dc5f..92b190fa92 100644 --- a/charts/sequencer-relayer/Chart.yaml +++ b/charts/sequencer-relayer/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.11.0 +version: 0.11.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/sequencer-relayer/files/scripts/start-relayer.sh b/charts/sequencer-relayer/files/scripts/start-relayer.sh index 9ad150c2d2..746d71022f 100644 --- a/charts/sequencer-relayer/files/scripts/start-relayer.sh +++ b/charts/sequencer-relayer/files/scripts/start-relayer.sh @@ -4,14 +4,19 @@ set -o errexit -o nounset -o pipefail echo "Starting the Astria Sequencer Relayer..." -if ! [ -f "$ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH" ]; then +if ! [ -z ${ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH+x} ] && ! [ -f "$ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH" ]; then echo "Pre-submit storage file not found, instantiating with ignore state. Post submit storage file will be created on first submit." echo "{\"state\": \"ignore\"}" > $ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH fi -if ! [ -f "$ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH" ]; then +if ! [ -z ${ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH+x} ] && ! [ -f "$ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH" ]; then echo "Post-submit storage file does not exist, instantiating with fresh state. Will start relaying from first sequencer block." echo "{\"state\": \"fresh\"}" > $ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH fi +if ! [ -z ${ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH+x} ] && ! [ -f "$ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH" ]; then + echo "Submission state file does not exist, instantiating with fresh state. Will start relaying from first sequencer block." + echo "{\"state\": \"fresh\"}" > $ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH +fi + exec /usr/local/bin/astria-sequencer-relayer diff --git a/charts/sequencer-relayer/templates/_helpers.tpl b/charts/sequencer-relayer/templates/_helpers.tpl index 1185798745..1dccd0a2bb 100644 --- a/charts/sequencer-relayer/templates/_helpers.tpl +++ b/charts/sequencer-relayer/templates/_helpers.tpl @@ -20,3 +20,7 @@ Namepsace to deploy elements into. {{- define "sequencer-relayer.storage.postSubmitPath" -}} {{ include "sequencer-relayer.storage.mountPath" . }}/postsubmit.json {{- end }} + +{{- define "sequencer-relayer.storage.submissionStatePath" -}} +{{ include "sequencer-relayer.storage.mountPath" . }}/submission-state.json +{{- end }} diff --git a/charts/sequencer-relayer/templates/configmaps.yaml b/charts/sequencer-relayer/templates/configmaps.yaml index 50917b74fb..b76f6c4029 100644 --- a/charts/sequencer-relayer/templates/configmaps.yaml +++ b/charts/sequencer-relayer/templates/configmaps.yaml @@ -11,8 +11,6 @@ data: ASTRIA_SEQUENCER_RELAYER_CELESTIA_APP_GRPC_ENDPOINT: "{{ .Values.config.relayer.celestiaAppGrpc }}" ASTRIA_SEQUENCER_RELAYER_CELESTIA_APP_KEY_FILE: "/celestia-key/{{ .Values.config.celestiaAppPrivateKey.secret.filename }}" ASTRIA_SEQUENCER_RELAYER_API_ADDR: "0.0.0.0:{{ .Values.ports.healthAPI }}" - ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH: "{{ include "sequencer-relayer.storage.preSubmitPath" . }}" - ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH: "{{ include "sequencer-relayer.storage.postSubmitPath" . }}" ASTRIA_SEQUENCER_RELAYER_NO_METRICS: "{{ not .Values.config.relayer.metrics.enabled }}" ASTRIA_SEQUENCER_RELAYER_METRICS_HTTP_LISTENER_ADDR: "0.0.0.0:{{ .Values.ports.metrics }}" ASTRIA_SEQUENCER_RELAYER_FORCE_STDOUT: "{{ .Values.global.useTTY }}" @@ -30,7 +28,10 @@ data: ASTRIA_SEQUENCER_RELAYER_SEQUENCER_CHAIN_ID: "{{ .Values.config.relayer.sequencerChainId }}" ASTRIA_SEQUENCER_RELAYER_CELESTIA_CHAIN_ID: "{{ .Values.config.relayer.celestiaChainId }}" {{- if not .Values.global.dev }} + ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH: "{{ include "sequencer-relayer.storage.preSubmitPath" . }}" + ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH: "{{ include "sequencer-relayer.storage.postSubmitPath" . }}" {{- else }} + ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH: "{{ include "sequencer-relayer.storage.submissionStatePath" . }}" {{- end }} --- apiVersion: v1 diff --git a/charts/sequencer/Chart.lock b/charts/sequencer/Chart.lock index 7807d9b493..0e2b3ea35f 100644 --- a/charts/sequencer/Chart.lock +++ b/charts/sequencer/Chart.lock @@ -1,6 +1,6 @@ dependencies: - name: sequencer-relayer repository: file://../sequencer-relayer - version: 0.11.0 -digest: sha256:70434f4e37c36660ff9b89258d4de6770f206712020bda7398a22772e8f74fa8 -generated: "2024-07-19T12:21:51.250339+02:00" + version: 0.11.1 +digest: sha256:9c44f4901c4b89bbf6261f1a92bb18f71aef6d95e536aadc8a4a275a01eec25b +generated: "2024-07-23T20:01:42.179395482+01:00" diff --git a/charts/sequencer/Chart.yaml b/charts/sequencer/Chart.yaml index 9982dc29ca..bb8d4fc527 100644 --- a/charts/sequencer/Chart.yaml +++ b/charts/sequencer/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.19.0 +version: 0.19.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. @@ -24,7 +24,7 @@ appVersion: "0.14.0" dependencies: - name: sequencer-relayer - version: "0.11.0" + version: "0.11.1" repository: "file://../sequencer-relayer" condition: sequencer-relayer.enabled diff --git a/crates/astria-sequencer-relayer/Cargo.toml b/crates/astria-sequencer-relayer/Cargo.toml index 3c4a687f98..c6cf8cedd0 100644 --- a/crates/astria-sequencer-relayer/Cargo.toml +++ b/crates/astria-sequencer-relayer/Cargo.toml @@ -25,6 +25,7 @@ const_format = { workspace = true } futures = { workspace = true } hex = { workspace = true, features = ["serde"] } humantime = { workspace = true } +humantime-serde = "1.1.1" hyper = { workspace = true } itoa = { workspace = true } metrics = { workspace = true } @@ -39,7 +40,12 @@ tendermint-config = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } tryhard = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } +tokio = { workspace = true, features = [ + "fs", + "macros", + "rt-multi-thread", + "signal", +] } tokio-stream = { workspace = true } tokio-util = { workspace = true } tonic = { workspace = true } diff --git a/crates/astria-sequencer-relayer/local.env.example b/crates/astria-sequencer-relayer/local.env.example index 62073e9834..81b5c75d5f 100644 --- a/crates/astria-sequencer-relayer/local.env.example +++ b/crates/astria-sequencer-relayer/local.env.example @@ -60,23 +60,17 @@ ASTRIA_SEQUENCER_RELAYER_ONLY_INCLUDE_ROLLUPS= # The socket address at which sequencer relayer will server healthz, readyz, and status calls. ASTRIA_SEQUENCER_RELAYER_API_ADDR=127.0.0.1:2450 -# The path to which relayer will write its state prior to submitting to Celestia. -# A file must exist at this path, be readable and writable, and contain one of: -# 1. {"state": "ignore"} -# to ignore the pre-submit state entirely and only consider the object stored in -# ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH. -# 2. {"state": "started", "last_submission": } -# which is usually only written by sequencer-relayer during normal operation and -# is checked for consistency with ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH at startup. -ASTRIA_SEQUENCER_RELAYER_PRE_SUBMIT_PATH=/path/to/presubmit.json - -# The path to which relayer will write its state after submitting to Celestia. +# The path to which relayer will write its state while submitting to Celestia. # A file must exist at this path, be readable and writable, and contain one of: # 1. {"state": "fresh"} # for relaying sequencer blocks starting at sequencer height 1. -# 2. {"state": "submitted", "celestia_height": , "sequencer_height": }} -# for relaying blocks starting at ` + 1`. -ASTRIA_SEQUENCER_RELAYER_POST_SUBMIT_PATH=/path/to/postsubmit.json +# 2. {"state":"started","last_submission":{"celestia_height":,"sequencer_height":}} +# for relaying blocks starting at `[last_submission.sequencer_height] + 1`. +# 3. {"state":"prepared","sequencer_height":,"last_submission":{"celestia_height":,"sequencer_height":},"blob_tx_hash":"","at":""} +# for trying to continue from the last submission attempt. Checks if the given blob tx is stored +# on Celestia, and if so, begins relaying blocks starting at `[sequencer_height] + 1`, otherwise +# begins relaying blocks starting at `[last_submission.sequencer_height] + 1`. +ASTRIA_SEQUENCER_RELAYER_SUBMISSION_STATE_PATH=/path/to/submission-state.json # Set to true to enable prometheus metrics. ASTRIA_SEQUENCER_RELAYER_NO_METRICS=true diff --git a/crates/astria-sequencer-relayer/src/config.rs b/crates/astria-sequencer-relayer/src/config.rs index 5d1186b927..b884634e81 100644 --- a/crates/astria-sequencer-relayer/src/config.rs +++ b/crates/astria-sequencer-relayer/src/config.rs @@ -48,10 +48,8 @@ pub struct Config { pub metrics_http_listener_addr: String, /// Writes a human readable format to stdout instead of JSON formatted OTEL trace data. pub pretty_print: bool, - /// The path to which relayer will write its state prior to submitting to Celestia. - pub pre_submit_path: PathBuf, - /// The path to which relayer will write its state after submitting to Celestia. - pub post_submit_path: PathBuf, + /// The path to which relayer will write its state while submitting to Celestia. + pub submission_state_path: PathBuf, } impl Config { diff --git a/crates/astria-sequencer-relayer/src/relayer/builder.rs b/crates/astria-sequencer-relayer/src/relayer/builder.rs index b43b343a4c..6312cd5caa 100644 --- a/crates/astria-sequencer-relayer/src/relayer/builder.rs +++ b/crates/astria-sequencer-relayer/src/relayer/builder.rs @@ -35,8 +35,7 @@ pub(crate) struct Builder { pub(crate) sequencer_poll_period: Duration, pub(crate) sequencer_grpc_endpoint: String, pub(crate) rollup_filter: IncludeRollup, - pub(crate) pre_submit_path: PathBuf, - pub(crate) post_submit_path: PathBuf, + pub(crate) submission_state_path: PathBuf, pub(crate) metrics: &'static Metrics, } @@ -53,8 +52,7 @@ impl Builder { sequencer_poll_period, sequencer_grpc_endpoint, rollup_filter, - pre_submit_path, - post_submit_path, + submission_state_path, metrics, } = self; @@ -93,8 +91,7 @@ impl Builder { celestia_client_builder, rollup_filter, state, - pre_submit_path, - post_submit_path, + submission_state_path, metrics, }) } diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs index b8848d53a0..9309694d31 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs @@ -103,6 +103,12 @@ pub(in crate::relayer) enum TrySubmitError { #[derive(Clone, Debug)] pub(in crate::relayer) struct GrpcResponseError(Status); +impl GrpcResponseError { + pub(in crate::relayer) fn is_timeout(&self) -> bool { + self.0.code() == tonic::Code::Cancelled + } +} + impl Display for GrpcResponseError { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { write!( diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs index ac504dbb8d..2f850c95d4 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs @@ -6,7 +6,14 @@ mod error; mod tests; use std::{ + borrow::Cow, convert::TryInto, + fmt::{ + self, + Debug, + Display, + Formatter, + }, sync::Arc, time::{ Duration, @@ -77,12 +84,27 @@ pub(super) use error::{ ProtobufDecodeError, TrySubmitError, }; +use hex::{ + FromHex, + FromHexError, +}; use prost::{ bytes::Bytes, Message as _, Name as _, }; -use tokio::sync::watch; +use serde::{ + Deserialize, + Deserializer, + Serialize, + Serializer, +}; +use sha2::{ + Digest as _, + Sha256, +}; +use telemetry::display::hex; +use thiserror::Error; use tonic::{ transport::Channel, Response, @@ -90,7 +112,9 @@ use tonic::{ }; use tracing::{ debug, + error, info, + instrument, trace, warn, }; @@ -124,11 +148,11 @@ impl CelestiaClient { /// used to obtain the appropriate fee in the case that the previous attempt failed due to a /// low fee. // Copied from https://github.com/celestiaorg/celestia-app/blob/v1.4.0/x/blob/payforblob.go - pub(super) async fn try_submit( - mut self, + pub(super) async fn try_prepare( + &mut self, blobs: Arc>, - last_error_receiver: watch::Receiver>, - ) -> Result { + maybe_last_error: Option, + ) -> Result { info!("fetching cost params and account info from celestia app"); let (blob_params, auth_params, min_gas_price, base_account) = tokio::try_join!( self.fetch_blob_params(), @@ -153,8 +177,6 @@ impl CelestiaClient { let cost_params = CelestiaCostParams::new(gas_per_blob_byte, tx_size_cost_per_byte, min_gas_price); let gas_limit = estimate_gas(&msg_pay_for_blobs.blob_sizes, cost_params); - // Get the error from the last attempt to `try_submit`. - let maybe_last_error = last_error_receiver.borrow().clone(); let fee = calculate_fee(cost_params, gas_limit, maybe_last_error); let signed_tx = new_signed_tx( @@ -166,20 +188,52 @@ impl CelestiaClient { &self.signing_keys, ); - let blob_tx = new_blob_tx(&signed_tx, blobs.iter()); - info!( gas_limit = gas_limit.0, fee_utia = fee, - "broadcasting blob transaction to celestia app" + "prepared blob transaction for celestia app" ); - let tx_hash = self.broadcast_tx(blob_tx).await?; - info!(tx_hash = %tx_hash.0, "broadcast blob transaction succeeded"); - let height = self.confirm_submission(tx_hash).await; + Ok(new_blob_tx(&signed_tx, blobs.iter())) + } + + pub(super) async fn try_submit( + &mut self, + blob_tx_hash: BlobTxHash, + blob_tx: BlobTx, + ) -> Result { + info!("broadcasting blob transaction to celestia app"); + let hex_encoded_tx_hash = self.broadcast_tx(blob_tx).await?; + if hex_encoded_tx_hash != blob_tx_hash.to_hex() { + // This is not a critical error. Worst case, we restart the process now and try for a + // short while to `GetTx` for this tx using the wrong hash, resulting in a likely + // duplicate submission of this set of blobs. + warn!( + "tx hash `{hex_encoded_tx_hash}` returned from celestia app is not the same as \ + the locally calculated one `{blob_tx_hash}`; submission file has invalid data" + ); + } + info!(tx_hash = %hex_encoded_tx_hash, "broadcast blob transaction succeeded"); + + let height = self.confirm_submission(hex_encoded_tx_hash).await; Ok(height) } + /// Repeatedly sends `GetTx` until a successful response is received or `timeout` duration has + /// elapsed. + /// + /// Returns the height of the Celestia block in which the blobs were submitted, or `None` if + /// timed out. + pub(super) async fn confirm_submission_with_timeout( + &mut self, + blob_tx_hash: &BlobTxHash, + timeout: Duration, + ) -> Option { + tokio::time::timeout(timeout, self.confirm_submission(blob_tx_hash.to_hex())) + .await + .ok() + } + async fn fetch_account(&self) -> Result { let mut auth_query_client = AuthQueryClient::new(self.grpc_channel.clone()); let request = QueryAccountRequest { @@ -245,7 +299,7 @@ impl CelestiaClient { /// [`CometBFT`][cometbft]. /// /// [cometbft]: https://github.com/cometbft/cometbft/blob/b139e139ad9ae6fccb9682aa5c2de4aa952fd055/rpc/openapi/openapi.yaml#L201-L204 - async fn broadcast_tx(&mut self, blob_tx: BlobTx) -> Result { + async fn broadcast_tx(&mut self, blob_tx: BlobTx) -> Result { let request = BroadcastTxRequest { tx_bytes: Bytes::from(blob_tx.encode_to_vec()), mode: i32::from(BroadcastMode::Sync), @@ -256,14 +310,14 @@ impl CelestiaClient { { trace!(?response); } - tx_hash_from_response(response) + lowercase_hex_encoded_tx_hash_from_response(response) } /// Returns `Some(height)` if the tx submission has completed, or `None` if it is still /// pending. - async fn get_tx(&mut self, tx_hash: TxHash) -> Result, TrySubmitError> { + async fn get_tx(&mut self, hex_encoded_tx_hash: String) -> Result, TrySubmitError> { let request = GetTxRequest { - hash: tx_hash.0.clone(), + hash: hex_encoded_tx_hash, }; let response = self.tx_client.get_tx(request).await; // trace-level logging, so using Debug format is ok. @@ -276,7 +330,8 @@ impl CelestiaClient { /// Repeatedly sends `GetTx` until a successful response is received. Returns the height of the /// Celestia block in which the blobs were submitted. - async fn confirm_submission(&mut self, tx_hash: TxHash) -> u64 { + #[instrument(skip_all, fields(hex_encoded_tx_hash))] + async fn confirm_submission(&mut self, hex_encoded_tx_hash: String) -> u64 { // The min seconds to sleep after receiving a GetTx response and sending the next request. const MIN_POLL_INTERVAL_SECS: u64 = 1; // The max seconds to sleep after receiving a GetTx response and sending the next request. @@ -296,7 +351,7 @@ impl CelestiaClient { let reason = maybe_error.map_or(Report::msg("transaction still pending"), Report::new); warn!( %reason, - tx_hash = tx_hash.0, + tx_hash = %hex_encoded_tx_hash, elapsed_seconds = start.elapsed().as_secs_f32(), "waiting to confirm blob submission" ); @@ -306,7 +361,7 @@ impl CelestiaClient { let mut sleep_secs = MIN_POLL_INTERVAL_SECS; loop { tokio::time::sleep(Duration::from_secs(sleep_secs)).await; - match self.get_tx(tx_hash.clone()).await { + match self.get_tx(hex_encoded_tx_hash.clone()).await { Ok(Some(height)) => return height, Ok(None) => { sleep_secs = MIN_POLL_INTERVAL_SECS; @@ -407,11 +462,11 @@ fn min_gas_price_from_response( }) } -/// Extracts the tx hash from the given response. -fn tx_hash_from_response( +/// Extracts the tx hash from the given response and converts to lowercase. +fn lowercase_hex_encoded_tx_hash_from_response( response: Result, Status>, -) -> Result { - let tx_response = response +) -> Result { + let mut tx_response = response .map_err(|status| TrySubmitError::FailedToBroadcastTx(GrpcResponseError::from(status)))? .into_inner() .tx_response @@ -425,7 +480,8 @@ fn tx_hash_from_response( }; return Err(error); } - Ok(TxHash(tx_response.txhash)) + tx_response.txhash.make_ascii_lowercase(); + Ok(tx_response.txhash) } /// Extracts the block height from the given response if available, or `None` if the transaction is @@ -736,6 +792,62 @@ struct Bech32Address(String); #[derive(Copy, Clone, Debug)] struct GasLimit(u64); -/// A hex-encoded transaction hash. -#[derive(Clone, Debug)] -struct TxHash(String); +/// A blob transaction hash. +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] +pub(super) struct BlobTxHash([u8; 32]); + +impl BlobTxHash { + /// Computes the SHA256 digest of the given blob transaction. + pub(super) fn compute(blob_tx: &BlobTx) -> Self { + let sha2 = Sha256::digest(&blob_tx.tx); + Self(sha2.into()) + } + + /// Converts `self` to a hex-encoded string. + pub(super) fn to_hex(self) -> String { + hex::encode(self.0) + } + + #[cfg(test)] + pub(super) const fn from_raw(hash: [u8; 32]) -> Self { + Self(hash) + } +} + +impl Display for BlobTxHash { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + write!(formatter, "{}", hex(&self.0)) + } +} + +impl Debug for BlobTxHash { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + formatter + .debug_tuple("BlobTxHash") + .field(&format_args!("{}", hex(&self.0))) + .finish() + } +} + +impl Serialize for BlobTxHash { + fn serialize(&self, serializer: S) -> Result { + serializer.collect_str(self) + } +} + +impl<'de> Deserialize<'de> for BlobTxHash { + fn deserialize>(deserializer: D) -> Result { + let hex = Cow::<'_, str>::deserialize(deserializer)?; + let raw_hash = <[u8; 32]>::from_hex(hex.as_bytes()).map_err(|error: FromHexError| { + serde::de::Error::custom(DeserializeBlobTxHashError::Hex(error.to_string())) + })?; + Ok(BlobTxHash(raw_hash)) + } +} + +#[derive(Error, Clone, Debug)] +#[non_exhaustive] +pub(in crate::relayer) enum DeserializeBlobTxHashError { + #[error("failed to decode as hex for blob tx hash: {0}")] + Hex(String), +} diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs index 2b85001314..7d59fdf02c 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs @@ -267,14 +267,14 @@ fn tx_hash_from_good_response_should_succeed() { tx_response: Some(tx_response), }); - let extracted_tx_hash = tx_hash_from_response(Ok(response)).unwrap(); - assert_eq!(tx_hash, extracted_tx_hash.0); + let extracted_tx_hash = lowercase_hex_encoded_tx_hash_from_response(Ok(response)).unwrap(); + assert_eq!(tx_hash, extracted_tx_hash); } #[test] fn tx_hash_from_bad_response_should_fail() { // Should return `FailedToBroadcastTx` if outer response is an error. - let error = tx_hash_from_response(Err(Status::internal(""))).unwrap_err(); + let error = lowercase_hex_encoded_tx_hash_from_response(Err(Status::internal(""))).unwrap_err(); // allow: `assert!(matches!(..))` provides poor feedback on failure. #[allow(clippy::manual_assert)] if !matches!(error, TrySubmitError::FailedToBroadcastTx(_)) { @@ -285,7 +285,7 @@ fn tx_hash_from_bad_response_should_fail() { let response = Ok(Response::new(BroadcastTxResponse { tx_response: None, })); - let error = tx_hash_from_response(response).unwrap_err(); + let error = lowercase_hex_encoded_tx_hash_from_response(response).unwrap_err(); // allow: `assert!(matches!(..))` provides poor feedback on failure. #[allow(clippy::manual_assert)] if !matches!(error, TrySubmitError::EmptyBroadcastTxResponse) { @@ -307,7 +307,7 @@ fn tx_hash_from_bad_response_should_fail() { let response = Ok(Response::new(BroadcastTxResponse { tx_response: Some(tx_response), })); - let error = tx_hash_from_response(response).unwrap_err(); + let error = lowercase_hex_encoded_tx_hash_from_response(response).unwrap_err(); match error { TrySubmitError::BroadcastTxResponseErrorCode { tx_hash: received_tx_hash, @@ -513,3 +513,11 @@ fn extract_required_fee_from_log_should_fail() { let bad_value = "insufficient fees; got: 1utia required: 2mutia: insufficient fee".to_string(); assert!(extract_required_fee_from_log(&bad_value).is_none()); } + +#[test] +fn blob_tx_hash_should_round_trip_json() { + let blob_tx_hash = BlobTxHash::from_raw([1; 32]); + let json_encoded = serde_json::to_string(&blob_tx_hash).unwrap(); + let decoded = serde_json::from_str(&json_encoded).unwrap(); + assert_eq!(blob_tx_hash, decoded); +} diff --git a/crates/astria-sequencer-relayer/src/relayer/mod.rs b/crates/astria-sequencer-relayer/src/relayer/mod.rs index 0c261f89d8..882aad521c 100644 --- a/crates/astria-sequencer-relayer/src/relayer/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/mod.rs @@ -1,8 +1,5 @@ use std::{ - path::{ - Path, - PathBuf, - }, + path::PathBuf, sync::Arc, time::Duration, }; @@ -62,6 +59,7 @@ mod write; pub(crate) use builder::Builder; use celestia_client::{ + BlobTxHash, BuilderError, CelestiaClientBuilder, CelestiaKeys, @@ -69,7 +67,11 @@ use celestia_client::{ }; use state::State; pub(crate) use state::StateSnapshot; -use submission::SubmissionState; +use submission::{ + PreparedSubmission, + StartedSubmission, + SubmissionStateAtStartup, +}; use crate::{ metrics::Metrics, @@ -106,8 +108,7 @@ pub(crate) struct Relayer { /// A watch channel to track the state of the relayer. Used by the API service. state: Arc, - pre_submit_path: PathBuf, - post_submit_path: PathBuf, + submission_state_path: PathBuf, metrics: &'static Metrics, } @@ -124,9 +125,9 @@ impl Relayer { /// failed catastrophically (after `u32::MAX` retries). #[instrument(skip_all)] pub(crate) async fn run(self) -> eyre::Result<()> { - let submission_state = read_submission_state(&self.pre_submit_path, &self.post_submit_path) - .await - .wrap_err("failed reading submission state from files")?; + // No need to add `wrap_err` as `new_from_path` already reports the path on error. + let submission_state_at_startup = + SubmissionStateAtStartup::new_from_path(&self.submission_state_path).await?; select!( () = self.relayer_shutdown_token.cancelled() => return Ok(()), @@ -136,7 +137,8 @@ impl Relayer { ) => init_result, )?; - let last_submitted_sequencer_height = submission_state.last_submitted_height(); + let last_completed_sequencer_height = + submission_state_at_startup.last_completed_sequencer_height(); let mut latest_height_stream = { use sequencer_client::StreamLatestHeight as _; @@ -148,7 +150,7 @@ impl Relayer { self.celestia_client_builder.clone(), self.rollup_filter.clone(), self.state.clone(), - submission_state, + submission_state_at_startup, self.submitter_shutdown_token.clone(), self.metrics, ); @@ -156,7 +158,7 @@ impl Relayer { let mut block_stream = read::BlockStream::builder(self.metrics) .block_time(self.sequencer_poll_period) .client(self.sequencer_grpc_client.clone()) - .set_last_fetched_height(last_submitted_sequencer_height) + .set_last_fetched_height(last_completed_sequencer_height) .state(self.state.clone()) .build(); @@ -348,30 +350,11 @@ async fn fetch_sequencer_chain_id( response.map(|status_response| status_response.node_info.network.to_string()) } -async fn read_submission_state, P2: AsRef>( - pre: P1, - post: P2, -) -> eyre::Result { - const LENIENT_CONSISTENCY_CHECK: bool = true; - let pre = pre.as_ref().to_path_buf(); - let post = post.as_ref().to_path_buf(); - crate::utils::flatten( - tokio::task::spawn_blocking(move || { - SubmissionState::from_paths::(pre, post) - }) - .await, - ) - .wrap_err( - "failed reading submission state from the configured pre- and post-submit files. Refer to \ - the values documented in `local.env.example` of the astria-sequencer-relayer service", - ) -} - fn spawn_submitter( client_builder: CelestiaClientBuilder, rollup_filter: IncludeRollup, state: Arc, - submission_state: SubmissionState, + submission_state_at_startup: SubmissionStateAtStartup, submitter_shutdown_token: CancellationToken, metrics: &'static Metrics, ) -> ( @@ -382,7 +365,7 @@ fn spawn_submitter( client_builder, rollup_filter, state, - submission_state, + submission_state_at_startup, submitter_shutdown_token, metrics, ); diff --git a/crates/astria-sequencer-relayer/src/relayer/submission.rs b/crates/astria-sequencer-relayer/src/relayer/submission.rs index 71654a8501..075e8d4b5b 100644 --- a/crates/astria-sequencer-relayer/src/relayer/submission.rs +++ b/crates/astria-sequencer-relayer/src/relayer/submission.rs @@ -1,366 +1,456 @@ //! Tracks the current submission state of sequencer-relayer and syncs it to disk. -use std::path::{ - Path, - PathBuf, +use std::{ + fmt::{ + self, + Display, + Formatter, + }, + path::{ + Path, + PathBuf, + }, + time::{ + Duration, + SystemTime, + }, }; use astria_eyre::eyre::{ self, - bail, ensure, WrapErr as _, }; -use sequencer_client::tendermint::block::Height as SequencerHeight; use serde::{ Deserialize, Serialize, }; -use tracing::{ - debug, - warn, -}; +use tendermint::block::Height as SequencerHeight; +use tracing::debug; + +use super::BlobTxHash; + +/// Represents a submission made to Celestia which has been confirmed as stored via a successful +/// `GetTx` call. +#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub(super) struct CompletedSubmission { + /// The height of the Celestia block in which the submission was stored. + celestia_height: u64, + /// The highest sequencer block height contained in the submission. + #[serde(with = "as_number")] + sequencer_height: SequencerHeight, +} + +impl CompletedSubmission { + fn new(celestia_height: u64, sequencer_height: SequencerHeight) -> Self { + Self { + celestia_height, + sequencer_height, + } + } +} + +/// Newtype wrapper for the file path of the submission state. +#[derive(Clone, Debug)] +struct StateFilePath(PathBuf); -#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "state")] -enum PostSubmission { +/// Newtype wrapper for the file path of the temp file used when writing submission state to disk. +#[derive(Clone, Debug)] +struct TempFilePath(PathBuf); + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case", tag = "state")] +enum State { + /// Indicates the first run of the sequencer, i.e. no previous session occurred. Fresh, - Submitted { - celestia_height: u64, + /// Indicates that we have started to prepare a new submission. Preparation involves fetching + /// information from the Celestia app (nonce, prices, etc.), and using that to create a signed + /// blob transaction. + Started { + last_submission: CompletedSubmission, + }, + /// Indicates that preparation of a signed blob transaction has happened, and we are now in the + /// process of submitting the transaction (sending a `broadcast_tx` gRPC) and confirming its + /// submission (polling via `get_tx` gRPCs). + Prepared { #[serde(with = "as_number")] sequencer_height: SequencerHeight, + last_submission: CompletedSubmission, + blob_tx_hash: BlobTxHash, + #[serde(with = "humantime_serde")] + at: SystemTime, }, } -impl PostSubmission { - fn is_fresh(&self) -> bool { - matches!(self, PostSubmission::Fresh) +impl State { + fn new_started(last_submission: CompletedSubmission) -> Self { + Self::Started { + last_submission, + } } - fn is_submitted(&self) -> bool { - matches!(self, PostSubmission::Submitted { .. }) + fn new_prepared( + sequencer_height: SequencerHeight, + last_submission: CompletedSubmission, + blob_tx_hash: BlobTxHash, + at: SystemTime, + ) -> Self { + Self::Prepared { + sequencer_height, + last_submission, + blob_tx_hash, + at, + } } - fn from_path>(path: P) -> eyre::Result { - let file = std::fs::File::open(&path) - .wrap_err("failed opening provided file path for reading post-submission state")?; - let state = serde_json::from_reader(file) - .wrap_err("failed reading contents of post-submission file")?; + /// Constructs an instance of `State` by parsing from `source`: a JSON-encoded file. + async fn read(source: &StateFilePath) -> eyre::Result { + let contents = tokio::fs::read_to_string(&source.0) + .await + .wrap_err_with(|| { + format!( + "failed reading submission state file at `{}`", + source.0.display() + ) + })?; + let state: State = serde_json::from_str(&contents) + .wrap_err_with(|| format!("failed parsing the contents of `{}`", source.0.display()))?; + + // Ensure the parsed values are sane. + match &state { + State::Fresh + | State::Started { + .. + } => {} + State::Prepared { + sequencer_height, + last_submission, + .. + } => ensure!( + *sequencer_height > last_submission.sequencer_height, + "submission state file `{}` invalid: current sequencer height \ + ({sequencer_height}) should be greater than last successful submission sequencer \ + height ({})", + source.0.display(), + last_submission.sequencer_height + ), + } + Ok(state) } - fn write_to_path>(&self, path: P) -> eyre::Result<()> { - let f = std::fs::File::options() - .write(true) - .truncate(true) - .open(path) - .wrap_err("failed opening file for writing state")?; - serde_json::to_writer(&f, self).wrap_err("failed writing json-serialized state to file")?; - f.sync_all() - .wrap_err("failed fully syncing state write to disk")?; - Ok(()) + /// Writes JSON-encoded `self` to `temp_file`, then renames `temp_file` to `destination`. + async fn write( + &self, + destination: &StateFilePath, + temp_file: &TempFilePath, + ) -> eyre::Result<()> { + let contents = + serde_json::to_string_pretty(self).wrap_err("failed json-encoding submission state")?; + tokio::fs::write(&temp_file.0, &contents) + .await + .wrap_err_with(|| { + format!( + "failed writing submission state to `{}`", + temp_file.0.display() + ) + })?; + tokio::fs::rename(&temp_file.0, &destination.0) + .await + .wrap_err_with(|| { + format!( + "failed moving `{}` to `{}`", + temp_file.0.display(), + destination.0.display() + ) + }) } } -#[derive(Clone, Copy, Debug, Deserialize, Serialize)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "state")] -enum PreSubmission { - Ignore, - Started { - #[serde(with = "as_number")] - sequencer_height: SequencerHeight, - last_submission: PostSubmission, - }, -} -impl PreSubmission { - fn from_path>(path: P) -> eyre::Result { - let file = std::fs::File::open(&path) - .wrap_err("failed opening provided file path for reading pre-submission state")?; - let state = serde_json::from_reader(file) - .wrap_err("failed reading contents of pre-submission file")?; - Ok(state) +impl Display for State { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + if formatter.alternate() { + write!(formatter, "{}", serde_json::to_string_pretty(self).unwrap()) + } else { + write!(formatter, "{}", serde_json::to_string(self).unwrap()) + } } +} - fn write_to_path>(&self, path: P) -> eyre::Result<()> { - let f = std::fs::File::options() - .write(true) - .truncate(true) - .open(path) - .wrap_err("failed opening file for writing state")?; - serde_json::to_writer(&f, self).wrap_err("failed writing json-serialized state to file")?; - f.sync_all() - .wrap_err("failed fully syncing state write to disk")?; - Ok(()) +/// State indicating the relayer has not performed any submissions previously. +#[derive(Debug)] +pub(super) struct FreshSubmission { + state_file_path: StateFilePath, + temp_file_path: TempFilePath, +} + +impl FreshSubmission { + /// Converts `self` into a `StartedSubmission` with last submission being given celestia and + /// sequencer heights of 0. + /// + /// The new state is not written to disk, as it is not required - a restart will perform the + /// correct operation if the state on disk is left as `fresh`. + pub(super) fn into_started(self) -> StartedSubmission { + let last_submission = CompletedSubmission::new(0, SequencerHeight::from(0_u8)); + StartedSubmission { + last_submission, + state_file_path: self.state_file_path, + temp_file_path: self.temp_file_path, + } } } +/// State indicating the relayer has started to prepare a new submission. #[derive(Clone, Debug)] -pub(super) struct SubmissionState { - pre: PreSubmission, - post: PostSubmission, - pre_path: PathBuf, - post_path: PathBuf, +pub(super) struct StartedSubmission { + last_submission: CompletedSubmission, + state_file_path: StateFilePath, + temp_file_path: TempFilePath, } -#[derive(Debug)] -pub(super) struct Started(SubmissionState); - -impl Started { - pub(super) fn finalize(self, celestia_height: u64) -> eyre::Result { - let Self(SubmissionState { - pre, - pre_path, - post_path, - .. - }) = self; - - let PreSubmission::Started { - sequencer_height, .. - } = pre - else { - panic!( - "once initialized, a submission's `pre` field must always be `Started`. Here it \ - is not. This is a bug" - ); - }; +impl StartedSubmission { + /// Constructs a new `StartedSubmission` and writes the state to disk. + async fn construct_and_write( + last_submission: CompletedSubmission, + state_file_path: StateFilePath, + temp_file_path: TempFilePath, + ) -> eyre::Result { + let state = State::new_started(last_submission); + debug!(%state, "writing submission started state to file"); + state + .write(&state_file_path, &temp_file_path) + .await + .wrap_err("failed commiting submission started state to disk")?; + Ok(Self { + last_submission, + state_file_path, + temp_file_path, + }) + } - let new = SubmissionState { - pre, - post: PostSubmission::Submitted { - celestia_height, - sequencer_height, - }, - pre_path, - post_path, - }; + /// Returns the celestia block height from the last completed submission. + pub(super) fn last_submission_celestia_height(&self) -> u64 { + self.last_submission.celestia_height + } - debug!( - state = serde_json::to_string(&new.post).expect("type contains no non-ascii keys"), - "finalizing submission by writing post-submit state to file", - ); - new.post - .write_to_path(&new.post_path) - .wrap_err("failed committing post-submission state to disk")?; - Ok(new) + /// Returns the sequencer block height from the last completed submission. + pub(super) fn last_submission_sequencer_height(&self) -> SequencerHeight { + self.last_submission.sequencer_height + } + + /// Converts `self` into a `PreparedSubmission` and writes the new state to disk. + pub(super) async fn into_prepared( + self, + new_sequencer_height: SequencerHeight, + blob_tx_hash: BlobTxHash, + ) -> eyre::Result { + PreparedSubmission::construct_and_write( + new_sequencer_height, + self.last_submission, + blob_tx_hash, + self.state_file_path, + self.temp_file_path, + ) + .await } } -impl SubmissionState { - pub(super) fn last_submitted_height(&self) -> Option { - match self.post { - PostSubmission::Fresh => None, - PostSubmission::Submitted { - sequencer_height, .. - } => Some(sequencer_height), - } +impl Display for StartedSubmission { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + write!( + formatter, + "file: {}, {}", + self.state_file_path.0.display(), + State::new_started(self.last_submission) + ) } +} - pub(super) fn initialize(self, sequencer_height: SequencerHeight) -> eyre::Result { - if let PostSubmission::Submitted { - sequencer_height: latest_submitted, - .. - } = self.post - { - ensure!( - sequencer_height > latest_submitted, - "refusing to submit a sequencer block at heights below or at what was already \ - submitted" - ); - } - let new = Self { - pre: PreSubmission::Started { - sequencer_height, - last_submission: self.post, - }, - ..self - }; - debug!( - state = serde_json::to_string(&new.pre).expect("type contains no non-ascii keys"), - "initializing submission by writing pre-submit state to file", +/// State indicating the relayer has prepared a new submission and is about to broadcast it to the +/// Celestia app. +#[derive(Clone, Debug)] +pub(super) struct PreparedSubmission { + sequencer_height: SequencerHeight, + last_submission: CompletedSubmission, + blob_tx_hash: BlobTxHash, + created_at: SystemTime, + state_file_path: StateFilePath, + temp_file_path: TempFilePath, +} + +impl PreparedSubmission { + /// Constructs a new `PreparedSubmission` and writes the state to disk. + async fn construct_and_write( + sequencer_height: SequencerHeight, + last_submission: CompletedSubmission, + blob_tx_hash: BlobTxHash, + state_file_path: StateFilePath, + temp_file_path: TempFilePath, + ) -> eyre::Result { + ensure!( + sequencer_height > last_submission.sequencer_height, + "cannot submit a sequencer block at height below or equal to what was already \ + successfully submitted" ); - new.pre - .write_to_path(&new.pre_path) - .wrap_err("failed commiting pre-submission state to disk")?; - Ok(Started(new)) + let created_at = SystemTime::now(); + let state = + State::new_prepared(sequencer_height, last_submission, blob_tx_hash, created_at); + state + .write(&state_file_path, &temp_file_path) + .await + .wrap_err("failed commiting submission prepared state to disk")?; + Ok(Self { + sequencer_height, + last_submission, + blob_tx_hash, + created_at, + state_file_path, + temp_file_path, + }) } - pub(super) fn from_paths, P2: AsRef>( - pre_path: P1, - post_path: P2, - ) -> eyre::Result { - let pre_path = pre_path.as_ref().to_path_buf(); - let post_path = post_path.as_ref().to_path_buf(); - - let mut pre = PreSubmission::from_path(&pre_path).wrap_err( - "failed constructing post submit state from provided path; if the post-submit state is - otherwise present and correctly formatted - and contains the correct information \ - about the - last submitted sequencer height and its inclusion height on Celestia - then \ - override the - contents of the pre-submission file with `{{\"state\": \"ignore\"}}`", - )?; - let post = PostSubmission::from_path(&post_path).wrap_err( - "failed constructing post submit state from provided path; if the pre-submit state is \ - otherwise present and correctly formatted then this indicates a file corruption. \ - Because the post-submit state is critical for determining the starting height for \ - relaying sequencer blocks, make sure it's set correctly.", - )?; - - let state = match (pre, post) { - (PreSubmission::Ignore, post) => Self { - pre, - post, - pre_path, - post_path, - }, + /// Returns the transaction hash of the prepared `BlobTx`. + pub(super) fn blob_tx_hash(&self) -> &BlobTxHash { + &self.blob_tx_hash + } - ( - PreSubmission::Started { - sequencer_height, - last_submission, - }, - post, - ) => { - if let Err(error) = ensure_consistent(sequencer_height, last_submission, post) { - if LENIENT { - warn!(%error, "pre- and post-submission states were inconsistent. Setting pre-state to `ignore` and continuing from last post-state. This could to double submission!!"); - pre = PreSubmission::Ignore; - } else { - return Err(error).wrap_err("on-disk states are inconsistent"); - } - } - Self { - pre, - post, - pre_path, - post_path, - } - } - }; + /// Returns the maximum duration for which the Celestia app should be polled with `GetTx` + /// requests to confirm successful storage of the associated `BlobTx`. + /// + /// This is at least 15 seconds, but up to a maximum of a minute from when the submission was + /// first attempted. + pub(super) fn confirmation_timeout(&self) -> Duration { + std::cmp::max( + Duration::from_secs(15), + Duration::from_secs(60).saturating_sub(self.created_at.elapsed().unwrap_or_default()), + ) + } - // testing if the states can be written to the provided paths - state.pre.write_to_path(&state.pre_path).wrap_err_with(|| { - format!( - "failed writing just-read pre-submission state to disk; is the file writable? \ - Write destination: {}", - state.pre_path.display(), - ) - })?; - state - .post - .write_to_path(&state.post_path) - .wrap_err_with(|| { - format!( - "failed writing just-read post-submission state to disk; is the file \ - writable? Write destination: {}", - state.post_path.display(), - ) - })?; + /// Converts `self` into a `StartedSubmission` with last submission being recorded using the + /// provided celestia height and the sequencer height from `self`. Writes the new state to disk. + pub(super) async fn into_started( + self, + celestia_height: u64, + ) -> eyre::Result { + let last_submission = CompletedSubmission::new(celestia_height, self.sequencer_height); + StartedSubmission::construct_and_write( + last_submission, + self.state_file_path, + self.temp_file_path, + ) + .await + } - Ok(state) + /// Reverts `self` into a `StartedSubmission` retaining the last submission from `self` as the + /// last submission. Writes the new state to disk. + pub(super) async fn revert(self) -> eyre::Result { + StartedSubmission::construct_and_write( + self.last_submission, + self.state_file_path, + self.temp_file_path, + ) + .await } } -fn ensure_consistent( - sequencer_height_started: SequencerHeight, - last_submission: PostSubmission, - current_submission: PostSubmission, -) -> eyre::Result<()> { - ensure_height_pre_submission_is_height_post_submission( - sequencer_height_started, - current_submission, - )?; - ensure_last_and_current_are_different(last_submission, current_submission)?; - ensure_last_is_not_submitted_while_current_is_fresh(last_submission, current_submission)?; - ensure_height_in_last_is_less_than_height_in_current(last_submission, current_submission)?; - Ok(()) +impl Display for PreparedSubmission { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + write!( + formatter, + "file: {}, {}", + self.state_file_path.0.display(), + State::new_prepared( + self.sequencer_height, + self.last_submission, + self.blob_tx_hash, + self.created_at + ) + ) + } } -fn ensure_height_pre_submission_is_height_post_submission( - sequencer_height_started: SequencerHeight, - current_submission: PostSubmission, -) -> eyre::Result<()> { - let PostSubmission::Submitted { - sequencer_height, .. - } = current_submission - else { - bail!( - "the pre-submit file indicated that a new submission was started, but the post-submit \ - file still contained a \"fresh\" state. This indicates that the submission was not \ - finalized." - ); - }; - ensure!( - sequencer_height_started == sequencer_height, - "the initialized `sequencer_height` in the pre-submit file does not match the sequencer \ - height in the post-submit file. This indicates that a new submission to Celestia was \ - started but not finalized. This is because a successful submission records the very same \ - `sequencer_height` in the post-submit file." - ); - Ok(()) +#[derive(Debug)] +pub(super) enum SubmissionStateAtStartup { + Fresh(FreshSubmission), + Started(StartedSubmission), + Prepared(PreparedSubmission), } -fn ensure_last_and_current_are_different( - last_submission: PostSubmission, - current_submission: PostSubmission, -) -> eyre::Result<()> { - ensure!( - last_submission != current_submission, - "the `last_submission` field of the pre-submit file matches the object found in the \ - post-submit file. This indicates that a new submission to Celestia was started but not \ - finalized. This is because when starting a new submission the object in the post-submit \ - file is written to `last_submission`." - ); - Ok(()) -} +impl SubmissionStateAtStartup { + /// Constructs a new `SubmissionStateAtStartup` by reading from the given `source`. + /// + /// `source` should be a JSON-encoded `State`, and should be writable. + pub(super) async fn new_from_path>(source: P) -> eyre::Result { + let file_path = source.as_ref(); + let state_file_path = StateFilePath(file_path.to_path_buf()); + let state = State::read(&state_file_path).await?; + let temp_file_path = match file_path.extension().and_then(|extn| extn.to_str()) { + Some(extn) => TempFilePath(file_path.with_extension(format!("{extn}.tmp"))), + None => TempFilePath(file_path.with_extension("tmp")), + }; -fn ensure_last_is_not_submitted_while_current_is_fresh( - last_submission: PostSubmission, - current_submission: PostSubmission, -) -> eyre::Result<()> { - ensure!( - !(last_submission.is_submitted() && current_submission.is_fresh()), - "the submission recorded in the post-submit file cannot be `fresh` while \ - `last_submission` in the pre-submit file is `submitted`", - ); - Ok(()) -} + // Ensure the state can be written. + state + .write(&state_file_path, &temp_file_path) + .await + .wrap_err_with(|| { + format!( + "failed writing just-read submission state to disk at `{}`", + state_file_path.0.display() + ) + })?; -fn ensure_height_in_last_is_less_than_height_in_current( - last_submission: PostSubmission, - current_submission: PostSubmission, -) -> eyre::Result<()> { - let PostSubmission::Submitted { - sequencer_height: height_in_last, - .. - } = last_submission - else { - return Ok(()); - }; - let PostSubmission::Submitted { - sequencer_height: height_in_current, - .. - } = current_submission - else { - return Ok(()); - }; - ensure!( - height_in_last < height_in_current, - "the `sequencer_height` in the post-submit file is not greater than the \ - `sequencer_height` stored in the `last_submission` field of the pre-submit file. - This indicates that a new submission was started not but finalized." - ); - Ok(()) + match state { + State::Fresh => Ok(Self::Fresh(FreshSubmission { + state_file_path, + temp_file_path, + })), + State::Started { + last_submission, + } => Ok(Self::Started(StartedSubmission { + last_submission, + state_file_path, + temp_file_path, + })), + State::Prepared { + sequencer_height, + last_submission, + blob_tx_hash, + at, + } => Ok(Self::Prepared(PreparedSubmission { + sequencer_height, + last_submission, + blob_tx_hash, + created_at: at, + state_file_path, + temp_file_path, + })), + } + } + + /// Returns the sequencer height of the last completed submission, or `None` if the state is + /// `Fresh`. + pub(super) fn last_completed_sequencer_height(&self) -> Option { + match &self { + SubmissionStateAtStartup::Fresh { + .. + } => None, + SubmissionStateAtStartup::Started(StartedSubmission { + last_submission, .. + }) + | SubmissionStateAtStartup::Prepared(PreparedSubmission { + last_submission, .. + }) => Some(last_submission.sequencer_height), + } + } } mod as_number { //! Logic to serialize sequencer heights as number, deserialize numbers as sequencer heights. //! //! This is unfortunately necessary because the [`serde::Serialize`], [`serde::Deserialize`] - //! implementations for [`sequencer_client::tendermint::block::Height`] write the integer as - //! string, probably due to tendermint's/cometbft's go-legacy. + //! implementations for [`tendermint::block::Height`] write the integer as a string, probably + //! due to tendermint's/cometbft's go-legacy. use serde::{ Deserialize as _, Deserializer, @@ -368,6 +458,7 @@ mod as_number { }; use super::SequencerHeight; + // Allow: the function signature is dictated by the serde(with) attribute. #[allow(clippy::trivially_copy_pass_by_ref)] pub(super) fn serialize(height: &SequencerHeight, serializer: S) -> Result @@ -381,239 +472,440 @@ mod as_number { where D: Deserializer<'de>, { - use serde::de::Error; let height = u64::deserialize(deserializer)?; - SequencerHeight::try_from(height).map_err(Error::custom) + SequencerHeight::try_from(height).map_err(serde::de::Error::custom) } } #[cfg(test)] mod tests { + use std::time::Duration; + use serde_json::json; use tempfile::NamedTempFile; - use super::SubmissionState; - use crate::relayer::submission::PostSubmission; + use super::*; - const STRICT_CONSISTENCY_CHECK: bool = false; + const CELESTIA_HEIGHT: u64 = 1234; + const SEQUENCER_HEIGHT_LOW: u32 = 111; + const SEQUENCER_HEIGHT_HIGH: u32 = 222; + const BLOB_TX_HASH_STR: &str = + "0909090909090909090909090909090909090909090909090909090909090909"; + const BLOB_TX_HASH: BlobTxHash = BlobTxHash::from_raw([9; 32]); + const AT_STR: &str = "2024-06-24T22:22:22.222222222Z"; + const AT_DURATION_SINCE_EPOCH: Duration = Duration::from_nanos(1_719_267_742_222_222_222); #[track_caller] - fn create_files() -> (NamedTempFile, NamedTempFile) { - let pre = NamedTempFile::new() - .expect("must be able to create an empty pre submit state file to run tests"); - let post = NamedTempFile::new() - .expect("must be able to create an empty post submit state file to run tests"); - (pre, post) + fn write(val: &serde_json::Value) -> NamedTempFile { + let file = NamedTempFile::new().unwrap(); + serde_json::to_writer(&file, val).unwrap(); + file } - fn write(f: &NamedTempFile, val: &serde_json::Value) { - serde_json::to_writer(f, val).expect("must be able to write state to run tests"); + fn write_fresh_state() -> NamedTempFile { + write(&json!({ "state": "fresh" })) } - #[test] - fn fresh_with_ignored_is_ok() { - let (pre, post) = create_files(); - write(&pre, &json!({ "state": "ignore" })); - write(&post, &json!({ "state": "fresh" })); - SubmissionState::from_paths::(pre.path(), post.path()) - .expect("states `ignore` and `fresh` give a working submission state"); + fn write_started_state() -> NamedTempFile { + write(&json!({ + "state": "started", + "last_submission": { + "celestia_height": CELESTIA_HEIGHT, + "sequencer_height": SEQUENCER_HEIGHT_LOW + } + })) } - #[test] - fn submitted_with_ignored_is_ok() { - let (pre, post) = create_files(); - write(&pre, &json!({ "state": "ignore" })); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - SubmissionState::from_paths::(pre.path(), post.path()) - .expect("states `ignore` and `submitted` give a working submission state"); + fn write_prepared_state() -> NamedTempFile { + write(&json!({ + "state": "prepared", + "sequencer_height": SEQUENCER_HEIGHT_HIGH, + "last_submission": { + "celestia_height": CELESTIA_HEIGHT, + "sequencer_height": SEQUENCER_HEIGHT_LOW + }, + "blob_tx_hash": BLOB_TX_HASH_STR, + "at": AT_STR + })) } - #[test] - fn started_with_same_fresh_in_last_and_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 5, "last_submission": { "state": "fresh"} }), - ); - write(&post, &json!({ "state": "fresh" })); - let _ = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect_err("started state with `fresh` in last and current gives error"); + #[tokio::test] + async fn should_read_fresh_state() { + let file = write_fresh_state(); + let parsed = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap(); + match parsed { + State::Fresh => (), + _ => panic!("expected fresh state, got:\n{parsed:#}"), + } } - #[test] - fn started_with_height_before_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 5, "last_submission": { "state": "fresh"} }), - ); - write( - &post, - &json!({ "state": "submitted", "sequencer_height": 6, "celestia_height": 2 }), - ); - let _ = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect_err( - "started state with sequencer height less then sequencer height recorded \ - submitted gives error", + #[tokio::test] + async fn should_read_started_state() { + let file = write_started_state(); + let parsed = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap(); + match parsed { + State::Started { + last_submission, + } => { + let expected_submission = CompletedSubmission::new( + CELESTIA_HEIGHT, + SequencerHeight::from(SEQUENCER_HEIGHT_LOW), ); + assert_eq!(last_submission, expected_submission); + } + _ => panic!("expected started state, got:\n{parsed:#}"), + } } - #[test] - fn started_with_same_submitted_in_last_and_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "submitted", "celestia_height": 5, "sequencer_height": 2} }), - ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - let _ = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect_err( - "started state with the same `submitted` in last and current give an error", + #[tokio::test] + async fn should_read_prepared_state() { + let file = write_prepared_state(); + let parsed = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap(); + match parsed { + State::Prepared { + sequencer_height, + last_submission, + blob_tx_hash, + at, + } => { + assert_eq!( + sequencer_height, + SequencerHeight::from(SEQUENCER_HEIGHT_HIGH) + ); + let expected_submission = CompletedSubmission::new( + CELESTIA_HEIGHT, + SequencerHeight::from(SEQUENCER_HEIGHT_LOW), ); + assert_eq!(last_submission, expected_submission); + assert_eq!(blob_tx_hash, BLOB_TX_HASH); + assert_eq!(at, SystemTime::UNIX_EPOCH + AT_DURATION_SINCE_EPOCH); + } + _ => panic!("expected prepared state, got:\n{parsed:?}"), + } } - #[test] - fn started_with_different_last_fresh_and_current_submitted_is_ok() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "fresh" }}), - ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - let _ = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect( - "started state with the `fresh` in last and `submitted` in current gives \ - working submission state", - ); + #[tokio::test] + async fn should_fail_to_read_missing_state_file() { + let bad_path = "bad path"; + let error = State::read(&StateFilePath(Path::new(bad_path).to_path_buf())) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains(bad_path)); + assert!(full_error.contains("failed reading submission state file")); } - #[test] - fn submit_initialize_finalize_flow_works() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "fresh" }}), - ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - let state = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect( - "started state with the `fresh` in last and `submitted` in current gives \ - working submission state", - ); - let started = state.initialize(3u32.into()).unwrap(); - let finalized = started.finalize(6).unwrap(); - let PostSubmission::Submitted { - celestia_height, + #[tokio::test] + async fn should_fail_to_read_invalid_state_file() { + let file = write(&json!({ "state": "invalid" })); + let error = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains(&file.path().display().to_string())); + assert!(full_error.contains("failed parsing the contents")); + } + + #[tokio::test] + async fn should_fail_to_read_state_file_with_broken_invariant() { + // The current sequencer height must be greater than the last submission's. + let file = write(&json!({ + "state": "prepared", + "sequencer_height": SEQUENCER_HEIGHT_HIGH, + "last_submission": { + "celestia_height": CELESTIA_HEIGHT, + "sequencer_height": SEQUENCER_HEIGHT_HIGH + }, + "blob_tx_hash": BLOB_TX_HASH_STR, + "at": AT_STR + })); + let error = State::read(&StateFilePath(file.path().to_path_buf())) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains(&file.path().display().to_string())); + assert!(full_error.contains("should be greater than last successful submission sequencer")); + } + + async fn should_write_state(state: State) { + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + state.write(&destination, &temp_file).await.unwrap(); + + let parsed_state = State::read(&destination).await.unwrap(); + assert_eq!(state, parsed_state); + assert!(!temp_file.0.exists()); + } + + #[tokio::test] + async fn should_write_fresh_state() { + should_write_state(State::Fresh).await; + } + + #[tokio::test] + async fn should_write_started_state() { + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + should_write_state(State::new_started(last_submission)).await; + } + + #[tokio::test] + async fn should_write_prepared_state() { + let sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_HIGH); + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let at = SystemTime::UNIX_EPOCH + AT_DURATION_SINCE_EPOCH; + let state = State::new_prepared(sequencer_height, last_submission, BLOB_TX_HASH, at); + should_write_state(state).await; + } + + #[tokio::test] + async fn started_submission_should_transition_to_prepared() { + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + let started_submission = StartedSubmission { + last_submission, + state_file_path: destination.clone(), + temp_file_path: temp_file.clone(), + }; + + // Transition to prepared. + let new_sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_HIGH); + let prepared_submission = started_submission + .into_prepared(new_sequencer_height, BLOB_TX_HASH) + .await + .unwrap(); + assert_eq!(prepared_submission.sequencer_height, new_sequencer_height); + assert_eq!(prepared_submission.last_submission, last_submission); + assert_eq!(prepared_submission.blob_tx_hash, BLOB_TX_HASH); + assert_eq!(prepared_submission.state_file_path.0, destination.0); + assert_eq!(prepared_submission.temp_file_path.0, temp_file.0); + + // Ensure the new state was written to disk. + let parsed_state = State::read(&destination).await.unwrap(); + match parsed_state { + State::Prepared { + .. + } => (), + _ => panic!("expected prepared state, got:\n{parsed_state:?}"), + } + } + + #[tokio::test] + async fn started_submission_should_not_transition_with_broken_invariant() { + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + let started_submission = StartedSubmission { + last_submission, + state_file_path: destination.clone(), + temp_file_path: temp_file.clone(), + }; + + // Try to transition to prepared - should fail as new sequencer height == last sequencer + // height. + let new_sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_LOW); + let error = started_submission + .into_prepared(new_sequencer_height, BLOB_TX_HASH) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains("cannot submit a sequencer block at height below or")); + + // Ensure the new state was not written to disk. + let error = State::read(&destination).await.unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains("failed reading submission state file")); + } + + #[tokio::test] + async fn prepared_submission_should_transition_to_started() { + let sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_HIGH); + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let created_at = SystemTime::UNIX_EPOCH + AT_DURATION_SINCE_EPOCH; + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + let prepared_submission = PreparedSubmission { sequencer_height, - } = finalized.post - else { - panic!("the post submission state should be `submitted`"); + last_submission, + blob_tx_hash: BLOB_TX_HASH, + created_at, + state_file_path: destination.clone(), + temp_file_path: temp_file.clone(), }; - assert_eq!(celestia_height, 6); - assert_eq!(sequencer_height.value(), 3); + + // Transition to started. + let new_celestia_height = CELESTIA_HEIGHT + 1; + let started_submission = prepared_submission + .into_started(new_celestia_height) + .await + .unwrap(); + let expected_last_submission = + CompletedSubmission::new(new_celestia_height, sequencer_height); + assert_eq!(started_submission.last_submission, expected_last_submission); + assert_eq!(started_submission.state_file_path.0, destination.0); + assert_eq!(started_submission.temp_file_path.0, temp_file.0); + + // Ensure the new state was written to disk. + let parsed_state = State::read(&destination).await.unwrap(); + match parsed_state { + State::Started { + .. + } => (), + _ => panic!("expected started state, got:\n{parsed_state:?}"), + } + } + + #[tokio::test] + async fn prepared_submission_should_revert_to_started() { + let sequencer_height = SequencerHeight::from(SEQUENCER_HEIGHT_HIGH); + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let created_at = SystemTime::UNIX_EPOCH + AT_DURATION_SINCE_EPOCH; + let tempdir = tempfile::tempdir().unwrap(); + let destination = StateFilePath(tempdir.path().join("state.json")); + let temp_file = TempFilePath(tempdir.path().join("state.json.tmp")); + let prepared_submission = PreparedSubmission { + sequencer_height, + last_submission, + blob_tx_hash: BLOB_TX_HASH, + created_at, + state_file_path: destination.clone(), + temp_file_path: temp_file.clone(), + }; + + // Revert to started - should hold last submission. + let reverted_submission = prepared_submission.revert().await.unwrap(); + assert_eq!(reverted_submission.last_submission, last_submission); + assert_eq!(reverted_submission.state_file_path.0, destination.0); + assert_eq!(reverted_submission.temp_file_path.0, temp_file.0); + + // Ensure the new state was written to disk. + let parsed_state = State::read(&destination).await.unwrap(); + match parsed_state { + State::Started { + .. + } => (), + _ => panic!("expected started state, got:\n{parsed_state:?}"), + } } #[test] - fn submit_old_blocks_gives_error() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "fresh" }}), + fn confirmation_timeout_should_respect_limits() { + let last_submission = + CompletedSubmission::new(CELESTIA_HEIGHT, SequencerHeight::from(SEQUENCER_HEIGHT_LOW)); + let mut prepared_submission = PreparedSubmission { + sequencer_height: SequencerHeight::from(SEQUENCER_HEIGHT_HIGH), + last_submission, + blob_tx_hash: BLOB_TX_HASH, + created_at: SystemTime::UNIX_EPOCH, + state_file_path: StateFilePath(PathBuf::new()), + temp_file_path: TempFilePath(PathBuf::new()), + }; + + // With a creation time far in the past, timeout should be 15 seconds. + assert_eq!( + prepared_submission.confirmation_timeout(), + Duration::from_secs(15) ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), + + // With a creation time in the future, timeout should be 60 seconds. + prepared_submission.created_at = SystemTime::now() + Duration::from_secs(1000); + assert_eq!( + prepared_submission.confirmation_timeout(), + Duration::from_secs(60) ); - let state = - SubmissionState::from_paths::(pre.path(), post.path()) - .expect( - "started state with `fresh` in last and `submitted` in current gives working \ - submission state", - ); - let _ = state - .initialize(2u32.into()) - .expect_err("trying to submit the same sequencer height is an error"); } - mod lenient { - //! These test the same scenarios as the tests of the same name in the super module, but - //! whereas those are strict and should fail, the tests in this module should pass - - use super::{ - create_files, - json, - write, - SubmissionState, - }; - - const LENIENT_CONSISTENCY_CHECK: bool = true; - - #[test] - fn started_with_same_fresh_in_last_and_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 5, "last_submission": { "state": "fresh"} }), - ); - write(&post, &json!({ "state": "fresh" })); - let _ = SubmissionState::from_paths::( - pre.path(), - post.path(), - ) - .expect("this test should not fail when doing a leaning consistency check"); + #[tokio::test] + async fn should_construct_fresh_submission_state_at_startup() { + let file = write_fresh_state(); + let parsed = SubmissionStateAtStartup::new_from_path(file.path()) + .await + .unwrap(); + match parsed { + SubmissionStateAtStartup::Fresh(FreshSubmission { + state_file_path, + temp_file_path, + }) => { + assert_eq!(state_file_path.0, file.path()); + assert_eq!( + temp_file_path.0.display().to_string(), + format!("{}.tmp", file.path().display()) + ); + } + _ => panic!("expected fresh state, got: {parsed:?}"), } + } - #[test] - fn started_with_height_before_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 5, "last_submission": { "state": "fresh"} }), - ); - write( - &post, - &json!({ "state": "submitted", "sequencer_height": 6, "celestia_height": 2 }), - ); - let _ = SubmissionState::from_paths::( - pre.path(), - post.path(), - ) - .expect("this test should not fail when doing a leaning consistency check"); + #[tokio::test] + async fn should_construct_started_submission_state_at_startup() { + let file = write_started_state(); + let parsed = SubmissionStateAtStartup::new_from_path(file.path()) + .await + .unwrap(); + match parsed { + SubmissionStateAtStartup::Started(StartedSubmission { + state_file_path, + temp_file_path, + .. + }) => { + assert_eq!(state_file_path.0, file.path()); + assert_eq!( + temp_file_path.0.display().to_string(), + format!("{}.tmp", file.path().display()) + ); + } + _ => panic!("expected started state, got: {parsed:?}"), } + } - #[test] - fn started_with_same_submitted_in_last_and_current_is_err() { - let (pre, post) = create_files(); - write( - &pre, - &json!({ "state": "started", "sequencer_height": 2, "last_submission": { "state": "submitted", "celestia_height": 5, "sequencer_height": 2} }), - ); - write( - &post, - &json!({ "state": "submitted", "celestia_height": 5, "sequencer_height": 2 }), - ); - let _ = SubmissionState::from_paths::( - pre.path(), - post.path(), - ) - .expect("this test should not fail when doing a leaning consistency check"); + #[tokio::test] + async fn should_construct_prepared_submission_state_at_startup() { + let file = write_prepared_state(); + let parsed = SubmissionStateAtStartup::new_from_path(file.path()) + .await + .unwrap(); + match parsed { + SubmissionStateAtStartup::Prepared(PreparedSubmission { + state_file_path, + temp_file_path, + .. + }) => { + assert_eq!(state_file_path.0, file.path()); + assert_eq!( + temp_file_path.0.display().to_string(), + format!("{}.tmp", file.path().display()) + ); + } + _ => panic!("expected prepared state, got: {parsed:?}"), } } + + #[tokio::test] + async fn should_fail_to_construct_if_not_writable() { + let file = write_prepared_state(); + // Create a folder at the path where the temp file would be written. + std::fs::create_dir(format!("{}.tmp", file.path().display())).unwrap(); + let error = SubmissionStateAtStartup::new_from_path(file.path()) + .await + .unwrap_err(); + let full_error = format!("{error:#}"); + assert!(full_error.contains(&file.path().display().to_string())); + assert!(full_error.contains("failed writing just-read submission state to disk at")); + } } diff --git a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs index d3bf5b352e..3054f2b7d3 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs @@ -15,6 +15,8 @@ use std::{ use astria_eyre::eyre::{ self, + bail, + Report, WrapErr as _, }; use celestia_types::Blob; @@ -26,6 +28,8 @@ use futures::{ FutureExt as _, }; use sequencer_client::SequencerBlock; +use tendermint::block::Height as SequencerHeight; +use thiserror::Error; use tokio::{ select, sync::{ @@ -52,9 +56,12 @@ use tracing::{ use super::{ celestia_client::CelestiaClient, + BlobTxHash, BuilderError, CelestiaClientBuilder, - SubmissionState, + PreparedSubmission, + StartedSubmission, + SubmissionStateAtStartup, TrySubmitError, }; use crate::{ @@ -109,9 +116,8 @@ pub(super) struct BlobSubmitter { /// The state of the relayer. state: Arc, - /// Tracks the submission state and writes it to disk before and after each Celestia - /// submission. - submission_state: SubmissionState, + /// Provides the initial submission state. + submission_state_at_startup: Option, /// The shutdown token to signal that blob submitter should finish its current submission and /// exit. @@ -129,7 +135,7 @@ impl BlobSubmitter { client_builder: CelestiaClientBuilder, rollup_filter: IncludeRollup, state: Arc, - submission_state: SubmissionState, + submission_state_at_startup: SubmissionStateAtStartup, submitter_shutdown_token: CancellationToken, metrics: &'static Metrics, ) -> (Self, BlobSubmitterHandle) { @@ -141,7 +147,7 @@ impl BlobSubmitter { blocks: rx, next_submission: NextSubmission::new(rollup_filter, metrics), state, - submission_state, + submission_state_at_startup: Some(submission_state_at_startup), submitter_shutdown_token, pending_block: None, metrics, @@ -163,6 +169,27 @@ impl BlobSubmitter { error.wrap_err(message) })?; + let Some(submission_state_at_startup) = self.submission_state_at_startup.take() else { + bail!("submission state must be provided at startup"); + }; + let mut started_submission = match submission_state_at_startup { + SubmissionStateAtStartup::Fresh(fresh) => fresh.into_started(), + SubmissionStateAtStartup::Started(started) => started, + SubmissionStateAtStartup::Prepared(prepared) => { + try_confirm_submission_from_last_session( + client.clone(), + prepared, + self.state.clone(), + self.metrics, + ) + .await + .wrap_err( + "failed to confirm the unfinished submission state of the previously loaded \ + session", + )? + } + }; + // A submission to Celestia that is currently in-flight. let mut ongoing_submission = Fuse::terminated(); @@ -182,12 +209,11 @@ impl BlobSubmitter { { // XXX: Breaks the select-loop and returns. With the current retry-logic in // `submit_blobs` this happens after u32::MAX retries which is effectively never. - // self.submission_state = match submission_result.wrap_err("failed submitting blocks to Celestia") - self.submission_state = match submission_result { + started_submission = match submission_result { Ok(state) => state, Err(err) => { - // Use `wrap_err` on the return break value. Using it on the match-value causes - // type inference to fail. + // Use `wrap_err` on the return break value. Using it on the match-value + // causes type inference to fail. break Err(err).wrap_err("failed submitting blocks to Celestia"); } }; @@ -201,7 +227,7 @@ impl BlobSubmitter { client.clone(), submission, self.state.clone(), - self.submission_state.clone(), + started_submission.clone(), self.metrics, ).boxed().fuse(); if let Some(block) = self.pending_block.take() { @@ -215,7 +241,12 @@ impl BlobSubmitter { // add new blocks to the next submission if there is space. Some(block) = self.blocks.recv(), if self.has_capacity() => { - if let Err(error) = self.add_sequencer_block_to_next_submission(block) { + if block.height() <= started_submission.last_submission_sequencer_height() { + info!( + sequencer_height = %block.height(), + "skipping sequencer block as already included in previous submission" + ); + } else if let Err(error) = self.add_sequencer_block_to_next_submission(block) { break Err(error).wrap_err( "critically failed adding Sequencer block to next submission" ); @@ -269,15 +300,64 @@ impl BlobSubmitter { } } -/// Submits new blobs Celestia. +/// Tries to confirm the last attempted submission of the previous session. +/// +/// This should only be called where submission state on startup is `Prepared`, meaning we don't yet +/// know whether that final submission attempt succeeded or not. +/// +/// Internally, this polls `GetTx` for up to one minute. The returned `SubmissionState` is +/// guaranteed to be in `Started` state, either holding the heights of the previously prepared +/// submission if confirmed by Celestia, or holding the heights of the last known confirmed +/// submission in the case of timing out. #[instrument(skip_all)] +async fn try_confirm_submission_from_last_session( + mut client: CelestiaClient, + prepared_submission: PreparedSubmission, + state: Arc, + metrics: &'static Metrics, +) -> eyre::Result { + let blob_tx_hash = prepared_submission.blob_tx_hash(); + info!(%blob_tx_hash, "confirming submission of last `BlobTx` from previous session"); + + let timeout = prepared_submission.confirmation_timeout(); + let new_state = if let Some(celestia_height) = client + .confirm_submission_with_timeout(blob_tx_hash, timeout) + .await + { + info!(%celestia_height, "confirmed previous session submitted blobs to Celestia"); + prepared_submission + .into_started(celestia_height) + .await + .wrap_err("failed to convert previous session's state into `started`")? + } else { + info!( + "previous session's last submission was not completed; continuing from last confirmed \ + submission" + ); + prepared_submission + .revert() + .await + .wrap_err("failed to revert previous session's state into `started`")? + }; + + metrics.absolute_set_sequencer_submission_height( + new_state.last_submission_sequencer_height().value(), + ); + metrics.absolute_set_celestia_submission_height(new_state.last_submission_celestia_height()); + state.set_latest_confirmed_celestia_height(new_state.last_submission_celestia_height()); + + Ok(new_state) +} + +/// Submits new blobs Celestia. +#[instrument(skip_all, err)] async fn submit_blobs( client: CelestiaClient, data: conversion::Submission, state: Arc, - submission_state: SubmissionState, + started_submission: StartedSubmission, metrics: &'static Metrics, -) -> eyre::Result { +) -> eyre::Result { info!( blocks = %telemetry::display::json(&data.input_metadata()), total_data_uncompressed_size = data.uncompressed_size(), @@ -297,26 +377,18 @@ async fn submit_blobs( let largest_sequencer_height = data.greatest_sequencer_height(); let blobs = data.into_blobs(); - let submission_started = match crate::utils::flatten( - tokio::task::spawn_blocking(move || submission_state.initialize(largest_sequencer_height)) - .in_current_span() - .await, - ) { - Err(error) => { - error!(%error, "failed to initialize submission; abandoning"); - return Err(error); - } - Ok(state) => state, - }; + let new_state = submit_with_retry( + client, + blobs, + state.clone(), + started_submission, + largest_sequencer_height, + metrics, + ) + .await + .wrap_err("failed submitting blobs to Celestia")?; - let celestia_height = match submit_with_retry(client, blobs, state.clone(), metrics).await { - Err(error) => { - let message = "failed submitting blobs to Celestia"; - error!(%error, message); - return Err(error.wrap_err(message)); - } - Ok(height) => height, - }; + let celestia_height = new_state.last_submission_celestia_height(); metrics.absolute_set_sequencer_submission_height(largest_sequencer_height.value()); metrics.absolute_set_celestia_submission_height(celestia_height); metrics.record_celestia_submission_latency(start.elapsed()); @@ -326,18 +398,7 @@ async fn submit_blobs( state.set_celestia_connected(true); state.set_latest_confirmed_celestia_height(celestia_height); - let final_state = match crate::utils::flatten( - tokio::task::spawn_blocking(move || submission_started.finalize(celestia_height)) - .in_current_span() - .await, - ) { - Err(error) => { - error!(%error, "failed to finalize submission; abandoning"); - return Err(error); - } - Ok(state) => state, - }; - Ok(final_state) + Ok(new_state) } #[instrument(skip_all)] @@ -383,12 +444,25 @@ async fn init_with_retry(client_builder: CelestiaClientBuilder) -> eyre::Result< Ok(celestia_client) } +#[derive(Error, Clone, Debug)] +enum SubmissionError { + #[error(transparent)] + TrySubmit(#[from] TrySubmitError), + #[error("unrecoverable submission error")] + Unrecoverable(#[source] Arc), + #[error("broadcast tx timed out")] + BroadcastTxTimedOut(PreparedSubmission), +} + +#[instrument(skip_all)] async fn submit_with_retry( client: CelestiaClient, blobs: Vec, state: Arc, + started_submission: StartedSubmission, + largest_sequencer_height: SequencerHeight, metrics: &'static Metrics, -) -> eyre::Result { +) -> eyre::Result { // Moving the span into `on_retry`, because tryhard spawns these in a tokio // task, losing the span. let span = Span::current(); @@ -397,12 +471,22 @@ async fn submit_with_retry( // `TrySubmitError` to the next attempt of the `retry_fn`. let (last_error_sender, last_error_receiver) = watch::channel(None); + let initial_retry_delay = Duration::from_millis(100); let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) - .exponential_backoff(Duration::from_millis(100)) - // 12 seconds is the Celestia block time + // 12 seconds is the Celestia block time. .max_delay(Duration::from_secs(12)) + .custom_backoff(|attempt: u32, error: &SubmissionError| { + if matches!(error, SubmissionError::Unrecoverable(_)) { + return tryhard::RetryPolicy::Break; + } + // This is equivalent to the `exponential_backoff` policy. Note that `max_delay` + // above is still respected regardless of what we return here. + let delay = + initial_retry_delay.saturating_mul(2_u32.saturating_pow(attempt.saturating_sub(1))); + tryhard::RetryPolicy::Delay(delay) + }) .on_retry( - |attempt: u32, next_delay: Option, error: &TrySubmitError| { + |attempt: u32, next_delay: Option, error: &SubmissionError| { metrics.increment_celestia_submission_failure_count(); let state = Arc::clone(&state); @@ -426,14 +510,95 @@ async fn submit_with_retry( let blobs = Arc::new(blobs); - let height = tryhard::retry_fn(move || { - client - .clone() - .try_submit(blobs.clone(), last_error_receiver.clone()) + let final_state = tryhard::retry_fn(move || { + try_submit( + client.clone(), + blobs.clone(), + started_submission.clone(), + largest_sequencer_height, + last_error_receiver.clone(), + ) }) .with_config(retry_config) .in_current_span() .await - .wrap_err("retry attempts exhausted; bailing")?; - Ok(height) + .wrap_err("finished trying to submit")?; + Ok(final_state) +} + +#[instrument(skip_all)] +async fn try_submit( + mut client: CelestiaClient, + blobs: Arc>, + started_submission: StartedSubmission, + largest_sequencer_height: SequencerHeight, + last_error_receiver: watch::Receiver>, +) -> Result { + // Get the error from the last attempt to `try_submit`. + let maybe_last_error = last_error_receiver.borrow().clone(); + let maybe_try_submit_error = match maybe_last_error { + // If error is broadcast timeout, try to confirm submission from last attempt. + Some(SubmissionError::BroadcastTxTimedOut(prepared_submission)) => { + if let Some(new_state) = + try_confirm_submission_from_failed_attempt(client.clone(), prepared_submission) + .await? + { + return Ok(new_state); + } + None + } + Some(SubmissionError::TrySubmit(error)) => Some(error), + Some(SubmissionError::Unrecoverable(error)) => { + unreachable!("this error should not make it past `custom_backoff`: {error:#}"); + } + None => None, + }; + + let blob_tx = client.try_prepare(blobs, maybe_try_submit_error).await?; + let blob_tx_hash = BlobTxHash::compute(&blob_tx); + + let prepared_submission = started_submission + .into_prepared(largest_sequencer_height, blob_tx_hash) + .await + .map_err(|error| SubmissionError::Unrecoverable(Arc::new(error)))?; + + match client.try_submit(blob_tx_hash, blob_tx).await { + Ok(celestia_height) => prepared_submission + .into_started(celestia_height) + .await + .map_err(|error| SubmissionError::Unrecoverable(Arc::new(error))), + Err(TrySubmitError::FailedToBroadcastTx(error)) if error.is_timeout() => { + Err(SubmissionError::BroadcastTxTimedOut(prepared_submission)) + } + Err(error) => Err(SubmissionError::TrySubmit(error)), + } +} + +/// Tries to confirm the submission from a failed previous attempt. Returns `Some` if the +/// submission is confirmed, or `None` if not. +/// +/// This should only be called where submission state is `Prepared`, meaning we don't yet +/// know whether that previous submission attempt succeeded or not. +#[instrument(skip_all)] +async fn try_confirm_submission_from_failed_attempt( + mut client: CelestiaClient, + prepared_submission: PreparedSubmission, +) -> Result, SubmissionError> { + let blob_tx_hash = prepared_submission.blob_tx_hash(); + info!(%blob_tx_hash, "confirming submission of last `BlobTx` from previous attempt"); + + if let Some(celestia_height) = client + .confirm_submission_with_timeout(blob_tx_hash, prepared_submission.confirmation_timeout()) + .await + { + info!(%celestia_height, "confirmed previous attempt submitted blobs to Celestia"); + let new_state = prepared_submission + .into_started(celestia_height) + .await + .map_err(|error| SubmissionError::Unrecoverable(Arc::new(error)))?; + return Ok(Some(new_state)); + } + + info!("previous attempt's last submission was not completed; starting resubmission"); + Ok(None) } diff --git a/crates/astria-sequencer-relayer/src/sequencer_relayer.rs b/crates/astria-sequencer-relayer/src/sequencer_relayer.rs index c899ea09e8..d04b7c159c 100644 --- a/crates/astria-sequencer-relayer/src/sequencer_relayer.rs +++ b/crates/astria-sequencer-relayer/src/sequencer_relayer.rs @@ -63,8 +63,7 @@ impl SequencerRelayer { celestia_app_key_file, block_time, api_addr, - pre_submit_path, - post_submit_path, + submission_state_path, .. } = cfg; @@ -78,8 +77,7 @@ impl SequencerRelayer { sequencer_poll_period: Duration::from_millis(block_time), sequencer_grpc_endpoint, rollup_filter, - pre_submit_path, - post_submit_path, + submission_state_path, metrics, } .build() diff --git a/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs b/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs index 27edf2aa37..75308751f2 100644 --- a/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs +++ b/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs @@ -5,6 +5,7 @@ use std::{ Display, Formatter, }, + fs, future::Future, io::Write, mem, @@ -170,8 +171,7 @@ pub struct TestSequencerRelayer { pub signing_key: SigningKey, - pub pre_submit_file: NamedTempFile, - pub post_submit_file: NamedTempFile, + pub submission_state_file: NamedTempFile, /// The sequencer chain ID which will be returned by the mock `cometbft` instance, and set via /// `TestSequencerRelayerConfig`. pub actual_sequencer_chain_id: String, @@ -614,29 +614,18 @@ impl TestSequencerRelayer { } #[track_caller] - pub fn assert_state_files_are_as_expected( - &self, - pre_sequencer_height: u32, - post_sequencer_height: u32, - ) { - let pre_submit_state: serde_json::Value = - serde_json::from_str(&std::fs::read_to_string(&self.config.pre_submit_path).unwrap()) + pub fn check_state_file(&self, last_sequencer_height: u32, current_sequencer_height: u32) { + let submission_state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&self.config.submission_state_path).unwrap()) .unwrap(); assert_json_include!( - actual: pre_submit_state, - expected: json!({ - "sequencer_height": pre_sequencer_height - }), + actual: submission_state, + expected: json!({ "sequencer_height": last_sequencer_height }), ); - let post_submit_state: serde_json::Value = - serde_json::from_str(&std::fs::read_to_string(&self.config.post_submit_path).unwrap()) - .unwrap(); assert_json_include!( - actual: post_submit_state, - expected: json!({ - "sequencer_height": post_sequencer_height, - }), + actual: submission_state, + expected: json!({ "sequencer_height": current_sequencer_height }), ); } @@ -662,7 +651,7 @@ impl TestSequencerRelayer { // allow: want the name to reflect this is a test config. #[allow(clippy::module_name_repetitions)] pub struct TestSequencerRelayerConfig { - /// Sets the start height of relayer and configures the on-disk pre- and post-submit files to + /// Sets the start height of relayer and configures the on-disk submission-state file to /// look accordingly. pub last_written_sequencer_height: Option, /// The rollup ID filter, to be stringified and provided as `Config::only_include_rollups` @@ -707,11 +696,11 @@ impl TestSequencerRelayerConfig { let sequencer = MockSequencerServer::spawn().await; let sequencer_grpc_endpoint = format!("http://{}", sequencer.local_addr); - let (pre_submit_file, post_submit_file) = + let submission_state_file = if let Some(last_written_sequencer_height) = self.last_written_sequencer_height { - create_files_for_start_at_height(last_written_sequencer_height) + create_file_for_start_at_height(last_written_sequencer_height) } else { - create_files_for_fresh_start() + create_file_for_fresh_start() }; let only_include_rollups = self.only_include_rollups.iter().join(",").to_string(); @@ -732,8 +721,7 @@ impl TestSequencerRelayerConfig { no_metrics: false, metrics_http_listener_addr: String::new(), pretty_print: true, - pre_submit_path: pre_submit_file.path().to_owned(), - post_submit_path: post_submit_file.path().to_owned(), + submission_state_path: submission_state_file.path().to_owned(), }; info!(config = serde_json::to_string(&config).unwrap()); @@ -751,8 +739,7 @@ impl TestSequencerRelayerConfig { relayer_shutdown_handle: Some(relayer_shutdown_handle), sequencer_relayer, signing_key, - pre_submit_file, - post_submit_file, + submission_state_file, actual_sequencer_chain_id: self.sequencer_chain_id, actual_celestia_chain_id: self.celestia_chain_id, }; @@ -816,49 +803,28 @@ async fn write_file(data: &'static [u8]) -> NamedTempFile { .unwrap() } -fn create_files_for_fresh_start() -> (NamedTempFile, NamedTempFile) { - let pre = NamedTempFile::new() - .expect("must be able to create an empty pre submit state file to run tests"); - let post = NamedTempFile::new() - .expect("must be able to create an empty post submit state file to run tests"); - serde_json::to_writer( - &pre, - &json!({ - "state": "ignore" - }), - ) - .expect("must be able to write pre-submit state to run tests"); - serde_json::to_writer( - &post, - &json!({ - "state": "fresh" - }), - ) - .expect("must be able to write post-submit state to run tests"); - (pre, post) +fn create_file_for_fresh_start() -> NamedTempFile { + let temp_file = NamedTempFile::new() + .expect("must be able to create an empty submission state file to run tests"); + serde_json::to_writer(&temp_file, &json!({ "state": "fresh" })) + .expect("must be able to write submission state to run tests"); + temp_file } -fn create_files_for_start_at_height(height: u64) -> (NamedTempFile, NamedTempFile) { - let pre = NamedTempFile::new() - .expect("must be able to create an empty pre submit state file to run tests"); - let post = NamedTempFile::new() - .expect("must be able to create an empty post submit state file to run tests"); - - serde_json::to_writer( - &pre, - &json!({ - "state": "ignore", - }), - ) - .expect("must be able to write pre state to file to run tests"); +fn create_file_for_start_at_height(height: u64) -> NamedTempFile { + let temp_file = NamedTempFile::new() + .expect("must be able to create an empty submission state file to run tests"); serde_json::to_writer_pretty( - &post, + &temp_file, &json!({ - "state": "submitted", - "celestia_height": 5, - "sequencer_height": height + "state": "started", + "sequencer_height": height.saturating_add(10), + "last_submission": { + "celestia_height": 5, + "sequencer_height": height + } }), ) - .expect("must be able to write post state to file to run tests"); - (pre, post) + .expect("must be able to write submission state to run tests"); + temp_file } diff --git a/specs/sequencer-relayer.md b/specs/sequencer-relayer.md index e5e5dd024e..134218379e 100644 --- a/specs/sequencer-relayer.md +++ b/specs/sequencer-relayer.md @@ -140,48 +140,84 @@ which point backpressure will cause the reader task to pause as detailed above. ## Further Details -### Pre- and Post-Submit Files +### Submission State File -At the start and end of each successful attempt to put data onto Celestia, the -relayer writes some pertinent information to disk in the form of two JSON files; -the pre-submit file and post-submit file. These allow the relayer to restart and -continue submitting from where it left off. +During attempts to put data onto Celestia, the relayer writes some pertinent +information to disk in the form of a JSON file; the submission-state file. This +allows the relayer to restart and continue submitting from where it left off. -#### Pre-Submit File +This file needs to exist and be writable whenever the relayer starts, even on +first run. -The contents of the pre-submit file are one of either: +The submission state is one of three variants; `fresh`, `started` or `prepared`. -```json -{"state": "started", "sequencer_height": , "last_submission": } -``` +#### `fresh` State -or +The contents of the submission-state file when in `fresh` state are: ```json -{"state": "ignore"} +{"state": "fresh"} ``` -The former is the normal case, with the file updated at the start of every new -submission. The latter is used to force the relayer to ignore the pre-submit -state entirely and only consider the post-submit state. +This state is never written by the relayer: it needs to be provided externally +before the relayer is started. It indicates that the relayer should start +relaying from sequencer block 1. -#### Post-Submit File +#### `started` State -The contents of the post-submit file are one of either: +The contents of the submission-state file when in `started` state are: ```json -{"state": "fresh"} +{ + "state": "started", + "last_submission": { + "celestia_height": , + "sequencer_height": + } +} ``` -or +This state is written by the relayer at the start of each new submission. +`last_submission` records the last successful submission: the Celestia block +height at which the submission was stored, and the highest sequencer block +included in the submission. + +With the file in this state, on startup the relayer will begin submitting +sequencer blocks starting from `[last_submission.sequencer_height] + 1`. + +#### `prepared` State + +The contents of the submission-state file when in `prepared` state are: ```json -{"state": "submitted", "celestia_height": , "sequencer_height": } +{ + "state": "prepared", + "sequencer_height": , + "last_submission": { + "celestia_height": , + "sequencer_height": + }, + "blob_tx_hash": "<64-character hex string>", + "at": "" +} ``` -The former indicates the relayer should start relaying from sequencer block 1, -while the latter records the relevant block heights of the last successful -submission. +This state is written by the relayer when a submission has been prepared and is +about to be sent to the Celestia app for execution. `sequencer_height` indicates +the highest sequencer block included in the attempt, while `last_submission` is +as per that in `started` state. `blob_tx_hash` is the hex-encoded SHA-256 digest +of the `BlobTx` containing the data to be submitted, and `at` is the time at +which the `BlobTx` was created. + +With the file in this state, on startup the relayer has to first establish +whether that submission succeeded or not. It queries the Celestia app for the +given blob hash, and if not confirmed as stored repeats the query at a rate of +once per second until it is confirmed or until a timeout is hit. The timeout is +one minute after the `at` timestamp or 15 seconds, whichever is the greater. + +If the submission is confirmed, the relayer will then begin submitting sequencer +blocks starting from `[sequencer_height] + 1`, otherwise it begins from +`[last_submission.sequencer_height] + 1`. ### HTTP Servers