diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 5d6bdebc..4e93780b 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -191,7 +191,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { mesh_options, message_options, } => { - let sign_queue = Arc::new(RwLock::new(SignQueue::new())); + let (sign_tx, sign_rx) = SignQueue::channel(); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?; @@ -221,7 +221,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { &indexer_options, &mpc_contract_id, &account_id, - &sign_queue, + sign_tx, app_data_storage, rpc_client.clone(), )?; @@ -257,7 +257,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { rpc_client.clone(), signer, receiver, - sign_queue, + sign_rx, key_storage, triple_storage, presignature_storage, diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index bd15a3e7..7fdd5eb3 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -1,4 +1,4 @@ -use crate::protocol::{SignQueue, SignRequest}; +use crate::protocol::SignRequest; use crate::storage::app_data_storage::AppDataStorage; use crypto_shared::{derive_epsilon, ScalarExt}; use k256::Scalar; @@ -13,7 +13,7 @@ use std::ops::Mul; use std::sync::Arc; use std::thread::JoinHandle; use std::time::{Duration, Instant}; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, RwLock}; /// Configures indexer. #[derive(Debug, Clone, clap::Parser)] @@ -169,7 +169,7 @@ impl Indexer { struct Context { mpc_contract_id: AccountId, node_account_id: AccountId, - queue: Arc>, + sign_tx: mpsc::Sender, indexer: Indexer, } @@ -267,14 +267,20 @@ async fn handle_block( // Add the requests after going through the whole block to avoid partial processing if indexer fails somewhere. // This way we can revisit the same block if we failed while not having added the requests partially. - let mut queue = ctx.queue.write().await; for request in pending_requests { - queue.add(request); + tracing::info!( + request_id = ?near_primitives::hash::CryptoHash(request.request_id), + payload = hex::encode(request.request.payload.to_bytes()), + entropy = hex::encode(request.entropy), + "new sign request" + ); + if let Err(err) = ctx.sign_tx.send(request).await { + tracing::error!(?err, "failed to send the sign request into sign queue"); + } crate::metrics::NUM_SIGN_REQUESTS .with_label_values(&[ctx.node_account_id.as_str()]) .inc(); } - drop(queue); let log_indexing_interval = 1000; if block.block_height() % log_indexing_interval == 0 { @@ -292,7 +298,7 @@ pub fn run( options: &Options, mpc_contract_id: &AccountId, node_account_id: &AccountId, - queue: &Arc>, + sign_tx: mpsc::Sender, app_data_storage: AppDataStorage, rpc_client: near_fetch::Client, ) -> anyhow::Result<(JoinHandle>, Indexer)> { @@ -308,7 +314,7 @@ pub fn run( let context = Context { mpc_contract_id: mpc_contract_id.clone(), node_account_id: node_account_id.clone(), - queue: queue.clone(), + sign_tx, indexer: indexer.clone(), }; diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index b9782373..16c05d38 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -3,7 +3,6 @@ use super::state::{ JoiningState, NodeState, PersistentNodeData, RunningState, StartedState, WaitingForConsensusState, }; -use super::SignQueue; use crate::config::Config; use crate::gcp::error::SecretStorageError; use crate::http_client::MessageQueue; @@ -12,6 +11,7 @@ use crate::protocol::presignature::PresignatureManager; use crate::protocol::signature::SignatureManager; use crate::protocol::state::{GeneratingState, ResharingState}; use crate::protocol::triple::TripleManager; +use crate::protocol::SignRequest; use crate::storage::presignature_storage::PresignatureStorage; use crate::storage::secret_storage::SecretNodeStorageBox; use crate::storage::triple_storage::TripleStorage; @@ -25,7 +25,7 @@ use std::sync::Arc; use async_trait::async_trait; use cait_sith::protocol::InitializationError; use serde_json::json; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, RwLock}; use url::Url; use near_account_id::AccountId; @@ -38,7 +38,7 @@ pub trait ConsensusCtx { fn signer(&self) -> &InMemorySigner; fn mpc_contract_id(&self) -> &AccountId; fn my_address(&self) -> &Url; - fn sign_queue(&self) -> Arc>; + fn sign_rx(&self) -> Arc>>; fn secret_storage(&self) -> &SecretNodeStorageBox; fn triple_storage(&self) -> &TripleStorage; fn presignature_storage(&self) -> &PresignatureStorage; @@ -118,7 +118,6 @@ impl ConsensusProtocol for StartedState { } Ordering::Less => Err(ConsensusError::EpochRollback), Ordering::Equal => { - let sign_queue = ctx.sign_queue(); match contract_state .participants .find_participant(ctx.my_account_id()) @@ -147,9 +146,11 @@ impl ConsensusProtocol for StartedState { let signature_manager = Arc::new(RwLock::new(SignatureManager::new( me, + ctx.my_account_id(), + contract_state.threshold, public_key, epoch, - ctx.my_account_id(), + ctx.sign_rx(), ))); Ok(NodeState::Running(RunningState { @@ -158,7 +159,6 @@ impl ConsensusProtocol for StartedState { threshold: contract_state.threshold, private_share, public_key, - sign_queue, triple_manager, presignature_manager, signature_manager, @@ -388,9 +388,11 @@ impl ConsensusProtocol for WaitingForConsensusState { let signature_manager = Arc::new(RwLock::new(SignatureManager::new( me, + ctx.my_account_id(), + self.threshold, self.public_key, self.epoch, - ctx.my_account_id(), + ctx.sign_rx(), ))); Ok(NodeState::Running(RunningState { @@ -399,7 +401,6 @@ impl ConsensusProtocol for WaitingForConsensusState { threshold: self.threshold, private_share: self.private_share, public_key: self.public_key, - sign_queue: ctx.sign_queue(), triple_manager, presignature_manager, signature_manager, diff --git a/chain-signatures/node/src/protocol/cryptography.rs b/chain-signatures/node/src/protocol/cryptography.rs index 3621b5cf..b9dfb2b3 100644 --- a/chain-signatures/node/src/protocol/cryptography.rs +++ b/chain-signatures/node/src/protocol/cryptography.rs @@ -379,10 +379,9 @@ impl CryptographicProtocol for RunningState { let presig_task = PresignatureManager::execute(&self, &active, &cfg.protocol); - let me = ctx.me().await; let stable = mesh_state.stable_participants; tracing::debug!(?stable, "stable participants"); - let sig_task = SignatureManager::execute(&self, &stable, me, &cfg.protocol, &ctx); + let sig_task = SignatureManager::execute(&self, &stable, &cfg.protocol, &ctx); match tokio::try_join!(triple_task, presig_task, sig_task) { Ok(_result) => (), @@ -398,7 +397,7 @@ impl CryptographicProtocol for RunningState { let failures = messages .send_encrypted( - me, + ctx.me().await, &cfg.local.network.sign_sk, ctx.http_client(), &active, diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 2327bf2c..4be7f956 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -49,7 +49,7 @@ struct Ctx { signer: InMemorySigner, rpc_client: near_fetch::Client, http_client: reqwest::Client, - sign_queue: Arc>, + sign_rx: Arc>>, secret_storage: SecretNodeStorageBox, triple_storage: TripleStorage, presignature_storage: PresignatureStorage, @@ -81,8 +81,8 @@ impl ConsensusCtx for &mut MpcSignProtocol { &self.ctx.my_address } - fn sign_queue(&self) -> Arc> { - self.ctx.sign_queue.clone() + fn sign_rx(&self) -> Arc>> { + self.ctx.sign_rx.clone() } fn secret_storage(&self) -> &SecretNodeStorageBox { @@ -155,7 +155,7 @@ impl MpcSignProtocol { rpc_client: near_fetch::Client, signer: InMemorySigner, receiver: mpsc::Receiver, - sign_queue: Arc>, + sign_rx: mpsc::Receiver, secret_storage: SecretNodeStorageBox, triple_storage: TripleStorage, presignature_storage: PresignatureStorage, @@ -179,7 +179,7 @@ impl MpcSignProtocol { mpc_contract_id, rpc_client, http_client: reqwest::Client::new(), - sign_queue, + sign_rx: Arc::new(RwLock::new(sign_rx)), signer, secret_storage, triple_storage, diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index a915a881..99d3938e 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -21,13 +21,19 @@ use rand::seq::{IteratorRandom, SliceRandom}; use rand::SeedableRng; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; use std::time::{Duration, Instant}; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::{mpsc, RwLock}; use near_account_id::AccountId; use near_fetch::signer::SignerExt; pub type ReceiptId = near_primitives::hash::CryptoHash; +/// This is the maximum amount of sign requests that we can accept in the network. +const MAX_SIGN_REQUESTS: usize = 1024; + pub struct SignRequest { pub request_id: [u8; 32], pub request: ContractSignRequest, @@ -36,81 +42,54 @@ pub struct SignRequest { pub time_added: Instant, } -/// Type that preserves the insertion order of requests. -#[derive(Default)] -pub struct ParticipantRequests { - requests: VecDeque, -} - -impl ParticipantRequests { - fn insert(&mut self, request: SignRequest) { - self.requests.push_back(request); - } - - pub fn len(&self) -> usize { - self.requests.len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn pop_front(&mut self) -> Option { - self.requests.pop_front() - } -} - -#[derive(Default)] pub struct SignQueue { - unorganized_requests: Vec, - requests: HashMap, + me: Participant, + sign_rx: Arc>>, + requests: HashMap>, } impl SignQueue { - pub fn new() -> Self { - Self::default() + pub fn channel() -> (mpsc::Sender, mpsc::Receiver) { + mpsc::channel(MAX_SIGN_REQUESTS) + } + + pub fn new(me: Participant, sign_rx: Arc>>) -> Self { + Self { + me, + sign_rx, + requests: HashMap::new(), + } } pub fn len(&self) -> usize { - self.unorganized_requests.len() + self.requests.values().map(|v| v.len()).sum() } pub fn is_empty(&self) -> bool { self.len() == 0 } - pub fn add(&mut self, request: SignRequest) { - tracing::info!( - request_id = ?CryptoHash(request.request_id), - payload = hex::encode(request.request.payload.to_bytes()), - entropy = hex::encode(request.entropy), - "new sign request" - ); - self.unorganized_requests.push(request); - } - - pub fn organize( + pub async fn organize( &mut self, threshold: usize, stable: &Participants, - me: Participant, my_account_id: &AccountId, ) { - if stable.len() < threshold { - tracing::warn!( - "Require at least {} stable participants to organize, got {}: {:?}", - threshold, - stable.len(), - stable.keys_vec() - ); - return; - } - for request in self.unorganized_requests.drain(..) { + let mut sign_rx = self.sign_rx.write().await; + while let Ok(request) = { + match sign_rx.try_recv() { + err @ Err(TryRecvError::Disconnected) => { + tracing::error!("sign queue channel disconnected"); + err + } + other => other, + } + } { let mut rng = StdRng::from_seed(request.entropy); let subset = stable.keys().choose_multiple(&mut rng, threshold); let proposer = **subset.choose(&mut rng).unwrap(); - if subset.contains(&&me) { - let is_mine = proposer == me; + if subset.contains(&&self.me) { + let is_mine = proposer == self.me; tracing::info!( request_id = ?CryptoHash(request.request_id), ?is_mine, @@ -119,7 +98,7 @@ impl SignQueue { "saving sign request: node is in the signer subset" ); let proposer_requests = self.requests.entry(proposer).or_default(); - proposer_requests.insert(request); + proposer_requests.push_back(request); if is_mine { crate::metrics::NUM_SIGN_REQUESTS_MINE .with_label_values(&[my_account_id.as_str()]) @@ -128,7 +107,7 @@ impl SignQueue { } else { tracing::info!( rrequest_id = ?CryptoHash(request.request_id), - ?me, + me = ?self.me, ?subset, ?proposer, "skipping sign request: node is NOT in the signer subset" @@ -137,8 +116,12 @@ impl SignQueue { } } - pub fn my_requests(&mut self, me: Participant) -> &mut ParticipantRequests { - self.requests.entry(me).or_default() + pub fn take_my_requests(&mut self) -> VecDeque { + self.requests.remove(&self.me).unwrap_or_default() + } + + pub fn insert_mine(&mut self, requests: VecDeque) { + self.requests.insert(self.me, requests); } } @@ -245,9 +228,13 @@ pub struct SignatureManager { /// Vec<(receipt_id, msg_hash, timestamp, output)> signatures: Vec, me: Participant, + my_account_id: AccountId, + threshold: usize, public_key: PublicKey, epoch: u64, - my_account_id: AccountId, + + /// Sign queue that maintains all requests coming in from indexer. + sign_queue: SignQueue, } pub const MAX_RETRY: u8 = 10; @@ -279,9 +266,11 @@ impl ToPublish { impl SignatureManager { pub fn new( me: Participant, + my_account_id: &AccountId, + threshold: usize, public_key: PublicKey, epoch: u64, - my_account_id: &AccountId, + sign_rx: Arc>>, ) -> Self { Self { generators: HashMap::new(), @@ -289,9 +278,11 @@ impl SignatureManager { completed: HashMap::new(), signatures: Vec::new(), me, + my_account_id: my_account_id.clone(), + threshold, public_key, epoch, - my_account_id: my_account_id.clone(), + sign_queue: SignQueue::new(me, sign_rx), } } @@ -614,21 +605,31 @@ impl SignatureManager { pub async fn handle_requests( &mut self, - threshold: usize, stable: &Participants, - my_requests: &mut ParticipantRequests, presignature_manager: &mut PresignatureManager, cfg: &ProtocolConfig, ) { - if stable.len() < threshold { + if stable.len() < self.threshold { tracing::warn!( "Require at least {} stable participants to handle_requests, got {}: {:?}", - threshold, + self.threshold, stable.len(), stable.keys_vec() ); return; } + + self.sign_queue + .organize(self.threshold, stable, &self.my_account_id) + .await; + crate::metrics::SIGN_QUEUE_SIZE + .with_label_values(&[self.my_account_id.as_str()]) + .set(self.sign_queue.len() as i64); + let mut my_requests = self.sign_queue.take_my_requests(); + crate::metrics::SIGN_QUEUE_MINE_SIZE + .with_label_values(&[self.my_account_id.as_str()]) + .set(my_requests.len() as i64); + while let Some(mut presignature) = { if self.failed.is_empty() && my_requests.is_empty() { None @@ -637,7 +638,7 @@ impl SignatureManager { } } { let sig_participants = stable.intersection(&[&presignature.participants]); - if sig_participants.len() < threshold { + if sig_participants.len() < self.threshold { tracing::warn!( participants = ?sig_participants.keys_vec(), "intersection of stable participants and presignature participants is less than threshold, trashing presignature" @@ -676,7 +677,7 @@ impl SignatureManager { } let Some(my_request) = my_requests.pop_front() else { - tracing::warn!("Unexpected state, no more requests to handle"); + tracing::warn!("unexpected state, no more requests to handle"); continue; }; @@ -694,6 +695,12 @@ impl SignatureManager { continue; } } + + // We do not have enough presignature stockpile and the taken requests need to be fulfilled, + // so insert it back into the sign queue to be fulfilled in the next iteration. + if !my_requests.is_empty() { + self.sign_queue.insert_mine(my_requests); + } } pub async fn publish( @@ -804,18 +811,14 @@ impl SignatureManager { pub fn execute( state: &RunningState, stable: &Participants, - me: Participant, protocol_cfg: &ProtocolConfig, ctx: &impl super::cryptography::CryptographicCtx, ) -> tokio::task::JoinHandle<()> { - let threshold = state.threshold; - let my_account_id = state.triple_manager.my_account_id.clone(); let presignature_manager = state.presignature_manager.clone(); let signature_manager = state.signature_manager.clone(); let messages = state.messages.clone(); let stable = stable.clone(); let protocol_cfg = protocol_cfg.clone(); - let sign_queue = state.sign_queue.clone(); let rpc_client = ctx.rpc_client().clone(); let signer = ctx.signer().clone(); let mpc_contract_id = ctx.mpc_contract_id().clone(); @@ -826,27 +829,10 @@ impl SignatureManager { // then they are considered unstable and should not be a part of signature generation this round. tokio::task::spawn(tokio::task::unconstrained(async move { - let mut sign_queue = sign_queue.write().await; - crate::metrics::SIGN_QUEUE_SIZE - .with_label_values(&[my_account_id.as_str()]) - .set(sign_queue.len() as i64); - sign_queue.organize(threshold, &stable, me, &my_account_id); - - let my_requests = sign_queue.my_requests(me); - crate::metrics::SIGN_QUEUE_MINE_SIZE - .with_label_values(&[my_account_id.as_str()]) - .set(my_requests.len() as i64); - - let mut presignature_manager = presignature_manager.write().await; let mut signature_manager = signature_manager.write().await; + let mut presignature_manager = presignature_manager.write().await; signature_manager - .handle_requests( - threshold, - &stable, - my_requests, - &mut presignature_manager, - &protocol_cfg, - ) + .handle_requests(&stable, &mut presignature_manager, &protocol_cfg) .await; drop(presignature_manager); diff --git a/chain-signatures/node/src/protocol/state.rs b/chain-signatures/node/src/protocol/state.rs index e1fe1b91..31a4bb6e 100644 --- a/chain-signatures/node/src/protocol/state.rs +++ b/chain-signatures/node/src/protocol/state.rs @@ -3,7 +3,6 @@ use super::cryptography::CryptographicError; use super::presignature::PresignatureManager; use super::signature::SignatureManager; use super::triple::TripleManager; -use super::SignQueue; use crate::http_client::MessageQueue; use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; @@ -91,7 +90,6 @@ pub struct RunningState { pub threshold: usize, pub private_share: SecretKeyShare, pub public_key: PublicKey, - pub sign_queue: Arc>, pub triple_manager: TripleManager, pub presignature_manager: Arc>, pub signature_manager: Arc>,