Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload proposals' blobs separately. #3204

Merged
merged 8 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ View or update the resource control policy
* `--maximum-fuel-per-block <MAXIMUM_FUEL_PER_BLOCK>` — Set the maximum amount of fuel per block
* `--maximum-executed-block-size <MAXIMUM_EXECUTED_BLOCK_SIZE>` — Set the maximum size of an executed block, in bytes
* `--maximum-blob-size <MAXIMUM_BLOB_SIZE>` — Set the maximum size of data blobs, compressed bytecode and other binary blobs, in bytes
* `--maximum-published-blobs <MAXIMUM_PUBLISHED_BLOBS>` — Set the maximum number of published blobs per block
* `--maximum-bytecode-size <MAXIMUM_BYTECODE_SIZE>` — Set the maximum size of decompressed contract or service bytecode, in bytes
* `--maximum-block-proposal-size <MAXIMUM_BLOCK_PROPOSAL_SIZE>` — Set the maximum size of a block proposal, in bytes
* `--maximum-bytes-read-per-block <MAXIMUM_BYTES_READ_PER_BLOCK>` — Set the maximum read data per block
Expand Down Expand Up @@ -501,6 +502,7 @@ Create genesis configuration for a Linera deployment. Create initial user chains
* `--maximum-executed-block-size <MAXIMUM_EXECUTED_BLOCK_SIZE>` — Set the maximum size of an executed block
* `--maximum-bytecode-size <MAXIMUM_BYTECODE_SIZE>` — Set the maximum size of decompressed contract or service bytecode, in bytes
* `--maximum-blob-size <MAXIMUM_BLOB_SIZE>` — Set the maximum size of data blobs, compressed bytecode and other binary blobs, in bytes
* `--maximum-published-blobs <MAXIMUM_PUBLISHED_BLOBS>` — Set the maximum number of published blobs per block
* `--maximum-block-proposal-size <MAXIMUM_BLOCK_PROPOSAL_SIZE>` — Set the maximum size of a block proposal, in bytes
* `--maximum-bytes-read-per-block <MAXIMUM_BYTES_READ_PER_BLOCK>` — Set the maximum read data per block
* `--maximum-bytes-written-per-block <MAXIMUM_BYTES_WRITTEN_PER_BLOCK>` — Set the maximum write data per block
Expand Down
7 changes: 7 additions & 0 deletions linera-chain/src/certificate/lite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl<'a> LiteCertificate<'a> {
Ok(&self.value)
}

/// Checks whether the value matches this certificate.
pub fn check_value<T: CertificateValue>(&self, value: &Hashed<T>) -> bool {
self.value.chain_id == value.inner().chain_id()
&& T::KIND == self.value.kind
&& self.value.value_hash == value.hash()
}

/// Returns the [`GenericCertificate`] with the specified value, if it matches.
pub fn with_value<T: CertificateValue>(
self,
Expand Down
44 changes: 27 additions & 17 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,26 @@ use futures::stream::{self, StreamExt, TryStreamExt};
use linera_base::{
crypto::CryptoHash,
data_types::{
Amount, ArithmeticError, Blob, BlockHeight, OracleResponse, Timestamp,
UserApplicationDescription,
Amount, ArithmeticError, BlockHeight, OracleResponse, Timestamp, UserApplicationDescription,
},
ensure,
identifiers::{
BlobId, ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner,
StreamId, UserApplicationId,
ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner, StreamId,
UserApplicationId,
},
ownership::ChainOwnership,
};
use linera_execution::{
committee::ValidatorName, system::OpenChainConfig, ExecutionOutcome, ExecutionRuntimeContext,
ExecutionStateView, Message, MessageContext, Operation, OperationContext, Query, QueryContext,
QueryOutcome, RawExecutionOutcome, RawOutgoingMessage, ResourceController, ResourceTracker,
ServiceRuntimeEndpoint, TransactionTracker,
committee::{Committee, Epoch, ValidatorName},
system::OpenChainConfig,
ExecutionOutcome, ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext,
Operation, OperationContext, Query, QueryContext, QueryOutcome, RawExecutionOutcome,
RawOutgoingMessage, ResourceController, ResourceTracker, ServiceRuntimeEndpoint,
TransactionTracker,
};
use linera_views::{
context::Context,
log_view::LogView,
map_view::MapView,
queue_view::QueueView,
reentrant_collection_view::ReentrantCollectionView,
register_view::RegisterView,
Expand All @@ -49,7 +50,7 @@ use crate::{
inbox::{Cursor, InboxError, InboxStateView},
manager::ChainManager,
outbox::OutboxStateView,
types::ValidatedBlockCertificate,
pending_blobs::PendingBlobsView,
ChainError, ChainExecutionContext, ExecutionResultExt,
};

Expand Down Expand Up @@ -201,10 +202,10 @@ where
/// Consensus state.
pub manager: ChainManager<C>,
/// Pending validated block that is still missing blobs.
#[graphql(skip)]
pub pending_validated_block: RegisterView<C, Option<ValidatedBlockCertificate>>,
/// The incomplete set of blobs for the pending validated block.
pub pending_validated_blobs: MapView<C, BlobId, Option<Blob>>,
pub pending_validated_blobs: PendingBlobsView<C>,
/// The incomplete sets of blobs for upcoming proposals.
pub pending_proposed_blobs: ReentrantCollectionView<C, Owner, PendingBlobsView<C>>,

/// Hashes of all certified blocks for this sender.
/// This ends with `block_hash` and has length `usize::from(next_block_height)`.
Expand Down Expand Up @@ -589,6 +590,17 @@ where
Ok(true)
}

pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
self.execution_state
.system
.current_committee()
.ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
}

