Skip to content

Commit

Permalink
Merge pull request #374 from Wukong247/2025-01-09-heartbeat
Browse files Browse the repository at this point in the history
add address caching and heartbeat to dns and maker respectively
  • Loading branch information
Shourya742 authored Jan 9, 2025
2 parents 8784b05 + 16f9830 commit 84862cf
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 230 deletions.
160 changes: 83 additions & 77 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,95 +803,101 @@ pub(crate) fn recover_from_swap(

// Check for contract confirmations and broadcast timelocked transaction
let mut timelock_boardcasted = Vec::new();
while !maker.shutdown.load(Relaxed) {
for ((outgoing_reedemscript, contract), (timelock, timelocked_tx)) in outgoings.iter() {
// We have already broadcasted this tx, so skip
if timelock_boardcasted.contains(&timelocked_tx) {
continue;
}
// Check if the contract tx has reached required maturity
// Failure here means the transaction hasn't been broadcasted yet. So do nothing and try again.
let tx_from_chain = if let Ok(result) = maker
.wallet
.read()?
.rpc
.get_raw_transaction_info(&contract.compute_txid(), None)
{
log::info!(
"[{}] Contract Txid : {} reached confirmation : {:?}, Required Confirmation : {}",
maker.config.network_port,
contract.compute_txid(),
result.confirmations,
timelock
);
result
} else {
continue;
};

if let Some(confirmation) = tx_from_chain.confirmations {
// Now the transaction is confirmed in a block, check for required maturity
if confirmation > (*timelock as u32) {
log::info!(
"[{}] Timelock maturity of {} blocks reached for Contract Txid : {}",
maker.config.network_port,
timelock,
contract.compute_txid()
);
log::info!(
"[{}] Broadcasting timelocked tx: {}",
maker.config.network_port,
timelocked_tx.compute_txid()
);
maker
.wallet
.read()?
.rpc
.send_raw_transaction(timelocked_tx)
.map_err(WalletError::Rpc)?;
timelock_boardcasted.push(timelocked_tx);
let trigger_count = if cfg!(feature = "integration-test") {
10 / HEART_BEAT_INTERVAL.as_secs() // triggers every 10 secs for tests
} else {
60 / HEART_BEAT_INTERVAL.as_secs() // triggers every 60 secs for prod
};

let outgoing_removed = maker
.wallet
.write()?
.remove_outgoing_swapcoin(outgoing_reedemscript)?
.expect("outgoing swapcoin expected");
let mut i = 0;

while !maker.shutdown.load(Relaxed) {
if i >= trigger_count || i == 0 {
for ((outgoing_reedemscript, contract), (timelock, timelocked_tx)) in outgoings.iter() {
// We have already broadcasted this tx, so skip
if timelock_boardcasted.contains(&timelocked_tx) {
continue;
}
// Check if the contract tx has reached required maturity
// Failure here means the transaction hasn't been broadcasted yet. So do nothing and try again.
let tx_from_chain = if let Ok(result) = maker
.wallet
.read()?
.rpc
.get_raw_transaction_info(&contract.compute_txid(), None)
{
log::info!(
"[{}] Removed Outgoing Swapcoin from Wallet, Contract Txid: {}",
"[{}] Contract Txid : {} reached confirmation : {:?}, Required Confirmation : {}",
maker.config.network_port,
outgoing_removed.contract_tx.compute_txid()
contract.compute_txid(),
result.confirmations,
timelock
);
result
} else {
continue;
};

if let Some(confirmation) = tx_from_chain.confirmations {
// Now the transaction is confirmed in a block, check for required maturity
if confirmation > (*timelock as u32) {
log::info!(
"[{}] Timelock maturity of {} blocks reached for Contract Txid : {}",
maker.config.network_port,
timelock,
contract.compute_txid()
);
log::info!(
"[{}] Broadcasting timelocked tx: {}",
maker.config.network_port,
timelocked_tx.compute_txid()
);
maker
.wallet
.read()?
.rpc
.send_raw_transaction(timelocked_tx)
.map_err(WalletError::Rpc)?;
timelock_boardcasted.push(timelocked_tx);

let outgoing_removed = maker
.wallet
.write()?
.remove_outgoing_swapcoin(outgoing_reedemscript)?
.expect("outgoing swapcoin expected");

log::info!("initializing Wallet Sync.");
{
let mut wallet_write = maker.wallet.write()?;
wallet_write.sync()?;
wallet_write.save_to_disk()?;
log::info!(
"[{}] Removed Outgoing Swapcoin from Wallet, Contract Txid: {}",
maker.config.network_port,
outgoing_removed.contract_tx.compute_txid()
);

log::info!("initializing Wallet Sync.");
{
let mut wallet_write = maker.wallet.write()?;
wallet_write.sync()?;
wallet_write.save_to_disk()?;
}
log::info!("Completed Wallet Sync.");
}
log::info!("Completed Wallet Sync.");
}
}
}

if timelock_boardcasted.len() == outgoings.len() {
// For tests, terminate the maker at this stage.
#[cfg(feature = "integration-test")]
maker.shutdown.store(true, Relaxed);
if timelock_boardcasted.len() == outgoings.len() {
// For tests, terminate the maker at this stage.
#[cfg(feature = "integration-test")]
maker.shutdown.store(true, Relaxed);

log::info!(
"All outgoing transactions claimed back via timelock. Recovery loop exiting."
);
break;
log::info!(
"All outgoing transactions claimed back via timelock. Recovery loop exiting."
);
break;
}
// Reset counter
i = 0;
}

// Sleep before next blockchain scan
let block_lookup_interval = if cfg!(feature = "integration-test") {
Duration::from_secs(10)
} else {
Duration::from_secs(60)
};
std::thread::sleep(block_lookup_interval);
i += 1;
std::thread::sleep(HEART_BEAT_INTERVAL);
}
Ok(())
}
3 changes: 2 additions & 1 deletion src/maker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ impl Default for MakerConfig {
rpc_port: 6103,
min_swap_amount: MIN_SWAP_AMOUNT,
socks_port: 19050,
directory_server_address: "127.0.0.1:8080".to_string(),
directory_server_address:
"bhbzkndgad52ojm75w4goii7xsi6ou73fzyvorxas7swg2snlto4c4ad.onion:8080".to_string(),
#[cfg(feature = "integration-test")]
fidelity_amount: 5_000_000, // 0.05 BTC for tests
#[cfg(feature = "integration-test")]
Expand Down
142 changes: 77 additions & 65 deletions src/maker/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::utill::monitor_log_for_completion;
use crate::maker::error::MakerError;

// Default values for Maker configurations
pub(crate) const _DIRECTORY_SERVERS_REFRESH_INTERVAL_SECS: u64 = 60 * 60 * 12; // 12 Hours
pub(crate) const DIRECTORY_SERVERS_REFRESH_INTERVAL_SECS: u64 = 60 * 15; // 15 minutes

/// Fetches the Maker and DNS address, and sends maker address to the DNS server.
/// Depending upon ConnectionType and test/prod environment, different maker address and DNS addresses are returned.
Expand Down Expand Up @@ -157,54 +157,63 @@ fn network_bootstrap(maker: Arc<Maker>) -> Result<Option<Child>, MakerError> {
metadata: dns_metadata,
};

// Loop until shoutdown is initiated.
while !maker.shutdown.load(Relaxed) {
let stream = match maker.config.connection_type {
ConnectionType::CLEARNET => TcpStream::connect(&dns_address),
#[cfg(feature = "tor")]
ConnectionType::TOR => Socks5Stream::connect(
format!("127.0.0.1:{}", maker.config.socks_port),
dns_address.as_str(),
)
.map(|stream| stream.into_inner()),
};
thread::spawn(move || {
let trigger_count = DIRECTORY_SERVERS_REFRESH_INTERVAL_SECS / HEART_BEAT_INTERVAL.as_secs();
let mut i = 0;

log::info!(
"[{}] Connecting to DNS: {}",
maker.config.network_port,
dns_address
);
while !maker.shutdown.load(Relaxed) {
if i >= trigger_count || i == 0 {
let stream = match maker.config.connection_type {
ConnectionType::CLEARNET => TcpStream::connect(&dns_address),
#[cfg(feature = "tor")]
ConnectionType::TOR => Socks5Stream::connect(
format!("127.0.0.1:{}", maker.config.socks_port),
dns_address.as_str(),
)
.map(|stream| stream.into_inner()),
};

let mut stream = match stream {
Ok(s) => s,
Err(e) => {
log::warn!(
"[{}] TCP connection error with directory, reattempting: {}",
log::info!(
"[{}] Connecting to DNS: {}",
maker.config.network_port,
dns_address
);

let mut stream = match stream {
Ok(s) => s,
Err(e) => {
log::warn!(
"[{}] TCP connection error with directory, reattempting: {}",
maker_port,
e
);
thread::sleep(HEART_BEAT_INTERVAL);
continue;
}
};

if let Err(e) = send_message(&mut stream, &request) {
log::warn!(
"[{}] Failed to send our address to directory, reattempting: {}",
maker_port,
e
);
thread::sleep(HEART_BEAT_INTERVAL);
continue;
}

log::info!(
"[{}] Successfully sent our address to DNS at {}",
maker_port,
e
dns_address
);
thread::sleep(HEART_BEAT_INTERVAL);
continue;
// Reset counter when success
i = 0;
}
};

if let Err(e) = send_message(&mut stream, &request) {
log::warn!(
"[{}] Failed to send our address to directory, reattempting: {}",
maker_port,
e
);
i += 1;
thread::sleep(HEART_BEAT_INTERVAL);
continue;
};

log::info!(
"[{}] Successfully sent our address to dns at {}",
maker_port,
dns_address
);
break;
}
}
});

Ok(tor_handle)
}
Expand Down Expand Up @@ -337,34 +346,37 @@ fn check_connection_with_core(
accepting_clients: Arc<AtomicBool>,
) -> Result<(), MakerError> {
let mut rpc_ping_success = false;
let mut i = 0;
while !maker.shutdown.load(Relaxed) {
// If connection is disrupted keep trying at heart_beat_interval (3 sec).
// If connection is live, keep tring at rpc_ping_interval (60 sec).
match rpc_ping_success {
true => {
sleep(RPC_PING_INTERVAL);
}
false => {
sleep(HEART_BEAT_INTERVAL);
}
}
if let Err(e) = maker.wallet.read()?.rpc.get_blockchain_info() {
log::error!(
"[{}] RPC Connection failed. Reattempting {}",
maker.config.network_port,
e
);
rpc_ping_success = false;
} else {
if !rpc_ping_success {
log::info!(
"[{}] Bitcoin Core RPC connection is live.",
maker.config.network_port
let trigger_count = match rpc_ping_success {
true => RPC_PING_INTERVAL.as_secs() / HEART_BEAT_INTERVAL.as_secs(),
false => 1,
};

if i >= trigger_count || i == 0 {
if let Err(e) = maker.wallet.read()?.rpc.get_blockchain_info() {
log::error!(
"[{}] RPC Connection failed. Reattempting {}",
maker.config.network_port,
e
);
rpc_ping_success = false;
} else {
if !rpc_ping_success {
log::info!(
"[{}] Bitcoin Core RPC connection is back online.",
maker.config.network_port
);
}
rpc_ping_success = true;
}
rpc_ping_success = true;
accepting_clients.store(rpc_ping_success, Relaxed);
i = 0;
}
accepting_clients.store(rpc_ping_success, Relaxed);
i += 1;
thread::sleep(HEART_BEAT_INTERVAL);
}

Ok(())
Expand Down
Loading

0 comments on commit 84862cf

Please sign in to comment.