From 21f22fb59e8b72725e1649f930e475d95a6aff18 Mon Sep 17 00:00:00 2001 From: nazgull08 Date: Tue, 7 Jan 2025 13:39:35 +0300 Subject: [PATCH] removed timeout --- src/indexer/pangea.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/indexer/pangea.rs b/src/indexer/pangea.rs index cd8e028..9d9963f 100644 --- a/src/indexer/pangea.rs +++ b/src/indexer/pangea.rs @@ -10,7 +10,7 @@ use std::collections::HashSet; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use tokio::time::{sleep, timeout}; +use tokio::time::sleep; use crate::config::env::ev; use crate::error::Error; @@ -58,6 +58,7 @@ async fn start_pangea_indexer( if last_processed_block == 0 { last_processed_block = contract_start_block; } + info!("Switching to listening for new orders (deltas)"); listen_for_new_deltas( @@ -109,7 +110,6 @@ async fn fetch_historical_data( "FUEL" => ChainId::FUEL, _ => ChainId::FUELTESTNET, }; - let mut last_processed_block = contract_start_block; let target_latest_block = get_latest_block(fuel_chain).await?; @@ -145,7 +145,7 @@ async fn fetch_historical_data( last_processed_block = target_latest_block; info!( - "Processed events up to block {}. Moving to the next stage...", + "Processed events up to block {}. Moving to the next batch...", last_processed_block ); @@ -167,11 +167,8 @@ async fn listen_for_new_deltas( contract_h256: H256, ) -> Result<(), Error> { let mut retry_delay = Duration::from_secs(1); - let max_backoff = Duration::from_secs(60); - let inactivity_timeout = Duration::from_secs(3600); - loop { let fuel_chain = match ev("CHAIN")?.as_str() { "FUEL" => ChainId::FUEL, @@ -187,7 +184,7 @@ async fn listen_for_new_deltas( }; info!( - "Attempting to open stream from block: {} to Subscribe", + "Attempting to open realtime stream from block: {}", last_processed_block + 1 ); @@ -202,8 +199,8 @@ async fn listen_for_new_deltas( pangea_client::futures::pin_mut!(stream_deltas); - while let Ok(Some(item)) = timeout(inactivity_timeout, stream_deltas.next()).await { - match item { + while let Some(data_result) = stream_deltas.next().await { + match data_result { Ok(data) => { if let Err(e) = process_order_data( &data, @@ -247,12 +244,18 @@ async fn process_order_data( let order_event: PangeaOrderEvent = serde_json::from_str(&data_str)?; *last_processed_block = order_event.block_number; + handle_order_event(order_book.clone(), matching_orders.clone(), order_event).await; + timer.observe_duration(); BUY_ORDERS_TOTAL.set(order_book.get_buy_orders().len() as i64); SELL_ORDERS_TOTAL.set(order_book.get_sell_orders().len() as i64); - info!("BUY_ORDERS_TOTAL: {}", BUY_ORDERS_TOTAL.get()); - info!("SELL_ORDERS_TOTAL: {}", SELL_ORDERS_TOTAL.get()); + info!( + "Processed block: {}, BUY_ORDERS_TOTAL: {}, SELL_ORDERS_TOTAL: {}", + last_processed_block, + BUY_ORDERS_TOTAL.get(), + SELL_ORDERS_TOTAL.get() + ); Ok(()) }