Skip to content

Commit

Permalink
ValidatorNetwork: resolve validator peer ids from peer contact book
Browse files Browse the repository at this point in the history
  • Loading branch information
styppo committed Nov 25, 2024
1 parent e87f645 commit 2a6b9b8
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 21 deletions.
4 changes: 2 additions & 2 deletions handel/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl std::fmt::Debug for Protocol {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound = "C: AggregatableContribution")]
struct SerializableLevelUpdate<C: AggregatableContribution> {
aggregate: C,
Expand Down Expand Up @@ -161,7 +161,7 @@ impl<C: AggregatableContribution> From<LevelUpdate<C>> for SerializableLevelUpda
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound = "C: AggregatableContribution")]
struct Update<C: AggregatableContribution>(pub SerializableLevelUpdate<C>);

Expand Down
2 changes: 1 addition & 1 deletion network-interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub trait Network: Send + Sync + Unpin + 'static {

/// Returns all peer ids that are known for the given validator.
/// The returned list might contain unverified mappings.
fn get_peers_by_validator(&self, validator_address: Address) -> Vec<Self::PeerId>;
fn get_peers_by_validator(&self, validator_address: &Address) -> Vec<Self::PeerId>;

/// Returns true when the given peer provides the services flags that are required by us
fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool;
Expand Down
2 changes: 1 addition & 1 deletion network-interface/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl RequestKind for MessageMarker {
}

pub trait RequestCommon:
Serialize + Deserialize + Send + Sync + Unpin + fmt::Debug + 'static
Serialize + Deserialize + Send + Sync + Unpin + fmt::Debug + Clone + 'static
{
type Kind: RequestKind;
const TYPE_ID: u16;
Expand Down
6 changes: 4 additions & 2 deletions network-libp2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,10 @@ impl NetworkInterface for Network {
Ok(filtered_peers)
}

fn get_peers_by_validator(&self, validator_address: Address) -> Vec<Self::PeerId> {
todo!()
fn get_peers_by_validator(&self, validator_address: &Address) -> Vec<Self::PeerId> {
self.contacts
.read()
.get_validator_peer_ids(validator_address)
}

fn peer_provides_required_services(&self, peer_id: PeerId) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion network-mock/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ impl Network for MockNetwork {
Ok(self.get_peers())
}

fn get_peers_by_validator(&self, _validator_address: Address) -> Vec<Self::PeerId> {
fn get_peers_by_validator(&self, _validator_address: &Address) -> Vec<Self::PeerId> {
// TODO
vec![]
}
Expand Down
55 changes: 42 additions & 13 deletions validator-network/src/network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use nimiq_network_interface::{
};
use nimiq_primitives::slots_allocation::{Validator, Validators};
use nimiq_serde::{Deserialize, Serialize};
use nimiq_utils::spawn;
use nimiq_utils::{spawn, stream::FuturesUnordered};
use parking_lot::RwLock;
use time::OffsetDateTime;

use super::{MessageStream, NetworkError, PubsubId, ValidatorNetwork};
use crate::NetworkError::UnknownValidator;

/// Validator `PeerId` cache state
#[derive(Clone, Copy)]
Expand Down Expand Up @@ -275,7 +276,7 @@ where
///
/// This makes it easier for the recipient to check that the sender is indeed a
/// currently elected validator.
#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
struct ValidatorMessage<M> {
validator_id: u16,
inner: M,
Expand Down Expand Up @@ -350,25 +351,53 @@ where
validator_id: self.local_validator_id()?,
inner: msg,
};

// Use the last known peer ID, knowing that it might be already outdated.
// The network doesn't have a way to know if a record is outdated but we mark
// The network doesn't have a way to know if a record is outdated, but we mark
// them as potentially outdated when a request/response error happens.
// If the cache has a potentially outdated value, it will be updated soon
// and then available to use by future calls to this function.
let peer_id = self
let cached_peer_id = self
.get_validator_cache(validator_id)
.potentially_outdated_peer_id()
.ok_or_else(|| NetworkError::UnknownValidator(validator_id))?;

self.network
.message(msg, peer_id)
.map_err(|e| {
.potentially_outdated_peer_id();
if let Some(peer_id) = cached_peer_id {
if let Err(e) = self.network.message(msg.clone(), peer_id).await {
// The validator peer id might have changed and thus caused a connection failure.
self.clear_validator_peer_id_cache_on_error(validator_id, &e, &peer_id);
} else {
return Ok(());
}
}

NetworkError::Request(e)
})
.await
// Try all validator peer_ids from our peer contact book.
let our_address = {
let own_validator_id = self.own_validator_id.read();
let Some(own_validator_id) = *own_validator_id else {
return Err(UnknownValidator(validator_id));
};
let validators = self.validators.read();
let Some(validators) = validators.as_ref() else {
return Err(UnknownValidator(validator_id));
};
validators
.get_validator_by_slot_band(own_validator_id)
.address
.clone()
};

let mut futures = FuturesUnordered::new();
for peer_id in self.network.get_peers_by_validator(&our_address) {
let network = Arc::clone(&self.network);
let msg = msg.clone();
futures.push(async move { network.message(msg, peer_id).await });
}

let results = futures.collect::<Vec<Result<(), RequestError>>>().await;
if results.iter().any(|result| result.is_ok()) {
Ok(())
} else {
Err(UnknownValidator(validator_id))
}
}

async fn request<TRequest: Request>(
Expand Down
2 changes: 1 addition & 1 deletion validator/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use nimiq_validator::aggregation::{
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
struct SkipBlockMessage(SerializableLevelUpdate<SignedSkipBlockMessage>);

impl RequestCommon for SkipBlockMessage {
Expand Down

0 comments on commit 2a6b9b8

Please sign in to comment.