Skip to content

Commit

Permalink
Improve reporting and fix endpoint duplication detection bug
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-casperlabs committed May 19, 2020
1 parent 4aaa7c7 commit 1bdd639
Showing 1 changed file with 22 additions and 18 deletions.
40 changes: 22 additions & 18 deletions src/components/small_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ where
};
let our_fp = our_endpoint.cert.public_key_fingerprint();

// Run the server task.
info!(%our_endpoint, "starting server background task");
let mut effects = server_task(eq, tokio::net::TcpListener::from_std(listener)?)
.boxed()
.ignore();
let model = SmallNetwork {
cfg,
signed_endpoints: hashmap! { our_fp => Signed::new(&our_endpoint, &private_key)? },
Expand All @@ -191,12 +196,6 @@ where
outgoing: collections::HashMap::new(),
};

// Run the server task.
info!(?addr, "starting server background task");
let mut effects = server_task(eq, tokio::net::TcpListener::from_std(listener)?)
.boxed()
.ignore();

// Connect to the root node (even if we are the root node, just loopback).
effects.extend(model.connect_to_root());

Expand Down Expand Up @@ -366,15 +365,15 @@ where
fn update_endpoint(&mut self, endpoint: &Endpoint) -> Option<NodeId> {
let fp = endpoint.cert.public_key_fingerprint();

let mut rv = None;
self.endpoints.entry(fp).and_modify(|prev| {
if endpoint > prev {
rv = Some(fp);
*prev = endpoint.clone();
if let Some(prev) = self.endpoints.get(&fp) {
if prev >= endpoint {
// Still up to date or stale, do nothing.
return None;
}
});
}

rv
self.endpoints.insert(fp, endpoint.clone());
Some(fp)
}

/// Update internal endpoint store and if new, output a `BroadcastEndpoint` effect.
Expand All @@ -385,15 +384,15 @@ where
) -> Multiple<Effect<Event<P>>> {
match signed.validate_self_signed(|endpoint| Ok(endpoint.cert.public_key())) {
Ok(endpoint) => {
// Endpoint is valid, check if it was new.
if let Some(node_id) = self.update_endpoint(&endpoint) {
debug!("new endpoint {}", endpoint);
// We learned of a new endpoint. We store it and note whether it is the first
// endpoint for the node.
self.signed_endpoints.insert(node_id, signed.clone());
self.endpoints.insert(node_id, endpoint.clone());

self.broadcast_message(Message::BroadcastEndpoint(signed));

if self.outgoing.remove(&node_id).is_none() {
let effect = if self.outgoing.remove(&node_id).is_none() {
info!(%node_id, ?endpoint, "new outgoing channel");
// Initiate the connection process once we learn of a new node ID.
connect_outgoing(endpoint, self.cert.clone(), self.private_key.clone())
Expand All @@ -409,10 +408,15 @@ where
// There was a previous endpoint, whose sender has now been dropped. This
// will cause the sender task to exit and trigger a reconnect.

info!(%node_id, ?endpoint, "endpoint changed");
info!(%endpoint, "endpoint changed");
Multiple::new()
}
};

self.broadcast_message(Message::BroadcastEndpoint(signed));

effect
} else {
debug!("known endpoint: {}", endpoint);
Multiple::new()
}
}
Expand Down

0 comments on commit 1bdd639

Please sign in to comment.