Skip to content

Commit

Permalink
fix(publisher): Don't defer error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Jurshsmith committed Dec 12, 2024
1 parent c165b19 commit b1699d8
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions crates/fuel-streams-publisher/src/publisher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,35 +133,37 @@ impl Publisher {

loop {
tokio::select! {
Some(sealed_block) = blocks_stream.next() => {
let sealed_block = sealed_block.context("block streams failed to produce sealed block")?;

tracing::info!("Processing blocks stream");

let fuel_core = &self.fuel_core;
let (block, block_producer) =
fuel_core.get_block_and_producer(sealed_block);

// TODO: Avoid awaiting Offchain DB sync for all streams by grouping in their own service
fuel_core
.await_offchain_db_sync(&block.id())
.await
.context("Failed to await Offchain DB sync")?;

if let Err(err) = self.publish(&block, &block_producer).await {
tracing::error!("Failed to publish block data: {}", err);
self.telemetry.record_failed_publishing(self.fuel_core.chain_id(), &block_producer);
Some(sealed_block) = blocks_stream.next() => {
let sealed_block = sealed_block.context("block streams failed to produce sealed block")?;

tracing::info!("Processing blocks stream");

let fuel_core = &self.fuel_core;
let (block, block_producer) =
fuel_core.get_block_and_producer(sealed_block);

// TODO: Avoid awaiting Offchain DB sync for all streams by grouping in their own service
fuel_core
.await_offchain_db_sync(&block.id())
.await
.context("Failed to await Offchain DB sync")?;

if let Err(err) = self.publish(&block, &block_producer).await {
tracing::error!("Publisher encountered an error: {:?}", err);

tracing::error!("Failed to publish block data: {}", err);
self.telemetry.record_failed_publishing(self.fuel_core.chain_id(), &block_producer);
}

},
shutdown = shutdown_token.wait_for_shutdown() => {
if shutdown {
tracing::info!("Shutdown signal received. Stopping services ...");
self.shutdown_services_with_timeout().await?;
break;
}

},
shutdown = shutdown_token.wait_for_shutdown() => {
if shutdown {
tracing::info!("Shutdown signal received. Stopping services ...");
self.shutdown_services_with_timeout().await?;
break;
}
},
};
},
};
}

tracing::info!("Publishing stopped successfully!");
Expand Down Expand Up @@ -281,6 +283,7 @@ pub fn publish<S: Streamable + 'static>(
Ok(())
}
Err(e) => {
tracing::error!("Failed to publish: {:?}", e);
telemetry.log_error(&e.to_string());
telemetry.update_publisher_error_metrics(
wildcard,
Expand Down

0 comments on commit b1699d8

Please sign in to comment.