Skip to content

Commit

Permalink
fix(conductor): robust Celestia blob fetch, verify, convert (#946)
Browse files Browse the repository at this point in the history
## Summary
Fixes conductor so it works in the presence of duplicate sequencer blobs
or rollup blobs fetched from Celestia.

## Background
Duplicate sequencer block information can end up on Celestia. If a
single Sequencer block ends up as a duplicate at the same Celestia
height (for example, because two Sequencer nodes both relayed to
Celestia), then Conductor can potentially stop functioning correctly
because of code present as of commit
[`cd38d1d5458fd561696590417b647f1844d328f2`](https://github.com/astriaorg/astria/blob/cd38d1d5458fd561696590417b647f1844d328f2/crates/astria-conductor/src/celestia/mod.rs#L586-L589):
```rust
    ensure!(
        rollup_blobs.len() <= 1,
        "received more than one celestia rollup blob for the given namespace and height"
    );
```
This causes Conductor to drop reconstruction of a Sequencer block
information from blobs fetched from Celestia, even though it would be
perfectly acceptable to go on.

## Changes
- Split the celestia `Reader` object into a crate-public exposed Reader
and an internal `RunningReader`. `Reader::run_until_stopped` itself
initializes the `RunningReader` with information it has to await at
runtime before delegating to it.
- Determine the Sequencer chain ID (required to get its namespace) form
the `/genesis` endpoint (instead of `/latest_block`).
- Remove the stream of new blocks reconstructed from Celestia blobs
- Replace the stream with a `JoinMap` (one task per Celestia height)
- Each task performs the following steps
1. fetch Celestia blobs for a given height, sequencer namespace, rollup
namespace
2. decode the blobs according to their namespace (sequencer header or
rollup item)
    3. verify all sequencer headers blobs against sequencer
4. reconstruct rollup information by matching the verified sequencer
blobs to the rollup item blobs

## Testing
Blackbox tests are implemented for the following cases (more will be
implemented in a followup):

1. conductor fetches a block with sequencer height 2 from celestia
height 1 and executes it
2. conductor fetches blocks with sequencer height 2 from celestia height
1, sequencer height 3 from celestia height 2, and executes both in
succession
3. the remote rollup starts at block number 5. Conductor fetches
sequencer heights 5 and 6 from celestia heights 1 and 2, but only
forwards sequencer height 6.
4. the remote rollup has celestia height 4 in its genesis. Conductor
starts fetching at that block number.

## Related Issues
closes #94
closes #184 (together with
work done in #769)
closes #884
closes #947
closes #948
  • Loading branch information
SuperFluffy authored Apr 18, 2024
1 parent 11e0d39 commit 4357edd
Show file tree
Hide file tree
Showing 18 changed files with 2,297 additions and 688 deletions.
116 changes: 110 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion crates/astria-celestia-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ pub const fn celestia_namespace_v0_from_rollup_id(

#[must_use = "a celestia namespace must be used in order to be useful"]
pub fn celestia_namespace_v0_from_str(chain_id: &str) -> Namespace {
celestia_namespace_v0_from_bytes(chain_id.as_bytes())
}

#[must_use = "a celestia namespace must be used in order to be useful"]
pub fn celestia_namespace_v0_from_bytes(bytes: &[u8]) -> Namespace {
use sha2::{
Digest as _,
Sha256,
};
celestia_namespace_v0_from_array(Sha256::digest(chain_id.as_bytes()).into())
celestia_namespace_v0_from_array(Sha256::digest(bytes).into())
}
2 changes: 2 additions & 0 deletions crates/astria-conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ itertools = "0.12.1"
pin-project-lite = "0.2"
tokio-stream = "0.1.14"
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
moka = { version = "0.12.5", features = ["future"] }

[dev-dependencies]
astria-core = { path = "../astria-core", features = ["server", "test-utils"] }
Expand All @@ -68,6 +69,7 @@ wiremock = { workspace = true }

chrono = "0.4.35"
tokio-stream = { version = "0.1.15", features = ["net"] }
rand_chacha = "0.3.1"

[build-dependencies]
astria-build-info = { path = "../astria-build-info", features = ["build"] }
2 changes: 0 additions & 2 deletions crates/astria-conductor/src/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ impl<T> BlockCache<T> {
next_height,
})
}
}

impl<T> BlockCache<T> {
/// Returns the next sequential block if it exists in the cache.
pub(crate) fn pop(&mut self) -> Option<T> {
let block = self.inner.remove(&self.next_height)?;
Expand Down
Loading

0 comments on commit 4357edd

Please sign in to comment.