Skip to content

Commit

Permalink
removed timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
nazgull08 committed Jan 7, 2025
1 parent 863d731 commit 21f22fb
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions src/indexer/pangea.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{sleep, timeout};
use tokio::time::sleep;

use crate::config::env::ev;
use crate::error::Error;
Expand Down Expand Up @@ -58,6 +58,7 @@ async fn start_pangea_indexer(
if last_processed_block == 0 {
last_processed_block = contract_start_block;
}

info!("Switching to listening for new orders (deltas)");

listen_for_new_deltas(
Expand Down Expand Up @@ -109,7 +110,6 @@ async fn fetch_historical_data(
"FUEL" => ChainId::FUEL,
_ => ChainId::FUELTESTNET,
};

let mut last_processed_block = contract_start_block;

let target_latest_block = get_latest_block(fuel_chain).await?;
Expand Down Expand Up @@ -145,7 +145,7 @@ async fn fetch_historical_data(

last_processed_block = target_latest_block;
info!(
"Processed events up to block {}. Moving to the next stage...",
"Processed events up to block {}. Moving to the next batch...",
last_processed_block
);

Expand All @@ -167,11 +167,8 @@ async fn listen_for_new_deltas(
contract_h256: H256,
) -> Result<(), Error> {
let mut retry_delay = Duration::from_secs(1);

let max_backoff = Duration::from_secs(60);

let inactivity_timeout = Duration::from_secs(3600);

loop {
let fuel_chain = match ev("CHAIN")?.as_str() {
"FUEL" => ChainId::FUEL,
Expand All @@ -187,7 +184,7 @@ async fn listen_for_new_deltas(
};

info!(
"Attempting to open stream from block: {} to Subscribe",
"Attempting to open realtime stream from block: {}",
last_processed_block + 1
);

Expand All @@ -202,8 +199,8 @@ async fn listen_for_new_deltas(

pangea_client::futures::pin_mut!(stream_deltas);

while let Ok(Some(item)) = timeout(inactivity_timeout, stream_deltas.next()).await {
match item {
while let Some(data_result) = stream_deltas.next().await {
match data_result {
Ok(data) => {
if let Err(e) = process_order_data(
&data,
Expand Down Expand Up @@ -247,12 +244,18 @@ async fn process_order_data(
let order_event: PangeaOrderEvent = serde_json::from_str(&data_str)?;

*last_processed_block = order_event.block_number;

handle_order_event(order_book.clone(), matching_orders.clone(), order_event).await;

timer.observe_duration();
BUY_ORDERS_TOTAL.set(order_book.get_buy_orders().len() as i64);
SELL_ORDERS_TOTAL.set(order_book.get_sell_orders().len() as i64);

info!("BUY_ORDERS_TOTAL: {}", BUY_ORDERS_TOTAL.get());
info!("SELL_ORDERS_TOTAL: {}", SELL_ORDERS_TOTAL.get());
info!(
"Processed block: {}, BUY_ORDERS_TOTAL: {}, SELL_ORDERS_TOTAL: {}",
last_processed_block,
BUY_ORDERS_TOTAL.get(),
SELL_ORDERS_TOTAL.get()
);
Ok(())
}

0 comments on commit 21f22fb

Please sign in to comment.