Skip to content

Commit

Permalink
Mempool syncer: minor improvements
Browse files Browse the repository at this point in the history
- Only try to construct mempool transaction requests if new hashes are discovered
- Check the mempool state first before checking the blockchain if a transaction is already known/included
- Bound the amount of received hashes that will be processed per peer
- Fix tests
  • Loading branch information
Eligioo committed Oct 9, 2024
1 parent a41c16a commit 2011b40
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 34 deletions.
11 changes: 6 additions & 5 deletions mempool/mempool-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,20 @@ impl<N: Network> MempoolTask<N> {
) -> Self {
let consensus_event_rx = consensus.subscribe_events();
let network_event_rx = consensus.network.subscribe_events();
let blockchain_event_rx = blockchain.read().notifier_as_stream();

let proxy = consensus.proxy();
let sync_event_rx = proxy.subscribe_sync_events();

let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers());
let mempool = Arc::new(Mempool::new(
Arc::clone(&blockchain),
mempool_config,
Arc::clone(&consensus.network),
));
let mempool_active = false;

let blockchain_event_rx = blockchain.read().notifier_as_stream();

let proxy = consensus.proxy();
let sync_event_rx = proxy.subscribe_sync_events();
let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers());

Self {
consensus: proxy,
peers_in_live_sync,
Expand Down
48 changes: 29 additions & 19 deletions mempool/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<N: Network> HashRequestStatus<N> {
}

const MAX_HASHES_PER_REQUEST: usize = 500;
const MAX_TOTAL_TRANSACTIONS: usize = 5000;
const MAX_TOTAL_HASHES: usize = 25_000;
const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes

/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have a mempool
Expand All @@ -85,7 +85,7 @@ pub(crate) struct MempoolSyncer<N: Network> {
shutdown_timer: Pin<Box<Sleep>>,

/// Consensus sync event receiver
consensus_sync_event_rx: BroadcastStream<SyncEvent<<N as Network>::PeerId>>,
consensus_sync_event_rx: BroadcastStream<SyncEvent<<N>::PeerId>>,

/// Blockchain reference
blockchain: Arc<RwLock<Blockchain>>,
Expand Down Expand Up @@ -161,30 +161,34 @@ impl<N: Network> MempoolSyncer<N> {
}

/// Push newly discovered hashes into the `unknown_hashes` and keep track which peers have those hashes
fn push_unknown_hashes(&mut self, hashes: Vec<Blake2bHash>, peer_id: N::PeerId) {
fn push_unknown_hashes(&mut self, hashes: Vec<Blake2bHash>, peer_id: N::PeerId) -> bool {
let blockchain = self.blockchain.read();
let state = self.mempool_state.read();

debug!(peer_id = %peer_id, num = %hashes.len(), "Received unknown mempool hashes");
let mut new_hashes_discovered = false;

hashes.into_iter().for_each(|hash| {
hashes.into_iter().take(MAX_TOTAL_HASHES).for_each(|hash| {
// Perform some basic checks to reduce the amount of transactions we are going to request later
// TODO: what if I respond with MAX_TOTAL_TRANSACTIONS fake hashes
if self.unknown_hashes.len() < MAX_TOTAL_TRANSACTIONS
&& !blockchain
if state.contains(&hash)
|| blockchain
.contains_tx_in_validity_window(&RawTransactionHash::from((hash).clone()), None)
&& !state.contains(&hash)
{
match self.unknown_hashes.entry(hash) {
Occupied(mut entry) => {
entry.get_mut().add_peer(peer_id);
}
Vacant(entry) => {
entry.insert(HashRequestStatus::new(vec![peer_id]));
}
};
return;
}
})

match self.unknown_hashes.entry(hash) {
Occupied(mut entry) => {
entry.get_mut().add_peer(peer_id);
}
Vacant(entry) => {
entry.insert(HashRequestStatus::new(vec![peer_id]));
new_hashes_discovered = true;
}
};
});

new_hashes_discovered
}

/// Add peer to discover its mempool
Expand All @@ -193,6 +197,7 @@ impl<N: Network> MempoolSyncer<N> {
return;
}

debug!(%peer_id, "Peer added to mempool sync");
self.peers.push(peer_id);
let network = Arc::clone(&self.network);
let transaction_type = self.mempool_transaction_type.clone();
Expand Down Expand Up @@ -346,10 +351,13 @@ impl<N: Network> Stream for MempoolSyncer<N> {
}

// Then we check our RequestMempoolHashes responses
let mut new_hashes_discovered = false;
while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) {
match result {
Ok(hashes) => {
self.push_unknown_hashes(hashes.hashes, peer_id);
if self.push_unknown_hashes(hashes.hashes, peer_id) {
new_hashes_discovered = true;
}
}
Err(err) => {
error!(%err, %peer_id, "Failed to fetch mempool hashes");
Expand All @@ -358,7 +366,9 @@ impl<N: Network> Stream for MempoolSyncer<N> {
}

// Then we construct our RequestMempoolTransactions requests and send them over the network to our peers
self.send_mempool_transactions_requests();
if new_hashes_discovered {
self.send_mempool_transactions_requests();
}

// Then we check our RequestMempoolTransactions responses
while let Poll::Ready(Some((peer_id, result))) =
Expand Down
4 changes: 2 additions & 2 deletions mempool/src/sync/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ mod tests {

// Load known hashes into the mempool state
let mut handle = state.write();
known_txns.iter().for_each(|txn| {
handle.regular_transactions.insert(&txn, TxPriority::Medium);
known_txns.into_iter().for_each(|txn| {
handle.regular_transactions.insert(txn, TxPriority::Medium);
});
assert_eq!(handle.regular_transactions.len(), known_hashes.len());
drop(handle);
Expand Down
33 changes: 25 additions & 8 deletions mempool/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use nimiq_mempool::{
config::MempoolConfig, executor::PubsubIdOrPeerId, mempool::Mempool,
mempool_transactions::TxPriority,
};
use nimiq_network_interface::network::PubsubId;
use nimiq_network_mock::{MockHub, MockId, MockNetwork, MockPeerId};
use nimiq_primitives::{coin::Coin, networks::NetworkId, policy::Policy};
use nimiq_serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -871,10 +872,14 @@ async fn multiple_start_stop() {
let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64);

// Create mempool and subscribe with a custom txn stream.
let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default());
let mut hub = MockHub::new();
let mock_id = MockId::new(hub.new_address().into());
let mock_network = Arc::new(hub.new_network());
let mempool = Mempool::new(
Arc::clone(&blockchain),
MempoolConfig::default(),
Arc::clone(&mock_network),
);
let mock_id = MockId::new(hub.new_address().into());

// Subscribe mempool with the mpsc stream created
mempool
Expand All @@ -891,7 +896,10 @@ async fn multiple_start_stop() {
tokio::task::spawn(async move {
for txn in txns {
txn_stream_tx1
.send((txn.clone(), mock_id1.clone()))
.send((
txn.clone(),
PubsubIdOrPeerId::PeerId(mock_id1.propagation_source().clone()),
))
.await
.unwrap();
}
Expand All @@ -915,7 +923,10 @@ async fn multiple_start_stop() {
tokio::task::spawn(async move {
for txn in txns {
txn_stream_tx
.send((txn.clone(), mock_id.clone()))
.send((
txn.clone(),
PubsubIdOrPeerId::PeerId(mock_id.propagation_source()),
))
.await
.expect_err("Send should fail, executor is stopped");
}
Expand All @@ -940,10 +951,14 @@ async fn multiple_start_stop() {
let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64);

// Create mempool and subscribe with a custom txn stream.
let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default());
let mut hub = MockHub::new();
let mock_id = MockId::new(hub.new_address().into());
let mock_network = Arc::new(hub.new_network());
let mempool = Mempool::new(
Arc::clone(&blockchain),
MempoolConfig::default(),
mock_network.clone(),
);
let mock_id = MockId::new(hub.new_address().into());

// Subscribe mempool with the mpsc stream created
mempool
Expand All @@ -958,7 +973,10 @@ async fn multiple_start_stop() {
tokio::task::spawn(async move {
for txn in txns {
txn_stream_tx
.send((txn.clone(), mock_id.clone()))
.send((
txn.clone(),
PubsubIdOrPeerId::PeerId(mock_id.propagation_source()),
))
.await
.unwrap();
}
Expand Down Expand Up @@ -1947,7 +1965,6 @@ async fn applies_total_tx_size_limits() {
..Default::default()
};
let mut hub = MockHub::new();
let mock_id = MockId::new(hub.new_address().into());
let mock_network = Arc::new(hub.new_network());
let mempool = Mempool::new(blockchain, mempool_config, Arc::clone(&mock_network));

Expand Down

0 comments on commit 2011b40

Please sign in to comment.