Skip to content

Commit

Permalink
In prepare_chain, ensure validate_incoming_bundles is OK. (#2835)
Browse files Browse the repository at this point in the history
* In prepare_chain, ensure validate_incoming_bundles is OK.

* Add two epochs in CI before starting the faucet.

* Box the largest error variants.
  • Loading branch information
afck committed Nov 11, 2024
1 parent 63502a1 commit 1fb6b00
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 122 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ jobs:
cargo build --features storage-service
mkdir /tmp/local-linera-net
cargo run --features storage-service --bin linera -- net up --storage service:tcp:localhost:1235:table --policy-config devnet --path /tmp/local-linera-net --validators 4 --shards 4 &
- name: Run the faucet
- name: Create two epochs and run the faucet
run: |
cargo build --bin linera
cargo run --bin linera -- faucet --amount 1000 --port 8079 &
cargo run --bin linera -- resource-control-policy --block 0.0000001
cargo run --bin linera -- resource-control-policy --block 0.000000
cargo run --bin linera -- faucet --amount 1000 --port 8079 69705f85ac4c9fef6c02b4d83426aaaf05154c645ec1c61665f8e450f0468bc0 &
- name: Run the remote-net tests
run: |
cargo test -p linera-service remote_net_grpc --features remote-net
Expand Down
70 changes: 30 additions & 40 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use linera_base::{
},
};
use linera_execution::{
system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRuntimeContext,
ExecutionStateView, Message, MessageContext, Operation, OperationContext, Query, QueryContext,
RawExecutionOutcome, RawOutgoingMessage, ResourceController, ResourceTracker, Response,
ServiceRuntimeEndpoint, TransactionTracker,
system::OpenChainConfig, ExecutionOutcome, ExecutionRuntimeContext, ExecutionStateView,
Message, MessageContext, Operation, OperationContext, Query, QueryContext, RawExecutionOutcome,
RawOutgoingMessage, ResourceController, ResourceTracker, Response, ServiceRuntimeEndpoint,
TransactionTracker,
};
use linera_views::{
context::Context,
Expand All @@ -46,7 +46,7 @@ use crate::{
inbox::{Cursor, InboxError, InboxStateView},
manager::ChainManager,
outbox::OutboxStateView,
ChainError, ChainExecutionContext,
ChainError, ChainExecutionContext, ExecutionResultExt,
};

#[cfg(test)]
Expand Down Expand Up @@ -385,12 +385,10 @@ where
next_block_height: self.tip_state.get().next_block_height,
local_time,
};
let response = self
.execution_state
self.execution_state
.query_application(context, query, service_runtime_endpoint)
.await
.map_err(|error| ChainError::ExecutionError(error, ChainExecutionContext::Query))?;
Ok(response)
.with_execution_context(ChainExecutionContext::Query)
}

pub async fn describe_application(
Expand All @@ -402,9 +400,7 @@ where
.registry
.describe_application(application_id)
.await
.map_err(|err| {
ChainError::ExecutionError(err.into(), ChainExecutionContext::DescribeApplication)
})
.with_execution_context(ChainExecutionContext::DescribeApplication)
}

pub async fn mark_messages_as_received(
Expand Down Expand Up @@ -745,7 +741,7 @@ where
resource_controller
.track_executed_block_size_sequence_extension(0, block.operations.len())
})
.map_err(|err| ChainError::ExecutionError(err, ChainExecutionContext::Block))?;
.with_execution_context(ChainExecutionContext::Block)?;