pub fn ownership(&self) -> &ChainOwnership {
self.execution_state.system.ownership.get()
}

/// Removes the incoming message bundles in the block from the inboxes.
pub async fn remove_bundles_from_inboxes(
&mut self,
Expand Down Expand Up @@ -703,9 +715,7 @@ where
ChainError::InvalidBlockTimestamp
);
self.execution_state.system.timestamp.set(block.timestamp);
let Some((_, committee)) = self.execution_state.system.current_committee() else {
return Err(ChainError::InactiveChain(chain_id));
};
let (_, committee) = self.current_committee()?;
let mut resource_controller = ResourceController {
policy: Arc::new(committee.policy().clone()),
tracker: ResourceTracker::default(),
Expand Down Expand Up @@ -891,7 +901,7 @@ where
let maybe_committee = self.execution_state.system.current_committee().into_iter();

self.pending_validated_blobs.clear();
self.pending_validated_block.set(None);
self.pending_proposed_blobs.clear();
self.manager.reset(
self.execution_state.system.ownership.get().clone(),
block.height.try_add_one()?,
Expand Down
63 changes: 42 additions & 21 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use custom_debug_derive::Debug;
use linera_base::{
bcs,
crypto::{BcsHashable, BcsSignable, CryptoError, CryptoHash, KeyPair, PublicKey, Signature},
data_types::{Amount, Blob, BlockHeight, OracleResponse, Round, Timestamp},
data_types::{Amount, BlockHeight, OracleResponse, Round, Timestamp},
doc_scalar, ensure,
hashed::Hashed,
hex_debug,
Expand All @@ -29,6 +29,7 @@ use linera_execution::{
use serde::{Deserialize, Serialize};

use crate::{
block::ValidatedBlock,
types::{
CertificateKind, CertificateValue, GenericCertificate, LiteCertificate,
ValidatedBlockCertificate,
Expand Down Expand Up @@ -144,12 +145,8 @@ impl ProposedBlock {
Some((in_bundle, posted_message, config))
}

pub fn check_proposal_size(
&self,
maximum_block_proposal_size: u64,
blobs: &[Blob],
) -> Result<(), ChainError> {
let size = bcs::serialized_size(&(self, blobs))?;
pub fn check_proposal_size(&self, maximum_block_proposal_size: u64) -> Result<(), ChainError> {
let size = bcs::serialized_size(self)?;
ensure!(
size <= usize::try_from(maximum_block_proposal_size).unwrap_or(usize::MAX),
ChainError::BlockProposalTooLarge
Expand Down Expand Up @@ -312,8 +309,6 @@ pub struct BlockProposal {
pub owner: Owner,
pub public_key: PublicKey,
pub signature: Signature,
#[debug(skip_if = Vec::is_empty)]
pub blobs: Vec<Blob>,
#[debug(skip_if = Option::is_none)]
pub validated_block_certificate: Option<LiteCertificate<'static>>,
}
Expand Down Expand Up @@ -744,7 +739,7 @@ impl BlockExecutionOutcome {
}
}

fn oracle_blob_ids(&self) -> HashSet<BlobId> {
pub fn oracle_blob_ids(&self) -> HashSet<BlobId> {
let mut required_blob_ids = HashSet::new();
for responses in &self.oracle_responses {
for response in responses {
Expand Down Expand Up @@ -777,12 +772,7 @@ pub struct ProposalContent {
}

impl BlockProposal {
pub fn new_initial(
round: Round,
block: ProposedBlock,
secret: &KeyPair,
blobs: Vec<Blob>,
) -> Self {
pub fn new_initial(round: Round, block: ProposedBlock, secret: &KeyPair) -> Self {
let content = ProposalContent {
round,
block,
Expand All @@ -794,7 +784,6 @@ impl BlockProposal {
public_key: secret.public(),
owner: secret.public().into(),
signature,
blobs,
validated_block_certificate: None,
}
}
Expand All @@ -803,7 +792,6 @@ impl BlockProposal {
round: Round,
validated_block_certificate: ValidatedBlockCertificate,
secret: &KeyPair,
blobs: Vec<Blob>,
) -> Self {
let lite_cert = validated_block_certificate.lite_certificate().cloned();
let block = validated_block_certificate.into_inner().into_inner();
Expand All @@ -819,13 +807,46 @@ impl BlockProposal {
public_key: secret.public(),
owner: secret.public().into(),
signature,
blobs,
validated_block_certificate: Some(lite_cert),
}
}

pub fn check_signature(&self, public_key: PublicKey) -> Result<(), CryptoError> {
self.signature.check(&self.content, public_key)
pub fn check_signature(&self) -> Result<(), CryptoError> {
self.signature.check(&self.content, self.public_key)
}

pub fn required_blob_ids(&self) -> impl Iterator<Item = BlobId> + '_ {
self.content.block.published_blob_ids().into_iter().chain(
self.content
.outcome
.iter()
.flat_map(|outcome| outcome.oracle_blob_ids()),
)
}

/// Checks that the public key matches the owner and that the optional certificate matches
/// the outcome.
pub fn check_invariants(&self) -> Result<(), &'static str> {
ensure!(
self.owner == Owner::from(&self.public_key),
"Public key does not match owner"
);
match (&self.validated_block_certificate, &self.content.outcome) {
(None, None) => {}
(None, Some(_)) | (Some(_), None) => {
return Err("Must contain a validation certificate if and only if \
it contains the execution outcome from a previous round");
}
(Some(lite_certificate), Some(outcome)) => {
let executed_block = outcome.clone().with(self.content.block.clone());
let value = Hashed::new(ValidatedBlock::new(executed_block));
ensure!(
lite_certificate.check_value(&value),
"Lite certificate must match the given block and execution outcome"
);
}
}
Ok(())
}
}

Expand Down
1 change: 1 addition & 0 deletions linera-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod data_types;
mod inbox;
pub mod manager;
mod outbox;
mod pending_blobs;
#[cfg(with_testing)]
pub mod test;

Expand Down
27 changes: 18 additions & 9 deletions linera-chain/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ where
/// proposals in the same round, this contains only the first one.
#[graphql(skip)]
pub proposed: RegisterView<C, Option<BlockProposal>>,
/// These are blobs published or read by the proposed block.
pub proposed_blobs: MapView<C, BlobId, Blob>,
/// Latest validated proposal that a validator may have voted to confirm. This is either the
/// latest `ValidatedBlock` we have seen, or the proposal from the `Fast` round.
#[graphql(skip)]
Expand Down Expand Up @@ -460,16 +462,16 @@ where
{
let value = Hashed::new(ValidatedBlock::new(executed_block.clone()));
if let Some(certificate) = lite_cert.clone().with_value(value) {
self.update_locking(LockingBlock::Regular(certificate), blobs)?;
self.update_locking(LockingBlock::Regular(certificate), blobs.clone())?;
}
}
} else if round.is_fast() && self.locking_block.get().is_none() {
// The fast block also counts as locking.
self.update_locking(LockingBlock::Fast(proposal.clone()), blobs)?;
self.update_locking(LockingBlock::Fast(proposal.clone()), blobs.clone())?;
}

// We record the proposed block, in case it affects the current round number.
self.update_proposed(proposal.clone());
self.update_proposed(proposal.clone(), blobs)?;
self.update_current_round(local_time);

let Some(key_pair) = key_pair else {
Expand Down Expand Up @@ -521,10 +523,8 @@ where

/// Returns the requested blob if it belongs to the proposal or the locking block.
pub async fn pending_blob(&self, blob_id: &BlobId) -> Result<Option<Blob>, ViewError> {
if let Some(proposal) = self.proposed.get() {
if let Some(blob) = proposal.blobs.iter().find(|blob| blob.id() == *blob_id) {
return Ok(Some(blob.clone()));
}
if let Some(blob) = self.proposed_blobs.get(blob_id).await? {
return Ok(Some(blob));
}
self.locking_blobs.get(blob_id).await
}
Expand Down Expand Up @@ -651,13 +651,22 @@ where
}

/// Sets the proposed block, if it is newer than our known latest proposal.
fn update_proposed(&mut self, proposal: BlockProposal) {
fn update_proposed(
&mut self,
proposal: BlockProposal,
blobs: BTreeMap<BlobId, Blob>,
) -> Result<(), ViewError> {
if let Some(old_proposal) = self.proposed.get() {
if old_proposal.content.round >= proposal.content.round {
return;
return Ok(());
}
}
self.proposed.set(Some(proposal));
self.proposed_blobs.clear();
for (blob_id, blob) in blobs {
self.proposed_blobs.insert(&blob_id, blob)?;
}
Ok(())
}

/// Sets the locking block and the associated blobs, if it is newer than the known one.
Expand Down
Loading
Loading