diff --git a/CLI.md b/CLI.md index b5ca9ee1814..a85edf1e5e6 100644 --- a/CLI.md +++ b/CLI.md @@ -436,6 +436,7 @@ View or update the resource control policy * `--maximum-fuel-per-block ` — Set the maximum amount of fuel per block * `--maximum-executed-block-size ` — Set the maximum size of an executed block, in bytes * `--maximum-blob-size ` — Set the maximum size of data blobs, compressed bytecode and other binary blobs, in bytes +* `--maximum-published-blobs ` — Set the maximum number of published blobs per block * `--maximum-bytecode-size ` — Set the maximum size of decompressed contract or service bytecode, in bytes * `--maximum-block-proposal-size ` — Set the maximum size of a block proposal, in bytes * `--maximum-bytes-read-per-block ` — Set the maximum read data per block @@ -501,6 +502,7 @@ Create genesis configuration for a Linera deployment. Create initial user chains * `--maximum-executed-block-size ` — Set the maximum size of an executed block * `--maximum-bytecode-size ` — Set the maximum size of decompressed contract or service bytecode, in bytes * `--maximum-blob-size ` — Set the maximum size of data blobs, compressed bytecode and other binary blobs, in bytes +* `--maximum-published-blobs ` — Set the maximum number of published blobs per block * `--maximum-block-proposal-size ` — Set the maximum size of a block proposal, in bytes * `--maximum-bytes-read-per-block ` — Set the maximum read data per block * `--maximum-bytes-written-per-block ` — Set the maximum write data per block diff --git a/linera-chain/src/certificate/lite.rs b/linera-chain/src/certificate/lite.rs index f78c9c9c997..7a34f445294 100644 --- a/linera-chain/src/certificate/lite.rs +++ b/linera-chain/src/certificate/lite.rs @@ -74,6 +74,13 @@ impl<'a> LiteCertificate<'a> { Ok(&self.value) } + /// Checks whether the value matches this certificate. + pub fn check_value(&self, value: &Hashed) -> bool { + self.value.chain_id == value.inner().chain_id() + && T::KIND == self.value.kind + && self.value.value_hash == value.hash() + } + /// Returns the [`GenericCertificate`] with the specified value, if it matches. pub fn with_value( self, diff --git a/linera-chain/src/chain.rs b/linera-chain/src/chain.rs index b9080327a13..48251d6bfd3 100644 --- a/linera-chain/src/chain.rs +++ b/linera-chain/src/chain.rs @@ -13,25 +13,26 @@ use futures::stream::{self, StreamExt, TryStreamExt}; use linera_base::{ crypto::CryptoHash, data_types::{ - Amount, ArithmeticError, Blob, BlockHeight, OracleResponse, Timestamp, - UserApplicationDescription, + Amount, ArithmeticError, BlockHeight, OracleResponse, Timestamp, UserApplicationDescription, }, ensure, identifiers::{ - BlobId, ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner, - StreamId, UserApplicationId, + ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner, StreamId, + UserApplicationId, }, + ownership::ChainOwnership, }; use linera_execution::{ - committee::ValidatorName, system::OpenChainConfig, ExecutionOutcome, ExecutionRuntimeContext, - ExecutionStateView, Message, MessageContext, Operation, OperationContext, Query, QueryContext, - QueryOutcome, RawExecutionOutcome, RawOutgoingMessage, ResourceController, ResourceTracker, - ServiceRuntimeEndpoint, TransactionTracker, + committee::{Committee, Epoch, ValidatorName}, + system::OpenChainConfig, + ExecutionOutcome, ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, + Operation, OperationContext, Query, QueryContext, QueryOutcome, RawExecutionOutcome, + RawOutgoingMessage, ResourceController, ResourceTracker, ServiceRuntimeEndpoint, + TransactionTracker, }; use linera_views::{ context::Context, log_view::LogView, - map_view::MapView, queue_view::QueueView, reentrant_collection_view::ReentrantCollectionView, register_view::RegisterView, @@ -49,7 +50,7 @@ use crate::{ inbox::{Cursor, InboxError, InboxStateView}, manager::ChainManager, outbox::OutboxStateView, - types::ValidatedBlockCertificate, + pending_blobs::PendingBlobsView, ChainError, ChainExecutionContext, ExecutionResultExt, }; @@ -201,10 +202,10 @@ where /// Consensus state. pub manager: ChainManager, /// Pending validated block that is still missing blobs. - #[graphql(skip)] - pub pending_validated_block: RegisterView>, /// The incomplete set of blobs for the pending validated block. - pub pending_validated_blobs: MapView>, + pub pending_validated_blobs: PendingBlobsView, + /// The incomplete sets of blobs for upcoming proposals. + pub pending_proposed_blobs: ReentrantCollectionView>, /// Hashes of all certified blocks for this sender. /// This ends with `block_hash` and has length `usize::from(next_block_height)`. @@ -589,6 +590,17 @@ where Ok(true) } + pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> { + self.execution_state + .system + .current_committee() + .ok_or_else(|| ChainError::InactiveChain(self.chain_id())) + } + + pub fn ownership(&self) -> &ChainOwnership { + self.execution_state.system.ownership.get() + } + /// Removes the incoming message bundles in the block from the inboxes. pub async fn remove_bundles_from_inboxes( &mut self, @@ -703,9 +715,7 @@ where ChainError::InvalidBlockTimestamp ); self.execution_state.system.timestamp.set(block.timestamp); - let Some((_, committee)) = self.execution_state.system.current_committee() else { - return Err(ChainError::InactiveChain(chain_id)); - }; + let (_, committee) = self.current_committee()?; let mut resource_controller = ResourceController { policy: Arc::new(committee.policy().clone()), tracker: ResourceTracker::default(), @@ -891,7 +901,7 @@ where let maybe_committee = self.execution_state.system.current_committee().into_iter(); self.pending_validated_blobs.clear(); - self.pending_validated_block.set(None); + self.pending_proposed_blobs.clear(); self.manager.reset( self.execution_state.system.ownership.get().clone(), block.height.try_add_one()?, diff --git a/linera-chain/src/data_types.rs b/linera-chain/src/data_types.rs index 45d31419c39..690de73d92a 100644 --- a/linera-chain/src/data_types.rs +++ b/linera-chain/src/data_types.rs @@ -12,7 +12,7 @@ use custom_debug_derive::Debug; use linera_base::{ bcs, crypto::{BcsHashable, BcsSignable, CryptoError, CryptoHash, KeyPair, PublicKey, Signature}, - data_types::{Amount, Blob, BlockHeight, OracleResponse, Round, Timestamp}, + data_types::{Amount, BlockHeight, OracleResponse, Round, Timestamp}, doc_scalar, ensure, hashed::Hashed, hex_debug, @@ -29,6 +29,7 @@ use linera_execution::{ use serde::{Deserialize, Serialize}; use crate::{ + block::ValidatedBlock, types::{ CertificateKind, CertificateValue, GenericCertificate, LiteCertificate, ValidatedBlockCertificate, @@ -144,12 +145,8 @@ impl ProposedBlock { Some((in_bundle, posted_message, config)) } - pub fn check_proposal_size( - &self, - maximum_block_proposal_size: u64, - blobs: &[Blob], - ) -> Result<(), ChainError> { - let size = bcs::serialized_size(&(self, blobs))?; + pub fn check_proposal_size(&self, maximum_block_proposal_size: u64) -> Result<(), ChainError> { + let size = bcs::serialized_size(self)?; ensure!( size <= usize::try_from(maximum_block_proposal_size).unwrap_or(usize::MAX), ChainError::BlockProposalTooLarge @@ -312,8 +309,6 @@ pub struct BlockProposal { pub owner: Owner, pub public_key: PublicKey, pub signature: Signature, - #[debug(skip_if = Vec::is_empty)] - pub blobs: Vec, #[debug(skip_if = Option::is_none)] pub validated_block_certificate: Option>, } @@ -744,7 +739,7 @@ impl BlockExecutionOutcome { } } - fn oracle_blob_ids(&self) -> HashSet { + pub fn oracle_blob_ids(&self) -> HashSet { let mut required_blob_ids = HashSet::new(); for responses in &self.oracle_responses { for response in responses { @@ -777,12 +772,7 @@ pub struct ProposalContent { } impl BlockProposal { - pub fn new_initial( - round: Round, - block: ProposedBlock, - secret: &KeyPair, - blobs: Vec, - ) -> Self { + pub fn new_initial(round: Round, block: ProposedBlock, secret: &KeyPair) -> Self { let content = ProposalContent { round, block, @@ -794,7 +784,6 @@ impl BlockProposal { public_key: secret.public(), owner: secret.public().into(), signature, - blobs, validated_block_certificate: None, } } @@ -803,7 +792,6 @@ impl BlockProposal { round: Round, validated_block_certificate: ValidatedBlockCertificate, secret: &KeyPair, - blobs: Vec, ) -> Self { let lite_cert = validated_block_certificate.lite_certificate().cloned(); let block = validated_block_certificate.into_inner().into_inner(); @@ -819,13 +807,46 @@ impl BlockProposal { public_key: secret.public(), owner: secret.public().into(), signature, - blobs, validated_block_certificate: Some(lite_cert), } } - pub fn check_signature(&self, public_key: PublicKey) -> Result<(), CryptoError> { - self.signature.check(&self.content, public_key) + pub fn check_signature(&self) -> Result<(), CryptoError> { + self.signature.check(&self.content, self.public_key) + } + + pub fn required_blob_ids(&self) -> impl Iterator + '_ { + self.content.block.published_blob_ids().into_iter().chain( + self.content + .outcome + .iter() + .flat_map(|outcome| outcome.oracle_blob_ids()), + ) + } + + /// Checks that the public key matches the owner and that the optional certificate matches + /// the outcome. + pub fn check_invariants(&self) -> Result<(), &'static str> { + ensure!( + self.owner == Owner::from(&self.public_key), + "Public key does not match owner" + ); + match (&self.validated_block_certificate, &self.content.outcome) { + (None, None) => {} + (None, Some(_)) | (Some(_), None) => { + return Err("Must contain a validation certificate if and only if \ + it contains the execution outcome from a previous round"); + } + (Some(lite_certificate), Some(outcome)) => { + let executed_block = outcome.clone().with(self.content.block.clone()); + let value = Hashed::new(ValidatedBlock::new(executed_block)); + ensure!( + lite_certificate.check_value(&value), + "Lite certificate must match the given block and execution outcome" + ); + } + } + Ok(()) } } diff --git a/linera-chain/src/lib.rs b/linera-chain/src/lib.rs index 6c59ffac0d3..dfea43a4d2c 100644 --- a/linera-chain/src/lib.rs +++ b/linera-chain/src/lib.rs @@ -17,6 +17,7 @@ pub mod data_types; mod inbox; pub mod manager; mod outbox; +mod pending_blobs; #[cfg(with_testing)] pub mod test; diff --git a/linera-chain/src/manager.rs b/linera-chain/src/manager.rs index 05ac307db7a..bab4a5ca86c 100644 --- a/linera-chain/src/manager.rs +++ b/linera-chain/src/manager.rs @@ -159,6 +159,8 @@ where /// proposals in the same round, this contains only the first one. #[graphql(skip)] pub proposed: RegisterView>, + /// These are blobs published or read by the proposed block. + pub proposed_blobs: MapView, /// Latest validated proposal that a validator may have voted to confirm. This is either the /// latest `ValidatedBlock` we have seen, or the proposal from the `Fast` round. #[graphql(skip)] @@ -460,16 +462,16 @@ where { let value = Hashed::new(ValidatedBlock::new(executed_block.clone())); if let Some(certificate) = lite_cert.clone().with_value(value) { - self.update_locking(LockingBlock::Regular(certificate), blobs)?; + self.update_locking(LockingBlock::Regular(certificate), blobs.clone())?; } } } else if round.is_fast() && self.locking_block.get().is_none() { // The fast block also counts as locking. - self.update_locking(LockingBlock::Fast(proposal.clone()), blobs)?; + self.update_locking(LockingBlock::Fast(proposal.clone()), blobs.clone())?; } // We record the proposed block, in case it affects the current round number. - self.update_proposed(proposal.clone()); + self.update_proposed(proposal.clone(), blobs)?; self.update_current_round(local_time); let Some(key_pair) = key_pair else { @@ -521,10 +523,8 @@ where /// Returns the requested blob if it belongs to the proposal or the locking block. pub async fn pending_blob(&self, blob_id: &BlobId) -> Result, ViewError> { - if let Some(proposal) = self.proposed.get() { - if let Some(blob) = proposal.blobs.iter().find(|blob| blob.id() == *blob_id) { - return Ok(Some(blob.clone())); - } + if let Some(blob) = self.proposed_blobs.get(blob_id).await? { + return Ok(Some(blob)); } self.locking_blobs.get(blob_id).await } @@ -651,13 +651,22 @@ where } /// Sets the proposed block, if it is newer than our known latest proposal. - fn update_proposed(&mut self, proposal: BlockProposal) { + fn update_proposed( + &mut self, + proposal: BlockProposal, + blobs: BTreeMap, + ) -> Result<(), ViewError> { if let Some(old_proposal) = self.proposed.get() { if old_proposal.content.round >= proposal.content.round { - return; + return Ok(()); } } self.proposed.set(Some(proposal)); + self.proposed_blobs.clear(); + for (blob_id, blob) in blobs { + self.proposed_blobs.insert(&blob_id, blob)?; + } + Ok(()) } /// Sets the locking block and the associated blobs, if it is newer than the known one. diff --git a/linera-chain/src/pending_blobs.rs b/linera-chain/src/pending_blobs.rs new file mode 100644 index 00000000000..7b71ec3079f --- /dev/null +++ b/linera-chain/src/pending_blobs.rs @@ -0,0 +1,78 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::BTreeMap; + +use async_graphql::SimpleObject; +use linera_base::{ + data_types::{Blob, Round}, + ensure, + identifiers::BlobId, +}; +use linera_views::{ + context::Context, + map_view::MapView, + register_view::RegisterView, + views::{ClonableView, View, ViewError}, +}; + +use crate::ChainError; + +/// The pending blobs belonging to a block that can't be processed without them. +#[derive(Debug, View, ClonableView, SimpleObject)] +pub struct PendingBlobsView +where + C: Clone + Context + Send + Sync + 'static, +{ + /// The round in which the block is validated. + pub round: RegisterView, + /// Whether these blobs were already validated. + /// + /// This is only `false` for _new_ block proposals, not when re-proposing blocks from earlier + /// rounds or when handling validated block certificates. If it is false, the pending blobs are + /// only the ones published by the new block, not the ones that are only read. + pub validated: RegisterView, + /// The map of blobs needed to process the block. + pub pending_blobs: MapView>, +} + +impl PendingBlobsView +where + C: Clone + Context + Send + Sync + 'static, +{ + pub async fn get(&self, blob_id: &BlobId) -> Result, ViewError> { + Ok(self.pending_blobs.get(blob_id).await?.flatten()) + } + + pub async fn maybe_insert(&mut self, blob: &Blob) -> Result<(), ViewError> { + let blob_id = blob.id(); + if let Some(maybe_blob) = self.pending_blobs.get_mut(&blob_id).await? { + if maybe_blob.is_none() { + *maybe_blob = Some(blob.clone()); + } + } + Ok(()) + } + + pub async fn update( + &mut self, + round: Round, + validated: bool, + maybe_blobs: BTreeMap>, + ) -> Result<(), ChainError> { + let existing_round = *self.round.get(); + ensure!( + existing_round <= round, + ChainError::InsufficientRound(existing_round) + ); + if existing_round < round { + self.clear(); + self.round.set(round); + self.validated.set(validated); + } + for (blob_id, maybe_blob) in maybe_blobs { + self.pending_blobs.insert(&blob_id, maybe_blob)?; + } + Ok(()) + } +} diff --git a/linera-chain/src/test.rs b/linera-chain/src/test.rs index c024834ce6a..5914073c306 100644 --- a/linera-chain/src/test.rs +++ b/linera-chain/src/test.rs @@ -125,7 +125,7 @@ impl BlockTestExt for ProposedBlock { } fn into_proposal_with_round(self, key_pair: &KeyPair, round: Round) -> BlockProposal { - BlockProposal::new_initial(round, self, key_pair, vec![]) + BlockProposal::new_initial(round, self, key_pair) } } diff --git a/linera-chain/src/unit_tests/chain_tests.rs b/linera-chain/src/unit_tests/chain_tests.rs index 265c598b7c2..ca9dcc7c8b9 100644 --- a/linera-chain/src/unit_tests/chain_tests.rs +++ b/linera-chain/src/unit_tests/chain_tests.rs @@ -112,7 +112,7 @@ async fn test_block_size_limit() { let mut chain = ChainStateView::new(chain_id).await; // The size of the executed valid block below. - let maximum_executed_block_size = 677; + let maximum_executed_block_size = 685; // Initialize the chain. let mut config = make_open_chain_config(); diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index cec5296e592..ee663f1186c 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -800,7 +800,6 @@ where linera_base::data_types::Round::Fast, block.clone(), key_pair, - vec![], ); proposals.push(RpcMessage::BlockProposal(Box::new(proposal))); next_recipient = chain.chain_id; diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index b8ef5f69351..415bc960b3e 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -543,6 +543,10 @@ pub enum ClientCommand { #[arg(long)] maximum_blob_size: Option, + /// Set the maximum number of published blobs per block. + #[arg(long)] + maximum_published_blobs: Option, + /// Set the maximum size of decompressed contract or service bytecode, in bytes. #[arg(long)] maximum_bytecode_size: Option, @@ -674,6 +678,10 @@ pub enum ClientCommand { #[arg(long)] maximum_blob_size: Option, + /// Set the maximum number of published blobs per block. + #[arg(long)] + maximum_published_blobs: Option, + /// Set the maximum size of a block proposal, in bytes. #[arg(long)] maximum_block_proposal_size: Option, diff --git a/linera-core/src/chain_worker/state/attempted_changes.rs b/linera-core/src/chain_worker/state/attempted_changes.rs index 61788fb7757..a14cd348198 100644 --- a/linera-core/src/chain_worker/state/attempted_changes.rs +++ b/linera-core/src/chain_worker/state/attempted_changes.rs @@ -7,13 +7,14 @@ use std::{borrow::Cow, collections::BTreeMap}; use futures::future::Either; use linera_base::{ - data_types::{Blob, BlockHeight, Timestamp}, + data_types::{Blob, BlobContent, BlockHeight, CompressedBytecode, Timestamp}, ensure, - identifiers::{ChainId, MessageId}, + identifiers::{BlobType, ChainId, MessageId}, }; use linera_chain::{ data_types::{ - BlockExecutionOutcome, BlockProposal, ExecutedBlock, MessageBundle, Origin, Target, + BlockExecutionOutcome, BlockProposal, ExecutedBlock, MessageBundle, Origin, + ProposalContent, Target, }, manager, types::{ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate}, @@ -21,7 +22,7 @@ use linera_chain::{ }; use linera_execution::{ committee::{Committee, Epoch, ValidatorName}, - BlobState, + BlobState, ResourceControlPolicy, }; use linera_storage::{Clock as _, Storage}; use linera_views::{ @@ -73,13 +74,7 @@ where // Check that the chain is active and ready for this timeout. // Verify the certificate. Returns a catch-all error to make client code more robust. self.state.ensure_is_active()?; - let (chain_epoch, committee) = self - .state - .chain - .execution_state - .system - .current_committee() - .expect("chain is active"); + let (chain_epoch, committee) = self.state.chain.current_committee()?; ensure!( certificate.inner().epoch == chain_epoch, WorkerError::InvalidEpoch { @@ -124,6 +119,70 @@ where Ok((info, actions)) } + /// Validateds a proposal's signatures and blobs. + pub(super) async fn validate_block( + &mut self, + proposal: &BlockProposal, + ) -> Result<(), WorkerError> { + let BlockProposal { + content: + ProposalContent { + block, + round, + outcome: _, + }, + public_key: _, + owner, + validated_block_certificate, + signature: _, + } = proposal; + + let chain = &self.state.chain; + // Check the epoch. + let (epoch, committee) = chain.current_committee()?; + check_block_epoch(epoch, block.chain_id, block.epoch)?; + let policy = committee.policy().clone(); + block.check_proposal_size(policy.maximum_block_proposal_size)?; + // Check the authentication of the block. + ensure!( + chain.manager.verify_owner(proposal), + WorkerError::InvalidOwner + ); + if let Some(lite_certificate) = validated_block_certificate { + // Verify that this block has been validated by a quorum before. + lite_certificate.check(committee)?; + } else if let Some(signer) = block.authenticated_signer { + // Check the authentication of the operations in the new block. + ensure!(signer == *owner, WorkerError::InvalidSigner(signer)); + } + // Check if the chain is ready for this new block proposal. + chain.tip_state.get().verify_block_chaining(block)?; + if chain.manager.check_proposed_block(proposal)? == manager::Outcome::Skip { + return Ok(()); + } + let maybe_blobs = self + .state + .maybe_get_required_blobs(proposal.required_blob_ids()) + .await?; + let missing_blob_ids = super::missing_blob_ids(&maybe_blobs); + if !missing_blob_ids.is_empty() { + let chain = &mut self.state.chain; + if chain.ownership().open_multi_leader_rounds { + // TODO(#3203): Allow multiple pending proposals on permissionless chains. + chain.pending_proposed_blobs.clear(); + } + chain + .pending_proposed_blobs + .try_load_entry_mut(owner) + .await? + .update(*round, validated_block_certificate.is_some(), maybe_blobs) + .await?; + self.save().await?; + return Err(WorkerError::BlobsNotFound(missing_blob_ids)); + } + Ok(()) + } + /// Votes for a block proposal for the next block for this chain. pub(super) async fn vote_for_block_proposal( &mut self, @@ -133,13 +192,10 @@ where ) -> Result<(), WorkerError> { // Create the vote and store it in the chain state. let executed_block = outcome.with(proposal.content.block.clone()); - let blobs = if proposal.validated_block_certificate.is_some() { - self.state - .get_required_blobs(executed_block.required_blob_ids(), &proposal.blobs) - .await? - } else { - BTreeMap::new() - }; + let blobs = self + .state + .get_required_blobs(proposal.required_blob_ids()) + .await?; let key_pair = self.state.config.key_pair(); let manager = &mut self.state.chain.manager; match manager.create_vote(proposal, executed_block, key_pair, local_time, blobs)? { @@ -172,13 +228,7 @@ where // Check that the chain is active and ready for this validated block. // Verify the certificate. Returns a catch-all error to make client code more robust. self.state.ensure_is_active()?; - let (epoch, committee) = self - .state - .chain - .execution_state - .system - .current_committee() - .expect("chain is active"); + let (epoch, committee) = self.state.chain.current_committee()?; check_block_epoch(epoch, header.chain_id, header.epoch)?; certificate.check(committee)?; let mut actions = NetworkActions::default(); @@ -210,22 +260,16 @@ where let required_blob_ids = block.required_blob_ids(); let maybe_blobs = self .state - .maybe_get_required_blobs(required_blob_ids, &[]) + .maybe_get_required_blobs(required_blob_ids) .await?; let missing_blob_ids = super::missing_blob_ids(&maybe_blobs); if !missing_blob_ids.is_empty() { - let chain = &mut self.state.chain; - let pending_validated_block = chain.pending_validated_block.get_mut(); - if !pending_validated_block - .as_ref() - .is_some_and(|existing_cert| existing_cert.round > certificate.round) - { - for (blob_id, maybe_blob) in maybe_blobs { - chain.pending_validated_blobs.insert(&blob_id, maybe_blob)?; - } - *pending_validated_block = Some(certificate); - self.save().await?; - } + self.state + .chain + .pending_validated_blobs + .update(certificate.round, true, maybe_blobs) + .await?; + self.save().await?; return Err(WorkerError::BlobsNotFound(missing_blob_ids)); } let blobs = maybe_blobs @@ -296,13 +340,7 @@ where } self.state.ensure_is_active()?; // Verify the certificate. - let (epoch, committee) = self - .state - .chain - .execution_state - .system - .current_committee() - .expect("chain is active"); + let (epoch, committee) = self.state.chain.current_committee()?; check_block_epoch( epoch, executed_block.block.chain_id, @@ -318,7 +356,7 @@ where let required_blob_ids = executed_block.required_blob_ids(); let blobs_result = self .state - .get_required_blobs(executed_block.required_blob_ids(), &[]) + .get_required_blobs(executed_block.required_blob_ids()) .await .map(|blobs| blobs.into_values().collect::>()); @@ -565,14 +603,13 @@ where chain.execution_state.system.epoch.get(), chain.unskippable_bundles.front().await?, ) { - let ownership = chain.execution_state.system.ownership.get(); let elapsed = self .state .storage .clock() .current_time() .delta_since(entry.seen); - if elapsed >= ownership.timeout_config.fallback_duration { + if elapsed >= chain.ownership().timeout_config.fallback_duration { let chain_id = chain.chain_id(); let height = chain.tip_state.get().next_block_height; let key_pair = self.state.config.key_pair(); @@ -591,12 +628,29 @@ where &mut self, blob: Blob, ) -> Result { - let chain = &mut self.state.chain; - let blob_id = blob.id(); - if let Some(maybe_blob) = chain.pending_validated_blobs.get_mut(&blob_id).await? { - if maybe_blob.is_none() { - *maybe_blob = Some(blob); + self.state + .chain + .pending_validated_blobs + .maybe_insert(&blob) + .await?; + for (_, mut pending_blobs) in self + .state + .chain + .pending_proposed_blobs + .try_load_all_entries_mut() + .await? + { + if !pending_blobs.validated.get() { + let (_, committee) = self.state.chain.current_committee()?; + let policy = committee.policy(); + Self::check_blob_size(blob.content(), policy)?; + ensure!( + u64::try_from(pending_blobs.pending_blobs.count().await?) + .is_ok_and(|count| count < policy.maximum_published_blobs), + WorkerError::TooManyPublishedBlobs(policy.maximum_published_blobs) + ); } + pending_blobs.maybe_insert(&blob).await?; } self.save().await?; Ok(ChainInfoResponse::new( @@ -605,6 +659,31 @@ where )) } + fn check_blob_size( + content: &BlobContent, + policy: &ResourceControlPolicy, + ) -> Result<(), WorkerError> { + ensure!( + u64::try_from(content.bytes().len()) + .ok() + .is_some_and(|size| size <= policy.maximum_blob_size), + WorkerError::BlobTooLarge + ); + match content.blob_type() { + BlobType::ContractBytecode | BlobType::ServiceBytecode => { + ensure!( + CompressedBytecode::decompressed_size_at_most( + content.bytes(), + policy.maximum_bytecode_size + )?, + WorkerError::BytecodeTooLarge + ); + } + BlobType::Data => {} + } + Ok(()) + } + /// Stores the chain state in persistent storage. /// /// Waits until the [`ChainStateView`] is no longer shared before persisting the changes. diff --git a/linera-core/src/chain_worker/state/mod.rs b/linera-core/src/chain_worker/state/mod.rs index 5342ff202b0..d1b94a5d487 100644 --- a/linera-core/src/chain_worker/state/mod.rs +++ b/linera-core/src/chain_worker/state/mod.rs @@ -8,6 +8,7 @@ mod temporary_changes; use std::{ collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + iter, sync::{self, Arc}, }; @@ -203,10 +204,19 @@ where &mut self, proposal: BlockProposal, ) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> { - let validation_outcome = ChainWorkerStateWithTemporaryChanges::new(self) + self.ensure_is_active()?; + proposal + .check_invariants() + .map_err(|msg| WorkerError::InvalidBlockProposal(msg.to_string()))?; + proposal.check_signature()?; + ChainWorkerStateWithAttemptedChanges::new(&mut *self) .await .validate_block(&proposal) .await?; + let validation_outcome = ChainWorkerStateWithTemporaryChanges::new(self) + .await + .validate_proposal_content(&proposal.content) + .await?; let actions = if let Some((outcome, local_time)) = validation_outcome { ChainWorkerStateWithAttemptedChanges::new(&mut *self) @@ -298,8 +308,10 @@ where /// Returns the requested blob, if it belongs to the current locking block or pending proposal. pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result { - let maybe_blob = self.chain.manager.pending_blob(&blob_id).await?; - maybe_blob.ok_or_else(|| WorkerError::BlobsNotFound(vec![blob_id])) + if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? { + return Ok(blob); + } + Ok(self.storage.read_blob(blob_id).await?) } /// Adds the blob to pending blocks or validated block certificates that are missing it. @@ -322,16 +334,13 @@ where Ok(()) } - /// Returns the blobs required by the given executed block. The ones that are not passed in - /// are read from the chain manager or from storage. + /// Reads the blobs from the chain manager or from storage. Returns an error if any are + /// missing. async fn get_required_blobs( &self, - required_blob_ids: HashSet, - blobs: &[Blob], + required_blob_ids: impl IntoIterator, ) -> Result, WorkerError> { - let maybe_blobs = self - .maybe_get_required_blobs(required_blob_ids, blobs) - .await?; + let maybe_blobs = self.maybe_get_required_blobs(required_blob_ids).await?; let not_found_blob_ids = missing_blob_ids(&maybe_blobs); ensure!( not_found_blob_ids.is_empty(), @@ -343,30 +352,30 @@ where .collect()) } - /// Returns the blobs required by the given executed block. The ones that are not passed in - /// are read from the chain manager or from storage. + /// Tries to read the blobs from the chain manager or storage. Returns `None` if not found. async fn maybe_get_required_blobs( &self, - required_blob_ids: HashSet, - provided_blobs: &[Blob], + blob_ids: impl IntoIterator, ) -> Result>, WorkerError> { - let required_blob_ids = required_blob_ids.into_iter(); - let mut maybe_blobs = BTreeMap::from_iter(required_blob_ids.map(|blob_id| (blob_id, None))); + let mut maybe_blobs = BTreeMap::from_iter(blob_ids.into_iter().zip(iter::repeat(None))); - for blob in provided_blobs { - if let Some(maybe_blob) = maybe_blobs.get_mut(&blob.id()) { - *maybe_blob = Some(blob.clone()); - } - } for (blob_id, maybe_blob) in &mut maybe_blobs { - if maybe_blob.is_some() { - continue; - } if let Some(blob) = self.chain.manager.pending_blob(blob_id).await? { *maybe_blob = Some(blob); - } else if let Some(Some(blob)) = self.chain.pending_validated_blobs.get(blob_id).await? - { + } else if let Some(blob) = self.chain.pending_validated_blobs.get(blob_id).await? { *maybe_blob = Some(blob); + } else { + for (_, pending_blobs) in self + .chain + .pending_proposed_blobs + .try_load_all_entries() + .await? + { + if let Some(blob) = pending_blobs.get(blob_id).await? { + *maybe_blob = Some(blob); + break; + } + } } } let missing_blob_ids = missing_blob_ids(&maybe_blobs); diff --git a/linera-core/src/chain_worker/state/temporary_changes.rs b/linera-core/src/chain_worker/state/temporary_changes.rs index 53128f1b202..31096549ced 100644 --- a/linera-core/src/chain_worker/state/temporary_changes.rs +++ b/linera-core/src/chain_worker/state/temporary_changes.rs @@ -4,23 +4,15 @@ //! Operations that don't persist any changes to the chain state. use linera_base::{ - data_types::{ - ArithmeticError, Blob, BlobContent, CompressedBytecode, Timestamp, - UserApplicationDescription, - }, + data_types::{ArithmeticError, Timestamp, UserApplicationDescription}, ensure, - hashed::Hashed, - identifiers::{AccountOwner, BlobType, GenericApplicationId, Owner, UserApplicationId}, + identifiers::{AccountOwner, GenericApplicationId, UserApplicationId}, }; -use linera_chain::{ - data_types::{ - BlockExecutionOutcome, BlockProposal, ChannelFullName, ExecutedBlock, IncomingBundle, - Medium, MessageAction, ProposalContent, ProposedBlock, - }, - manager, - types::ValidatedBlock, +use linera_chain::data_types::{ + BlockExecutionOutcome, ChannelFullName, ExecutedBlock, IncomingBundle, Medium, MessageAction, + ProposalContent, ProposedBlock, }; -use linera_execution::{ChannelSubscription, Query, QueryOutcome, ResourceControlPolicy}; +use linera_execution::{ChannelSubscription, Query, QueryOutcome}; use linera_storage::{Clock as _, Storage}; use linera_views::views::View; #[cfg(with_testing)] @@ -32,7 +24,7 @@ use { }, }; -use super::{check_block_epoch, ChainWorkerState}; +use super::ChainWorkerState; use crate::{ data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse}, worker::WorkerError, @@ -152,82 +144,15 @@ where } /// Validates a block proposed to extend this chain. - pub(super) async fn validate_block( + pub(super) async fn validate_proposal_content( &mut self, - proposal: &BlockProposal, + content: &ProposalContent, ) -> Result, WorkerError> { - let BlockProposal { - content: - ProposalContent { - block, - round, - outcome, - }, - public_key, - owner, - blobs, - validated_block_certificate, - signature: _, - } = proposal; - ensure!( - validated_block_certificate.is_some() == outcome.is_some(), - WorkerError::InvalidBlockProposal( - "Must contain a validation certificate if and only if \ - it contains the execution outcome from a previous round" - .to_string() - ) - ); - ensure!( - *owner == Owner::from(public_key), - WorkerError::InvalidBlockProposal("Public key does not match owner".into()) - ); - self.0.ensure_is_active()?; - // Check the epoch. - let (epoch, committee) = self - .0 - .chain - .execution_state - .system - .current_committee() - .expect("chain is active"); - check_block_epoch(epoch, block.chain_id, block.epoch)?; - let policy = committee.policy().clone(); - // Check the authentication of the block. - ensure!( - self.0.chain.manager.verify_owner(proposal), - WorkerError::InvalidOwner - ); - proposal.check_signature(*public_key)?; - if let Some(lite_certificate) = validated_block_certificate { - // Verify that this block has been validated by a quorum before. - lite_certificate.check(committee)?; - } else if let Some(signer) = block.authenticated_signer { - // Check the authentication of the operations in the new block. - ensure!(signer == *owner, WorkerError::InvalidSigner(signer)); - } - // Check if the chain is ready for this new block proposal. - // This should always pass for nodes without voting key. - self.0.chain.tip_state.get().verify_block_chaining(block)?; - if self.0.chain.manager.check_proposed_block(proposal)? == manager::Outcome::Skip { - return Ok(None); - } - // Update the inboxes so that we can verify the provided hashed certificate values are - // legitimately required. - // Actual execution happens below, after other validity checks. - self.0 - .chain - .remove_bundles_from_inboxes(block.timestamp, &block.incoming_bundles) - .await?; - // Verify that no unrelated blobs were provided. - let published_blob_ids = block.published_blob_ids(); - let provided_blob_ids = blobs.iter().map(Blob::id); - ensure!( - published_blob_ids.iter().copied().eq(provided_blob_ids), - WorkerError::WrongBlobsInProposal - ); - for blob in blobs { - Self::check_blob_size(blob.content(), &policy)?; - } + let ProposalContent { + block, + round, + outcome, + } = content; let local_time = self.0.storage.clock().current_time(); ensure!( @@ -236,44 +161,29 @@ where ); self.0.storage.clock().sleep_until(block.timestamp).await; let local_time = self.0.storage.clock().current_time(); + + let chain = &mut self.0.chain; + chain + .remove_bundles_from_inboxes(block.timestamp, &block.incoming_bundles) + .await?; let outcome = if let Some(outcome) = outcome { outcome.clone() } else { - Box::pin( - self.0 - .chain - .execute_block(block, local_time, round.multi_leader(), None), - ) - .await? + Box::pin(chain.execute_block(block, local_time, round.multi_leader(), None)).await? }; let executed_block = outcome.with(block.clone()); - let required_blobs = self - .0 - .get_required_blobs(executed_block.required_blob_ids(), blobs) - .await? - .into_values() - .collect::>(); - block.check_proposal_size(policy.maximum_block_proposal_size, &required_blobs)?; - if let Some(lite_certificate) = &validated_block_certificate { - let value = Hashed::new(ValidatedBlock::new(executed_block.clone())); - lite_certificate - .clone() - .with_value(value) - .ok_or_else(|| WorkerError::InvalidLiteCertificate)?; - } ensure!( !round.is_fast() || !executed_block.outcome.has_oracle_responses(), WorkerError::FastBlockUsingOracles ); // Check if the counters of tip_state would be valid. - self.0 - .chain + chain .tip_state .get() .verify_counters(block, &executed_block.outcome)?; // Verify that the resulting chain would have no unconfirmed incoming messages. - self.0.chain.validate_incoming_bundles().await?; + chain.validate_incoming_bundles().await?; Ok(Some((executed_block.outcome, local_time))) } @@ -355,31 +265,6 @@ where } Ok(ChainInfoResponse::new(info, self.0.config.key_pair())) } - - fn check_blob_size( - content: &BlobContent, - policy: &ResourceControlPolicy, - ) -> Result<(), WorkerError> { - ensure!( - u64::try_from(content.bytes().len()) - .ok() - .is_some_and(|size| size <= policy.maximum_blob_size), - WorkerError::BlobTooLarge - ); - match content.blob_type() { - BlobType::ContractBytecode | BlobType::ServiceBytecode => { - ensure!( - CompressedBytecode::decompressed_size_at_most( - content.bytes(), - policy.maximum_bytecode_size - )?, - WorkerError::BytecodeTooLarge - ); - } - BlobType::Data => {} - } - Ok(()) - } } impl Drop for ChainWorkerStateWithTemporaryChanges<'_, StorageClient> diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index d10768311aa..4cb4e481086 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -46,9 +46,8 @@ use linera_chain::{ }, manager::LockingBlock, types::{ - CertificateKind, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, - GenericCertificate, LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, - ValidatedBlockCertificate, + CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, + LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate, }, ChainError, ChainExecutionContext, ChainStateView, }; @@ -372,11 +371,12 @@ where warn!("Failed to process network certificate {}", hash); return info; } - let mut result = self.handle_certificate(certificate.clone(), vec![]).await; + let mut result = self.handle_certificate(certificate.clone()).await; if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result { if let Some(blobs) = remote_node.try_download_blobs(blob_ids).await { - result = self.handle_certificate(certificate, blobs).await; + let _ = self.local_node.store_blobs(&blobs).await; + result = self.handle_certificate(certificate).await; } } @@ -396,29 +396,10 @@ where async fn handle_certificate( &self, certificate: GenericCertificate, - blobs: Vec, ) -> Result { - let chain_id = certificate.inner().chain_id(); - let result = self - .local_node + self.local_node .handle_certificate(certificate.clone(), &self.notifier) - .await; - if let Err(LocalNodeError::BlobsNotFound(_)) = &result { - match T::KIND { - CertificateKind::Confirmed => self.local_node.store_blobs(&blobs).await?, - CertificateKind::Validated => { - self.local_node - .handle_pending_blobs(chain_id, blobs) - .await? - } - CertificateKind::Timeout => return result, - } - return self - .local_node - .handle_certificate(certificate, &self.notifier) - .await; - } - result + .await } } @@ -1064,16 +1045,18 @@ where self.state_mut() .set_pending_proposal(proposal.content.block.clone()); let required_blob_ids = value.inner().required_blob_ids(); - let proposed_blobs = proposal.blobs.clone(); + let published_blobs = self + .read_local_blobs(proposal.content.block.published_blob_ids()) + .await?; let submit_action = CommunicateAction::SubmitBlock { proposal, blob_ids: required_blob_ids, + published_blobs, }; let certificate = self .communicate_chain_action(committee, submit_action, value) .await?; - self.process_certificate(certificate.clone(), proposed_blobs) - .await?; + self.process_certificate(certificate.clone()).await?; Ok(certificate) } @@ -1261,13 +1244,14 @@ where .await?; // Process the received operations. Download required hashed certificate values if // necessary. - if let Err(err) = self.process_certificate(certificate.clone(), vec![]).await { + if let Err(err) = self.process_certificate(certificate.clone()).await { match &err { LocalNodeError::BlobsNotFound(blob_ids) => { let blobs = RemoteNode::download_blobs(blob_ids, &nodes) .await .ok_or(err)?; - self.process_certificate(certificate, blobs).await?; + self.client.local_node.store_blobs(&blobs).await?; + self.process_certificate(certificate).await?; } _ => { // The certificate is not as expected. Give up. @@ -1622,13 +1606,8 @@ where async fn process_certificate( &self, certificate: GenericCertificate, - blobs: Vec, ) -> Result<(), LocalNodeError> { - let info = self - .client - .handle_certificate(certificate, blobs) - .await? - .info; + let info = self.client.handle_certificate(certificate).await?.info; self.update_from_info(&info); Ok(()) } @@ -1670,8 +1649,7 @@ where let certificate = self .communicate_chain_action(&committee, action, value) .await?; - self.process_certificate(certificate.clone(), vec![]) - .await?; + self.process_certificate(certificate.clone()).await?; // The block height didn't increase, but this will communicate the timeout as well. self.communicate_chain_updates( &committee, @@ -1776,19 +1754,55 @@ where .handle_block_proposal(proposal.clone()) .await { - if let LocalNodeError::BlobsNotFound(blob_ids) = &err { - self.update_local_node_with_blobs_from(blob_ids.clone(), remote_node) - .await?; - // We found the missing blobs: retry. - if let Err(new_err) = self - .client - .local_node - .handle_block_proposal(proposal.clone()) - .await - { - err = new_err; - } else { - continue; + if let LocalNodeError::BlobsNotFound(_) = &err { + let required_blob_ids = proposal.required_blob_ids().collect::>(); + if !required_blob_ids.is_empty() { + let mut blobs = Vec::new(); + for blob_id in required_blob_ids { + let blob_content = match remote_node + .node + .download_pending_blob(chain_id, blob_id) + .await + { + Ok(content) => content, + Err(err) => { + let name = &remote_node.name; + warn!("Skipping proposal from {owner} and validator {name}: {err}"); + continue; + } + }; + blobs.push(Blob::new(blob_content)); + } + self.client + .local_node + .handle_pending_blobs(chain_id, blobs) + .await?; + // We found the missing blobs: retry. + if let Err(new_err) = self + .client + .local_node + .handle_block_proposal(proposal.clone()) + .await + { + err = new_err; + } else { + continue; + } + } + if let LocalNodeError::BlobsNotFound(blob_ids) = &err { + self.update_local_node_with_blobs_from(blob_ids.clone(), remote_node) + .await?; + // We found the missing blobs: retry. + if let Err(new_err) = self + .client + .local_node + .handle_block_proposal(proposal.clone()) + .await + { + err = new_err; + } else { + continue; + } } } @@ -1805,7 +1819,7 @@ where certificate: GenericCertificate, ) -> Result<(), ChainClientError> { let chain_id = certificate.inner().chain_id(); - match self.process_certificate(certificate.clone(), vec![]).await { + match self.process_certificate(certificate.clone()).await { Err(LocalNodeError::BlobsNotFound(blob_ids)) => { let mut blobs = Vec::new(); for blob_id in blob_ids { @@ -1815,7 +1829,11 @@ where .await?; blobs.push(Blob::new(blob_content)); } - self.process_certificate(certificate, blobs).await?; + self.client + .local_node + .handle_pending_blobs(chain_id, blobs) + .await?; + self.process_certificate(certificate).await?; Ok(()) } Err(err) => Err(err.into()), @@ -2138,13 +2156,10 @@ where let (executed_block, _) = self .stage_block_execution_and_discard_failing_messages(block, round) .await?; - let blobs = self - .read_local_blobs(executed_block.required_blob_ids()) - .await?; let block = &executed_block.block; let committee = self.local_committee().await?; let max_size = committee.policy().maximum_block_proposal_size; - block.check_proposal_size(max_size, &blobs)?; + block.check_proposal_size(max_size)?; self.state_mut().set_pending_proposal(block.clone()); Ok(Hashed::new(ConfirmedBlock::new(executed_block))) } @@ -2487,10 +2502,6 @@ where Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)), }; - // Collect the blobs required for execution. - let blobs = self - .read_local_blobs(executed_block.block.published_blob_ids()) - .await?; let already_handled_locally = info .manager .already_handled_proposal(round, &executed_block.block); @@ -2498,23 +2509,38 @@ where // Create the final block proposal. let proposal = if let Some(locking) = info.manager.requested_locking { Box::new(match *locking { - LockingBlock::Regular(cert) => { - BlockProposal::new_retry(round, cert, &key_pair, blobs) - } + LockingBlock::Regular(cert) => BlockProposal::new_retry(round, cert, &key_pair), LockingBlock::Fast(proposal) => { - BlockProposal::new_initial(round, proposal.content.block, &key_pair, blobs) + BlockProposal::new_initial(round, proposal.content.block, &key_pair) } }) } else { let block = executed_block.block.clone(); - Box::new(BlockProposal::new_initial(round, block, &key_pair, blobs)) + Box::new(BlockProposal::new_initial(round, block, &key_pair)) }; if !already_handled_locally { // Check the final block proposal. This will be cheaper after #1401. - self.client + if let Err(err) = self + .client .local_node .handle_block_proposal(*proposal.clone()) - .await?; + .await + { + match &err { + LocalNodeError::BlobsNotFound(blob_ids) => { + let blobs = self.read_local_blobs(blob_ids.iter().copied()).await?; + self.client + .local_node + .handle_pending_blobs(self.chain_id, blobs) + .await?; + self.client + .local_node + .handle_block_proposal(*proposal.clone()) + .await?; + } + _ => return Err(err.into()), + } + } } let committee = self.local_committee().await?; // Send the query to validators. diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index 9d4a418d434..cf7b5b1ce6a 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -3362,12 +3362,8 @@ where // But with the validated block certificate for block2, it is allowed. let certificate2 = make_certificate_with_round(&committee, &worker, value2.clone(), Round::SingleLeader(4)); - let proposal = BlockProposal::new_retry( - Round::SingleLeader(5), - certificate2.clone(), - &key_pairs[1], - Vec::new(), - ); + let proposal = + BlockProposal::new_retry(Round::SingleLeader(5), certificate2.clone(), &key_pairs[1]); let lite_value2 = LiteValue::new(&value2); let (_, _) = worker.handle_block_proposal(proposal).await?; let (response, _) = worker.handle_chain_info_query(query_values.clone()).await?; @@ -3670,12 +3666,8 @@ where let value2 = Hashed::new(ValidatedBlock::new(executed_block2.clone())); let certificate2 = make_certificate_with_round(&committee, &worker, value2.clone(), Round::MultiLeader(0)); - let proposal = BlockProposal::new_retry( - Round::MultiLeader(3), - certificate2.clone(), - &key_pairs[1], - Vec::new(), - ); + let proposal = + BlockProposal::new_retry(Round::MultiLeader(3), certificate2.clone(), &key_pairs[1]); let lite_value2 = LiteValue::new(&value2); let (_, _) = worker.handle_block_proposal(proposal).await?; let query_values = ChainInfoQuery::new(chain_id).with_manager_values(); diff --git a/linera-core/src/updater.rs b/linera-core/src/updater.rs index 2bcf429d0bf..7dd4b433407 100644 --- a/linera-core/src/updater.rs +++ b/linera-core/src/updater.rs @@ -6,12 +6,13 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, fmt, hash::Hash, + mem, ops::Range, }; use futures::{stream, stream::TryStreamExt, Future, StreamExt}; use linera_base::{ - data_types::{BlockHeight, Round}, + data_types::{Blob, BlockHeight, Round}, identifiers::{BlobId, ChainId}, time::{timer::timeout, Duration, Instant}, }; @@ -43,6 +44,7 @@ pub enum CommunicateAction { SubmitBlock { proposal: Box, blob_ids: HashSet, + published_blobs: Vec, }, FinalizeBlock { certificate: ValidatedBlockCertificate, @@ -273,12 +275,10 @@ where &mut self, proposal: Box, mut blob_ids: HashSet, + mut published_blobs: Vec, ) -> Result, ChainClientError> { let chain_id = proposal.content.block.chain_id; let mut sent_cross_chain_updates = false; - for blob in &proposal.blobs { - blob_ids.remove(&blob.id()); // Keep only blobs we may need to resend. - } loop { match self .remote_node @@ -300,9 +300,15 @@ where // For `BlobsNotFound`, we assume that the local node should already be // updated with the needed blobs, so sending the chain information about the // certificates that last used the blobs to the validator node should be enough. - let blob_ids = blob_ids.drain().collect::>(); + let blob_ids = blob_ids + .drain() + .filter(|blob_id| !published_blobs.iter().any(|blob| blob.id() == *blob_id)) + .collect::>(); let missing_blob_ids = self.remote_node.node.missing_blob_ids(blob_ids).await?; let local_storage = self.local_node.storage_client(); + self.remote_node + .send_pending_blobs(chain_id, mem::take(&mut published_blobs)) + .await?; let blob_states = local_storage.read_blob_states(&missing_blob_ids).await?; let mut chain_heights = BTreeMap::new(); for blob_state in blob_states { @@ -428,8 +434,14 @@ where .await?; // Send the block proposal, certificate or timeout request and return a vote. let vote = match action { - CommunicateAction::SubmitBlock { proposal, blob_ids } => { - let info = self.send_block_proposal(proposal, blob_ids).await?; + CommunicateAction::SubmitBlock { + proposal, + blob_ids, + published_blobs, + } => { + let info = self + .send_block_proposal(proposal, blob_ids, published_blobs) + .await?; info.manager.pending } CommunicateAction::FinalizeBlock { diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 196e621ec30..68130a294a6 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -209,10 +209,6 @@ pub enum WorkerError { MissingCertificateValue, #[error("The hash certificate doesn't match its value.")] InvalidLiteCertificate, - #[error("An additional blob was provided that is not required: {blob_id}.")] - UnneededBlob { blob_id: BlobId }, - #[error("The blobs provided in the proposal were not the published ones, in order.")] - WrongBlobsInProposal, #[error("Fast blocks cannot query oracles")] FastBlockUsingOracles, #[error("Blobs not found: {0:?}")] @@ -227,6 +223,8 @@ pub enum WorkerError { BlobTooLarge, #[error("Bytecode exceeds size limit")] BytecodeTooLarge, + #[error("Number of published blobs per block must not exceed {0}")] + TooManyPublishedBlobs(u64), #[error(transparent)] Decompression(#[from] DecompressionError), } diff --git a/linera-execution/src/policy.rs b/linera-execution/src/policy.rs index cbb60a2351b..72d5f59b21f 100644 --- a/linera-execution/src/policy.rs +++ b/linera-execution/src/policy.rs @@ -47,6 +47,8 @@ pub struct ResourceControlPolicy { pub maximum_bytecode_size: u64, /// The maximum size of a blob. pub maximum_blob_size: u64, + /// The maximum number of published blobs per block. + pub maximum_published_blobs: u64, /// The maximum size of a block proposal. pub maximum_block_proposal_size: u64, /// The maximum data to read per block @@ -72,6 +74,7 @@ impl fmt::Display for ResourceControlPolicy { maximum_fuel_per_block, maximum_executed_block_size, maximum_blob_size, + maximum_published_blobs, maximum_bytecode_size, maximum_block_proposal_size, maximum_bytes_read_per_block, @@ -94,6 +97,7 @@ impl fmt::Display for ResourceControlPolicy { {maximum_fuel_per_block} maximum fuel per block\n\ {maximum_executed_block_size} maximum size of an executed block\n\ {maximum_blob_size} maximum size of a data blob, bytecode or other binary blob\n\ + {maximum_published_blobs} maximum number of blobs published per block\n\ {maximum_bytecode_size} maximum size of service and contract bytecode\n\ {maximum_block_proposal_size} maximum size of a block proposal\n\ {maximum_bytes_read_per_block} maximum number bytes read per block\n\ @@ -119,6 +123,7 @@ impl Default for ResourceControlPolicy { maximum_fuel_per_block: u64::MAX, maximum_executed_block_size: u64::MAX, maximum_blob_size: u64::MAX, + maximum_published_blobs: u64::MAX, maximum_bytecode_size: u64::MAX, maximum_block_proposal_size: u64::MAX, maximum_bytes_read_per_block: u64::MAX, @@ -241,6 +246,7 @@ impl ResourceControlPolicy { maximum_fuel_per_block: 100_000_000, maximum_executed_block_size: 1_000_000, maximum_blob_size: 1_000_000, + maximum_published_blobs: 10, maximum_bytecode_size: 10_000_000, maximum_block_proposal_size: 13_000_000, maximum_bytes_read_per_block: 100_000_000, diff --git a/linera-execution/tests/fee_consumption.rs b/linera-execution/tests/fee_consumption.rs index 001c6e42368..de31db58bf7 100644 --- a/linera-execution/tests/fee_consumption.rs +++ b/linera-execution/tests/fee_consumption.rs @@ -146,10 +146,11 @@ async fn test_fee_consumption( maximum_fuel_per_block: 4_868_145_137, maximum_executed_block_size: 37, maximum_blob_size: 41, - maximum_bytecode_size: 43, - maximum_block_proposal_size: 47, - maximum_bytes_read_per_block: 53, - maximum_bytes_written_per_block: 59, + maximum_published_blobs: 43, + maximum_bytecode_size: 47, + maximum_block_proposal_size: 53, + maximum_bytes_read_per_block: 59, + maximum_bytes_written_per_block: 61, }; let consumed_fees = spends diff --git a/linera-rpc/proto/rpc.proto b/linera-rpc/proto/rpc.proto index da0a4b97e14..8d9f33fdbd5 100644 --- a/linera-rpc/proto/rpc.proto +++ b/linera-rpc/proto/rpc.proto @@ -215,9 +215,6 @@ message BlockProposal { // A lite certificate for a validated block that justifies the proposal in this round. optional bytes validated_block_certificate = 6; - - // Required blob - bytes blobs = 7; } // A certified statement from the committee, without the value. diff --git a/linera-rpc/src/grpc/conversions.rs b/linera-rpc/src/grpc/conversions.rs index 2c48ce7705c..001e251187b 100644 --- a/linera-rpc/src/grpc/conversions.rs +++ b/linera-rpc/src/grpc/conversions.rs @@ -199,7 +199,6 @@ impl TryFrom for api::BlockProposal { public_key: Some(block_proposal.public_key.into()), owner: Some(block_proposal.owner.into()), signature: Some(block_proposal.signature.into()), - blobs: bincode::serialize(&block_proposal.blobs)?, validated_block_certificate: block_proposal .validated_block_certificate .map(|cert| bincode::serialize(&cert)) @@ -222,7 +221,6 @@ impl TryFrom for BlockProposal { public_key: try_proto_convert(block_proposal.public_key)?, owner: try_proto_convert(block_proposal.owner)?, signature: try_proto_convert(block_proposal.signature)?, - blobs: bincode::deserialize(&block_proposal.blobs)?, validated_block_certificate: block_proposal .validated_block_certificate .map(|bytes| bincode::deserialize(&bytes)) @@ -1224,7 +1222,6 @@ pub mod tests { owner: Owner::from(public_key), public_key, signature: Signature::new(&Foo("test".into()), &KeyPair::generate()), - blobs: vec![], validated_block_certificate: Some(cert), }; diff --git a/linera-rpc/tests/snapshots/format__format.yaml.snap b/linera-rpc/tests/snapshots/format__format.yaml.snap index 3da32b6bead..629c310228f 100644 --- a/linera-rpc/tests/snapshots/format__format.yaml.snap +++ b/linera-rpc/tests/snapshots/format__format.yaml.snap @@ -153,9 +153,6 @@ BlockProposal: TYPENAME: PublicKey - signature: TYPENAME: Signature - - blobs: - SEQ: - TYPENAME: BlobContent - validated_block_certificate: OPTION: TYPENAME: LiteCertificate @@ -801,6 +798,7 @@ ResourceControlPolicy: - maximum_executed_block_size: U64 - maximum_bytecode_size: U64 - maximum_blob_size: U64 + - maximum_published_blobs: U64 - maximum_block_proposal_size: U64 - maximum_bytes_read_per_block: U64 - maximum_bytes_written_per_block: U64 diff --git a/linera-service-graphql-client/gql/service_schema.graphql b/linera-service-graphql-client/gql/service_schema.graphql index f7517624482..f43bb152252 100644 --- a/linera-service-graphql-client/gql/service_schema.graphql +++ b/linera-service-graphql-client/gql/service_schema.graphql @@ -225,6 +225,10 @@ type ChainManager { """ seed: Int! """ + These are blobs published or read by the proposed block. + """ + proposedBlobs: MapView_BlobId_Blob_3711e760! + """ These are blobs published or read by the locking block. """ lockingBlobs: MapView_BlobId_Blob_3711e760! @@ -271,9 +275,14 @@ type ChainStateExtendedView { """ manager: ChainManager! """ + Pending validated block that is still missing blobs. The incomplete set of blobs for the pending validated block. """ - pendingValidatedBlobs: MapView_BlobId_Blob_9f0b41f3! + pendingValidatedBlobs: PendingBlobsView! + """ + The incomplete sets of blobs for upcoming proposals. + """ + pendingProposedBlobs: ReentrantCollectionView_Owner_PendingBlobsView_3247061959! """ Hashes of all certified blocks for this sender. This ends with `block_hash` and has length `usize::from(next_block_height)`. @@ -474,6 +483,14 @@ type Entry_Origin_InboxStateView_c4db01d6 { value: InboxStateView! } +""" +A GraphQL-visible map item, complete with key. +""" +type Entry_Owner_PendingBlobsView_c4f6af6f { + key: Owner! + value: PendingBlobsView! +} + """ A GraphQL-visible map item, complete with key. """ @@ -607,6 +624,10 @@ input MapFilters_Origin_742d451b { keys: [Origin!] } +input MapFilters_Owner_6898ce22 { + keys: [Owner!] +} + input MapFilters_Target_7aac1e1c { keys: [Target!] } @@ -627,6 +648,10 @@ input MapInput_Origin_742d451b { filters: MapFilters_Origin_742d451b } +input MapInput_Owner_6898ce22 { + filters: MapFilters_Owner_6898ce22 +} + input MapInput_Target_7aac1e1c { filters: MapFilters_Target_7aac1e1c } @@ -889,6 +914,28 @@ The owner of a chain. This is currently the hash of the owner's public key used """ scalar Owner +""" +The pending blobs belonging to a block that can't be processed without them. +""" +type PendingBlobsView { + """ + The round in which the block is validated. + """ + round: Round! + """ + Whether these blobs were already validated. + + This is only `false` for _new_ block proposals, not when re-proposing blocks from earlier + rounds or when handling validated block certificates. If it is false, the pending blobs are + only the ones published by the new block, not the ones that are only read. + """ + validated: Boolean! + """ + The map of blobs needed to process the block. + """ + pendingBlobs: MapView_BlobId_Blob_9f0b41f3! +} + """ A message together with kind, authentication and grant information. """ @@ -960,6 +1007,12 @@ type ReentrantCollectionView_Origin_InboxStateView_3699835794 { entries(input: MapInput_Origin_742d451b): [Entry_Origin_InboxStateView_c4db01d6!]! } +type ReentrantCollectionView_Owner_PendingBlobsView_3247061959 { + keys: [Owner!]! + entry(key: Owner!): Entry_Owner_PendingBlobsView_c4f6af6f! + entries(input: MapInput_Owner_6898ce22): [Entry_Owner_PendingBlobsView_c4f6af6f!]! +} + type ReentrantCollectionView_Target_OutboxStateView_2789119133 { keys: [Target!]! entry(key: Target!): Entry_Target_OutboxStateView_50a86149! @@ -1032,6 +1085,10 @@ input ResourceControlPolicy { """ maximumBlobSize: Int! """ + The maximum number of published blobs per block. + """ + maximumPublishedBlobs: Int! + """ The maximum size of a block proposal. """ maximumBlockProposalSize: Int! diff --git a/linera-service/src/cli_wrappers/wallet.rs b/linera-service/src/cli_wrappers/wallet.rs index 718378a9e9b..355e1dc51c8 100644 --- a/linera-service/src/cli_wrappers/wallet.rs +++ b/linera-service/src/cli_wrappers/wallet.rs @@ -249,6 +249,7 @@ impl ClientWrapper { maximum_fuel_per_block, maximum_executed_block_size, maximum_blob_size, + maximum_published_blobs, maximum_bytecode_size, maximum_block_proposal_size, maximum_bytes_read_per_block, @@ -283,6 +284,10 @@ impl ClientWrapper { &maximum_executed_block_size.to_string(), ]) .args(["--maximum-blob-size", &maximum_blob_size.to_string()]) + .args([ + "--maximum-published-blobs", + &maximum_published_blobs.to_string(), + ]) .args([ "--maximum-bytecode-size", &maximum_bytecode_size.to_string(), diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index 92619f47358..ba877410526 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -617,6 +617,7 @@ impl Runnable for Job { maximum_fuel_per_block, maximum_executed_block_size, maximum_blob_size, + maximum_published_blobs, maximum_bytecode_size, maximum_block_proposal_size, maximum_bytes_read_per_block, @@ -670,6 +671,9 @@ impl Runnable for Job { if let Some(maximum_blob_size) = maximum_blob_size { policy.maximum_blob_size = maximum_blob_size; } + if let Some(maximum_published_blobs) = maximum_published_blobs { + policy.maximum_published_blobs = maximum_published_blobs; + } if let Some(maximum_block_proposal_size) = maximum_block_proposal_size { @@ -1495,6 +1499,7 @@ async fn run(options: &ClientOptions) -> Result { maximum_fuel_per_block, maximum_executed_block_size, maximum_blob_size, + maximum_published_blobs, maximum_bytecode_size, maximum_block_proposal_size, maximum_bytes_read_per_block, @@ -1511,6 +1516,7 @@ async fn run(options: &ClientOptions) -> Result { maximum_bytes_written_per_block.unwrap_or(u64::MAX); let maximum_executed_block_size = maximum_executed_block_size.unwrap_or(u64::MAX); let maximum_blob_size = maximum_blob_size.unwrap_or(u64::MAX); + let maximum_published_blobs = maximum_published_blobs.unwrap_or(u64::MAX); let maximum_bytecode_size = maximum_bytecode_size.unwrap_or(u64::MAX); let maximum_block_proposal_size = maximum_block_proposal_size.unwrap_or(u64::MAX); let policy = ResourceControlPolicy { @@ -1528,6 +1534,7 @@ async fn run(options: &ClientOptions) -> Result { maximum_fuel_per_block, maximum_executed_block_size, maximum_blob_size, + maximum_published_blobs, maximum_bytecode_size, maximum_block_proposal_size, maximum_bytes_read_per_block,