From b1699d8e56166443b22b33f8e793a9b5247d504e Mon Sep 17 00:00:00 2001 From: AJ Date: Thu, 12 Dec 2024 15:38:24 +0100 Subject: [PATCH] fix(publisher): Don't defer error logging --- .../src/publisher/mod.rs | 59 ++++++++++--------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/crates/fuel-streams-publisher/src/publisher/mod.rs b/crates/fuel-streams-publisher/src/publisher/mod.rs index 9666053a..d67e607c 100644 --- a/crates/fuel-streams-publisher/src/publisher/mod.rs +++ b/crates/fuel-streams-publisher/src/publisher/mod.rs @@ -133,35 +133,37 @@ impl Publisher { loop { tokio::select! { - Some(sealed_block) = blocks_stream.next() => { - let sealed_block = sealed_block.context("block streams failed to produce sealed block")?; - - tracing::info!("Processing blocks stream"); - - let fuel_core = &self.fuel_core; - let (block, block_producer) = - fuel_core.get_block_and_producer(sealed_block); - - // TODO: Avoid awaiting Offchain DB sync for all streams by grouping in their own service - fuel_core - .await_offchain_db_sync(&block.id()) - .await - .context("Failed to await Offchain DB sync")?; - - if let Err(err) = self.publish(&block, &block_producer).await { - tracing::error!("Failed to publish block data: {}", err); - self.telemetry.record_failed_publishing(self.fuel_core.chain_id(), &block_producer); + Some(sealed_block) = blocks_stream.next() => { + let sealed_block = sealed_block.context("block streams failed to produce sealed block")?; + + tracing::info!("Processing blocks stream"); + + let fuel_core = &self.fuel_core; + let (block, block_producer) = + fuel_core.get_block_and_producer(sealed_block); + + // TODO: Avoid awaiting Offchain DB sync for all streams by grouping in their own service + fuel_core + .await_offchain_db_sync(&block.id()) + .await + .context("Failed to await Offchain DB sync")?; + + if let Err(err) = self.publish(&block, &block_producer).await { + tracing::error!("Publisher encountered an error: {:?}", err); + + tracing::error!("Failed to publish block data: {}", err); + self.telemetry.record_failed_publishing(self.fuel_core.chain_id(), &block_producer); + } + + }, + shutdown = shutdown_token.wait_for_shutdown() => { + if shutdown { + tracing::info!("Shutdown signal received. Stopping services ..."); + self.shutdown_services_with_timeout().await?; + break; } - - }, - shutdown = shutdown_token.wait_for_shutdown() => { - if shutdown { - tracing::info!("Shutdown signal received. Stopping services ..."); - self.shutdown_services_with_timeout().await?; - break; - } - }, - }; + }, + }; } tracing::info!("Publishing stopped successfully!"); @@ -281,6 +283,7 @@ pub fn publish( Ok(()) } Err(e) => { + tracing::error!("Failed to publish: {:?}", e); telemetry.log_error(&e.to_string()); telemetry.update_publisher_error_metrics( wildcard,