Skip to content

Commit

Permalink
chore: cleaning up message protocol for better consistency (#686)
Browse files Browse the repository at this point in the history
* Look at all messages for timeout

* Added garbage collection to messages

* Made presignature message not have leftover messages

* Made signature message not have leftover messages

* Make signature manager be consistent with garbage_collect

* Clippy

* Removed unused code
  • Loading branch information
ChaoticTempest authored Jul 18, 2024
1 parent 80fab52 commit 522abfb
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 210 deletions.
340 changes: 196 additions & 144 deletions chain-signatures/node/src/protocol/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::cryptography::CryptographicError;
use super::presignature::{self, PresignatureId};
use super::presignature::{GenerationError, PresignatureId};
use super::state::{GeneratingState, NodeState, ResharingState, RunningState};
use super::triple::TripleId;
use crate::gcp::error::SecretStorageError;
Expand All @@ -17,7 +17,6 @@ use near_primitives::hash::CryptoHash;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;

#[async_trait::async_trait]
Expand Down Expand Up @@ -236,45 +235,24 @@ impl MessageHandler for RunningState {

// remove the triple_id that has already failed or taken from the triple_bins
// and refresh the timestamp of failed and taken
queue
.triple_bins
.entry(self.epoch)
.or_default()
.retain(|id, queue| {
if let Some(first_msg) = queue.front() {
// Skip this triple if its message already timed out
if util::is_elapsed_longer_than_timeout(
first_msg.timestamp,
let triple_messages = queue.triple_bins.entry(self.epoch).or_default();
triple_messages.retain(|id, queue| {
if queue.is_empty()
|| queue.iter().any(|msg| {
util::is_elapsed_longer_than_timeout(
msg.timestamp,
crate::types::PROTOCOL_TRIPLE_TIMEOUT,
) {
return false;
}
} else {
return false;
}
let has_failed = triple_manager.failed_triples.contains_key(id);
if has_failed {
triple_manager.failed_triples.insert(*id, Instant::now());
}
let is_taken = triple_manager.taken.contains_key(id);
if is_taken {
triple_manager.taken.insert(*id, Instant::now());
}
!has_failed && !is_taken
});

for (id, queue) in queue.triple_bins.entry(self.epoch).or_default() {
if let Some(first_msg) = queue.front() {
// Skip this triple if its message already timed out
if util::is_elapsed_longer_than_timeout(
first_msg.timestamp,
crate::types::PROTOCOL_TRIPLE_TIMEOUT,
) {
continue;
}
} else {
continue;
)
})
{
return false;
}

// if triple id is in GC, remove these messages because the triple is currently
// being GC'ed, where this particular triple has previously failed or been utilized.
!triple_manager.refresh_gc(id)
});
for (id, queue) in triple_messages {
let protocol = match triple_manager.get_or_generate(*id, participants) {
Ok(protocol) => protocol,
Err(err) => {
Expand All @@ -293,126 +271,200 @@ impl MessageHandler for RunningState {
}

let mut presignature_manager = self.presignature_manager.write().await;
for (id, queue) in queue.presignature_bins.entry(self.epoch).or_default() {
let mut leftover_messages = Vec::new();
while let Some(message) = queue.pop_front() {
// Skip message if it already timed out
if util::is_elapsed_longer_than_timeout(
message.timestamp,
crate::types::PROTOCOL_PRESIG_TIMEOUT,
) {
let presignature_messages = queue.presignature_bins.entry(self.epoch).or_default();
presignature_messages.retain(|id, queue| {
// Skip message if it already timed out
if queue.is_empty()
|| queue.iter().any(|msg| {
util::is_elapsed_longer_than_timeout(
msg.timestamp,
crate::types::PROTOCOL_PRESIG_TIMEOUT,
)
})
{
return false;
}

// if presignature id is in GC, remove these messages because the presignature is currently
// being GC'ed, where this particular presignature has previously failed or been utilized.
!presignature_manager.refresh_gc(id)
});
for (id, queue) in presignature_messages {
// SAFETY: this unwrap() is safe since we have already checked that the queue is not empty.
let PresignatureMessage {
triple0, triple1, ..
} = queue.front().unwrap();

if !queue
.iter()
.all(|msg| triple0 == &msg.triple0 && triple1 == &msg.triple1)
{
// Check that all messages in the queue have the same triple0 and triple1, otherwise this is an
// invalid message, so we should just bin the whole entire protocol and its message for this presignature id.
queue.clear();
continue;
}

let protocol = match presignature_manager
.get_or_generate(
participants,
*id,
*triple0,
*triple1,
&mut triple_manager,
&self.public_key,
&self.private_share,
)
.await
{
Ok(protocol) => protocol,
Err(GenerationError::TripleIsGenerating(_)) => {
// We will go back to this presignature bin later when the triple is generated.
continue;
}

match presignature_manager
.get_or_generate(
participants,
*id,
message.triple0,
message.triple1,
&mut triple_manager,
&self.public_key,
&self.private_share,
)
.await
{
Ok(protocol) => protocol.message(message.from, message.data),
Err(presignature::GenerationError::AlreadyGenerated) => {
tracing::debug!(id, "presignature already generated, nothing left to do")
}
Err(presignature::GenerationError::TripleIsGenerating(_)) => {
// Store the message until triple gets generated
leftover_messages.push(message)
}
Err(presignature::GenerationError::TripleIsMissing(_)) => {
// If a triple is missing, that means our system cannot process this presignature. We will have to bin
// this message and have the other node timeout on that generation.
tracing::warn!(
presignature_id = id,
triple0 = message.triple0,
triple1 = message.triple1,
"unable to process presignature: one or more triples are missing",
);
}
Err(presignature::GenerationError::CaitSithInitializationError(error)) => {
// ignore the message since the generation had bad parameters. Also have the other node who
// initiated the protocol resend the message or have it timeout on their side.
tracing::warn!(
presignature_id = id,
?error,
"unable to initialize incoming presignature protocol"
);
continue;
}
Err(
err @ (GenerationError::AlreadyGenerated
| GenerationError::TripleIsGarbageCollected(_)
| GenerationError::TripleIsMissing(_)),
) => {
// This triple has already been generated or removed from the triple manager, so we will have to bin
// the entirety of the messages we received for this presignature id, and have the other nodes timeout
tracing::debug!(id, ?err, "presignature cannot be generated");
queue.clear();
continue;
}
}
if !leftover_messages.is_empty() {
tracing::warn!(
msg_count = leftover_messages.len(),
"unable to process messages, storing for future"
);
queue.extend(leftover_messages);
Err(GenerationError::CaitSithInitializationError(error)) => {
// ignore these messages since the generation had bad parameters. Also have the other node who
// initiated the protocol resend the message or have it timeout on their side.
tracing::warn!(
presignature_id = id,
?error,
"unable to initialize incoming presignature protocol"
);
queue.clear();
continue;
}
Err(err) => {
tracing::warn!(
presignature_id = id,
?err,
"Unexpected error encounted while generating presignature"
);
queue.clear();
continue;
}
};

while let Some(message) = queue.pop_front() {
protocol.message(message.from, message.data);
}
}

let mut signature_manager = self.signature_manager.write().await;
for (receipt_id, queue) in queue.signature_bins.entry(self.epoch).or_default() {
let mut leftover_messages = Vec::new();
while let Some(message) = queue.pop_front() {
// Skip message if it already timed out
if util::is_elapsed_longer_than_timeout(
message.timestamp,
crate::types::PROTOCOL_SIGNATURE_TIMEOUT,
) {
let signature_messages = queue.signature_bins.entry(self.epoch).or_default();
signature_messages.retain(|_, queue| {
// Skip message if it already timed out
if queue.is_empty()
|| queue.iter().any(|msg| {
util::is_elapsed_longer_than_timeout(
msg.timestamp,
crate::types::PROTOCOL_SIGNATURE_TIMEOUT,
)
})
{
return false;
}
true
});
for (receipt_id, queue) in signature_messages {
// SAFETY: this unwrap() is safe since we have already checked that the queue is not empty.
let SignatureMessage {
proposer,
presignature_id,
request,
epsilon,
delta,
..
} = queue.front().unwrap();

if !queue
.iter()
.all(|msg| presignature_id == &msg.presignature_id)
{
// Check that all messages in the queue have the same triple0 and triple1, otherwise this is an
// invalid message, so we should just bin the whole entire protocol and its message for this presignature id.
queue.clear();
continue;
}

// if !self
// .sign_queue
// .read()
// .await
// .contains(message.proposer, receipt_id.clone())
// {
// leftover_messages.push(message);
// continue;
// };
// TODO: Validate that the message matches our sign_queue
let protocol = match signature_manager.get_or_generate(
participants,
*receipt_id,
*proposer,
*presignature_id,
request,
*epsilon,
*delta,
&mut presignature_manager,
) {
Ok(protocol) => protocol,
Err(GenerationError::PresignatureIsGenerating(_)) => {
// We will revisit this this signature request later when the presignature has been generated.
continue;
}

// TODO: make consistent with presignature manager AlreadyGenerated.
if signature_manager.has_completed(&message.presignature_id) {
tracing::info!(
presignature_id = message.presignature_id,
"signature already generated, nothing left to do"
Err(
err @ (GenerationError::AlreadyGenerated
| GenerationError::PresignatureIsGarbageCollected(_)
| GenerationError::PresignatureIsMissing(_)),
) => {
// We will have to remove the entirety of the messages we received for this signature request,
// and have the other nodes timeout in the following cases:
// - If a presignature is in GC, then it was used already or failed to be produced.
// - If a presignature is missing, that means our system cannot process this signature.
tracing::debug!(%receipt_id, ?err, "signature cannot be generated");
queue.clear();
continue;
}
Err(GenerationError::CaitSithInitializationError(error)) => {
// ignore the whole of the messages since the generation had bad parameters. Also have the other node who
// initiated the protocol resend the message or have it timeout on their side.
tracing::warn!(
?receipt_id,
presignature_id,
?error,
"unable to initialize incoming signature protocol"
);
queue.clear();
continue;
}
// if !self
// .sign_queue
// .read()
// .await
// .contains(message.proposer, receipt_id.clone())
// {
// leftover_messages.push(message);
// continue;
// };
// TODO: Validate that the message matches our sign_queue
match signature_manager.get_or_generate(
participants,
*receipt_id,
message.proposer,
message.presignature_id,
message.request.clone(),
message.epsilon,
message.delta,
&mut presignature_manager,
)? {
Some(protocol) => protocol.message(message.from, message.data),
None => {
// Store the message until we are ready to process it
leftover_messages.push(message)
}
Err(err) => {
tracing::warn!(
?receipt_id,
?err,
"Unexpected error encounted while generating signature"
);
queue.clear();
continue;
}
}
if !leftover_messages.is_empty() {
tracing::warn!(
msg_count = leftover_messages.len(),
"unable to process messages, storing for future"
);
queue.extend(leftover_messages);
};

while let Some(message) = queue.pop_front() {
protocol.message(message.from, message.data);
}
}
triple_manager.clear_failed_triples();
triple_manager.clear_taken();
presignature_manager.clear_taken();
triple_manager.garbage_collect();
presignature_manager.garbage_collect();
signature_manager.garbage_collect();
Ok(())
}
}
Expand Down
Loading

0 comments on commit 522abfb

Please sign in to comment.