Skip to content

Commit

Permalink
Separate proposed blobs from block proposal.
Browse files Browse the repository at this point in the history
  • Loading branch information
afck committed Jan 29, 2025
1 parent 4cdcbc4 commit 67c6023
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 179 deletions.
20 changes: 13 additions & 7 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ use linera_base::{
},
};
use linera_execution::{
committee::ValidatorName, system::OpenChainConfig, ExecutionOutcome, ExecutionRuntimeContext,
ExecutionStateView, Message, MessageContext, Operation, OperationContext, Query, QueryContext,
RawExecutionOutcome, RawOutgoingMessage, ResourceController, ResourceTracker, Response,
ServiceRuntimeEndpoint, TransactionTracker,
committee::{Committee, Epoch, ValidatorName},
system::OpenChainConfig,
ExecutionOutcome, ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext,
Operation, OperationContext, Query, QueryContext, RawExecutionOutcome, RawOutgoingMessage,
ResourceController, ResourceTracker, Response, ServiceRuntimeEndpoint, TransactionTracker,
};
use linera_views::{
context::Context,
Expand Down Expand Up @@ -587,6 +588,13 @@ where
Ok(true)
}

pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
self.execution_state
.system
.current_committee()
.ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
}

/// Removes the incoming message bundles in the block from the inboxes.
pub async fn remove_bundles_from_inboxes(
&mut self,
Expand Down Expand Up @@ -701,9 +709,7 @@ where
ChainError::InvalidBlockTimestamp
);
self.execution_state.system.timestamp.set(block.timestamp);
let Some((_, committee)) = self.execution_state.system.current_committee() else {
return Err(ChainError::InactiveChain(chain_id));
};
let (_, committee) = self.current_committee()?;
let mut resource_controller = ResourceController {
policy: Arc::new(committee.policy().clone()),
tracker: ResourceTracker::default(),
Expand Down
23 changes: 11 additions & 12 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use custom_debug_derive::Debug;
use linera_base::{
bcs,
crypto::{BcsHashable, BcsSignable, CryptoError, CryptoHash, KeyPair, PublicKey, Signature},
data_types::{Amount, Blob, BlockHeight, OracleResponse, Round, Timestamp},
data_types::{Amount, BlockHeight, OracleResponse, Round, Timestamp},
doc_scalar, ensure,
hashed::Hashed,
hex_debug,
Expand Down Expand Up @@ -308,8 +308,6 @@ pub struct BlockProposal {
pub owner: Owner,
pub public_key: PublicKey,
pub signature: Signature,
#[debug(skip_if = Vec::is_empty)]
pub blobs: Vec<Blob>,
#[debug(skip_if = Option::is_none)]
pub validated_block_certificate: Option<LiteCertificate<'static>>,
}
Expand Down Expand Up @@ -773,12 +771,7 @@ pub struct ProposalContent {
}

impl BlockProposal {
pub fn new_initial(
round: Round,
block: ProposedBlock,
secret: &KeyPair,
blobs: Vec<Blob>,
) -> Self {
pub fn new_initial(round: Round, block: ProposedBlock, secret: &KeyPair) -> Self {
let content = ProposalContent {
round,
block,
Expand All @@ -790,7 +783,6 @@ impl BlockProposal {
public_key: secret.public(),
owner: secret.public().into(),
signature,
blobs,
validated_block_certificate: None,
}
}
Expand All @@ -799,7 +791,6 @@ impl BlockProposal {
round: Round,
validated_block_certificate: ValidatedBlockCertificate,
secret: &KeyPair,
blobs: Vec<Blob>,
) -> Self {
let lite_cert = validated_block_certificate.lite_certificate().cloned();
let block = validated_block_certificate.into_inner().into_inner();
Expand All @@ -815,14 +806,22 @@ impl BlockProposal {
public_key: secret.public(),
owner: secret.public().into(),
signature,
blobs,
validated_block_certificate: Some(lite_cert),
}
}

pub fn check_signature(&self) -> Result<(), CryptoError> {
self.signature.check(&self.content, self.public_key)
}

pub fn required_blob_ids(&self) -> impl Iterator<Item = BlobId> + '_ {
self.content.block.published_blob_ids().into_iter().chain(
self.content
.outcome
.iter()
.flat_map(|outcome| outcome.oracle_blob_ids()),
)
}
}

impl LiteVote {
Expand Down
27 changes: 18 additions & 9 deletions linera-chain/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ where
/// proposals in the same round, this contains only the first one.
#[graphql(skip)]
pub proposed: RegisterView<C, Option<BlockProposal>>,
/// These are blobs published or read by the proposed block.
pub proposed_blobs: MapView<C, BlobId, Blob>,
/// Latest validated proposal that a validator may have voted to confirm. This is either the
/// latest `ValidatedBlock` we have seen, or the proposal from the `Fast` round.
#[graphql(skip)]
Expand Down Expand Up @@ -460,16 +462,16 @@ where
{
let value = Hashed::new(ValidatedBlock::new(executed_block.clone()));
if let Some(certificate) = lite_cert.clone().with_value(value) {
self.update_locking(LockingBlock::Regular(certificate), blobs)?;
self.update_locking(LockingBlock::Regular(certificate), blobs.clone())?;
}
}
} else if round.is_fast() && self.locking_block.get().is_none() {
// The fast block also counts as locking.
self.update_locking(LockingBlock::Fast(proposal.clone()), blobs)?;
self.update_locking(LockingBlock::Fast(proposal.clone()), blobs.clone())?;
}

// We record the proposed block, in case it affects the current round number.
self.update_proposed(proposal.clone());
self.update_proposed(proposal.clone(), blobs)?;
self.update_current_round(local_time);

let Some(key_pair) = key_pair else {
Expand Down Expand Up @@ -521,10 +523,8 @@ where

/// Returns the requested blob if it belongs to the proposal or the locking block.
pub async fn pending_blob(&self, blob_id: &BlobId) -> Result<Option<Blob>, ViewError> {
if let Some(proposal) = self.proposed.get() {
if let Some(blob) = proposal.blobs.iter().find(|blob| blob.id() == *blob_id) {
return Ok(Some(blob.clone()));
}
if let Some(blob) = self.proposed_blobs.get(blob_id).await? {
return Ok(Some(blob));
}
self.locking_blobs.get(blob_id).await
}
Expand Down Expand Up @@ -651,13 +651,22 @@ where
}

/// Sets the proposed block, if it is newer than our known latest proposal.
fn update_proposed(&mut self, proposal: BlockProposal) {
fn update_proposed(
&mut self,
proposal: BlockProposal,
blobs: BTreeMap<BlobId, Blob>,
) -> Result<(), ViewError> {
if let Some(old_proposal) = self.proposed.get() {
if old_proposal.content.round >= proposal.content.round {
return;
return Ok(());
}
}
self.proposed.set(Some(proposal));
self.proposed_blobs.clear();
for (blob_id, blob) in blobs {
self.proposed_blobs.insert(&blob_id, blob)?;
}
Ok(())
}

/// Sets the locking block and the associated blobs, if it is newer than the known one.
Expand Down
2 changes: 1 addition & 1 deletion linera-chain/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl BlockTestExt for ProposedBlock {
}

fn into_proposal_with_round(self, key_pair: &KeyPair, round: Round) -> BlockProposal {
BlockProposal::new_initial(round, self, key_pair, vec![])
BlockProposal::new_initial(round, self, key_pair)
}
}

Expand Down
1 change: 0 additions & 1 deletion linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,6 @@ where
linera_base::data_types::Round::Fast,
block.clone(),
key_pair,
vec![],
);
proposals.push(RpcMessage::BlockProposal(Box::new(proposal)));
next_recipient = chain.chain_id;
Expand Down
72 changes: 21 additions & 51 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use linera_chain::{
},
manager,
types::{ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate},
ChainError, ChainStateView,
ChainStateView,
};
use linera_execution::{
committee::{Committee, Epoch, ValidatorName},
Expand Down Expand Up @@ -74,13 +74,7 @@ where
// Check that the chain is active and ready for this timeout.
// Verify the certificate. Returns a catch-all error to make client code more robust.
self.state.ensure_is_active()?;
let (chain_epoch, committee) = self
.state
.chain
.execution_state
.system
.current_committee()
.expect("chain is active");
let (chain_epoch, committee) = self.state.chain.current_committee()?;
ensure!(
certificate.inner().epoch == chain_epoch,
WorkerError::InvalidEpoch {
Expand Down Expand Up @@ -138,7 +132,6 @@ where
},
public_key,
owner,
blobs: _,
validated_block_certificate,
signature: _,
} = proposal;
Expand All @@ -157,13 +150,7 @@ where
);
self.state.ensure_is_active()?;
// Check the epoch.
let (epoch, committee) = self
.state
.chain
.execution_state
.system
.current_committee()
.expect("chain is active");
let (epoch, committee) = self.state.chain.current_committee()?;
check_block_epoch(epoch, block.chain_id, block.epoch)?;
let policy = committee.policy().clone();
// Check the authentication of the block.
Expand All @@ -189,13 +176,9 @@ where
if self.state.chain.manager.check_proposed_block(proposal)? == manager::Outcome::Skip {
return Ok(());
}
let required_blob_ids = block
.published_blob_ids()
.into_iter()
.chain(outcome.iter().flat_map(|outcome| outcome.oracle_blob_ids()));
let maybe_blobs = self
.state
.maybe_get_required_blobs(required_blob_ids)
.maybe_get_required_blobs(proposal.required_blob_ids())
.await?;
let missing_blob_ids = super::missing_blob_ids(&maybe_blobs);
if !missing_blob_ids.is_empty() {
Expand All @@ -222,13 +205,10 @@ where
) -> Result<(), WorkerError> {
// Create the vote and store it in the chain state.
let executed_block = outcome.with(proposal.content.block.clone());
let blobs = if proposal.validated_block_certificate.is_some() {
self.state
.get_required_blobs(executed_block.required_blob_ids())
.await?
} else {
BTreeMap::new()
};
let blobs = self
.state
.get_required_blobs(proposal.required_blob_ids())
.await?;
let key_pair = self.state.config.key_pair();
let manager = &mut self.state.chain.manager;
match manager.create_vote(proposal, executed_block, key_pair, local_time, blobs)? {
Expand Down Expand Up @@ -263,13 +243,7 @@ where
// Check that the chain is active and ready for this validated block.
// Verify the certificate. Returns a catch-all error to make client code more robust.
self.state.ensure_is_active()?;
let (epoch, committee) = self
.state
.chain
.execution_state
.system
.current_committee()
.expect("chain is active");
let (epoch, committee) = self.state.chain.current_committee()?;
check_block_epoch(epoch, header.chain_id, header.epoch)?;
certificate.check(committee)?;
let mut actions = NetworkActions::default();
Expand Down Expand Up @@ -382,13 +356,7 @@ where
}
self.state.ensure_is_active()?;
// Verify the certificate.
let (epoch, committee) = self
.state
.chain
.execution_state
.system
.current_committee()
.expect("chain is active");
let (epoch, committee) = self.state.chain.current_committee()?;
check_block_epoch(
epoch,
executed_block.block.chain_id,
Expand Down Expand Up @@ -678,20 +646,22 @@ where
&mut self,
blob: Blob,
) -> Result<ChainInfoResponse, WorkerError> {
let (_, committee) = self
.state
.chain
.execution_state
.system
.current_committee()
.ok_or_else(|| ChainError::InactiveChain(self.state.chain_id()))?;
let policy = committee.policy().clone();
Self::check_blob_size(blob.content(), &policy)?;
let (_, committee) = self.state.chain.current_committee()?;
Self::check_blob_size(blob.content(), committee.policy())?;
self.state
.chain
.pending_validated_blobs
.maybe_insert(&blob)
.await?;
for (_, mut pending_blobs) in self
.state
.chain
.pending_proposed_blobs
.try_load_all_entries_mut()
.await?
{
pending_blobs.maybe_insert(&blob).await?;
}
self.save().await?;
Ok(ChainInfoResponse::new(
&self.state.chain,
Expand Down
8 changes: 5 additions & 3 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,10 @@ where

/// Returns the requested blob, if it belongs to the current locking block or pending proposal.
pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result<Blob, WorkerError> {
let maybe_blob = self.chain.manager.pending_blob(&blob_id).await?;
maybe_blob.ok_or_else(|| WorkerError::BlobsNotFound(vec![blob_id]))
if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? {
return Ok(blob);
}
Ok(self.storage.read_blob(blob_id).await?)
}

/// Adds the blob to pending blocks or validated block certificates that are missing it.
Expand All @@ -331,7 +333,7 @@ where
/// missing.
async fn get_required_blobs(
&self,
required_blob_ids: HashSet<BlobId>,
required_blob_ids: impl IntoIterator<Item = BlobId>,
) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
let maybe_blobs = self.maybe_get_required_blobs(required_blob_ids).await?;
let not_found_blob_ids = missing_blob_ids(&maybe_blobs);
Expand Down
1 change: 0 additions & 1 deletion linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ where
},
public_key: _,
owner: _,
blobs: _,
validated_block_certificate,
signature: _,
} = proposal;
Expand Down
Loading

0 comments on commit 67c6023

Please sign in to comment.