Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeytimoshin committed Jun 6, 2024
1 parent eb9a08b commit 2c2ec5b
Show file tree
Hide file tree
Showing 101 changed files with 1,158 additions and 569 deletions.
23 changes: 16 additions & 7 deletions forester/src/indexer/photon_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use log::info;
use crate::utils::decode_hash;
use account_compression::initialize_address_merkle_tree::Pubkey;
use light_test_utils::indexer::{
Indexer, IndexerError, MerkleProof, MerkleProofWithAddressContext,
};
use log::info;
use photon_api::apis::configuration::Configuration;
use photon_api::models::GetCompressedAccountsByOwnerPostRequestParams;

Expand Down Expand Up @@ -31,16 +31,21 @@ impl Clone for PhotonIndexer {
}

impl Indexer for PhotonIndexer {

async fn get_multiple_compressed_account_proofs_for_forester(&self, hashes: Vec<String>) -> Result<Vec<MerkleProof>, IndexerError> {
async fn get_multiple_compressed_account_proofs_for_forester(
&self,
hashes: Vec<String>,
) -> Result<Vec<MerkleProof>, IndexerError> {
self.get_multiple_compressed_account_proofs(hashes).await
}

async fn get_multiple_compressed_account_proofs(
&self,
hashes: Vec<String>,
) -> Result<Vec<MerkleProof>, IndexerError> {
info!("PhotonIndexer: Getting proofs for {} accounts", hashes.len());
info!(
"PhotonIndexer: Getting proofs for {} accounts",
hashes.len()
);
let request = photon_api::models::GetMultipleCompressedAccountProofsPostRequest {
params: hashes,
..Default::default()
Expand Down Expand Up @@ -86,7 +91,10 @@ impl Indexer for PhotonIndexer {
}
}

async fn get_rpc_compressed_accounts_by_owner(&self, owner: &Pubkey) -> Result<Vec<String>, IndexerError> {
async fn get_rpc_compressed_accounts_by_owner(
&self,
owner: &Pubkey,
) -> Result<Vec<String>, IndexerError> {
let request = photon_api::models::GetCompressedAccountsByOwnerPostRequest {
params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
cursor: None,
Expand All @@ -99,7 +107,9 @@ impl Indexer for PhotonIndexer {
let result = photon_api::apis::default_api::get_compressed_accounts_by_owner_post(
&self.configuration,
request,
).await.unwrap();
)
.await
.unwrap();

info!("PhotonIndexer: Got response: {:?}", result);

Expand All @@ -112,7 +122,6 @@ impl Indexer for PhotonIndexer {
Ok(hashes)
}


fn get_address_tree_proof(
&self,
_merkle_tree_pubkey: [u8; 32],
Expand Down
59 changes: 34 additions & 25 deletions forester/src/nullifier/nullify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ use light_hash_set::HashSet;
use light_registry::sdk::get_cpi_authority_pda;
use light_system_program::utils::get_registered_program_pda;
use light_test_utils::indexer::Indexer;
use light_test_utils::rpc::rpc_connection::RpcConnection;
use light_test_utils::test_env::NOOP_PROGRAM_ID;
use log::{info, warn};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signer;
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;
use futures::FutureExt;
use futures::TryFutureExt;
use tokio::signal;
use tokio::sync::{Mutex, Semaphore};
use tokio_util::sync::CancellationToken;
use light_test_utils::rpc::rpc_connection::RpcConnection;

pub async fn nullify<T: Indexer, R: RpcConnection>(indexer: &mut T, rpc: &mut R, config: &Config) -> Result<(), ForesterError> {
pub async fn nullify<T: Indexer, R: RpcConnection>(
indexer: &mut T,
rpc: &mut R,
config: &Config,
) -> Result<(), ForesterError> {
let concurrency_limit = config.concurrency_limit;
let semaphore = Arc::new(Semaphore::new(concurrency_limit));
let successful_nullifications = Arc::new(Mutex::new(1));
Expand Down Expand Up @@ -104,37 +106,34 @@ pub async fn nullify<T: Indexer, R: RpcConnection>(indexer: &mut T, rpc: &mut R,
leaf_index,
&config_clone,
rpc,
indexer
indexer,
)
.await
.await
{
Ok(_) => {
let mut successful_nullifications =
successful_nullifications.lock().await;
let mut successful_nullifications = successful_nullifications.lock().await;
*successful_nullifications += 1;
break;
}
Err(e) => {
if retries >= max_retries {
warn!(
"Max retries reached for account {}: {:?}",
account.hash_string(),
e
);
"Max retries reached for account {}: {:?}",
account.hash_string(),
e
);
break;
}
retries += 1;
warn!(
"Retrying account {} due to error: {:?}",
account.hash_string(),
e
);
"Retrying account {} due to error: {:?}",
account.hash_string(),
e
);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
}
}


} else {
warn!("No proof found for account: {}", account.hash_string());
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Expand All @@ -154,9 +153,8 @@ async fn fetch_queue_data<T: Indexer, R: RpcConnection>(
rpc: &mut R,
config: &Config,
) -> Result<Option<StateQueueData>, ForesterError> {
let (change_log_index, sequence_number) = {
get_changelog_index(&config.state_merkle_tree_pubkey, rpc).await?
};
let (change_log_index, sequence_number) =
{ get_changelog_index(&config.state_merkle_tree_pubkey, rpc).await? };
let compressed_accounts_to_nullify = {
let queue = get_nullifier_queue(&config.nullifier_queue_pubkey, rpc).await?;
info!(
Expand Down Expand Up @@ -246,7 +244,13 @@ pub async fn nullify_compressed_account<T: Indexer, R: RpcConnection>(
ix,
];

let signature = rpc.create_and_send_transaction(&instructions, &config.payer_keypair.pubkey(), &[&config.payer_keypair]).await;
let signature = rpc
.create_and_send_transaction(
&instructions,
&config.payer_keypair.pubkey(),
&[&config.payer_keypair],
)
.await;
info!("Transaction: {:?}", signature);
indexer.account_nullified(config.state_merkle_tree_pubkey, &account.hash_string());
Ok(())
Expand All @@ -256,11 +260,14 @@ pub async fn get_nullifier_queue<R: RpcConnection>(
nullifier_queue_pubkey: &Pubkey,
rpc: &mut R,
) -> Result<Vec<Account>, ForesterError> {
let mut nullifier_queue_account = rpc.get_account(*nullifier_queue_pubkey).await
let mut nullifier_queue_account = rpc
.get_account(*nullifier_queue_pubkey)
.await
.map_err(|e| {
warn!("Error fetching nullifier queue account: {:?}", e);
ForesterError::Custom("Error fetching nullifier queue account".to_string())
})?.unwrap();
})?
.unwrap();

let nullifier_queue: HashSet = unsafe {
HashSet::from_bytes_copy(
Expand All @@ -287,7 +294,9 @@ pub async fn get_changelog_index<R: RpcConnection>(
merkle_tree_pubkey: &Pubkey,
rpc: &mut R,
) -> Result<(usize, usize), ForesterError> {
let merkle_tree_account = rpc.get_anchor_account::<StateMerkleTreeAccount>(merkle_tree_pubkey).await;
let merkle_tree_account = rpc
.get_anchor_account::<StateMerkleTreeAccount>(merkle_tree_pubkey)
.await;
let merkle_tree = merkle_tree_account.copy_merkle_tree()?;
Ok((
merkle_tree.current_changelog_index,
Expand Down
4 changes: 2 additions & 2 deletions forester/src/nullifier/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::{nullify, Config};
use crate::constants::{INDEXER_URL, WS_SERVER_URL};
use crate::indexer::PhotonIndexer;
use light_test_utils::rpc::rpc_connection::RpcConnection;
use log::{info, warn};
use solana_client::pubsub_client::PubsubClient;
use solana_client::rpc_config::RpcAccountInfoConfig;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::time::{sleep, Duration};
use light_test_utils::rpc::rpc_connection::RpcConnection;
use super::{nullify, Config};

pub async fn subscribe_nullify<R: RpcConnection>(config: &Config, rpc: &mut R) {
let mut indexer = PhotonIndexer::new(INDEXER_URL.to_string());
Expand Down
4 changes: 3 additions & 1 deletion forester/tests/empty_address_tree_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ async fn empty_address_tree_test() {

async fn get_state_queue_length(config: &Config) -> usize {
let mut rpc = SolanaRpcConnection::new(None).await;
let queue = get_nullifier_queue(&config.address_merkle_tree_queue_pubkey, &mut rpc).await.unwrap();
let queue = get_nullifier_queue(&config.address_merkle_tree_queue_pubkey, &mut rpc)
.await
.unwrap();
queue.len()
}
16 changes: 11 additions & 5 deletions forester/tests/nullifier_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use forester::constants::{INDEXER_URL, SERVER_URL};
use forester::indexer::PhotonIndexer;
use forester::nullifier::{get_nullifier_queue, nullify, subscribe_nullify, Config};
use forester::utils::u8_arr_to_hex_string;
use light_test_utils::rpc::rpc_connection::RpcConnection;
use light_test_utils::rpc::SolanaRpcConnection;
use light_test_utils::test_env::{get_test_env_accounts, REGISTRY_ID_TEST_KEYPAIR};
use log::{info, warn};
use solana_client::rpc_client::RpcClient;
use solana_sdk::native_token::LAMPORTS_PER_SOL;
use solana_sdk::signature::Keypair;
use solana_sdk::signer::Signer;
use light_test_utils::rpc::rpc_connection::RpcConnection;
use light_test_utils::rpc::SolanaRpcConnection;

fn test_config() -> Config {
let registry_keypair = Keypair::from_bytes(&REGISTRY_ID_TEST_KEYPAIR).unwrap();
Expand All @@ -35,7 +35,9 @@ fn test_config() -> Config {
async fn queue_info_test() {
let config = test_config();
let mut rpc = SolanaRpcConnection::new(None).await;
let queue = get_nullifier_queue(&config.nullifier_queue_pubkey, &mut rpc).await.unwrap();
let queue = get_nullifier_queue(&config.nullifier_queue_pubkey, &mut rpc)
.await
.unwrap();
info!("Nullifier queue length: {}", queue.len());
}

Expand Down Expand Up @@ -89,7 +91,9 @@ async fn test_nullify_leaves() {
let mut indexer = PhotonIndexer::new(INDEXER_URL.to_string());
let config = test_config();
let mut rpc = SolanaRpcConnection::new(None).await;
let _ = rpc.airdrop_lamports(&config.payer_keypair.pubkey(), LAMPORTS_PER_SOL * 1000).await;
let _ = rpc
.airdrop_lamports(&config.payer_keypair.pubkey(), LAMPORTS_PER_SOL * 1000)
.await;

let time = std::time::Instant::now();
match nullify(&mut indexer, &mut rpc, &config).await {
Expand All @@ -108,6 +112,8 @@ async fn test_nullify_leaves() {
async fn test_subscribe_nullify() {
let config = test_config();
let mut rpc = SolanaRpcConnection::new(None).await;
let _ = rpc.airdrop_lamports(&config.payer_keypair.pubkey(), LAMPORTS_PER_SOL * 1000).await;
let _ = rpc
.airdrop_lamports(&config.payer_keypair.pubkey(), LAMPORTS_PER_SOL * 1000)
.await;
subscribe_nullify(&config, &mut rpc).await;
}
Loading

0 comments on commit 2c2ec5b

Please sign in to comment.