Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
Merge #398
Browse files Browse the repository at this point in the history
398: Don't fail for loops r=da-kami a=da-kami

We should not fail for loops where we loop over cfds to process state updates based on information we learned / restart. 
We use a macro to wrap all fallible code in the loop to just print an error and continue. 

Note: I went over the codebase in search of other for loops, maybe someone else can do another grep :)

Co-authored-by: Daniel Karzel <[email protected]>
  • Loading branch information
bors[bot] and da-kami authored Oct 20, 2021
2 parents 3734ffc + c9c03b7 commit 5891ad6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 25 deletions.
36 changes: 18 additions & 18 deletions daemon/src/cfd_actors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId};
use crate::wallet::Wallet;
use crate::{db, monitor, oracle};
use anyhow::{bail, Result};
use crate::{db, monitor, oracle, try_continue};
use anyhow::{bail, Context, Result};
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use tokio::sync::watch;
Expand Down Expand Up @@ -128,28 +128,28 @@ pub async fn handle_oracle_attestation(
.iter_mut()
.filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc)))
{
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id,
attestation.price,
attestation.scalars.clone(),
dlc,
cfd.role(),
)?))?
.is_none()
{
let attestation = try_continue!(Attestation::new(
attestation.id,
attestation.price,
attestation.scalars.clone(),
dlc,
cfd.role(),
));

let new_state =
try_continue!(cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation)));

if new_state.is_none() {
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}

append_cfd_state(cfd, conn, update_sender).await?;

if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
continue;
}
try_continue!(append_cfd_state(cfd, conn, update_sender).await);
try_continue!(try_cet_publication(cfd, conn, wallet, update_sender)
.await
.context("Error when trying to publish CET"));
}

Ok(())
Expand Down
13 changes: 6 additions & 7 deletions daemon/src/housekeeping.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::db::{append_cfd_state, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState};
use crate::try_continue;
use crate::wallet::Wallet;
use anyhow::Result;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;

pub async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>,
) -> Result<()> {
Expand All @@ -29,29 +29,28 @@ pub async fn rebroadcast_transactions(
let cfds = load_all_cfds(conn).await?;

for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;

let txid = try_continue!(wallet.try_broadcast_transaction(dlc.lock.0.clone()).await);
tracing::info!("Lock transaction published with txid {}", txid);
}

for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
let txid = try_continue!(wallet.try_broadcast_transaction(signed_refund_tx).await);

tracing::info!("Refund transaction published on chain: {}", txid);
}

for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
let txid = try_continue!(wallet.try_broadcast_transaction(signed_commit_tx).await);

tracing::info!("Commit transaction published on chain: {}", txid);
}

for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) {
// Double question-mark OK because if we are in PendingCet we must have been Ready before
// Double question mark OK because if we are in PendingCet we must have been Ready before
let signed_cet = cfd.cet()??;
let txid = wallet.try_broadcast_transaction(signed_cet).await?;
let txid = try_continue!(wallet.try_broadcast_transaction(signed_cet).await);

tracing::info!("CET published on chain: {}", txid);
}
Expand Down
1 change: 1 addition & 0 deletions daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod setup_contract;
pub mod taker_cfd;
pub mod to_sse_event;
pub mod tokio_ext;
pub mod try_continue;
pub mod wallet;
pub mod wallet_sync;
pub mod wire;
13 changes: 13 additions & 0 deletions daemon/src/try_continue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/// Wrapper for errors in loop that logs error and continues
#[macro_export]
macro_rules! try_continue {
($result:expr) => {
match $result {
Ok(value) => value,
Err(e) => {
tracing::error!("{:#}", e);
continue;
}
}
};
}

0 comments on commit 5891ad6

Please sign in to comment.