Skip to content

Commit

Permalink
Implementation of a Mempool Syncer.
Browse files Browse the repository at this point in the history
The syncers get initiated once the Mempool Executors are spawned. One Syncer per Executor is started in order to sync the regular and control transactions separately.
First the Syncer starts to discover which transaction hashes other nodes with a mempool have and compare those with what we have locally.
Then we distribute those unknown hashes among the peers we know have those transactions and retrieve the actual transactions.
Lastly for every received transaction we do a full verification which should add it to our local mempool if everything checks out.
  • Loading branch information
Eligioo committed Aug 19, 2024
1 parent 0c4cbc6 commit e129d1c
Show file tree
Hide file tree
Showing 9 changed files with 749 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ nimiq-account = { workspace = true }
nimiq-block = { workspace = true }
nimiq-blockchain = { workspace = true }
nimiq-blockchain-interface = { workspace = true }
nimiq-consensus = { workspace = true }
nimiq-database = { workspace = true }
nimiq-hash = { workspace = true }
nimiq-keys = { workspace = true }
Expand Down
28 changes: 15 additions & 13 deletions mempool/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub(crate) struct MempoolExecutor<N: Network, T: Topic + Unpin + Sync> {
// Network ID, used for tx verification
network_id: NetworkId,

// Transaction stream that is used to listen to transactions from the network
txn_stream: BoxStream<'static, (Transaction, <N as Network>::PubsubId)>,
// Transaction stream that is used to listen to transactions from the network and the Mempool Syncer
txn_stream: BoxStream<'static, (Transaction, Option<<N as Network>::PubsubId>)>,

// Phantom data for the unused type T
_phantom: PhantomData<T>,
Expand All @@ -58,7 +58,7 @@ impl<N: Network, T: Topic + Unpin + Sync> MempoolExecutor<N, T> {
state: Arc<RwLock<MempoolState>>,
filter: Arc<RwLock<MempoolFilter>>,
network: Arc<N>,
txn_stream: BoxStream<'static, (Transaction, <N as Network>::PubsubId)>,
txn_stream: BoxStream<'static, (Transaction, Option<<N as Network>::PubsubId>)>,
verification_tasks: Arc<AtomicU32>,
) -> Self {
Self {
Expand Down Expand Up @@ -114,16 +114,18 @@ impl<N: Network, T: Topic + Unpin + Sync> Future for MempoolExecutor<N, T> {
)
.await;

let acceptance = match verify_tx_ret {
Ok(_) => MsgAcceptance::Accept,
// Reject the message if signature verification fails or transaction is invalid
// for current validation window
Err(VerifyErr::InvalidTransaction(_)) => MsgAcceptance::Reject,
Err(VerifyErr::AlreadyIncluded) => MsgAcceptance::Reject,
Err(_) => MsgAcceptance::Ignore,
};

network.validate_message::<T>(pubsub_id, acceptance);
if let Some(pubsub_id) = pubsub_id {
let acceptance = match verify_tx_ret {
Ok(_) => MsgAcceptance::Accept,
// Reject the message if signature verification fails or transaction is invalid
// for current validation window
Err(VerifyErr::InvalidTransaction(_)) => MsgAcceptance::Reject,
Err(VerifyErr::AlreadyIncluded) => MsgAcceptance::Reject,
Err(_) => MsgAcceptance::Ignore,
};

network.validate_message::<T>(pubsub_id, acceptance);
}

drop(decrement);
});
Expand Down
2 changes: 2 additions & 0 deletions mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ pub mod mempool;
mod mempool_metrics;
/// Mempool transaction module
pub mod mempool_transactions;
/// Mempool syncer module
mod sync;
/// Verify transaction module
pub mod verify;
44 changes: 36 additions & 8 deletions mempool/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use futures::{
future::{AbortHandle, Abortable},
lock::{Mutex, MutexGuard},
stream::{BoxStream, StreamExt},
stream::{select, BoxStream, StreamExt},
};
use nimiq_account::ReservedBalance;
use nimiq_block::Block;
Expand All @@ -32,6 +32,7 @@ use crate::{
filter::{MempoolFilter, MempoolRules},
mempool_state::{EvictionReason, MempoolState},
mempool_transactions::{MempoolTransactions, TxPriority},
sync::{messages::MempoolTransactionType, MempoolSyncer},
verify::{verify_tx, VerifyErr},
};

Expand Down Expand Up @@ -90,7 +91,7 @@ impl Mempool {
network: Arc<N>,
monitor: Option<TaskMonitor>,
mut handle: MutexGuard<'_, Option<AbortHandle>>,
txn_stream: BoxStream<'static, (Transaction, <N as Network>::PubsubId)>,
txn_stream: BoxStream<'static, (Transaction, Option<<N as Network>::PubsubId>)>,
) {
if handle.is_some() {
// If we already have an executor running, don't do anything
Expand Down Expand Up @@ -144,29 +145,56 @@ impl Mempool {
return;
}

// TODO: get correct peers
// TODO: only get peers that are synced with us
// Sync regular transactions with the mempool of other peers
let regular_transactions_syncer = MempoolSyncer::new(
network.get_peers(),
MempoolTransactionType::Regular,
Arc::clone(&network),
Arc::clone(&self.blockchain),
Arc::clone(&self.state),
);

// Subscribe to the network TX topic
let txn_stream = network.subscribe::<TransactionTopic>().await.unwrap();
let txn_stream = network
.subscribe::<TransactionTopic>()
.await
.unwrap()
.map(|(tx, pubsub_id)| (tx, Some(pubsub_id)))
.boxed();

self.start_executor::<N, TransactionTopic>(
Arc::clone(&network),
monitor,
executor_handle,
txn_stream,
select(regular_transactions_syncer, txn_stream).boxed(),
);

// TODO: get correct peers
// TODO: only get peers that are synced with us
// Sync control transactions with the mempool of other peers
let control_transactions_syncer = MempoolSyncer::new(
network.get_peers(),
MempoolTransactionType::Control,
Arc::clone(&network),
Arc::clone(&self.blockchain),
Arc::clone(&self.state),
);

// Subscribe to the control transaction topic
let txn_stream = network
.subscribe::<ControlTransactionTopic>()
.await
.unwrap()
.map(|(tx, pubsub_id)| (Transaction::from(tx), pubsub_id))
.map(|(tx, pubsub_id)| (Transaction::from(tx), Some(pubsub_id)))
.boxed();

self.start_executor::<N, ControlTransactionTopic>(
network,
control_monitor,
control_executor_handle,
txn_stream,
select(control_transactions_syncer, txn_stream).boxed(),
);
}

Expand All @@ -177,7 +205,7 @@ impl Mempool {
/// stream instead.
pub async fn start_executor_with_txn_stream<N: Network>(
&self,
txn_stream: BoxStream<'static, (Transaction, <N as Network>::PubsubId)>,
txn_stream: BoxStream<'static, (Transaction, Option<<N as Network>::PubsubId>)>,
network: Arc<N>,
) {
self.start_executor::<N, TransactionTopic>(
Expand All @@ -195,7 +223,7 @@ impl Mempool {
/// stream instead.
pub async fn start_control_executor_with_txn_stream<N: Network>(
&self,
txn_stream: BoxStream<'static, (Transaction, <N as Network>::PubsubId)>,
txn_stream: BoxStream<'static, (Transaction, Option<<N as Network>::PubsubId>)>,
network: Arc<N>,
) {
self.start_executor::<N, ControlTransactionTopic>(
Expand Down
85 changes: 85 additions & 0 deletions mempool/src/sync/messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::sync::Arc;

use nimiq_hash::Blake2bHash;
use nimiq_network_interface::{
network::Network,
request::{Handle, RequestCommon, RequestMarker},
};
use nimiq_serde::{Deserialize, Serialize};
use nimiq_transaction::Transaction;
use parking_lot::RwLock;

use crate::mempool_state::MempoolState;

const MAX_REQUEST_RESPONSE_MEMPOOL_STATE: u32 = 1000;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum MempoolTransactionType {
Control,
Regular,
}

/// Request the current transaction hashes in the mempool.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RequestMempoolHashes {
pub transaction_type: MempoolTransactionType,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ResponseMempoolHashes {
pub hashes: Vec<Blake2bHash>,
}

impl RequestCommon for RequestMempoolHashes {
type Kind = RequestMarker;
const TYPE_ID: u16 = 219;
type Response = ResponseMempoolHashes;
const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_MEMPOOL_STATE;
}

/// Request transactions in the mempool based on the provided hashes.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RequestMempoolTransactions {
pub hashes: Vec<Blake2bHash>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ResponseMempoolTransactions {
pub transactions: Vec<Transaction>,
}

impl RequestCommon for RequestMempoolTransactions {
type Kind = RequestMarker;
const TYPE_ID: u16 = 220;
type Response = ResponseMempoolTransactions;
const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_MEMPOOL_STATE;
}

impl<N: Network> Handle<N, Arc<RwLock<MempoolState>>> for RequestMempoolHashes {
fn handle(&self, _: N::PeerId, context: &Arc<RwLock<MempoolState>>) -> ResponseMempoolHashes {
let hashes: Vec<Blake2bHash> = match self.transaction_type {
MempoolTransactionType::Regular => context
.read()
.regular_transactions
.best_transactions
.iter()
.map(|txn| txn.0.clone())
.collect(),
MempoolTransactionType::Control => context
.read()
.control_transactions
.best_transactions
.iter()
.map(|txn| txn.0.clone())
.collect(),
};

ResponseMempoolHashes { hashes }
}
}

impl<N: Network> Handle<N, Arc<RwLock<MempoolState>>> for RequestMempoolTransactions {
fn handle(&self, _: N::PeerId, _: &Arc<RwLock<MempoolState>>) -> ResponseMempoolTransactions {
todo!()
}
}
Loading

0 comments on commit e129d1c

Please sign in to comment.