Skip to content

Commit

Permalink
Parallelize process_inboxes_and_force_validator_updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Feb 6, 2025
1 parent 96d6a58 commit f7cca92
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
56 changes: 48 additions & 8 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use linera_chain::types::ConfirmedBlockCertificate;
use linera_core::{
client::{BlanketMessagePolicy, ChainClient, Client, MessagePolicy},
data_types::ClientOutcome,
join_set_ext::{JoinSet, JoinSetExt as _},
join_set_ext::JoinSet,
node::CrossChainMessageDelivery,
JoinSetExt,
};
use linera_rpc::node_provider::{NodeOptions, NodeProvider};
use linera_storage::Storage;
Expand Down Expand Up @@ -566,16 +567,55 @@ where
W: Persist<Target = Wallet>,
{
pub async fn process_inboxes_and_force_validator_updates(&mut self) {
for chain_id in self.wallet.owned_chain_ids() {
let chain_client = self
.make_chain_client(chain_id)
.expect("chains in the wallet must exist");
self.process_inbox(&chain_client).await.unwrap();
chain_client.update_validators(None).await.unwrap();
self.update_wallet_from_client(&chain_client).await.unwrap();
let chain_clients = self
.wallet
.owned_chain_ids()
.iter()
.map(|chain_id| {
self.make_chain_client(*chain_id)
.expect("chains in the wallet must exist")
})
.collect::<Vec<_>>();

let mut join_set = task::JoinSet::new();
for chain_client in chain_clients {
join_set.spawn(async move {
Self::process_inbox_without_updating_wallet(&chain_client)
.await
.expect("Processing inbox should not fail!");
chain_client.update_validators(None).await.unwrap();
chain_client
});
}

let chain_clients = join_set.join_all().await;
for chain_client in &chain_clients {
self.update_wallet_from_client(chain_client).await.unwrap();
}
if chain_clients.is_empty() {
self.save_wallet().await.unwrap();
}
}

async fn process_inbox_without_updating_wallet(
chain_client: &ChainClient<NodeProvider, S>,
) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
let mut certificates = Vec::new();
// Try processing the inbox optimistically without waiting for validator notifications.
let (new_certificates, maybe_timeout) = {
chain_client.synchronize_from_validators().await?;
let result = chain_client.process_inbox_without_prepare().await;
result?
};
certificates.extend(new_certificates);
assert!(
maybe_timeout.is_none(),
"Should not timeout within benchmark!"
);

Ok(certificates)
}

/// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
/// with key pairs.
pub async fn make_benchmark_chains(
Expand Down
7 changes: 6 additions & 1 deletion linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,10 +764,15 @@ impl Runnable for Job {
transactions_per_block,
fungible_application_id,
} => {
let start = Instant::now();
// Below all block proposals are supposed to succeed without retries, we
// must make sure that all incoming payments have been accepted on-chain
// and that no validator is missing user certificates.
context.process_inboxes_and_force_validator_updates().await;
info!(
"Processed inboxes and forced validator updates in {} ms",
start.elapsed().as_millis()
);

let key_pairs = context
.make_benchmark_chains(num_chains, tokens_per_chain)
Expand Down Expand Up @@ -840,7 +845,7 @@ impl Runnable for Job {
}
});
info!(
"Confirmed {} valid certificates for {} block proposals.",
"Confirmed {} valid certificates for block proposals to {} chains.",
num_valid,
confirmed.len()
);
Expand Down

0 comments on commit f7cca92

Please sign in to comment.