Skip to content

Commit

Permalink
fallback to node shieldsync
Browse files Browse the repository at this point in the history
  • Loading branch information
Fraccaman committed Nov 25, 2024
1 parent 4019ab0 commit 7b1829b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 48 deletions.
3 changes: 2 additions & 1 deletion workload/src/build_checks/shielding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub async fn shielding(
source: Alias,
target: Alias,
amount: u64,
with_indexer: bool,
retry_config: RetryFutureConfig<ExponentialBackoff, NoOnRetry>,
state: &State,
) -> Vec<Check> {
Expand All @@ -19,7 +20,7 @@ pub async fn shielding(
};

let target_check = if let Ok(Some(pre_balance)) =
super::utils::get_shielded_balance(sdk, target.clone(), None).await
super::utils::get_shielded_balance(sdk, target.clone(), None, with_indexer).await
{
Check::BalanceShieldedTarget(target, pre_balance, amount, state.clone())
} else {
Expand Down
145 changes: 99 additions & 46 deletions workload/src/build_checks/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use namada_sdk::{
address::Address,
control_flow::install_shutdown_signal,
io::DevNullProgressBar,
masp::{shielded_wallet::ShieldedApi, IndexerMaspClient, MaspLocalTaskEnv, ShieldedSyncConfig},
masp::{
shielded_wallet::ShieldedApi, IndexerMaspClient, LedgerMaspClient, MaspLocalTaskEnv,
ShieldedSyncConfig,
},
masp_primitives::{transaction::components::ValueSum, zip32},
rpc,
token::{self, MaspDigitPos},
Expand Down Expand Up @@ -50,15 +53,26 @@ pub async fn get_shielded_balance(
sdk: &Sdk,
source: Alias,
height: Option<u64>,
with_indexer: bool
) -> Result<Option<token::Amount>, StepError> {
let shiedsync_res = shield_sync(sdk, height).await;
antithesis_sdk::assert_always!(
shiedsync_res.is_ok(),
"Shieldsync was successful.",
&json!({
"source": source,
})
);
let shiedsync_res = shield_sync(sdk, height, with_indexer).await;
if with_indexer {
antithesis_sdk::assert_sometimes!(
shiedsync_res.is_ok(),
"Shieldsync (indexer) was successful.",
&json!({
"source": source,
})
);
} else {
antithesis_sdk::assert_always!(
shiedsync_res.is_ok(),
"Shieldsync (node) was successful.",
&json!({
"source": source,
})
);
}
if shiedsync_res.is_err() {
tracing::info!(
"Shieldsync error: {}",
Expand Down Expand Up @@ -150,7 +164,11 @@ pub async fn get_bond(
.ok()
}

pub async fn shield_sync(sdk: &Sdk, height: Option<u64>) -> Result<(), StepError> {
pub async fn shield_sync(
sdk: &Sdk,
height: Option<u64>,
with_indexer: bool,
) -> Result<(), StepError> {
let now = Instant::now();
tracing::info!("Started shieldsync...");

Expand All @@ -168,45 +186,80 @@ pub async fn shield_sync(sdk: &Sdk, height: Option<u64>) -> Result<(), StepError
let mut shielded_ctx = sdk.namada.shielded_mut().await;

let mut max_retries = 3;
loop {
// let masp_client = LedgerMaspClient::new(sdk.namada.clone_client(), 10);
let masp_client = IndexerMaspClient::new(
reqwest::Client::new(),
Url::parse(&sdk.masp_indexer_url).unwrap(),
true,
10,
);
let task_env = MaspLocalTaskEnv::new(4).map_err(|e| StepError::ShieldSync(e.to_string()))?;
let shutdown_signal = install_shutdown_signal(true);

let config = ShieldedSyncConfig::builder()
.client(masp_client)
.fetched_tracker(DevNullProgressBar)
.scanned_tracker(DevNullProgressBar)
.applied_tracker(DevNullProgressBar)
.shutdown_signal(shutdown_signal);

let config = if height.is_some() {
config.wait_for_last_query_height(true).build()
} else {
config.build()
};
if with_indexer {
loop {
// let masp_client = LedgerMaspClient::new(sdk.namada.clone_client(), 10);
let masp_client = IndexerMaspClient::new(
reqwest::Client::new(),
Url::parse(&sdk.masp_indexer_url).unwrap(),
true,
10,
);
let task_env =
MaspLocalTaskEnv::new(4).map_err(|e| StepError::ShieldSync(e.to_string()))?;
let shutdown_signal = install_shutdown_signal(true);

let config = ShieldedSyncConfig::builder()
.client(masp_client)
.fetched_tracker(DevNullProgressBar)
.scanned_tracker(DevNullProgressBar)
.applied_tracker(DevNullProgressBar)
.shutdown_signal(shutdown_signal);

let config = if height.is_some() {
config.wait_for_last_query_height(true).build()
} else {
config.build()
};

let height = height.map(|h| h.into());
let res = shielded_ctx
.sync(task_env, config, height, &[], &vks)
.await;
if res.is_err() {
tracing::info!("Retry shieldsyncing ({}/3)...", max_retries);
if max_retries == 0 {
res.map_err(|e| StepError::ShieldedSync(e.to_string()))?
let height = height.map(|h| h.into());
let res = shielded_ctx.sync(task_env, config, height, &[], &vks).await;
if res.is_err() {
tracing::info!("Retry shieldsyncing ({}/3)...", max_retries);
if max_retries == 0 {
res.map_err(|e| StepError::ShieldedSync(e.to_string()))?
}
max_retries -= 1;
sleep(Duration::from_secs(2)).await
} else {
break;
}
max_retries -= 1;
sleep(Duration::from_secs(2)).await
} else {
break
}
}
} else {
loop {
let masp_client =
LedgerMaspClient::new(sdk.namada.clone_client(), 10, Duration::from_secs(1));
let task_env =
MaspLocalTaskEnv::new(4).map_err(|e| StepError::ShieldSync(e.to_string()))?;
let shutdown_signal = install_shutdown_signal(true);

let config = ShieldedSyncConfig::builder()
.client(masp_client)
.fetched_tracker(DevNullProgressBar)
.scanned_tracker(DevNullProgressBar)
.applied_tracker(DevNullProgressBar)
.shutdown_signal(shutdown_signal);

let config = if height.is_some() {
config.wait_for_last_query_height(true).build()
} else {
config.build()
};

let height = height.map(|h| h.into());
let res = shielded_ctx.sync(task_env, config, height, &[], &vks).await;
if res.is_err() {
tracing::info!("Retry shieldsyncing ({}/3)...", max_retries);
if max_retries == 0 {
res.map_err(|e| StepError::ShieldedSync(e.to_string()))?
}
max_retries -= 1;
sleep(Duration::from_secs(2)).await
} else {
break;
}
}
};

shielded_ctx
.save()
Expand Down
4 changes: 3 additions & 1 deletion workload/src/steps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl WorkloadExecutor {
source,
target,
amount,
true,
retry_config,
state,
)
Expand Down Expand Up @@ -435,7 +436,7 @@ impl WorkloadExecutor {

for (alias, amount) in shielded_balances {
if let Ok(Some(pre_balance)) =
build_checks::utils::get_shielded_balance(sdk, alias.clone(), None).await
build_checks::utils::get_shielded_balance(sdk, alias.clone(), None, true).await
{
if amount >= 0 {
checks.push(Check::BalanceShieldedTarget(
Expand Down Expand Up @@ -607,6 +608,7 @@ impl WorkloadExecutor {
sdk,
target.clone(),
Some(execution_height),
false
)
.await
{
Expand Down

0 comments on commit 7b1829b

Please sign in to comment.