diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index d4a9c3cf8ed..9f663de4814 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -18,8 +18,9 @@ use linera_chain::types::ConfirmedBlockCertificate; use linera_core::{ client::{BlanketMessagePolicy, ChainClient, Client, MessagePolicy}, data_types::ClientOutcome, - join_set_ext::{JoinSet, JoinSetExt as _}, + join_set_ext::JoinSet, node::CrossChainMessageDelivery, + JoinSetExt, }; use linera_rpc::node_provider::{NodeOptions, NodeProvider}; use linera_storage::Storage; @@ -566,14 +567,53 @@ where W: Persist, { pub async fn process_inboxes_and_force_validator_updates(&mut self) { - for chain_id in self.wallet.owned_chain_ids() { - let chain_client = self - .make_chain_client(chain_id) - .expect("chains in the wallet must exist"); - self.process_inbox(&chain_client).await.unwrap(); - chain_client.update_validators(None).await.unwrap(); - self.update_wallet_from_client(&chain_client).await.unwrap(); + let chain_clients = self + .wallet + .owned_chain_ids() + .iter() + .map(|chain_id| { + self.make_chain_client(*chain_id) + .expect("chains in the wallet must exist") + }) + .collect::>(); + + let mut join_set = task::JoinSet::new(); + for chain_client in chain_clients { + join_set.spawn(async move { + Self::process_inbox_without_updating_wallet(&chain_client) + .await + .expect("Processing inbox should not fail!"); + chain_client.update_validators(None).await.unwrap(); + chain_client + }); + } + + let chain_clients = join_set.join_all().await; + for chain_client in &chain_clients { + self.update_wallet_from_client(chain_client).await.unwrap(); } + if chain_clients.is_empty() { + self.save_wallet().await.unwrap(); + } + } + + async fn process_inbox_without_updating_wallet( + chain_client: &ChainClient, + ) -> Result, Error> { + let mut certificates = Vec::new(); + // Try processing the inbox optimistically without waiting for validator notifications. + let (new_certificates, maybe_timeout) = { + chain_client.synchronize_from_validators().await?; + let result = chain_client.process_inbox_without_prepare().await; + result? + }; + certificates.extend(new_certificates); + assert!( + maybe_timeout.is_none(), + "Should not timeout within benchmark!" + ); + + Ok(certificates) } /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index fa09f718a74..70c3bd29ffa 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -764,10 +764,15 @@ impl Runnable for Job { transactions_per_block, fungible_application_id, } => { + let start = Instant::now(); // Below all block proposals are supposed to succeed without retries, we // must make sure that all incoming payments have been accepted on-chain // and that no validator is missing user certificates. context.process_inboxes_and_force_validator_updates().await; + info!( + "Processed inboxes and forced validator updates in {} ms", + start.elapsed().as_millis() + ); let key_pairs = context .make_benchmark_chains(num_chains, tokens_per_chain) @@ -840,7 +845,7 @@ impl Runnable for Job { } }); info!( - "Confirmed {} valid certificates for {} block proposals.", + "Confirmed {} valid certificates for block proposals to {} chains.", num_valid, confirmed.len() );