if self.is_closed() {
ensure!(
Expand Down Expand Up @@ -793,8 +789,6 @@ where
Transaction::ReceiveMessages(_) => ChainExecutionContext::IncomingBundle(txn_index),
Transaction::ExecuteOperation(_) => ChainExecutionContext::Operation(txn_index),
};
let with_context =
|error: ExecutionError| ChainError::ExecutionError(error, chain_execution_context);
let maybe_responses = match replaying_oracle_responses.as_mut().map(Iterator::next) {
Some(Some(responses)) => Some(responses),
Some(None) => return Err(ChainError::MissingOracleResponseList),
Expand All @@ -805,7 +799,7 @@ where
Transaction::ReceiveMessages(incoming_bundle) => {
resource_controller
.track_executed_block_size_of(&incoming_bundle)
.map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
for (message_id, posted_message) in incoming_bundle.messages_and_ids() {
self.execute_message_in_block(
message_id,
Expand All @@ -823,7 +817,7 @@ where
Transaction::ExecuteOperation(operation) => {
resource_controller
.track_executed_block_size_of(&operation)
.map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
#[cfg(with_metrics)]
let _operation_latency = OPERATION_EXECUTION_LATENCY.measure_latency();
let context = OperationContext {
Expand All @@ -842,21 +836,22 @@ where
&mut resource_controller,
)
.await
.map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
resource_controller
.with_state(&mut self.execution_state)
.await?
.track_operation(operation)
.map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
}
}

self.execution_state
.update_execution_outcomes_with_app_registrations(&mut txn_tracker)
.await
.map_err(with_context)?;
let (txn_outcomes, txn_oracle_responses, new_next_message_index) =
txn_tracker.destructure().map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
let (txn_outcomes, txn_oracle_responses, new_next_message_index) = txn_tracker
.destructure()
.with_execution_context(chain_execution_context)?;
next_message_index = new_next_message_index;
let (txn_messages, txn_events) = self
.process_execution_outcomes(block.height, txn_outcomes)
Expand All @@ -874,21 +869,21 @@ where
.with_state(&mut self.execution_state)
.await?
.track_message(&message_out.message)
.map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
}
}
resource_controller
.track_executed_block_size_of(&(&txn_oracle_responses, &txn_messages, &txn_events))
.map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
resource_controller
.track_executed_block_size_sequence_extension(oracle_responses.len(), 1)
.map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
resource_controller
.track_executed_block_size_sequence_extension(messages.len(), 1)
.map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
resource_controller
.track_executed_block_size_sequence_extension(events.len(), 1)
.map_err(with_context)?;
.with_execution_context(chain_execution_context)?;
oracle_responses.push(txn_oracle_responses);
messages.push(txn_messages);
events.push(txn_events);
Expand All @@ -901,7 +896,7 @@ where
.with_state(&mut self.execution_state)
.await?
.track_block()
.map_err(|err| ChainError::ExecutionError(err, ChainExecutionContext::Block))?;
.with_execution_context(ChainExecutionContext::Block)?;
}

// Recompute the state hash.
Expand Down Expand Up @@ -978,10 +973,6 @@ where
let mut grant = posted_message.grant;
match incoming_bundle.action {
MessageAction::Accept => {
let with_context = |error: ExecutionError| {
let context = ChainExecutionContext::IncomingBundle(txn_index);
ChainError::ExecutionError(error, context)
};
// Once a chain is closed, accepting incoming messages is not allowed.
ensure!(!self.is_closed(), ChainError::ClosedChain);

Expand All @@ -995,36 +986,35 @@ where
resource_controller,
)
.await
.map_err(with_context)?;
.with_execution_context(ChainExecutionContext::IncomingBundle(txn_index))?;
if grant > Amount::ZERO {
if let Some(refund_grant_to) = posted_message.refund_grant_to {
self.execution_state
.send_refund(context, grant, refund_grant_to, txn_tracker)
.await
.map_err(with_context)?;
.with_execution_context(ChainExecutionContext::IncomingBundle(
txn_index,
))?;
}
}
}
MessageAction::Reject => {
let with_context = |error: ExecutionError| {
ChainError::ExecutionError(error, ChainExecutionContext::Block)
};
// If rejecting a message fails, the entire block proposal should be
// scrapped.
ensure!(
!posted_message.is_protected() || self.is_closed(),
ChainError::CannotRejectMessage {
chain_id: block.chain_id,
origin: Box::new(incoming_bundle.origin.clone()),
posted_message: posted_message.clone(),
posted_message: Box::new(posted_message.clone()),
}
);
if posted_message.is_tracked() {
// Bounce the message.
self.execution_state
.bounce_message(context, grant, posted_message.message.clone(), txn_tracker)
.await
.map_err(with_context)?;
.with_execution_context(ChainExecutionContext::Block)?;
} else if grant > Amount::ZERO {
// Nothing to do except maybe refund the grant.
let Some(refund_grant_to) = posted_message.refund_grant_to else {
Expand All @@ -1037,7 +1027,7 @@ where
self.execution_state
.send_refund(context, posted_message.grant, refund_grant_to, txn_tracker)
.await
.map_err(with_context)?;
.with_execution_context(ChainExecutionContext::Block)?;
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions linera-chain/src/inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,23 @@ impl From<(ChainId, Origin, InboxError)> for ChainError {
} => ChainError::UnexpectedMessage {
chain_id,
origin: origin.into(),
bundle,
previous_bundle,
bundle: Box::new(bundle),
previous_bundle: Box::new(previous_bundle),
},
InboxError::IncorrectOrder {
bundle,
next_cursor,
} => ChainError::IncorrectMessageOrder {
chain_id,
origin: origin.into(),
bundle,
bundle: Box::new(bundle),
next_height: next_cursor.height,
next_index: next_cursor.index,
},
InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
chain_id,
origin: origin.into(),
bundle,
bundle: Box::new(bundle),
},
}
}
Expand Down
25 changes: 19 additions & 6 deletions linera-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub enum ChainError {
#[error("Error in view operation: {0}")]
ViewError(#[from] ViewError),
#[error("Execution error: {0} during {1:?}")]
ExecutionError(ExecutionError, ChainExecutionContext),
ExecutionError(Box<ExecutionError>, ChainExecutionContext),

#[error("The chain being queried is not active {0:?}")]
InactiveChain(ChainId),
Expand All @@ -54,8 +54,8 @@ pub enum ChainError {
UnexpectedMessage {
chain_id: ChainId,
origin: Box<Origin>,
bundle: MessageBundle,
previous_bundle: MessageBundle,
bundle: Box<MessageBundle>,
previous_bundle: Box<MessageBundle>,
},
#[error(
"Message in block proposed to {chain_id:?} is out of order compared to previous messages \
Expand All @@ -65,7 +65,7 @@ pub enum ChainError {
IncorrectMessageOrder {
chain_id: ChainId,
origin: Box<Origin>,
bundle: MessageBundle,
bundle: Box<MessageBundle>,
next_height: BlockHeight,
next_index: u32,
},
Expand All @@ -76,7 +76,7 @@ pub enum ChainError {
CannotRejectMessage {
chain_id: ChainId,
origin: Box<Origin>,
posted_message: PostedMessage,
posted_message: Box<PostedMessage>,
},
#[error(
"Block proposed to {chain_id:?} is attempting to skip a message bundle \
Expand All @@ -85,7 +85,7 @@ pub enum ChainError {
CannotSkipMessage {
chain_id: ChainId,
origin: Box<Origin>,
bundle: MessageBundle,
bundle: Box<MessageBundle>,
},
#[error(
"Incoming message bundle in block proposed to {chain_id:?} has timestamp \
Expand Down Expand Up @@ -160,3 +160,16 @@ pub enum ChainExecutionContext {
Operation(u32),
Block,
}

trait ExecutionResultExt<T> {
fn with_execution_context(self, context: ChainExecutionContext) -> Result<T, ChainError>;
}

impl<T, E> ExecutionResultExt<T> for Result<T, E>
where
E: Into<ExecutionError>,
{
fn with_execution_context(self, context: ChainExecutionContext) -> Result<T, ChainError> {
self.map_err(|error| ChainError::ExecutionError(Box::new(error.into()), context))
}
}
4 changes: 2 additions & 2 deletions linera-chain/src/unit_tests/chain_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ async fn test_block_size_limit() {
assert_matches!(
result,
Err(ChainError::ExecutionError(
ExecutionError::ExecutedBlockTooLarge,
execution_error,
ChainExecutionContext::Operation(1),
))
)) if matches!(*execution_error, ExecutionError::ExecutedBlockTooLarge)
);

// The valid block is accepted...
Expand Down
4 changes: 2 additions & 2 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ where
ensure!(
executed_block.outcome == verified_outcome,
WorkerError::IncorrectOutcome {
submitted: executed_block.outcome.clone(),
computed: verified_outcome,
submitted: Box::new(executed_block.outcome.clone()),
computed: Box::new(verified_outcome),
}
);
// Advance to next block height.
Expand Down
21 changes: 15 additions & 6 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ where
}

/// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
/// its current height.
/// its current height and are not missing any received messages from the inbox.
#[instrument(level = "trace")]
async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
#[cfg(with_metrics)]
Expand All @@ -1002,6 +1002,15 @@ where
let nodes = self.validator_nodes().await?;
info = self.synchronize_chain_state(&nodes, self.chain_id).await?;
}

let result = self
.chain_state_view()
.await?
.validate_incoming_bundles()
.await;
if matches!(result, Err(ChainError::MissingCrossChainUpdate { .. })) {
self.find_received_certificates().await?;
}
self.update_from_info(&info);
Ok(info)
}
Expand Down Expand Up @@ -2308,13 +2317,13 @@ where
Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
WorkerError::ChainError(error),
))) if matches!(
*error,
&*error,
ChainError::ExecutionError(
ExecutionError::SystemError(
SystemExecutionError::InsufficientFundingForFees { .. }
),
execution_error,
ChainExecutionContext::Block
)
) if matches!(**execution_error, ExecutionError::SystemError(
SystemExecutionError::InsufficientFundingForFees { .. }
))
) =>
{
// We can't even pay for the execution of one empty block. Let's return zero.
Expand Down
1 change: 0 additions & 1 deletion linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ impl<T> ClientOutcome<T> {
}
}

#[expect(clippy::result_large_err)]
pub fn try_map<F, S>(self, f: F) -> Result<ClientOutcome<S>, ChainClientError>
where
F: FnOnce(T) -> Result<S, ChainClientError>,
Expand Down
Loading

0 comments on commit 1fb6b00

Please sign in to comment.