Skip to content

Commit

Permalink
Parallelize make_benchmark_chains
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Feb 6, 2025
1 parent 09aeee8 commit ccfbb1b
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 31 deletions.
120 changes: 89 additions & 31 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,79 +614,137 @@ where
num_chains: usize,
balance: Amount,
) -> Result<HashMap<ChainId, KeyPair>, Error> {
let mut key_pairs = HashMap::new();
let mut benchmark_chains = HashMap::new();
let start = Instant::now();
for chain_id in self.wallet.owned_chain_ids() {
if key_pairs.len() == num_chains {
if benchmark_chains.len() == num_chains {
break;
}
let Some(key_pair) = self
// This should never panic, because `owned_chain_ids` only returns the owned chains that
// we have a key pair for.
let key_pair = self
.wallet
.get(chain_id)
.and_then(|chain| chain.key_pair.as_ref().map(|kp| kp.copy()))
else {
continue;
};
.unwrap();
let chain_client = self.make_chain_client(chain_id)?;

let ownership = chain_client.chain_info().await?.manager.ownership;
if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
continue;
}
key_pairs.insert(chain_id, key_pair);
benchmark_chains.insert(chain_client.chain_id(), key_pair);
}

info!(
"Got {} chains from the wallet in {} ms",
benchmark_chains.len(),
start.elapsed().as_millis()
);

let chains_from_wallet = benchmark_chains.len();
let start = Instant::now();
let default_chain_id = self
.wallet
.default_chain()
.expect("should have default chain");
let num_chains_to_create = num_chains - benchmark_chains.len();
let operations_per_block = 900; // Over this we seem to hit the block size limits.

let mut key_pairs = Vec::new();
for _ in (0..num_chains_to_create).step_by(operations_per_block) {
key_pairs.push(self.wallet.generate_key_pair());
}
let mut key_pairs_iter = key_pairs.into_iter();
let admin_id = self.wallet.genesis_admin_chain();
let chain_client = self.make_chain_client(default_chain_id)?;
while key_pairs.len() < num_chains {
let key_pair = self.wallet.generate_key_pair();
let (epoch, committees) = chain_client.epoch_and_committees(default_chain_id).await?;
let epoch = epoch.expect("default chain should be active");
// Put at most 1000 OpenChain operations in each block.
let num_new_chains = (num_chains - key_pairs.len()).min(1000);
let config = OpenChainConfig {
ownership: ChainOwnership::single_super(key_pair.public().into()),
committees,
admin_id: self.wallet.genesis_admin_chain(),
epoch,
for i in (0..num_chains_to_create).step_by(operations_per_block) {
let num_new_chains = operations_per_block.min(num_chains_to_create - i);
let key_pair = key_pairs_iter.next().unwrap();

let certificate = Self::execute_open_chains_operations(
num_new_chains,
&chain_client,
balance,
application_permissions: Default::default(),
};
let operations = iter::repeat(Operation::System(SystemOperation::OpenChain(config)))
.take(num_new_chains)
.collect();
let certificate = chain_client
.execute_operations(operations, vec![])
.await?
.expect("should execute block with OpenChain operations");
&key_pair,
admin_id,
)
.await?;
info!("Block executed successfully");

let block = certificate.block();
let timestamp = block.header.timestamp;

info!("Will update wallet for {} new chains", num_new_chains);
for i in 0..num_new_chains {
let message_id = block
.message_id_for_operation(i, OPEN_CHAIN_MESSAGE_INDEX)
.expect("failed to create new chain");
let chain_id = ChainId::child(message_id);
key_pairs.insert(chain_id, key_pair.copy());
benchmark_chains.insert(chain_id, key_pair.copy());
self.client.track_chain(chain_id);
self.update_wallet_for_new_chain(chain_id, Some(key_pair.copy()), timestamp)
.await?;
}
info!("Updated wallet for {} new chains", num_new_chains);
}

if num_chains - chains_from_wallet > 0 {
info!(
"Created {} chains in {} ms",
num_chains - chains_from_wallet,
start.elapsed().as_millis()
);
}

info!("Updating wallet from client");
self.update_wallet_from_client(&chain_client).await?;
info!("Retrying pending outgoing messages");
let updated_chain_client = self.make_chain_client(default_chain_id)?;
updated_chain_client
.retry_pending_outgoing_messages()
.await
.context("outgoing messages to create the new chains should be delivered")?;

for chain_id in key_pairs.keys() {
info!("Processing inboxes");
let start = Instant::now();
for chain_id in benchmark_chains.keys() {
let child_client = self.make_chain_client(*chain_id)?;
child_client.process_inbox().await?;
self.wallet.as_mut().update_from_state(&child_client).await;
self.save_wallet().await?;
}
Ok(key_pairs)
info!("Processed inboxes in {} ms", start.elapsed().as_millis());
info!("Saving wallet");
self.save_wallet().await?;
Ok(benchmark_chains)
}

async fn execute_open_chains_operations(
num_new_chains: usize,
chain_client: &ChainClient<NodeProvider, S>,
balance: Amount,
key_pair: &KeyPair,
admin_id: ChainId,
) -> Result<ConfirmedBlockCertificate, Error> {
let chain_id = chain_client.chain_id();
let (epoch, committees) = chain_client.epoch_and_committees(chain_id).await?;
let epoch = epoch.expect("default chain should be active");
let config = OpenChainConfig {
ownership: ChainOwnership::single_super(key_pair.public().into()),
committees,
admin_id,
epoch,
balance,
application_permissions: Default::default(),
};
let operations = iter::repeat(Operation::System(SystemOperation::OpenChain(config)))
.take(num_new_chains)
.collect();
info!("Executing {} OpenChain operations", num_new_chains);
Ok(chain_client
.execute_operations(operations, vec![])
.await?
.expect("should execute block with OpenChain operations"))
}

/// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
Expand Down
6 changes: 6 additions & 0 deletions linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,9 +774,15 @@ impl Runnable for Job {
start.elapsed().as_millis()
);

let start = Instant::now();
let key_pairs = context
.make_benchmark_chains(num_chains, tokens_per_chain)
.await?;
info!(
"Got {} chains in {} ms",
key_pairs.len(),
start.elapsed().as_millis()
);

if let Some(id) = fungible_application_id {
context.supply_fungible_tokens(&key_pairs, id).await?;
Expand Down

0 comments on commit ccfbb1b

Please sign in to comment.