Skip to content

Commit

Permalink
Major updates:
Browse files Browse the repository at this point in the history
- Make taker send wait_notif to makjer.
- Fix maker and taker unfinished swapcoin detection logic.
- Remove harderrors at RPC while recovering.
- Differentiate between incoming/outgoing swapcoins. Fix the app commands to
give relevant balances only.
- Remove hard error on DNS connection to handle transient network errors.
- Break down balance and list utxo APIs for swaps and contracts. Dedicated api for
incoming swap and timelock contracts as thats what users are most interested in.
  • Loading branch information
mojoX911 committed Jan 21, 2025
1 parent d04502a commit 47d0204
Show file tree
Hide file tree
Showing 10 changed files with 447 additions and 190 deletions.
50 changes: 31 additions & 19 deletions src/bin/taker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,21 @@ struct Cli {
#[derive(Parser, Debug)]
enum Commands {
// TODO: Design a better structure to display different utxos and balance groups.
/// Lists all currently spendable utxos
/// Lists all utxos we know about along with their spend info. This is useful for debugging
ListUtxo,
/// List all signle signature wallet Utxos. These are all non-swap regular wallet utxos.
ListUtxoRegular,
/// Lists all utxos received in incoming swaps
ListUtxoSwap,
/// Lists all HTLC utxos (if any)
/// Lists all utxos that we need to claim via timelock. If you see entries in this list, do a `taker recover` to claim them.
ListUtxoContract,
/// Get the total spendable wallet balance (sats)
/// Get the total spendable wallet balance in sats (regular + swap utxos)
GetBalance,
/// Get the total balance received from swaps (sats)
/// Get Balance of all single sig regular wallet utxos.
GetBalanceRegular,
/// Get the total balance received in incoming swaps (sats)
GetBalanceSwap,
/// Get the total amount stuck in HTLC contracts (sats)
/// Get the total amount stuck in timelock contracts (sats)
GetBalanceContract,
/// Returns a new address
GetNewAddress,
Expand Down Expand Up @@ -105,7 +109,13 @@ enum Commands {

fn main() -> Result<(), TakerError> {
let args = Cli::parse();
setup_taker_logger(LevelFilter::from_str(&args.verbosity).unwrap());
setup_taker_logger(
LevelFilter::from_str(&args.verbosity).unwrap(),
matches!(
args.command,
Commands::Recover | Commands::FetchOffers | Commands::Coinswap { .. }
),
);

let rpc_config = RPCConfig {
url: args.rpc,
Expand Down Expand Up @@ -133,9 +143,13 @@ fn main() -> Result<(), TakerError> {

match args.command {
Commands::ListUtxo => {
let utxos = taker.get_wallet().list_all_utxo_spend_info(None)?;
println!("{:#?}", utxos);
}
Commands::ListUtxoRegular => {
let utxos: Vec<ListUnspentResultEntry> = taker
.get_wallet()
.list_all_utxo_spend_info(None)?
.list_descriptor_utxo_spend_info(None)?
.iter()
.map(|(l, _)| l.clone())
.collect();
Expand All @@ -144,31 +158,35 @@ fn main() -> Result<(), TakerError> {
Commands::ListUtxoSwap => {
let utxos: Vec<ListUnspentResultEntry> = taker
.get_wallet()
.list_swap_coin_utxo_spend_info(None)?
.list_incoming_swap_coin_utxo_spend_info(None)?
.iter()
.map(|(l, _)| l.clone())
.collect();
println!("{:#?}", utxos);
}
Commands::ListUtxoContract => {
let utxos: Vec<ListUnspentResultEntry> = taker
let utxos = taker
.get_wallet()
.list_live_contract_spend_info(None)?
.list_live_timelock_contract_spend_info(None)?
.iter()
.map(|(l, _)| l.clone())
.collect();
.collect::<Vec<_>>();
println!("{:#?}", utxos);
}
Commands::GetBalanceContract => {
let balance = taker.get_wallet().balance_live_contract(None)?;
println!("{:?}", balance);
}
Commands::GetBalanceSwap => {
let balance = taker.get_wallet().balance_swap_coins(None)?;
let balance = taker.get_wallet().balance_incoming_swap_coins(None)?;
println!("{:?}", balance);
}
Commands::GetBalance => {
let balance = taker.get_wallet().spendable_balance()?;
let balance = taker.get_wallet().spendable_balance(None)?;
println!("{:?}", balance);
}
Commands::GetBalanceRegular => {
let balance = taker.get_wallet().balance_descriptor_utxo(None)?;
println!("{:?}", balance);
}
Commands::GetNewAddress => {
Expand Down Expand Up @@ -213,7 +231,6 @@ fn main() -> Result<(), TakerError> {
}

Commands::FetchOffers => {
println!("Fetching offerdata from the market. use `tail -f <data-dir>/debug.log` to see progress.");
let offerbook = taker.fetch_offers()?;
println!("{:#?}", offerbook)
}
Expand All @@ -224,16 +241,11 @@ fn main() -> Result<(), TakerError> {
tx_count: 1,
required_confirms: REQUIRED_CONFIRMS,
};

println!("Starting coinswap with swap params : {:?}. use `tail -f <data-dir>/debug.log` to see progress.", swap_params);
taker.do_coinswap(swap_params)?;
println!("succesfully completed coinswap!! Check `list-utxo` to see the new coins");
}

Commands::Recover => {
println!("Starting recovery. use `tail -f <data-dir>/debug.log` to see progress.");
taker.recover_from_swap()?;
println!("Recovery completed succesfully.");
}
}

Expand Down
82 changes: 57 additions & 25 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub const RPC_PING_INTERVAL: Duration = Duration::from_secs(10);
// TODO: Make the maker repost their address to DNS once a day in spawned thread.
// pub const DIRECTORY_SERVERS_REFRESH_INTERVAL_SECS: u64 = Duartion::from_days(1); // Once a day.

/// Maker triggers the recovery mechanism, if Taker is idle for more than 1 hour.
pub const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60 * 60);
/// Maker triggers the recovery mechanism, if Taker is idle for more than 15 mins during a swap.
pub const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60 * 15);

/// The minimum difference in locktime (in blocks) between the incoming and outgoing swaps.
///
Expand Down Expand Up @@ -225,7 +225,7 @@ pub struct Maker {
/// A flag to trigger shutdown event
pub shutdown: AtomicBool,
/// Map of IP address to Connection State + last Connected instant
pub(crate) connection_state: Mutex<HashMap<String, (ConnectionState, Instant)>>,
pub(crate) ongoing_swap_state: Mutex<HashMap<String, (ConnectionState, Instant)>>,
/// Highest Value Fidelity Proof
pub(crate) highest_fidelity_proof: RwLock<Option<FidelityProof>>,
/// Is setup complete
Expand Down Expand Up @@ -317,7 +317,7 @@ impl Maker {
config,
wallet: RwLock::new(wallet),
shutdown: AtomicBool::new(false),
connection_state: Mutex::new(HashMap::new()),
ongoing_swap_state: Mutex::new(HashMap::new()),
highest_fidelity_proof: RwLock::new(None),
is_setup_complete: AtomicBool::new(false),
data_dir,
Expand Down Expand Up @@ -495,7 +495,7 @@ pub(crate) fn check_for_broadcasted_contracts(maker: Arc<Maker>) -> Result<(), M
}
// An extra scope to release all locks when done.
{
let mut lock_onstate = maker.connection_state.lock()?;
let mut lock_onstate = maker.ongoing_swap_state.lock()?;
for (ip, (connection_state, _)) in lock_onstate.iter_mut() {
let txids_to_watch = connection_state
.incoming_swapcoins
Expand Down Expand Up @@ -662,7 +662,7 @@ pub(crate) fn check_for_idle_states(maker: Arc<Maker>) -> Result<(), MakerError>

// Extra scope to release all locks when done.
{
let mut lock_on_state = maker.connection_state.lock()?;
let mut lock_on_state = maker.ongoing_swap_state.lock()?;
for (ip, (state, last_connected_time)) in lock_on_state.iter_mut() {
let mut outgoings = Vec::new();
let mut incomings = Vec::new();
Expand Down Expand Up @@ -751,9 +751,13 @@ pub(crate) fn recover_from_swap(
"[{}] Incoming Contract Already Broadcasted",
maker.config.network_port
);
} else if let Err(e) = maker.wallet.read()?.send_tx(&tx) {
log::info!(
"Can't send incoming contract: {} | {:?}",
tx.compute_txid(),
e
);
} else {
maker.wallet.read()?.send_tx(&tx)?;

log::info!(
"[{}] Broadcasted Incoming Contract : {}",
maker.config.network_port,
Expand All @@ -774,30 +778,52 @@ pub(crate) fn recover_from_swap(
}

//broadcast all the outgoing contracts
for ((_, tx), _) in outgoings.iter() {
if maker
for ((og_rs, tx), _) in outgoings.iter() {
let check_tx_result = maker
.wallet
.read()?
.rpc
.get_raw_transaction_info(&tx.compute_txid(), None)
.is_ok()
{
log::info!(
"[{}] Outgoing Contract already broadcasted",
maker.config.network_port
);
} else {
let txid = maker.wallet.read()?.send_tx(tx)?;
log::info!(
"[{}] Broadcasted Outgoing Contract : {}",
maker.config.network_port,
txid
);
.get_raw_transaction_info(&tx.compute_txid(), None);

match check_tx_result {
Ok(_) => {
log::info!(
"[{}] Outgoing Contract already broadcasted",
maker.config.network_port
);
}
Err(_) => {
let send_tx_result = maker.wallet.read()?.send_tx(tx);
match send_tx_result {
Ok(_) => {
log::info!(
"[{}] Broadcasted Outgoing Contract : {}",
maker.config.network_port,
tx.compute_txid()
);
}
Err(e) => {
log::info!(
"Can't send ougoing contract: {} | {:?}",
tx.compute_txid(),
e
);
if format!("{:?}", e).contains("bad-txns-inputs-missingorspent") {
// This means the funding utxo doesn't exist anymore. Just remove this coin.
maker
.get_wallet()
.write()?
.remove_outgoing_swapcoin(og_rs)?;
log::info!("Removed outgoing swapcoin: {}", tx.compute_txid());
}
}
}
}
}
}

// Save the wallet here before going into the expensive loop.
maker.get_wallet().write()?.sync()?;
maker.get_wallet().write()?.sync_no_fail();
maker.get_wallet().read()?.save_to_disk()?;
log::info!("Wallet file synced and saved to disk.");

Expand Down Expand Up @@ -883,6 +909,12 @@ pub(crate) fn recover_from_swap(
}
}

log::info!(
"{} outgoing contracts detected | {} timelock txs broadcasted.",
outgoings.len(),
timelock_boardcasted.len()
);

if timelock_boardcasted.len() == outgoings.len() {
// For tests, terminate the maker at this stage.
#[cfg(feature = "integration-test")]
Expand Down
36 changes: 27 additions & 9 deletions src/maker/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ pub(crate) fn handle_message(
connection_state: &mut ConnectionState,
message: TakerToMakerMessage,
) -> Result<Option<MakerToTakerMessage>, MakerError> {
// If taker is waiting for funding confirmation, reset the timer.
if let TakerToMakerMessage::WaitingFundingConfirmation(id) = &message {
log::info!(
"[{}] Taker is waiting for funding confirmation. Reseting timer.",
maker.config.network_port
);
maker
.ongoing_swap_state
.lock()?
.entry(id.clone())
.and_modify(|(_, timer)| *timer = Instant::now());
return Ok(None);
}

let outgoing_message = match connection_state.allowed_message {
ExpectedMessage::TakerHello => {
if let TakerToMakerMessage::TakerHello(m) = message {
Expand Down Expand Up @@ -397,7 +411,7 @@ impl Maker {
.expect("This should not overflow as we just above.");

log::info!(
"[{}] Outgoing Funding Txids: {:?}.",
"[{}] Prepared outgoing funding txs: {:?}.",
self.config.network_port,
my_funding_txes
.iter()
Expand Down Expand Up @@ -453,7 +467,7 @@ impl Maker {
.collect::<Result<Vec<SenderContractTxInfo>, WalletError>>()?;

// Update the connection state.
self.connection_state.lock()?.insert(
self.ongoing_swap_state.lock()?.insert(
message.id.clone(),
(connection_state.clone(), Instant::now()),
);
Expand Down Expand Up @@ -514,7 +528,7 @@ impl Maker {
my_funding_txids.push(txid);
}
log::info!(
"[{}] Outgoing Funding Txids: {:?}",
"[{}] Broadcasted funding txs: {:?}",
self.config.network_port,
my_funding_txids
);
Expand All @@ -533,7 +547,7 @@ impl Maker {
}

// Update the connection state.
self.connection_state.lock()?.insert(
self.ongoing_swap_state.lock()?.insert(
message.id.clone(),
(connection_state.clone(), Instant::now()),
);
Expand Down Expand Up @@ -599,14 +613,17 @@ impl Maker {
);
let mut swapcoin_private_keys = Vec::<MultisigPrivkey>::new();

// Send our privkey and mark the outgoing swapcoin as "done".
for multisig_redeemscript in &message.receivers_multisig_redeemscripts {
let wallet_read = self.wallet.read()?;
let outgoing_swapcoin = wallet_read
.find_outgoing_swapcoin(multisig_redeemscript)
let mut wallet_write = self.wallet.write()?;
let outgoing_swapcoin = wallet_write
.find_outgoing_swapcoin_mut(multisig_redeemscript)
.expect("outgoing swapcoin expected");
if read_hashvalue_from_contract(&outgoing_swapcoin.contract_redeemscript)? != hashvalue
{
return Err(MakerError::General("not correct hash preimage"));
} else {
outgoing_swapcoin.hash_preimage.replace(message.preimage);
}

swapcoin_private_keys.push(MultisigPrivkey {
Expand All @@ -627,6 +644,7 @@ impl Maker {
&self,
message: PrivKeyHandover,
) -> Result<(), MakerError> {
// Mark the incoming swapcoins as "done", by adding their's privkey
for swapcoin_private_key in &message.multisig_privkeys {
self.wallet
.write()?
Expand All @@ -636,7 +654,7 @@ impl Maker {
}

// Reset the connection state so watchtowers are not triggered.
let mut conn_state = self.connection_state.lock()?;
let mut conn_state = self.ongoing_swap_state.lock()?;
*conn_state = HashMap::default();

log::info!("initializing Wallet Sync.");
Expand All @@ -652,7 +670,7 @@ impl Maker {
}

fn unexpected_recovery(maker: Arc<Maker>) -> Result<(), MakerError> {
let mut lock_on_state = maker.connection_state.lock()?;
let mut lock_on_state = maker.ongoing_swap_state.lock()?;
for (_, (state, _)) in lock_on_state.iter_mut() {
let mut outgoings = Vec::new();
let mut incomings = Vec::new();
Expand Down
Loading

0 comments on commit 47d0204

Please sign in to comment.