Skip to content

Commit

Permalink
Fixed cases that did not need to error early
Browse files Browse the repository at this point in the history
  • Loading branch information
ChaoticTempest committed Jun 6, 2024
1 parent 9cd8708 commit 5c486d4
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 35 deletions.
24 changes: 16 additions & 8 deletions chain-signatures/node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,10 @@ impl CryptographicProtocol for RunningState {
crate::metrics::MESSAGE_QUEUE_SIZE
.with_label_values(&[my_account_id.as_str()])
.set(messages.len() as i64);
triple_manager.stockpile(active)?;
for (p, msg) in triple_manager.poke().await? {
if let Err(err) = triple_manager.stockpile(active) {
tracing::warn!(?err, "running: failed to stockpile triples");
}
for (p, msg) in triple_manager.poke().await {
let info = self.fetch_participant(&p)?;
messages.push(info.clone(), MpcMessage::Triple(msg));
}
Expand All @@ -384,16 +386,19 @@ impl CryptographicProtocol for RunningState {
.set(triple_manager.ongoing.len() as i64);

let mut presignature_manager = self.presignature_manager.write().await;
presignature_manager
if let Err(err) = presignature_manager
.stockpile(
active,
&self.public_key,
&self.private_share,
&mut triple_manager,
)
.await?;
.await
{
tracing::warn!(?err, "running: failed to stockpile presignatures");
}
drop(triple_manager);
for (p, msg) in presignature_manager.poke()? {
for (p, msg) in presignature_manager.poke() {
let info = self.fetch_participant(&p)?;
messages.push(info.clone(), MpcMessage::Presignature(msg));
}
Expand Down Expand Up @@ -424,22 +429,25 @@ impl CryptographicProtocol for RunningState {
active,
my_requests,
&mut presignature_manager,
)?;
);
drop(sign_queue);
drop(presignature_manager);

for (p, msg) in signature_manager.poke() {
let info = self.fetch_participant(&p)?;
messages.push(info.clone(), MpcMessage::Signature(msg));
}
signature_manager
if let Err(err) = signature_manager
.publish(
ctx.rpc_client(),
ctx.signer(),
ctx.mpc_contract_id(),
&my_account_id,
)
.await?;
.await
{
tracing::warn!(?err, "running: failed to publish signatures");
}
drop(signature_manager);
let failures = messages
.send_encrypted(
Expand Down
27 changes: 20 additions & 7 deletions chain-signatures/node/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,17 @@ impl MessageHandler for RunningState {
});

for (id, queue) in queue.triple_bins.entry(self.epoch).or_default() {
if let Some(protocol) = triple_manager.get_or_generate(*id, participants)? {
let protocol = match triple_manager.get_or_generate(*id, participants) {
Ok(protocol) => protocol,
Err(err) => {
// ignore the message since the generation had bad parameters. Also have the other node who
// initiated the protocol resend the message or have it timeout on their side.
tracing::warn!(?err, "unable to initialize incoming triple protocol");
continue;
}
};

if let Some(protocol) = protocol {
while let Some(message) = queue.pop_front() {
protocol.message(message.from, message.data);
}
Expand Down Expand Up @@ -281,7 +291,7 @@ impl MessageHandler for RunningState {
{
Ok(protocol) => protocol.message(message.from, message.data),
Err(presignature::GenerationError::AlreadyGenerated) => {
tracing::info!(id, "presignature already generated, nothing left to do")
tracing::debug!(id, "presignature already generated, nothing left to do")
}
Err(presignature::GenerationError::TripleIsGenerating(_)) => {
// Store the message until triple gets generated
Expand All @@ -292,7 +302,14 @@ impl MessageHandler for RunningState {
leftover_messages.push(message)
}
Err(presignature::GenerationError::CaitSithInitializationError(error)) => {
return Err(error.into())
// ignore the message since the generation had bad parameters. Also have the other node who
// initiated the protocol resend the message or have it timeout on their side.
tracing::warn!(
presignature_id = id,
?error,
"unable to initialize incoming presignature protocol"
);
continue;
}
Err(presignature::GenerationError::DatastoreStorageError(_)) => {
// Store the message until we are ready to process it
Expand Down Expand Up @@ -320,10 +337,6 @@ impl MessageHandler for RunningState {
) {
continue;
}
tracing::info!(
presignature_id = message.presignature_id,
"new signature message"
);

// TODO: make consistent with presignature manager AlreadyGenerated.
if signature_manager.has_completed(&message.presignature_id) {
Expand Down
26 changes: 21 additions & 5 deletions chain-signatures/node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,18 @@ impl PresignatureManager {
private_share: &SecretKeyShare,
) -> Result<(), InitializationError> {
let id = rand::random();
tracing::info!(id, "starting protocol to generate a new presignature");

// Check if the `id` is already in the system. Error out and have the next cycle try again.
if self.generators.contains_key(&id)
|| self.presignatures.contains_key(&id)
|| self.taken.contains_key(&id)
{
return Err(InitializationError::BadParameters(format!(
"id collision: presignature_id={id}"
)));
}

tracing::debug!(id, "starting protocol to generate a new presignature");
let generator = Self::generate_internal(
participants,
self.me,
Expand Down Expand Up @@ -384,16 +395,16 @@ impl PresignatureManager {
/// messages to be sent to the respective participant.
///
/// An empty vector means we cannot progress until we receive a new message.
pub fn poke(&mut self) -> Result<Vec<(Participant, PresignatureMessage)>, ProtocolError> {
pub fn poke(&mut self) -> Vec<(Participant, PresignatureMessage)> {
let mut messages = Vec::new();
let mut result = Ok(());
let mut errors = Vec::new();
self.generators.retain(|id, generator| {
loop {
let action = match generator.poke() {
Ok(action) => action,
Err(e) => {
self.introduced.remove(id);
result = Err(e);
errors.push(e);
break false;
}
};
Expand Down Expand Up @@ -467,6 +478,11 @@ impl PresignatureManager {
}
}
});
result.map(|_| messages)

if !errors.is_empty() {
tracing::warn!(?errors, "faled to generate some presignatures");
}

messages
}
}
19 changes: 12 additions & 7 deletions chain-signatures/node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ impl SignatureManager {
active: &Participants,
my_requests: &mut HashMap<CryptoHash, SignRequest>,
presignature_manager: &mut PresignatureManager,
) -> Result<(), super::CryptographicError> {
) {
let mut failed_presigs = Vec::new();
while let Some(mut presignature) = {
if self.failed.is_empty() && my_requests.is_empty() {
Expand All @@ -472,18 +472,22 @@ impl SignatureManager {
failed_presigs.push(presignature);
continue;
}
let presig_id = presignature.id;

// NOTE: this prioritizes old requests first then tries to do new ones if there's enough presignatures.
// TODO: we need to decide how to prioritize certain requests over others such as with gas or time of
// when the request made it into the NEAR network.
// issue: https://github.com/near/mpc-recovery/issues/596
if let Some((receipt_id, failed_req)) = self.failed.pop_front() {
self.retry_failed_generation(
if let Err(err) = self.retry_failed_generation(
receipt_id,
failed_req,
presignature,
&sig_participants,
)?;
) {
tracing::warn!(%receipt_id, presig_id, ?err, "failed to retry signature generation: trashing presignature");
continue;
}

if let Some(another_presignature) = presignature_manager.take_mine() {
presignature = another_presignature;
Expand All @@ -500,24 +504,25 @@ impl SignatureManager {
failed_presigs.push(presignature);
continue;
};
self.generate(
if let Err(err) = self.generate(
&sig_participants,
receipt_id,
presignature,
my_request.request,
my_request.epsilon,
my_request.delta,
my_request.time_added,
)?;
) {
tracing::warn!(%receipt_id, presig_id, ?err, "failed to start signature generation: trashing presignature");
continue;
}
}

// add back the failed presignatures that were incompatible to be made into
// signatures due to failures or lack of participants.
for presignature in failed_presigs {
presignature_manager.insert_mine(presignature);
}

Ok(())
}

pub async fn publish<T: SignerExt>(
Expand Down
24 changes: 20 additions & 4 deletions chain-signatures/node/src/protocol/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ impl TripleManager {
/// Starts a new Beaver triple generation protocol.
pub fn generate(&mut self, participants: &Participants) -> Result<(), InitializationError> {
let id = rand::random();

// Check if the `id` is already in the system. Error out and have the next cycle try again.
if self.generators.contains_key(&id)
|| self.triples.contains_key(&id)
|| self.taken.contains_key(&id)
{
return Err(InitializationError::BadParameters(format!(
"id collision: triple_id={id}"
)));
}

tracing::debug!(id, "starting protocol to generate a new triple");
let participants: Vec<_> = participants.keys().cloned().collect();
let protocol: TripleProtocol = Box::new(cait_sith::triples::generate_triple::<Secp256k1>(
Expand Down Expand Up @@ -391,7 +402,7 @@ impl TripleManager {
/// messages to be sent to the respective participant.
///
/// An empty vector means we cannot progress until we receive a new message.
pub async fn poke(&mut self) -> Result<Vec<(Participant, TripleMessage)>, ProtocolError> {
pub async fn poke(&mut self) -> Vec<(Participant, TripleMessage)> {
// Add more protocols to the ongoing pool if there is space.
let to_generate_len = self.triple_cfg.max_concurrent_generation - self.ongoing.len();
if !self.queued.is_empty() && to_generate_len > 0 {
Expand All @@ -401,8 +412,8 @@ impl TripleManager {
}

let mut messages = Vec::new();
let mut result = Ok(());
let mut triples_to_insert = Vec::new();
let mut errors = Vec::new();
self.generators.retain(|id, generator| {
if !self.ongoing.contains(id) {
// If the protocol is not ongoing, we should retain it for the next time
Expand All @@ -414,7 +425,7 @@ impl TripleManager {
let action = match generator.poke() {
Ok(action) => action,
Err(e) => {
result = Err(e);
errors.push(e);
self.failed_triples.insert(*id, Instant::now());
self.ongoing.remove(id);
self.introduced.remove(id);
Expand Down Expand Up @@ -522,7 +533,12 @@ impl TripleManager {
}
});
self.insert_triples_to_storage(triples_to_insert).await;
result.map(|_| messages)

if !errors.is_empty() {
tracing::warn!(?errors, "faled to generate some triples");
}

messages
}

async fn insert_triples_to_storage(&mut self, triples_to_insert: Vec<Triple>) {
Expand Down
6 changes: 3 additions & 3 deletions chain-signatures/node/src/storage/secret_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ impl SecretNodeStorage for DiskNodeStorage {
}

async fn load(&self) -> SecretResult<Option<PersistentNodeData>> {
println!("loading PersistentNodeData using DiskNodeStorage");
tracing::info!("loading PersistentNodeData using DiskNodeStorage");
// Open the file asynchronously
let file_res = File::open(self.path.as_os_str()).await;

match file_res {
Ok(mut file) => {
let mut contents = Vec::new();
// Read the contents of the file into the vector
println!("loading PersistentNodeData using DiskNodeStorage: reading");
tracing::debug!("loading PersistentNodeData using DiskNodeStorage: reading");
file.read_to_end(&mut contents).await?;

println!("loading PersistentNodeData using DiskNodeStorage: read done");
tracing::debug!("loading PersistentNodeData using DiskNodeStorage: read done");
// Deserialize the JSON content to a PersistentNodeData object
let data: PersistentNodeData = serde_json::from_slice(&contents)?;

Expand Down
2 changes: 1 addition & 1 deletion chain-signatures/node/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl TestTripleManagers {

async fn poke(&mut self, index: usize) -> Result<bool, ProtocolError> {
let mut quiet = true;
let messages = self.managers[index].poke().await?;
let messages = self.managers[index].poke().await;
for (
participant,
ref tm @ TripleMessage {
Expand Down

0 comments on commit 5c486d4

Please sign in to comment.