From 922bb1251e70cb81a1b5ba7278669f33657c4ca6 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Mon, 13 Jan 2025 19:54:18 -0800 Subject: [PATCH] feat: add MessageChannel for async protocol message receiving and sending (#93) * Added MessageChannel for receiving messages from endpoints * Replace MessageQueue with MessageChannel * Made NodeClient for requesting node requests * No longer need to find me() * Cleanup encryption * Cleanup encryption again by files * Rename http_client to node_client * Rename to inbox and outbox * Added back queue size metric * Made triple directly use channel * Parallelize sending messages * Made presignatures/signatures use channel directly * clippy * fix reshare test --- chain-signatures/node/src/cli.rs | 51 +- chain-signatures/node/src/contract_updater.rs | 4 +- chain-signatures/node/src/http_client.rs | 301 ---------- chain-signatures/node/src/lib.rs | 2 +- chain-signatures/node/src/mesh/connection.rs | 77 +-- chain-signatures/node/src/mesh/mod.rs | 45 +- chain-signatures/node/src/node_client.rs | 117 ++++ .../node/src/protocol/consensus.rs | 49 +- .../node/src/protocol/contract/primitives.rs | 11 +- .../node/src/protocol/cryptography.rs | 245 +++----- chain-signatures/node/src/protocol/message.rs | 542 +++++++++++++++--- chain-signatures/node/src/protocol/mod.rs | 95 +-- .../node/src/protocol/presignature.rs | 94 +-- .../node/src/protocol/signature.rs | 130 +++-- chain-signatures/node/src/protocol/state.rs | 7 +- chain-signatures/node/src/protocol/triple.rs | 121 ++-- chain-signatures/node/src/types.rs | 2 + chain-signatures/node/src/web/error.rs | 4 +- chain-signatures/node/src/web/mod.rs | 6 +- integration-tests/src/lib.rs | 12 +- 20 files changed, 959 insertions(+), 956 deletions(-) delete mode 100644 chain-signatures/node/src/http_client.rs create mode 100644 chain-signatures/node/src/node_client.rs diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 4e93780b..54c366a9 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -1,9 +1,11 @@ use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig}; use crate::gcp::GcpService; use crate::mesh::Mesh; +use crate::node_client::{self, NodeClient}; +use crate::protocol::message::MessageChannel; use crate::protocol::{MpcSignProtocol, SignQueue}; use crate::storage::app_data_storage; -use crate::{http_client, indexer, mesh, storage, web}; +use crate::{indexer, mesh, storage, web}; use clap::Parser; use deadpool_redis::Runtime; use local_ip_address::local_ip; @@ -11,7 +13,7 @@ use near_account_id::AccountId; use near_crypto::{InMemorySigner, SecretKey}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::RwLock; use tracing_stackdriver::layer as stackdriver_layer; use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry}; use url::Url; @@ -68,7 +70,7 @@ pub enum Cli { #[clap(flatten)] mesh_options: mesh::Options, #[clap(flatten)] - message_options: http_client::Options, + message_options: node_client::Options, }, } @@ -237,11 +239,10 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { Url::parse(&format!("http://{my_ip}:{web_port}")).unwrap() }); - let (sender, receiver) = mpsc::channel(16384); - tracing::info!(%my_address, "address detected"); + let client = NodeClient::new(&message_options); let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk); - let (mesh, mesh_state) = Mesh::init(mesh_options); + let (mesh, mesh_state) = Mesh::init(&client, mesh_options); let config = Arc::new(RwLock::new(Config::new(LocalConfig { over: override_config.unwrap_or_else(Default::default), network: NetworkConfig { @@ -250,24 +251,26 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { }, }))); let contract_state = Arc::new(RwLock::new(None)); - let (protocol, protocol_state) = MpcSignProtocol::init( - my_address, - mpc_contract_id.clone(), - account_id, - rpc_client.clone(), - signer, - receiver, - sign_rx, - key_storage, - triple_storage, - presignature_storage, - message_options, - ); let contract_updater = - crate::contract_updater::ContractUpdater::init(rpc_client, mpc_contract_id); + crate::contract_updater::ContractUpdater::init(&rpc_client, &mpc_contract_id); rt.block_on(async { + let (sender, channel) = + MessageChannel::spawn(client, &account_id, &config, &mesh_state).await; + let (protocol, protocol_state) = MpcSignProtocol::init( + my_address, + mpc_contract_id, + account_id, + rpc_client, + signer, + channel, + sign_rx, + key_storage, + triple_storage, + presignature_storage, + ); + tracing::info!("protocol initialized"); let contract_handle = tokio::spawn({ let contract_state = Arc::clone(&contract_state); @@ -278,12 +281,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { let contract_state = Arc::clone(&contract_state); async move { mesh.run(contract_state).await } }); - let protocol_handle = tokio::spawn({ - let contract_state = Arc::clone(&contract_state); - let config = Arc::clone(&config); - let mesh_state = Arc::clone(&mesh_state); - async move { protocol.run(contract_state, config, mesh_state).await } - }); + let protocol_handle = + tokio::spawn(protocol.run(contract_state, config, mesh_state)); tracing::info!("protocol thread spawned"); let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?; let web_handle = tokio::spawn(async move { diff --git a/chain-signatures/node/src/contract_updater.rs b/chain-signatures/node/src/contract_updater.rs index 27d5f80a..f7defb34 100644 --- a/chain-signatures/node/src/contract_updater.rs +++ b/chain-signatures/node/src/contract_updater.rs @@ -12,9 +12,9 @@ pub struct ContractUpdater { } impl ContractUpdater { - pub fn init(rpc_client: near_fetch::Client, mpc_contract_id: AccountId) -> Self { + pub fn init(rpc_client: &near_fetch::Client, mpc_contract_id: &AccountId) -> Self { Self { - rpc_client, + rpc_client: rpc_client.clone(), mpc_contract_id: mpc_contract_id.clone(), } } diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs deleted file mode 100644 index 44bf82b7..00000000 --- a/chain-signatures/node/src/http_client.rs +++ /dev/null @@ -1,301 +0,0 @@ -use crate::protocol::contract::primitives::Participants; -use crate::protocol::message::SignedMessage; -use crate::protocol::MpcMessage; -use cait_sith::protocol::Participant; -use mpc_contract::config::ProtocolConfig; -use mpc_keys::hpke::Ciphered; -use reqwest::{Client, IntoUrl}; -use std::collections::{HashMap, HashSet, VecDeque}; -use std::str::Utf8Error; -use std::time::{Duration, Instant}; -use tokio_retry::strategy::{jitter, ExponentialBackoff}; -use tokio_retry::Retry; - -#[derive(Debug, Clone, clap::Parser)] -#[group(id = "message_options")] -pub struct Options { - #[clap(long, env("MPC_MESSAGE_TIMEOUT"), default_value = "1000")] - pub timeout: u64, -} - -impl Options { - pub fn into_str_args(self) -> Vec { - vec!["--timeout".to_string(), self.timeout.to_string()] - } -} - -#[derive(Debug, thiserror::Error)] -pub enum SendError { - #[error("http request was unsuccessful: {0}")] - Unsuccessful(String), - #[error("serialization unsuccessful: {0}")] - DataConversionError(serde_json::Error), - #[error("http client error: {0}")] - ReqwestClientError(#[from] reqwest::Error), - #[error("http response could not be parsed: {0}")] - ReqwestBodyError(reqwest::Error), - #[error("http response body is not valid utf-8: {0}")] - MalformedResponse(Utf8Error), - #[error("encryption error: {0}")] - EncryptionError(String), - #[error("http request timeout: {0}")] - Timeout(String), - #[error("participant is not alive: {0}")] - ParticipantNotAlive(String), -} - -pub async fn send_encrypted( - from: Participant, - client: &Client, - url: U, - message: Vec, - request_timeout: Duration, -) -> Result<(), SendError> { - let _span = tracing::info_span!("message_request"); - let mut url = url.into_url()?; - url.set_path("msg"); - tracing::debug!(?from, to = %url, "making http request: sending encrypted message"); - let action = || async { - let response = tokio::time::timeout( - request_timeout, - client - .post(url.clone()) - .header("content-type", "application/json") - .json(&message) - .send(), - ) - .await - .map_err(|_| SendError::Timeout(format!("send encrypted from {from:?} to {url}")))? - .map_err(SendError::ReqwestClientError)?; - - let status = response.status(); - let response_bytes = response - .bytes() - .await - .map_err(SendError::ReqwestBodyError)?; - let response_str = - std::str::from_utf8(&response_bytes).map_err(SendError::MalformedResponse)?; - if status.is_success() { - Ok(()) - } else { - tracing::warn!( - "failed to send a message to {} with code {}: {}", - url, - status, - response_str - ); - Err(SendError::Unsuccessful(response_str.into())) - } - }; - - let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(3); - Retry::spawn(retry_strategy, action).await -} - -// TODO: add in retry logic either in struct or at call site. -// TODO: add check for participant list to see if the messages to be sent are still valid. -pub struct MessageQueue { - deque: VecDeque<(Participant, MpcMessage, Instant)>, - seen_counts: HashSet, - message_options: Options, -} - -impl MessageQueue { - pub fn new(options: Options) -> Self { - Self { - deque: VecDeque::default(), - seen_counts: HashSet::default(), - message_options: options, - } - } - - pub fn len(&self) -> usize { - self.deque.len() - } - - pub fn is_empty(&self) -> bool { - self.deque.is_empty() - } - - pub fn push(&mut self, node: Participant, msg: MpcMessage) { - self.deque.push_back((node, msg, Instant::now())); - } - - pub fn extend(&mut self, other: impl IntoIterator) { - self.deque - .extend(other.into_iter().map(|(i, msg)| (i, msg, Instant::now()))); - } - - pub async fn send_encrypted( - &mut self, - from: Participant, - sign_sk: &near_crypto::SecretKey, - client: &Client, - active: &Participants, - cfg: &ProtocolConfig, - ) -> Vec { - let mut failed = VecDeque::new(); - let mut errors = Vec::new(); - let mut participant_counter = HashMap::new(); - - let outer = Instant::now(); - let uncompacted = self.deque.len(); - let mut encrypted = HashMap::new(); - while let Some((id, msg, instant)) = self.deque.pop_front() { - if instant.elapsed() > timeout(&msg, cfg) { - errors.push(SendError::Timeout(format!( - "{} message has timed out for node={id:?}", - msg.typename(), - ))); - continue; - } - - let Some(info) = active.get(&id) else { - let counter = participant_counter.entry(id).or_insert(0); - *counter += 1; - failed.push_back((id, msg, instant)); - continue; - }; - - let encrypted_msg = match SignedMessage::encrypt(&msg, from, sign_sk, &info.cipher_pk) { - Ok(encrypted) => encrypted, - Err(err) => { - errors.push(SendError::EncryptionError(err.to_string())); - continue; - } - }; - let encrypted = encrypted.entry(info.id).or_insert_with(Vec::new); - encrypted.push((encrypted_msg, (id, msg, instant))); - } - - let mut compacted = 0; - for (id, encrypted) in encrypted { - for partition in partition_ciphered_256kb(encrypted) { - let (encrypted_partition, msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip(); - // guaranteed to unwrap due to our previous loop check: - let info = active.get(&Participant::from(id)).unwrap(); - let account_id = &info.account_id; - let number_of_messages = encrypted_partition.len() as f64; - - let start = Instant::now(); - crate::metrics::NUM_SEND_ENCRYPTED_TOTAL - .with_label_values(&[account_id.as_str()]) - .inc_by(number_of_messages); - if let Err(err) = send_encrypted( - from, - client, - &info.url, - encrypted_partition, - Duration::from_millis(self.message_options.timeout), - ) - .await - { - crate::metrics::NUM_SEND_ENCRYPTED_FAILURE - .with_label_values(&[account_id.as_str()]) - .inc_by(number_of_messages); - crate::metrics::FAILED_SEND_ENCRYPTED_LATENCY - .with_label_values(&[account_id.as_str()]) - .observe(start.elapsed().as_millis() as f64); - - // since we failed, put back all the messages related to this - failed.extend(msgs); - errors.push(err); - } else { - compacted += msgs.len(); - crate::metrics::SEND_ENCRYPTED_LATENCY - .with_label_values(&[account_id.as_str()]) - .observe(start.elapsed().as_millis() as f64); - } - } - } - - if uncompacted > 0 { - tracing::debug!( - uncompacted, - compacted, - "{from:?} sent messages in {:?};", - outer.elapsed() - ); - } - // only add the participant count if it hasn't been seen before. - let counts = format!("{participant_counter:?}"); - if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) { - errors.push(SendError::ParticipantNotAlive(format!( - "participants not responding: {counts:?}", - ))); - } - - // Add back the failed attempts for next time. - self.deque = failed; - if !errors.is_empty() { - tracing::warn!("got errors when sending encrypted messages: {errors:?}"); - } - errors - } -} - -/// Encrypted message with a reference to the old message. Only the ciphered portion of this -/// type will be sent over the wire, while the original message is kept just in case things -/// go wrong somewhere and the message needs to be requeued to be sent later. -type EncryptedMessage = (Ciphered, (Participant, MpcMessage, Instant)); - -fn partition_ciphered_256kb(encrypted: Vec) -> Vec> { - let mut result = Vec::new(); - let mut current_partition = Vec::new(); - let mut current_size: usize = 0; - - for ciphered in encrypted { - let bytesize = ciphered.0.text.len(); - if current_size + bytesize > 256 * 1024 { - // If adding this byte vector exceeds 256kb, start a new partition - result.push(current_partition); - current_partition = Vec::new(); - current_size = 0; - } - current_partition.push(ciphered); - current_size += bytesize; - } - - if !current_partition.is_empty() { - // Add the last partition - result.push(current_partition); - } - - result -} - -fn timeout(msg: &MpcMessage, cfg: &ProtocolConfig) -> Duration { - match msg { - MpcMessage::Generating(_) => Duration::from_millis(cfg.message_timeout), - MpcMessage::Resharing(_) => Duration::from_millis(cfg.message_timeout), - MpcMessage::Triple(_) => Duration::from_millis(cfg.triple.generation_timeout), - MpcMessage::Presignature(_) => Duration::from_millis(cfg.presignature.generation_timeout), - MpcMessage::Signature(_) => Duration::from_millis(cfg.signature.generation_timeout), - } -} - -#[cfg(test)] -mod tests { - use crate::protocol::message::GeneratingMessage; - use crate::protocol::MpcMessage; - - #[test] - fn test_sending_encrypted_message() { - let associated_data = b""; - let (sk, pk) = mpc_keys::hpke::generate(); - let starting_message = MpcMessage::Generating(GeneratingMessage { - from: cait_sith::protocol::Participant::from(0), - data: vec![], - }); - - let message = serde_json::to_vec(&starting_message).unwrap(); - let message = pk.encrypt(&message, associated_data).unwrap(); - - let message = serde_json::to_vec(&message).unwrap(); - let cipher = serde_json::from_slice(&message).unwrap(); - let message = sk.decrypt(&cipher, associated_data).unwrap(); - let message: MpcMessage = serde_json::from_slice(&message).unwrap(); - - assert_eq!(starting_message, message); - } -} diff --git a/chain-signatures/node/src/lib.rs b/chain-signatures/node/src/lib.rs index 2d8afca7..5b401c2e 100644 --- a/chain-signatures/node/src/lib.rs +++ b/chain-signatures/node/src/lib.rs @@ -2,11 +2,11 @@ pub mod cli; pub mod config; pub mod contract_updater; pub mod gcp; -pub mod http_client; pub mod indexer; pub mod kdf; pub mod mesh; pub mod metrics; +pub mod node_client; pub mod protocol; pub mod rpc_client; pub mod storage; diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index a889db53..2e652954 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -3,22 +3,19 @@ use std::time::{Duration, Instant}; use cait_sith::protocol::Participant; use tokio::sync::RwLock; -use url::Url; +use crate::node_client::NodeClient; use crate::protocol::contract::primitives::Participants; -use crate::protocol::ParticipantInfo; use crate::protocol::ProtocolState; use crate::web::StateView; -use mpc_keys::hpke::Ciphered; use std::sync::Arc; use tokio::task::JoinSet; // TODO: this is a basic connection pool and does not do most of the work yet. This is // mostly here just to facilitate offline node handling for now. // TODO/NOTE: we can use libp2p to facilitate most the of low level TCP connection work. -#[derive(Default)] pub struct Pool { - http: reqwest::Client, + client: NodeClient, connections: RwLock, potential_connections: RwLock, status: RwLock>, @@ -27,37 +24,19 @@ pub struct Pool { current_active: RwLock>, // Potentially active participants that we can use to establish a connection in the next epoch. potential_active: RwLock>, - fetch_participant_timeout: Duration, refresh_active_timeout: Duration, } -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] -pub enum FetchParticipantError { - #[error("request timed out")] - Timeout, - #[error("Response cannot be converted to JSON")] - JsonConversion, - #[error("Invalid URL")] - InvalidUrl, - #[error("Network error: {0}")] - NetworkError(String), -} - impl Pool { - pub fn new(fetch_participant_timeout: Duration, refresh_active_timeout: Duration) -> Self { - tracing::info!( - ?fetch_participant_timeout, - ?refresh_active_timeout, - "creating a new pool" - ); + pub fn new(client: &NodeClient, refresh_active_timeout: Duration) -> Self { + tracing::info!(?refresh_active_timeout, "creating a new pool"); Self { - http: reqwest::Client::new(), + client: client.clone(), connections: RwLock::new(Participants::default()), potential_connections: RwLock::new(Participants::default()), status: RwLock::new(HashMap::default()), current_active: RwLock::new(Option::default()), potential_active: RwLock::new(Option::default()), - fetch_participant_timeout, refresh_active_timeout, } } @@ -82,8 +61,8 @@ impl Pool { let pool = Arc::clone(&self); join_set.spawn(async move { - match pool.fetch_participant_state(&info).await { - Ok(state) => match pool.send_empty_msg(&participant, &info).await { + match pool.client.state(&info.url).await { + Ok(state) => match pool.client.msg_empty(&info.url).await { Ok(()) => Ok((participant, state, info)), Err(e) => { tracing::warn!( @@ -151,8 +130,8 @@ impl Pool { let pool = Arc::clone(&self); // Clone Arc for use inside tasks join_set.spawn(async move { - match pool.fetch_participant_state(&info).await { - Ok(state) => match pool.send_empty_msg(&participant, &info).await { + match pool.client.state(&info.url).await { + Ok(state) => match pool.client.msg_empty(&info.url).await { Ok(()) => Ok((participant, state, info)), Err(e) => { tracing::warn!( @@ -263,42 +242,4 @@ impl Pool { } stable } - - async fn fetch_participant_state( - &self, - participant_info: &ParticipantInfo, - ) -> Result { - let Ok(Ok(url)) = Url::parse(&participant_info.url).map(|url| url.join("/state")) else { - return Err(FetchParticipantError::InvalidUrl); - }; - match tokio::time::timeout( - self.fetch_participant_timeout, - self.http.get(url.clone()).send(), - ) - .await - { - Ok(Ok(resp)) => match resp.json::().await { - Ok(state) => Ok(state), - Err(_) => Err(FetchParticipantError::JsonConversion), - }, - Ok(Err(e)) => Err(FetchParticipantError::NetworkError(e.to_string())), - Err(_) => Err(FetchParticipantError::Timeout), - } - } - - async fn send_empty_msg( - &self, - participant: &Participant, - participant_info: &ParticipantInfo, - ) -> Result<(), crate::http_client::SendError> { - let empty_msg: Vec = Vec::new(); - crate::http_client::send_encrypted( - *participant, - &self.http, - participant_info.url.clone(), - empty_msg, - self.fetch_participant_timeout, - ) - .await - } } diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 364740a2..d70170cb 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use crate::node_client::NodeClient; use crate::protocol::contract::primitives::Participants; use crate::protocol::ProtocolState; use std::sync::Arc; @@ -10,12 +11,6 @@ pub mod connection; #[derive(Debug, Clone, clap::Parser)] #[group(id = "mesh_options")] pub struct Options { - #[clap( - long, - env("MPC_MESH_FETCH_PARTICIPANT_TIMEOUT"), - default_value = "1000" - )] - pub fetch_participant_timeout: u64, #[clap(long, env("MPC_MESH_REFRESH_ACTIVE_TIMEOUT"), default_value = "1000")] pub refresh_active_timeout: u64, } @@ -23,8 +18,6 @@ pub struct Options { impl Options { pub fn into_str_args(self) -> Vec { vec![ - "--fetch-participant-timeout".to_string(), - self.fetch_participant_timeout.to_string(), "--refresh-active-timeout".to_string(), self.refresh_active_timeout.to_string(), ] @@ -33,14 +26,24 @@ impl Options { #[derive(Clone, Default)] pub struct MeshState { - pub active_participants: Participants, + /// Participants that are active in the network; as in they respond when pinged. + pub active: Participants, /// Potential participants that are active including participants belonging to the next epoch. - pub active_potential_participants: Participants, + pub active_potential: Participants, - pub potential_participants: Participants, + /// Full list of potential participants that have yet to join the network. + pub potential: Participants, - pub stable_participants: Participants, + /// Participants that are stable in the network; as in they have met certain criterias such + /// as indexing the latest blocks. + pub stable: Participants, +} + +impl MeshState { + pub fn active_with_potential(&self) -> Participants { + self.active.and(&self.active_potential) + } } pub struct Mesh { @@ -50,11 +53,11 @@ pub struct Mesh { } impl Mesh { - pub fn init(options: Options) -> (Self, Arc>) { + pub fn init(client: &NodeClient, options: Options) -> (Self, Arc>) { let state = Arc::new(RwLock::new(MeshState::default())); let mesh = Self { connections: Arc::new(connection::Pool::new( - Duration::from_millis(options.fetch_participant_timeout), + client, Duration::from_millis(options.refresh_active_timeout), )), state: state.clone(), @@ -63,14 +66,14 @@ impl Mesh { } async fn ping(&mut self) { - let mut mesh_state = self.state.write().await; - *mesh_state = MeshState { - active_participants: Arc::clone(&self.connections).ping().await, - active_potential_participants: Arc::clone(&self.connections).ping_potential().await, - potential_participants: self.connections.potential_participants().await, - stable_participants: self.connections.stable_participants().await, + let state = MeshState { + active: Arc::clone(&self.connections).ping().await, + active_potential: Arc::clone(&self.connections).ping_potential().await, + potential: self.connections.potential_participants().await, + stable: self.connections.stable_participants().await, }; - drop(mesh_state); + + *self.state.write().await = state; } pub async fn run( diff --git a/chain-signatures/node/src/node_client.rs b/chain-signatures/node/src/node_client.rs new file mode 100644 index 00000000..05fbc5da --- /dev/null +++ b/chain-signatures/node/src/node_client.rs @@ -0,0 +1,117 @@ +use crate::web::StateView; +use mpc_keys::hpke::Ciphered; +use reqwest::IntoUrl; +use std::str::Utf8Error; +use std::time::Duration; +use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use tokio_retry::Retry; +use url::Url; + +#[derive(Debug, Clone, clap::Parser)] +#[group(id = "message_options")] +pub struct Options { + /// Default timeout used for all outbound requests to other nodes. + #[clap(long, env("MPC_NODE_TIMEOUT"), default_value = "1000")] + pub timeout: u64, + + /// Timeout used for fetching the state of a node. + #[clap(long, env("MPC_NODE_STATE_TIMEOUT"), default_value = "1000")] + pub state_timeout: u64, +} + +impl Options { + pub fn into_str_args(self) -> Vec { + vec![ + "--timeout".to_string(), + self.timeout.to_string(), + "--state-timeout".to_string(), + self.state_timeout.to_string(), + ] + } +} + +#[derive(Debug, thiserror::Error)] +pub enum SendError { + #[error("http request was unsuccessful: {0}")] + Unsuccessful(String), + #[error("serialization unsuccessful: {0}")] + DataConversionError(serde_json::Error), + #[error("http client error: {0}")] + ReqwestClientError(#[from] reqwest::Error), + #[error("http response could not be parsed: {0}")] + ReqwestBodyError(reqwest::Error), + #[error("http response body is not valid utf-8: {0}")] + MalformedResponse(Utf8Error), + #[error("encryption error: {0}")] + EncryptionError(String), + #[error("http request timeout: {0}")] + Timeout(String), + #[error("participant is not alive: {0}")] + ParticipantNotAlive(String), + #[error("cannot convert into json: {0}")] + Conversion(#[from] serde_json::Error), +} + +#[derive(Debug, Clone)] +pub struct NodeClient { + http: reqwest::Client, + options: Options, +} + +impl NodeClient { + pub fn new(options: &Options) -> Self { + Self { + http: reqwest::Client::builder() + .timeout(Duration::from_millis(options.timeout)) + .build() + .unwrap(), + options: options.clone(), + } + } + + async fn post_msg(&self, url: &Url, msg: &[Ciphered]) -> Result<(), SendError> { + let resp = self + .http + .post(url.clone()) + .header("content-type", "application/json") + .json(&msg) + .send() + .await?; + + let status = resp.status(); + if status.is_success() { + Ok(()) + } else { + let bytes = resp.bytes().await.map_err(SendError::ReqwestBodyError)?; + let resp = std::str::from_utf8(&bytes).map_err(SendError::MalformedResponse)?; + tracing::warn!("failed to send a message to {url} with code {status}: {resp}"); + Err(SendError::Unsuccessful(resp.into())) + } + } + + pub async fn msg(&self, base: impl IntoUrl, msg: &[Ciphered]) -> Result<(), SendError> { + let mut url = base.into_url()?; + url.set_path("msg"); + + let strategy = ExponentialBackoff::from_millis(10).map(jitter).take(3); + Retry::spawn(strategy, || self.post_msg(&url, msg)).await + } + + pub async fn msg_empty(&self, base: impl IntoUrl) -> Result<(), SendError> { + self.msg(base, &[]).await + } + + pub async fn state(&self, base: impl IntoUrl) -> Result { + let mut url = base.into_url()?; + url.set_path("state"); + + let resp = self + .http + .get(url) + .timeout(Duration::from_millis(self.options.state_timeout)) + .send() + .await?; + + Ok(resp.json::().await?) + } +} diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index 16c05d38..d75fb0cb 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -5,19 +5,18 @@ use super::state::{ }; use crate::config::Config; use crate::gcp::error::SecretStorageError; -use crate::http_client::MessageQueue; use crate::protocol::contract::primitives::Participants; use crate::protocol::presignature::PresignatureManager; use crate::protocol::signature::SignatureManager; use crate::protocol::state::{GeneratingState, ResharingState}; use crate::protocol::triple::TripleManager; use crate::protocol::SignRequest; +use crate::rpc_client; use crate::storage::presignature_storage::PresignatureStorage; use crate::storage::secret_storage::SecretNodeStorageBox; use crate::storage::triple_storage::TripleStorage; use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; use crate::util::AffinePointExt; -use crate::{http_client, rpc_client}; use std::cmp::Ordering; use std::sync::Arc; @@ -33,7 +32,6 @@ use near_crypto::InMemorySigner; pub trait ConsensusCtx { fn my_account_id(&self) -> &AccountId; - fn http_client(&self) -> &reqwest::Client; fn rpc_client(&self) -> &near_fetch::Client; fn signer(&self) -> &InMemorySigner; fn mpc_contract_id(&self) -> &AccountId; @@ -42,7 +40,6 @@ pub trait ConsensusCtx { fn secret_storage(&self) -> &SecretNodeStorageBox; fn triple_storage(&self) -> &TripleStorage; fn presignature_storage(&self) -> &PresignatureStorage; - fn message_options(&self) -> http_client::Options; } #[derive(thiserror::Error, Debug)] @@ -127,7 +124,7 @@ impl ConsensusProtocol for StartedState { "started: contract state is running and we are already a participant" ); let triple_manager = TripleManager::new( - me, + *me, contract_state.threshold, epoch, ctx.my_account_id(), @@ -136,7 +133,7 @@ impl ConsensusProtocol for StartedState { let presignature_manager = Arc::new(RwLock::new(PresignatureManager::new( - me, + *me, contract_state.threshold, epoch, ctx.my_account_id(), @@ -145,7 +142,7 @@ impl ConsensusProtocol for StartedState { let signature_manager = Arc::new(RwLock::new(SignatureManager::new( - me, + *me, ctx.my_account_id(), contract_state.threshold, public_key, @@ -162,9 +159,6 @@ impl ConsensusProtocol for StartedState { triple_manager, presignature_manager, signature_manager, - messages: Arc::new(RwLock::new(MessageQueue::new( - ctx.message_options().clone(), - ))), })) } None => Ok(NodeState::Joining(JoiningState { @@ -210,17 +204,15 @@ impl ConsensusProtocol for StartedState { "started(initializing): starting key generation as a part of the participant set" ); let protocol = KeygenProtocol::new( - &participants.keys().cloned().collect::>(), - me, + &participants.keys_vec(), + *me, contract_state.threshold, )?; Ok(NodeState::Generating(GeneratingState { + me: *me, participants, threshold: contract_state.threshold, protocol, - messages: Arc::new(RwLock::new(MessageQueue::new( - ctx.message_options().clone(), - ))), })) } None => { @@ -349,10 +341,13 @@ impl ConsensusProtocol for WaitingForConsensusState { return Err(ConsensusError::MismatchedPublicKey); } - let me = contract_state + let Some(me) = contract_state .participants .find_participant(ctx.my_account_id()) - .unwrap(); + else { + tracing::error!("waiting(running, unexpected): we do not belong to the participant set -- cannot progress!"); + return Ok(NodeState::WaitingForConsensus(self)); + }; // Clear triples from storage before starting the new epoch. This is necessary if the node has accumulated // triples from previous epochs. If it was not able to clear the previous triples, we'll leave them as-is @@ -371,7 +366,7 @@ impl ConsensusProtocol for WaitingForConsensusState { } let triple_manager = TripleManager::new( - me, + *me, self.threshold, self.epoch, ctx.my_account_id(), @@ -379,7 +374,7 @@ impl ConsensusProtocol for WaitingForConsensusState { ); let presignature_manager = Arc::new(RwLock::new(PresignatureManager::new( - me, + *me, self.threshold, self.epoch, ctx.my_account_id(), @@ -387,7 +382,7 @@ impl ConsensusProtocol for WaitingForConsensusState { ))); let signature_manager = Arc::new(RwLock::new(SignatureManager::new( - me, + *me, ctx.my_account_id(), self.threshold, self.public_key, @@ -404,7 +399,6 @@ impl ConsensusProtocol for WaitingForConsensusState { triple_manager, presignature_manager, signature_manager, - messages: self.messages, })) } }, @@ -739,17 +733,20 @@ async fn start_resharing( let me = contract_state .new_participants .find_participant(ctx.my_account_id()) - .unwrap(); - let protocol = ReshareProtocol::new(private_share, me, &contract_state)?; + .or_else(|| { + contract_state + .old_participants + .find_participant(ctx.my_account_id()) + }) + .expect("unexpected: cannot find us in the participant set while starting resharing"); + let protocol = ReshareProtocol::new(private_share, *me, &contract_state)?; Ok(NodeState::Resharing(ResharingState { + me: *me, old_epoch: contract_state.old_epoch, old_participants: contract_state.old_participants, new_participants: contract_state.new_participants, threshold: contract_state.threshold, public_key: contract_state.public_key, protocol, - messages: Arc::new(RwLock::new(MessageQueue::new( - ctx.message_options().clone(), - ))), })) } diff --git a/chain-signatures/node/src/protocol/contract/primitives.rs b/chain-signatures/node/src/protocol/contract/primitives.rs index 149722a4..2d67598c 100644 --- a/chain-signatures/node/src/protocol/contract/primitives.rs +++ b/chain-signatures/node/src/protocol/contract/primitives.rs @@ -138,17 +138,18 @@ impl Participants { self.participants.iter() } - pub fn find_participant(&self, account_id: &AccountId) -> Option { + pub fn find(&self, account_id: &AccountId) -> Option<(&Participant, &ParticipantInfo)> { self.participants .iter() .find(|(_, participant_info)| participant_info.account_id == *account_id) - .map(|(participant, _)| *participant) + } + + pub fn find_participant(&self, account_id: &AccountId) -> Option<&Participant> { + self.find(account_id).map(|(participant, _)| participant) } pub fn find_participant_info(&self, account_id: &AccountId) -> Option<&ParticipantInfo> { - self.participants - .values() - .find(|participant_info| participant_info.account_id == *account_id) + self.find(account_id).map(|(_, info)| info) } pub fn contains_account_id(&self, account_id: &AccountId) -> bool { diff --git a/chain-signatures/node/src/protocol/cryptography.rs b/chain-signatures/node/src/protocol/cryptography.rs index b9dfb2b3..6947d03e 100644 --- a/chain-signatures/node/src/protocol/cryptography.rs +++ b/chain-signatures/node/src/protocol/cryptography.rs @@ -1,15 +1,15 @@ use std::sync::PoisonError; +use super::message::MessageChannel; use super::signature::SignatureManager; use super::state::{GeneratingState, NodeState, ResharingState, RunningState}; use crate::config::Config; use crate::gcp::error::SecretStorageError; -use crate::http_client::SendError; +use crate::node_client::SendError; use crate::protocol::message::{GeneratingMessage, ResharingMessage}; use crate::protocol::presignature::PresignatureManager; use crate::protocol::state::{PersistentNodeData, WaitingForConsensusState}; use crate::protocol::MeshState; -use crate::protocol::MpcMessage; use crate::storage::secret_storage::SecretNodeStorageBox; use async_trait::async_trait; use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError}; @@ -17,15 +17,13 @@ use k256::elliptic_curve::group::GroupEncoding; use near_account_id::AccountId; use near_crypto::InMemorySigner; -#[async_trait::async_trait] pub trait CryptographicCtx { - async fn me(&self) -> Participant; - fn http_client(&self) -> &reqwest::Client; fn rpc_client(&self) -> &near_fetch::Client; fn signer(&self) -> &InMemorySigner; fn mpc_contract_id(&self) -> &AccountId; fn secret_storage(&mut self) -> &mut SecretNodeStorageBox; fn my_account_id(&self) -> &AccountId; + fn channel(&self) -> &MessageChannel; } #[derive(thiserror::Error, Debug)] @@ -74,10 +72,10 @@ impl CryptographicProtocol for GeneratingState { async fn progress( mut self, mut ctx: C, - cfg: Config, + _cfg: Config, mesh_state: MeshState, ) -> Result { - tracing::info!(active = ?mesh_state.active_participants.keys_vec(), "generating: progressing key generation"); + tracing::info!(active = ?mesh_state.active.keys_vec(), "generating: progressing key generation"); let mut protocol = self.protocol.write().await; loop { let action = match protocol.poke() { @@ -94,53 +92,40 @@ impl CryptographicProtocol for GeneratingState { Action::Wait => { drop(protocol); tracing::debug!("generating: waiting"); - let failures = self - .messages - .write() - .await - .send_encrypted( - ctx.me().await, - &cfg.local.network.sign_sk, - ctx.http_client(), - &mesh_state.active_participants, - &cfg.protocol, - ) - .await; - if !failures.is_empty() { - tracing::warn!( - active = ?mesh_state.active_participants.keys_vec(), - "generating(wait): failed to send encrypted message; {failures:?}" - ); - } - return Ok(NodeState::Generating(self)); } Action::SendMany(data) => { tracing::debug!("generating: sending a message to many participants"); - let mut messages = self.messages.write().await; - for (p, info) in mesh_state.active_participants.iter() { - if p == &ctx.me().await { + for p in mesh_state.active.keys() { + if p == &self.me { // Skip yourself, cait-sith never sends messages to oneself continue; } - messages.push( - Participant::from(info.id), - MpcMessage::Generating(GeneratingMessage { - from: ctx.me().await, - data: data.clone(), - }), - ); + + ctx.channel() + .send( + self.me, + *p, + GeneratingMessage { + from: self.me, + data: data.clone(), + }, + ) + .await; } } Action::SendPrivate(to, data) => { tracing::debug!("generating: sending a private message to {to:?}"); - self.messages.write().await.push( - to, - MpcMessage::Generating(GeneratingMessage { - from: ctx.me().await, - data, - }), - ); + ctx.channel() + .send( + self.me, + to, + GeneratingMessage { + from: self.me, + data, + }, + ) + .await; } Action::Return(r) => { tracing::info!( @@ -154,32 +139,12 @@ impl CryptographicProtocol for GeneratingState { public_key: r.public_key, }) .await?; - // Send any leftover messages - let failures = self - .messages - .write() - .await - .send_encrypted( - ctx.me().await, - &cfg.local.network.sign_sk, - ctx.http_client(), - &mesh_state.active_participants, - &cfg.protocol, - ) - .await; - if !failures.is_empty() { - tracing::warn!( - active = ?mesh_state.active_participants.keys_vec(), - "generating(return): failed to send encrypted message; {failures:?}" - ); - } return Ok(NodeState::WaitingForConsensus(WaitingForConsensusState { epoch: 0, participants: self.participants, threshold: self.threshold, private_share: r.private_share, public_key: r.public_key, - messages: self.messages, })); } } @@ -191,29 +156,10 @@ impl CryptographicProtocol for GeneratingState { impl CryptographicProtocol for WaitingForConsensusState { async fn progress( mut self, - ctx: C, - cfg: Config, - mesh_state: MeshState, + _ctx: C, + _cfg: Config, + _mesh_state: MeshState, ) -> Result { - let failures = self - .messages - .write() - .await - .send_encrypted( - ctx.me().await, - &cfg.local.network.sign_sk, - ctx.http_client(), - &mesh_state.active_participants, - &cfg.protocol, - ) - .await; - if !failures.is_empty() { - tracing::warn!( - active = ?mesh_state.active_participants.keys_vec(), - "waitingForConsensus: failed to send encrypted message; {failures:?}" - ); - } - // Wait for ConsensusProtocol step to advance state Ok(NodeState::WaitingForConsensus(self)) } @@ -224,15 +170,13 @@ impl CryptographicProtocol for ResharingState { async fn progress( mut self, mut ctx: C, - cfg: Config, + _cfg: Config, mesh_state: MeshState, ) -> Result { // TODO: we are not using active potential participants here, but we should in the future. // Currently resharing protocol does not timeout and restart with new set of participants. // So if it picks up a participant that is not active, it will never be able to send a message to it. - let active = mesh_state - .active_participants - .and(&mesh_state.potential_participants); + let active = mesh_state.active.and(&mesh_state.potential); tracing::info!(active = ?active.keys().collect::>(), "progressing key reshare"); let mut protocol = self.protocol.write().await; loop { @@ -247,65 +191,48 @@ impl CryptographicProtocol for ResharingState { return Err(err)?; } }; - tracing::debug!("got action ok"); match action { Action::Wait => { drop(protocol); tracing::debug!("resharing: waiting"); - let failures = self - .messages - .write() - .await - .send_encrypted( - ctx.me().await, - &cfg.local.network.sign_sk, - ctx.http_client(), - &active, - &cfg.protocol, - ) - .await; - if !failures.is_empty() { - tracing::warn!( - active = ?active.keys_vec(), - new = ?self.new_participants, - old = ?self.old_participants, - "resharing(wait): failed to send encrypted message; {failures:?}", - ); - } - return Ok(NodeState::Resharing(self)); } Action::SendMany(data) => { tracing::debug!("resharing: sending a message to all participants"); - let me = ctx.me().await; - let mut messages = self.messages.write().await; - for (p, _info) in self.new_participants.iter() { - if p == &me { + for p in self.new_participants.keys() { + if p == &self.me { // Skip yourself, cait-sith never sends messages to oneself continue; } - - messages.push( - *p, - MpcMessage::Resharing(ResharingMessage { - epoch: self.old_epoch, - from: me, - data: data.clone(), - }), - ) + ctx.channel() + .send( + self.me, + *p, + ResharingMessage { + epoch: self.old_epoch, + from: self.me, + data: data.clone(), + }, + ) + .await; } } Action::SendPrivate(to, data) => { tracing::debug!("resharing: sending a private message to {to:?}"); match self.new_participants.get(&to) { - Some(_) => self.messages.write().await.push( - to, - MpcMessage::Resharing(ResharingMessage { - epoch: self.old_epoch, - from: ctx.me().await, - data, - }), - ), + Some(_) => { + ctx.channel() + .send( + self.me, + to, + ResharingMessage { + epoch: self.old_epoch, + from: self.me, + data, + }, + ) + .await; + } None => return Err(CryptographicError::UnknownParticipant(to)), } } @@ -319,35 +246,12 @@ impl CryptographicProtocol for ResharingState { }) .await?; - // Send any leftover messages. - let failures = self - .messages - .write() - .await - .send_encrypted( - ctx.me().await, - &cfg.local.network.sign_sk, - ctx.http_client(), - &active, - &cfg.protocol, - ) - .await; - if !failures.is_empty() { - tracing::warn!( - active = ?active.keys_vec(), - new = ?self.new_participants, - old = ?self.old_participants, - "resharing(return): failed to send encrypted message; {failures:?}", - ); - } - return Ok(NodeState::WaitingForConsensus(WaitingForConsensusState { epoch: self.old_epoch + 1, participants: self.new_participants, threshold: self.threshold, private_share, public_key: self.public_key, - messages: self.messages, })); } } @@ -363,7 +267,7 @@ impl CryptographicProtocol for RunningState { cfg: Config, mesh_state: MeshState, ) -> Result { - let active = mesh_state.active_participants; + let active = mesh_state.active; if active.len() < self.threshold { tracing::warn!( active = ?active.keys_vec(), @@ -375,11 +279,12 @@ impl CryptographicProtocol for RunningState { let triple_task = self.triple_manager .clone() - .execute(&active, &cfg.protocol, self.messages.clone()); + .execute(&active, &cfg.protocol, ctx.channel()); - let presig_task = PresignatureManager::execute(&self, &active, &cfg.protocol); + let presig_task = + PresignatureManager::execute(&self, &active, &cfg.protocol, ctx.channel()); - let stable = mesh_state.stable_participants; + let stable = mesh_state.stable; tracing::debug!(?stable, "stable participants"); let sig_task = SignatureManager::execute(&self, &stable, &cfg.protocol, &ctx); @@ -390,28 +295,6 @@ impl CryptographicProtocol for RunningState { } } - let mut messages = self.messages.write().await; - crate::metrics::MESSAGE_QUEUE_SIZE - .with_label_values(&[ctx.my_account_id().as_str()]) - .set(messages.len() as i64); - - let failures = messages - .send_encrypted( - ctx.me().await, - &cfg.local.network.sign_sk, - ctx.http_client(), - &active, - &cfg.protocol, - ) - .await; - if !failures.is_empty() { - tracing::warn!( - active = ?active.keys_vec(), - "running: failed to send encrypted message; {failures:?}" - ); - } - drop(messages); - Ok(NodeState::Running(self)) } } diff --git a/chain-signatures/node/src/protocol/message.rs b/chain-signatures/node/src/protocol/message.rs index 6452eb76..10e3d876 100644 --- a/chain-signatures/node/src/protocol/message.rs +++ b/chain-signatures/node/src/protocol/message.rs @@ -1,29 +1,33 @@ +use super::contract::primitives::Participants; use super::cryptography::CryptographicError; use super::presignature::{GenerationError, PresignatureId}; use super::signature::SignRequestIdentifier; use super::state::{GeneratingState, NodeState, ResharingState, RunningState}; use super::triple::TripleId; use crate::gcp::error::SecretStorageError; -use crate::http_client::SendError; use crate::indexer::ContractSignRequest; +use crate::node_client::{NodeClient, SendError}; use crate::protocol::Config; use crate::protocol::MeshState; +use crate::types::Epoch; use crate::util; use async_trait::async_trait; use cait_sith::protocol::{InitializationError, MessageData, Participant, ProtocolError}; use k256::Scalar; +use mpc_contract::config::ProtocolConfig; use mpc_keys::hpke::{self, Ciphered}; +use near_account_id::AccountId; use near_crypto::Signature; use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; -use tokio::sync::RwLock; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::{mpsc, RwLock}; -#[async_trait::async_trait] -pub trait MessageCtx { - async fn me(&self) -> Participant; -} +pub const MAX_MESSAGE_INCOMING: usize = 1024 * 1024; +pub const MAX_MESSAGE_OUTGOING: usize = 1024 * 1024; #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct GeneratingMessage { @@ -31,35 +35,59 @@ pub struct GeneratingMessage { pub data: MessageData, } +impl From for Message { + fn from(msg: GeneratingMessage) -> Self { + Message::Generating(msg) + } +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct ResharingMessage { - pub epoch: u64, + pub epoch: Epoch, pub from: Participant, pub data: MessageData, } +impl From for Message { + fn from(msg: ResharingMessage) -> Self { + Message::Resharing(msg) + } +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct TripleMessage { pub id: u64, - pub epoch: u64, + pub epoch: Epoch, pub from: Participant, pub data: MessageData, // UNIX timestamp as seconds since the epoch pub timestamp: u64, } +impl From for Message { + fn from(msg: TripleMessage) -> Self { + Message::Triple(msg) + } +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct PresignatureMessage { pub id: u64, pub triple0: TripleId, pub triple1: TripleId, - pub epoch: u64, + pub epoch: Epoch, pub from: Participant, pub data: MessageData, // UNIX timestamp as seconds since the epoch pub timestamp: u64, } +impl From for Message { + fn from(msg: PresignatureMessage) -> Self { + Message::Presignature(msg) + } +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct SignatureMessage { pub request_id: [u8; 32], @@ -75,8 +103,14 @@ pub struct SignatureMessage { pub timestamp: u64, } +impl From for Message { + fn from(msg: SignatureMessage) -> Self { + Message::Signature(msg) + } +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] -pub enum MpcMessage { +pub enum Message { Generating(GeneratingMessage), Resharing(ResharingMessage), Triple(TripleMessage), @@ -84,52 +118,52 @@ pub enum MpcMessage { Signature(SignatureMessage), } -impl MpcMessage { +impl Message { pub const fn typename(&self) -> &'static str { match self { - MpcMessage::Generating(_) => "Generating", - MpcMessage::Resharing(_) => "Resharing", - MpcMessage::Triple(_) => "Triple", - MpcMessage::Presignature(_) => "Presignature", - MpcMessage::Signature(_) => "Signature", + Message::Generating(_) => "Generating", + Message::Resharing(_) => "Resharing", + Message::Triple(_) => "Triple", + Message::Presignature(_) => "Presignature", + Message::Signature(_) => "Signature", } } } #[derive(Default)] -pub struct MpcMessageQueue { +pub struct MessageInbox { generating: VecDeque, - resharing_bins: HashMap>, - triple_bins: HashMap>>, - presignature_bins: HashMap>>, - signature_bins: HashMap>>, + resharing: HashMap>, + triple: HashMap>>, + presignature: HashMap>>, + signature: HashMap>>, } -impl MpcMessageQueue { - pub fn push(&mut self, message: MpcMessage) { +impl MessageInbox { + pub fn push(&mut self, message: Message) { match message { - MpcMessage::Generating(message) => self.generating.push_back(message), - MpcMessage::Resharing(message) => self - .resharing_bins + Message::Generating(message) => self.generating.push_back(message), + Message::Resharing(message) => self + .resharing .entry(message.epoch) .or_default() .push_back(message), - MpcMessage::Triple(message) => self - .triple_bins + Message::Triple(message) => self + .triple .entry(message.epoch) .or_default() .entry(message.id) .or_default() .push_back(message), - MpcMessage::Presignature(message) => self - .presignature_bins + Message::Presignature(message) => self + .presignature .entry(message.epoch) .or_default() .entry(message.id) .or_default() .push_back(message), - MpcMessage::Signature(message) => self - .signature_bins + Message::Signature(message) => self + .signature .entry(message.epoch) .or_default() .entry(SignRequestIdentifier::new( @@ -141,10 +175,123 @@ impl MpcMessageQueue { .push_back(message), } } + + pub fn extend(&mut self, incoming: &mut mpsc::Receiver) -> usize { + let mut count = 0; + loop { + let msg = match incoming.try_recv() { + Ok(msg) => { + count += 1; + msg + } + Err(TryRecvError::Empty) => { + break; + } + Err(TryRecvError::Disconnected) => { + tracing::error!( + "inbox: communication disconnected, no more messages will be received" + ); + break; + } + }; + self.push(msg); + } + count + } +} + +struct MessageExecutor { + incoming: mpsc::Receiver, + outgoing: mpsc::Receiver, + inbox: Arc>, + outbox: MessageOutbox, + + config: Arc>, + mesh_state: Arc>, +} + +impl MessageExecutor { + pub async fn execute(mut self) { + let mut interval = tokio::time::interval(Duration::from_millis(100)); + loop { + interval.tick().await; + self.inbox.write().await.extend(&mut self.incoming); + + let (sign_sk, protocol) = { + let config = self.config.read().await; + ( + config.local.network.sign_sk.clone(), + config.protocol.clone(), + ) + }; + let active = { + let mesh_state = self.mesh_state.read().await; + mesh_state.active_with_potential() + }; + self.outbox.extend(&mut self.outgoing); + self.outbox.expire(&protocol); + let encrypted = self.outbox.encrypt(&sign_sk, &active); + self.outbox.send(&active, encrypted).await; + } + } +} + +#[derive(Clone)] +pub struct MessageChannel { + outgoing: mpsc::Sender, + inbox: Arc>, + _task: Arc>, +} + +impl MessageChannel { + pub async fn spawn( + client: NodeClient, + id: &AccountId, + config: &Arc>, + mesh_state: &Arc>, + ) -> (mpsc::Sender, Self) { + let (incoming_tx, incoming_rx) = mpsc::channel(MAX_MESSAGE_INCOMING); + let (outgoing_tx, outgoing_rx) = mpsc::channel(MAX_MESSAGE_OUTGOING); + + let inbox = Arc::new(RwLock::new(MessageInbox::default())); + let processor = MessageExecutor { + incoming: incoming_rx, + outgoing: outgoing_rx, + inbox: inbox.clone(), + config: config.clone(), + mesh_state: mesh_state.clone(), + outbox: MessageOutbox::new(client, id), + }; + + ( + incoming_tx, + Self { + inbox, + outgoing: outgoing_tx, + _task: Arc::new(tokio::spawn(processor.execute())), + }, + ) + } + + /// Grab the inbox for all the messages we received from the network. + pub fn inbox(&self) -> &Arc> { + &self.inbox + } + + /// Send a message to the participants in the network. + pub async fn send(&self, from: Participant, to: Participant, message: impl Into) { + if let Err(err) = self + .outgoing + .send((from, to, message.into(), Instant::now())) + .await + { + tracing::error!(?err, "failed to send message to participants"); + } + } } #[derive(thiserror::Error, Debug)] -pub enum MessageHandleError { +pub enum MessageRecvError { #[error("cait-sith initialization error: {0}")] CaitSithInitializationError(#[from] InitializationError), #[error("cait-sith protocol error: {0}")] @@ -167,7 +314,7 @@ pub enum MessageHandleError { SecretStorageError(#[from] SecretStorageError), } -impl From for MessageHandleError { +impl From for MessageRecvError { fn from(value: CryptographicError) -> Self { match value { CryptographicError::CaitSithInitializationError(e) => { @@ -187,27 +334,26 @@ impl From for MessageHandleError { } #[async_trait] -pub trait MessageHandler { - async fn handle( +pub trait MessageReceiver { + async fn recv( &mut self, - ctx: C, - queue: &mut MpcMessageQueue, + channel: &MessageChannel, cfg: Config, mesh_state: MeshState, - ) -> Result<(), MessageHandleError>; + ) -> Result<(), MessageRecvError>; } #[async_trait] -impl MessageHandler for GeneratingState { - async fn handle( +impl MessageReceiver for GeneratingState { + async fn recv( &mut self, - _ctx: C, - queue: &mut MpcMessageQueue, + channel: &MessageChannel, _cfg: Config, _mesh_state: MeshState, - ) -> Result<(), MessageHandleError> { + ) -> Result<(), MessageRecvError> { + let mut inbox = channel.inbox().write().await; let mut protocol = self.protocol.write().await; - while let Some(msg) = queue.generating.pop_front() { + while let Some(msg) = inbox.generating.pop_front() { tracing::debug!("handling new generating message"); protocol.message(msg.from, msg.data); } @@ -216,16 +362,16 @@ impl MessageHandler for GeneratingState { } #[async_trait] -impl MessageHandler for ResharingState { - async fn handle( +impl MessageReceiver for ResharingState { + async fn recv( &mut self, - _ctx: C, - queue: &mut MpcMessageQueue, + channel: &MessageChannel, _cfg: Config, _mesh_state: MeshState, - ) -> Result<(), MessageHandleError> { - tracing::debug!("handling {} resharing messages", queue.resharing_bins.len()); - let q = queue.resharing_bins.entry(self.old_epoch).or_default(); + ) -> Result<(), MessageRecvError> { + let mut inbox = channel.inbox().write().await; + tracing::debug!("handling {} resharing messages", inbox.resharing.len()); + let q = inbox.resharing.entry(self.old_epoch).or_default(); let mut protocol = self.protocol.write().await; while let Some(msg) = q.pop_front() { protocol.message(msg.from, msg.data); @@ -235,20 +381,20 @@ impl MessageHandler for ResharingState { } #[async_trait] -impl MessageHandler for RunningState { - async fn handle( +impl MessageReceiver for RunningState { + async fn recv( &mut self, - _ctx: C, - queue: &mut MpcMessageQueue, + channel: &MessageChannel, cfg: Config, mesh_state: MeshState, - ) -> Result<(), MessageHandleError> { + ) -> Result<(), MessageRecvError> { let protocol_cfg = &cfg.protocol; - let participants = &mesh_state.active_participants; + let participants = &mesh_state.active; + let mut inbox = channel.inbox().write().await; // remove the triple_id that has already failed or taken from the triple_bins // and refresh the timestamp of failed and taken - let triple_messages = queue.triple_bins.remove(&self.epoch).unwrap_or_default(); + let triple_messages = inbox.triple.remove(&self.epoch).unwrap_or_default(); for (id, mut queue) in triple_messages { if queue.is_empty() || queue.iter().any(|msg| { @@ -289,7 +435,7 @@ impl MessageHandler for RunningState { } let mut presignature_manager = self.presignature_manager.write().await; - let presignature_messages = queue.presignature_bins.entry(self.epoch).or_default(); + let presignature_messages = inbox.presignature.entry(self.epoch).or_default(); presignature_messages.retain(|id, queue| { // Skip message if it already timed out if queue.is_empty() @@ -380,7 +526,7 @@ impl MessageHandler for RunningState { } let mut signature_manager = self.signature_manager.write().await; - let signature_messages = queue.signature_bins.entry(self.epoch).or_default(); + let signature_messages = inbox.signature.entry(self.epoch).or_default(); signature_messages.retain(|sign_request_identifier, queue| { // Skip message if it already timed out if queue.is_empty() @@ -498,18 +644,17 @@ impl MessageHandler for RunningState { } #[async_trait] -impl MessageHandler for NodeState { - async fn handle( +impl MessageReceiver for NodeState { + async fn recv( &mut self, - ctx: C, - queue: &mut MpcMessageQueue, + channel: &MessageChannel, cfg: Config, mesh_state: MeshState, - ) -> Result<(), MessageHandleError> { + ) -> Result<(), MessageRecvError> { match self { - NodeState::Generating(state) => state.handle(ctx, queue, cfg, mesh_state).await, - NodeState::Resharing(state) => state.handle(ctx, queue, cfg, mesh_state).await, - NodeState::Running(state) => state.handle(ctx, queue, cfg, mesh_state).await, + NodeState::Generating(state) => state.recv(channel, cfg, mesh_state).await, + NodeState::Resharing(state) => state.recv(channel, cfg, mesh_state).await, + NodeState::Running(state) => state.recv(channel, cfg, mesh_state).await, _ => { tracing::debug!("skipping message processing"); Ok(()) @@ -592,3 +737,260 @@ where Ok(serde_json::from_slice(&msg)?) } } + +type SendMessage = (Participant, Participant, Message, Instant); + +/// Encrypted message with a reference to the old message. Only the ciphered portion of this +/// type will be sent over the wire, while the original message is kept just in case things +/// go wrong somewhere and the message needs to be requeued to be sent later. +type EncryptedWithOriginal = (Ciphered, SendMessage); + +/// Message outbox is the set of messages that are pending to be sent to other nodes. +/// These messages will be signed and encrypted before being sent out. +pub struct MessageOutbox { + client: NodeClient, + messages: VecDeque, + account_id: AccountId, +} + +impl MessageOutbox { + pub fn new(client: NodeClient, id: &AccountId) -> Self { + Self { + client, + messages: VecDeque::default(), + account_id: id.clone(), + } + } + + pub fn extend(&mut self, outgoing: &mut mpsc::Receiver) { + loop { + let (from, to, msg, instant) = match outgoing.try_recv() { + Ok(msg) => msg, + Err(TryRecvError::Empty) => { + break; + } + Err(TryRecvError::Disconnected) => { + tracing::error!( + "outbox: communication disconnected, no more messages will be received" + ); + break; + } + }; + self.messages.push_back((from, to, msg, instant)); + } + crate::metrics::MESSAGE_QUEUE_SIZE + .with_label_values(&[self.account_id.as_str()]) + .set(self.messages.len() as i64); + } + + /// Expire messages that have been in the outbox for too long. + pub fn expire(&mut self, cfg: &ProtocolConfig) { + // timeout errors are very common for a message expiring, so map them to counts here: + let mut timeouts = HashMap::::new(); + self.messages.retain(|(_, to, msg, instant)| { + if instant.elapsed() > timeout(msg, cfg) { + let counter = timeouts + .entry(format!( + "timeout message={} for node={to:?}", + msg.typename(), + )) + .or_insert(0); + *counter += 1; + false + } else { + true + } + }); + + if !timeouts.is_empty() { + tracing::warn!(?timeouts, "messages expired"); + } + } + + /// Encrypt all the messages in the outbox and return a map of participant to encrypted messages. + pub fn encrypt( + &mut self, + sign_sk: &near_crypto::SecretKey, + active: &Participants, + ) -> HashMap> { + // failed for when a participant is not active, so keep this message for next round. + let mut retry = VecDeque::new(); + let mut errors = Vec::new(); + let mut not_active = HashSet::new(); + + let mut encrypted = HashMap::new(); + while let Some((from, to, msg, instant)) = self.messages.pop_front() { + let Some(info) = active.get(&to) else { + not_active.insert(to); + retry.push_back((from, to, msg, instant)); + continue; + }; + + let encrypted_msg = match SignedMessage::encrypt(&msg, from, sign_sk, &info.cipher_pk) { + Ok(encrypted) => encrypted, + Err(err) => { + errors.push(SendError::EncryptionError(err.to_string())); + continue; + } + }; + let encrypted = encrypted.entry(to).or_insert_with(Vec::new); + encrypted.push((encrypted_msg, (from, to, msg, instant))); + } + + if !errors.is_empty() { + tracing::warn!(?errors, "outbox: encrypting messages failed on some"); + } + + if !not_active.is_empty() { + tracing::warn!( + ?not_active, + "some participants are not active even though mesh says they are" + ); + } + + // Add back the failed attempts for next time. + self.messages.extend(retry); + encrypted + } + + /// Compact together all the requests up to 256kb per request, and then send them out. + pub async fn send( + &mut self, + active: &Participants, + encrypted: HashMap>, + ) { + let start = Instant::now(); + let mut send_tasks = Vec::new(); + for (id, encrypted) in encrypted { + for partition in partition_ciphered_256kb(encrypted) { + let (encrypted_partition, msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip(); + // guaranteed to unwrap due to our previous loop check: + let info = active.get(&id).unwrap(); + let account_id = info.account_id.clone(); + let url = info.url.clone(); + + crate::metrics::NUM_SEND_ENCRYPTED_TOTAL + .with_label_values(&[account_id.as_str()]) + .inc_by(msgs.len() as f64); + + let client = self.client.clone(); + send_tasks.push(tokio::spawn(async move { + let start = Instant::now(); + if let Err(err) = client.msg(url, &encrypted_partition).await { + crate::metrics::NUM_SEND_ENCRYPTED_FAILURE + .with_label_values(&[account_id.as_str()]) + .inc_by(msgs.len() as f64); + crate::metrics::FAILED_SEND_ENCRYPTED_LATENCY + .with_label_values(&[account_id.as_str()]) + .observe(start.elapsed().as_millis() as f64); + Err((msgs, err)) + } else { + crate::metrics::SEND_ENCRYPTED_LATENCY + .with_label_values(&[account_id.as_str()]) + .observe(start.elapsed().as_millis() as f64); + Ok(msgs.len()) + } + })); + } + } + + let mut errors = Vec::new(); + let mut retry = VecDeque::new(); + let mut uncompacted = 0; + let mut compacted = 0; + for task in send_tasks { + match task.await { + Ok(Ok(msgs_len)) => { + uncompacted += msgs_len; + compacted += 1; + } + Ok(Err((msgs, err))) => { + // since we failed, put back all the messages related to this + retry.extend(msgs); + errors.push(err); + } + Err(err) => { + tracing::warn!(?err, "outbox: task failed to send message"); + } + } + } + + if uncompacted > 0 { + tracing::debug!( + uncompacted, + compacted, + "sent messages in {:?}", + start.elapsed() + ); + } + + if !errors.is_empty() { + tracing::warn!(?errors, "outbox: failed sending encrypted messages"); + } + + // Add back the failed attempts for next time. + self.messages.extend(retry); + } +} + +fn partition_ciphered_256kb( + encrypted: Vec, +) -> Vec> { + let mut result = Vec::new(); + let mut current_partition = Vec::new(); + let mut current_size: usize = 0; + + for ciphered in encrypted { + let bytesize = ciphered.0.text.len(); + if current_size + bytesize > 256 * 1024 { + // If adding this byte vector exceeds 256kb, start a new partition + result.push(current_partition); + current_partition = Vec::new(); + current_size = 0; + } + current_partition.push(ciphered); + current_size += bytesize; + } + + if !current_partition.is_empty() { + // Add the last partition + result.push(current_partition); + } + + result +} + +fn timeout(msg: &Message, cfg: &ProtocolConfig) -> Duration { + match msg { + Message::Generating(_) => Duration::from_millis(cfg.message_timeout), + Message::Resharing(_) => Duration::from_millis(cfg.message_timeout), + Message::Triple(_) => Duration::from_millis(cfg.triple.generation_timeout), + Message::Presignature(_) => Duration::from_millis(cfg.presignature.generation_timeout), + Message::Signature(_) => Duration::from_millis(cfg.signature.generation_timeout), + } +} + +#[cfg(test)] +mod tests { + use crate::protocol::message::{GeneratingMessage, Message}; + + #[test] + fn test_sending_encrypted_message() { + let associated_data = b""; + let (sk, pk) = mpc_keys::hpke::generate(); + let starting_message = Message::Generating(GeneratingMessage { + from: cait_sith::protocol::Participant::from(0), + data: vec![], + }); + + let message = serde_json::to_vec(&starting_message).unwrap(); + let message = pk.encrypt(&message, associated_data).unwrap(); + + let message = serde_json::to_vec(&message).unwrap(); + let cipher = serde_json::from_slice(&message).unwrap(); + let message = sk.decrypt(&cipher, associated_data).unwrap(); + let message: Message = serde_json::from_slice(&message).unwrap(); + + assert_eq!(starting_message, message); + } +} diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 4be7f956..f6c0bb21 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -12,33 +12,29 @@ pub use consensus::ConsensusError; pub use contract::primitives::ParticipantInfo; pub use contract::ProtocolState; pub use cryptography::CryptographicError; -pub use message::MpcMessage; -pub use signature::SignQueue; -pub use signature::SignRequest; +pub use message::{Message, MessageChannel}; +pub use signature::{SignQueue, SignRequest}; pub use state::NodeState; pub use sysinfo::{Components, CpuRefreshKind, Disks, RefreshKind, System}; use self::consensus::ConsensusCtx; use self::cryptography::CryptographicCtx; -use self::message::MessageCtx; use crate::config::Config; -use crate::http_client; use crate::mesh::MeshState; use crate::protocol::consensus::ConsensusProtocol; use crate::protocol::cryptography::CryptographicProtocol; -use crate::protocol::message::{MessageHandler, MpcMessageQueue}; +use crate::protocol::message::MessageReceiver as _; use crate::storage::presignature_storage::PresignatureStorage; use crate::storage::secret_storage::SecretNodeStorageBox; use crate::storage::triple_storage::TripleStorage; -use cait_sith::protocol::Participant; use near_account_id::AccountId; use near_crypto::InMemorySigner; use reqwest::IntoUrl; use std::path::Path; use std::time::Instant; use std::{sync::Arc, time::Duration}; -use tokio::sync::mpsc::{self, error::TryRecvError}; +use tokio::sync::mpsc; use tokio::sync::RwLock; use url::Url; @@ -48,12 +44,10 @@ struct Ctx { mpc_contract_id: AccountId, signer: InMemorySigner, rpc_client: near_fetch::Client, - http_client: reqwest::Client, sign_rx: Arc>>, secret_storage: SecretNodeStorageBox, triple_storage: TripleStorage, presignature_storage: PresignatureStorage, - message_options: http_client::Options, } impl ConsensusCtx for &mut MpcSignProtocol { @@ -61,10 +55,6 @@ impl ConsensusCtx for &mut MpcSignProtocol { &self.ctx.account_id } - fn http_client(&self) -> &reqwest::Client { - &self.ctx.http_client - } - fn rpc_client(&self) -> &near_fetch::Client { &self.ctx.rpc_client } @@ -96,22 +86,9 @@ impl ConsensusCtx for &mut MpcSignProtocol { fn presignature_storage(&self) -> &PresignatureStorage { &self.ctx.presignature_storage } - - fn message_options(&self) -> http_client::Options { - self.ctx.message_options.clone() - } } -#[async_trait::async_trait] impl CryptographicCtx for &mut MpcSignProtocol { - async fn me(&self) -> Participant { - get_my_participant(self).await - } - - fn http_client(&self) -> &reqwest::Client { - &self.ctx.http_client - } - fn rpc_client(&self) -> &near_fetch::Client { &self.ctx.rpc_client } @@ -131,18 +108,15 @@ impl CryptographicCtx for &mut MpcSignProtocol { fn secret_storage(&mut self) -> &mut SecretNodeStorageBox { &mut self.ctx.secret_storage } -} -#[async_trait::async_trait] -impl MessageCtx for &MpcSignProtocol { - async fn me(&self) -> Participant { - get_my_participant(self).await + fn channel(&self) -> &MessageChannel { + &self.channel } } pub struct MpcSignProtocol { ctx: Ctx, - receiver: mpsc::Receiver, + channel: MessageChannel, state: Arc>, } @@ -154,12 +128,11 @@ impl MpcSignProtocol { account_id: AccountId, rpc_client: near_fetch::Client, signer: InMemorySigner, - receiver: mpsc::Receiver, + channel: MessageChannel, sign_rx: mpsc::Receiver, secret_storage: SecretNodeStorageBox, triple_storage: TripleStorage, presignature_storage: PresignatureStorage, - message_options: http_client::Options, ) -> (Self, Arc>) { let my_address = my_address.into_url().unwrap(); let rpc_url = rpc_client.rpc_addr(); @@ -178,17 +151,15 @@ impl MpcSignProtocol { account_id, mpc_contract_id, rpc_client, - http_client: reqwest::Client::new(), sign_rx: Arc::new(RwLock::new(sign_rx)), signer, secret_storage, triple_storage, presignature_storage, - message_options, }; let protocol = MpcSignProtocol { ctx, - receiver, + channel, state: state.clone(), }; (protocol, state) @@ -200,15 +171,16 @@ impl MpcSignProtocol { config: Arc>, mesh_state: Arc>, ) -> anyhow::Result<()> { - let my_account_id = self.ctx.account_id.to_string(); + let my_account_id = self.ctx.account_id.as_str(); let _span = tracing::info_span!("running", my_account_id); + let my_account_id = self.ctx.account_id.clone(); + crate::metrics::NODE_RUNNING .with_label_values(&[my_account_id.as_str()]) .set(1); crate::metrics::NODE_VERSION .with_label_values(&[my_account_id.as_str()]) .set(node_version()); - let mut queue = MpcMessageQueue::default(); let mut last_hardware_pull = Instant::now(); loop { @@ -216,30 +188,13 @@ impl MpcSignProtocol { tracing::debug!("trying to advance chain signatures protocol"); // Hardware metric refresh if last_hardware_pull.elapsed() > Duration::from_secs(5) { - update_system_metrics(&my_account_id); + update_system_metrics(my_account_id.as_str()); last_hardware_pull = Instant::now(); } crate::metrics::PROTOCOL_ITER_CNT .with_label_values(&[my_account_id.as_str()]) .inc(); - loop { - let msg_result = self.receiver.try_recv(); - match msg_result { - Ok(msg) => { - tracing::debug!("received a new message"); - queue.push(msg); - } - Err(TryRecvError::Empty) => { - tracing::debug!("no new messages received"); - break; - } - Err(TryRecvError::Disconnected) => { - tracing::warn!("communication was disconnected, no more messages will be received, spinning down"); - return Ok(()); - } - } - } let contract_state = { let state = contract_state.read().await; @@ -278,8 +233,8 @@ impl MpcSignProtocol { .with_label_values(&[my_account_id.as_str()]) .observe(crypto_time.elapsed().as_secs_f64()); - let consensus_time = Instant::now(); if let Some(contract_state) = contract_state { + let consensus_time = Instant::now(); let from_state = format!("{state}"); state = match state.advance(&mut self, contract_state, cfg.clone()).await { Ok(state) => { @@ -292,14 +247,14 @@ impl MpcSignProtocol { continue; } }; + crate::metrics::PROTOCOL_LATENCY_ITER_CONSENSUS + .with_label_values(&[my_account_id.as_str()]) + .observe(consensus_time.elapsed().as_secs_f64()); } - crate::metrics::PROTOCOL_LATENCY_ITER_CONSENSUS - .with_label_values(&[my_account_id.as_str()]) - .observe(consensus_time.elapsed().as_secs_f64()); let message_time = Instant::now(); - if let Err(err) = state.handle(&self, &mut queue, cfg, mesh_state).await { - tracing::warn!("protocol unable to handle messages: {err:?}"); + if let Err(err) = state.recv(&self.channel, cfg, mesh_state).await { + tracing::warn!("protocol unable to receive messages: {err:?}"); } crate::metrics::PROTOCOL_LATENCY_ITER_MESSAGE .with_label_values(&[my_account_id.as_str()]) @@ -328,18 +283,6 @@ impl MpcSignProtocol { } } -async fn get_my_participant(protocol: &MpcSignProtocol) -> Participant { - let my_near_acc_id = &protocol.ctx.account_id; - let state = protocol.state.read().await; - let participant_info = state - .find_participant_info(my_near_acc_id) - .unwrap_or_else(|| { - tracing::error!("could not find participant info for {my_near_acc_id}"); - panic!("could not find participant info for {my_near_acc_id}"); - }); - participant_info.id.into() -} - /// our release versions take the form of "1.0.0-rc.2" fn node_version() -> i64 { let version = semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap(); diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 6e077707..861fc46d 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -1,4 +1,4 @@ -use super::message::PresignatureMessage; +use super::message::{MessageChannel, PresignatureMessage}; use super::state::RunningState; use super::triple::{Triple, TripleId, TripleManager}; use crate::protocol::contract::primitives::Participants; @@ -554,15 +554,13 @@ impl PresignatureManager { } } - /// Pokes all of the ongoing generation protocols and returns a vector of - /// messages to be sent to the respective participant. - /// - /// An empty vector means we cannot progress until we receive a new message. - pub async fn poke(&mut self) -> Vec<(Participant, PresignatureMessage)> { - let mut messages = Vec::new(); + /// Poke all ongoing presignature generation protocols to completion. + pub async fn poke(&mut self, channel: MessageChannel) { let mut errors = Vec::new(); let mut presignatures = Vec::new(); - self.generators.retain(|id, generator| { + + let mut remove = Vec::new(); + for (id, generator) in self.generators.iter_mut() { loop { let action = match generator.poke() { Ok(action) => action, @@ -573,43 +571,55 @@ impl PresignatureManager { self.gc.insert(*id, Instant::now()); self.introduced.remove(id); errors.push(e); - break false; + remove.push(*id); + break; } }; match action { Action::Wait => { tracing::debug!("presignature: waiting"); // Retain protocol until we are finished - return true; + break; } Action::SendMany(data) => { - for p in generator.participants.iter() { - messages.push(( - *p, + for to in generator.participants.iter() { + if *to == self.me { + continue; + } + channel + .send( + self.me, + *to, + PresignatureMessage { + id: *id, + triple0: generator.triple0, + triple1: generator.triple1, + epoch: self.epoch, + from: self.me, + data: data.clone(), + timestamp: Utc::now().timestamp() as u64, + }, + ) + .await; + } + } + Action::SendPrivate(to, data) => { + channel + .send( + self.me, + to, PresignatureMessage { id: *id, triple0: generator.triple0, triple1: generator.triple1, epoch: self.epoch, from: self.me, - data: data.clone(), - timestamp: Utc::now().timestamp() as u64 + data, + timestamp: Utc::now().timestamp() as u64, }, - )) - } + ) + .await; } - Action::SendPrivate(p, data) => messages.push(( - p, - PresignatureMessage { - id: *id, - triple0: generator.triple0, - triple1: generator.triple1, - epoch: self.epoch, - from: self.me, - data, - timestamp: Utc::now().timestamp() as u64 - }, - )), Action::Return(output) => { tracing::info!( id, @@ -638,11 +648,16 @@ impl PresignatureManager { .with_label_values(&[self.my_account_id.as_str()]) .inc(); // Do not retain the protocol - return false; + remove.push(*id); + break; } } } - }); + } + + for id in remove { + self.generators.remove(&id); + } for (presignature, mine) in presignatures { self.insert(presignature, mine, false).await; @@ -651,14 +666,13 @@ impl PresignatureManager { if !errors.is_empty() { tracing::warn!(?errors, "failed to generate some presignatures"); } - - messages } pub fn execute( state: &RunningState, active: &Participants, protocol_cfg: &ProtocolConfig, + channel: &MessageChannel, ) -> tokio::task::JoinHandle<()> { let triple_manager = state.triple_manager.clone(); let presignature_manager = state.presignature_manager.clone(); @@ -666,7 +680,7 @@ impl PresignatureManager { let protocol_cfg = protocol_cfg.clone(); let pk = state.public_key; let sk_share = state.private_share; - let messages = state.messages.clone(); + let channel = channel.clone(); tokio::task::spawn(async move { let mut presignature_manager = presignature_manager.write().await; @@ -676,17 +690,7 @@ impl PresignatureManager { { tracing::warn!(?err, "running: failed to stockpile presignatures"); } - - { - let mut messages = messages.write().await; - messages.extend( - presignature_manager - .poke() - .await - .into_iter() - .map(|(p, msg)| (p, super::MpcMessage::Presignature(msg))), - ); - } + presignature_manager.poke(channel).await; crate::metrics::NUM_PRESIGNATURES_MINE .with_label_values(&[presignature_manager.my_account_id.as_str()]) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index b8b87d9b..f0d58c22 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1,9 +1,9 @@ use super::contract::primitives::Participants; -use super::message::SignatureMessage; use super::presignature::{GenerationError, Presignature, PresignatureId, PresignatureManager}; use super::state::RunningState; use crate::indexer::ContractSignRequest; use crate::kdf::{derive_delta, into_eth_sig}; +use crate::protocol::message::{MessageChannel, SignatureMessage}; use crate::types::SignatureProtocol; use crate::util::AffinePointExt; use near_primitives::hash::CryptoHash; @@ -492,19 +492,17 @@ impl SignatureManager { } } - /// Pokes all of the ongoing generation protocols and returns a vector of - /// messages to be sent to the respective participant. - /// - /// An empty vector means we cannot progress until we receive a new message. - pub fn poke(&mut self) -> Vec<(Participant, SignatureMessage)> { - let mut messages = Vec::new(); - self.generators.retain(|sign_request_identifier, generator| { + /// Pokes all of the ongoing generation protocols to completion + pub async fn poke(&mut self, channel: MessageChannel) { + let mut remove = Vec::new(); + for (sign_request_id, generator) in self.generators.iter_mut() { loop { let action = match generator.poke() { Ok(action) => action, Err(err) => { if generator.proposer == self.me { - if generator.sign_request_timestamp.elapsed() < generator.timeout_total { + if generator.sign_request_timestamp.elapsed() < generator.timeout_total + { tracing::warn!(?err, "signature failed to be produced; pushing request back into failed queue"); crate::metrics::SIGNATURE_GENERATOR_FAILURES .with_label_values(&[self.my_account_id.as_str()]) @@ -512,42 +510,73 @@ impl SignatureManager { // only retry the signature generation if it was initially proposed by us. We do not // want any nodes to be proposing the same signature multiple times. self.failed.push_back(( - sign_request_identifier.clone(), + sign_request_id.clone(), GenerationRequest { proposer: generator.proposer, request: generator.request.clone(), epsilon: generator.epsilon, request_id: generator.request_id, entropy: generator.entropy, - sign_request_timestamp: generator.sign_request_timestamp + sign_request_timestamp: generator.sign_request_timestamp, }, )); } else { - self.completed.insert(sign_request_identifier.clone(), Instant::now()); + self.completed + .insert(sign_request_id.clone(), Instant::now()); crate::metrics::SIGNATURE_GENERATOR_FAILURES .with_label_values(&[self.my_account_id.as_str()]) .inc(); crate::metrics::SIGNATURE_FAILURES .with_label_values(&[self.my_account_id.as_str()]) .inc(); - tracing::warn!(?err, "signature failed to be produced; trashing request"); + tracing::warn!( + ?err, + "signature failed to be produced; trashing request" + ); } } - break false; + remove.push(sign_request_id.clone()); + break; } }; match action { Action::Wait => { tracing::debug!("signature: waiting"); // Retain protocol until we are finished - return true; + break; } Action::SendMany(data) => { - for p in generator.participants.iter() { - messages.push(( - *p, + for to in generator.participants.iter() { + if *to == self.me { + continue; + } + channel + .send( + self.me, + *to, + SignatureMessage { + request_id: sign_request_id.request_id, + proposer: generator.proposer, + presignature_id: generator.presignature_id, + request: generator.request.clone(), + epsilon: generator.epsilon, + entropy: generator.entropy, + epoch: self.epoch, + from: self.me, + data: data.clone(), + timestamp: Utc::now().timestamp() as u64, + }, + ) + .await; + } + } + Action::SendPrivate(to, data) => { + channel + .send( + self.me, + to, SignatureMessage { - request_id: sign_request_identifier.request_id, + request_id: sign_request_id.request_id, proposer: generator.proposer, presignature_id: generator.presignature_id, request: generator.request.clone(), @@ -555,52 +584,47 @@ impl SignatureManager { entropy: generator.entropy, epoch: self.epoch, from: self.me, - data: data.clone(), - timestamp: Utc::now().timestamp() as u64 + data, + timestamp: Utc::now().timestamp() as u64, }, - )) - } + ) + .await } - Action::SendPrivate(p, data) => messages.push(( - p, - SignatureMessage { - request_id: sign_request_identifier.request_id, - proposer: generator.proposer, - presignature_id: generator.presignature_id, - request: generator.request.clone(), - epsilon: generator.epsilon, - entropy: generator.entropy, - epoch: self.epoch, - from: self.me, - data, - timestamp: Utc::now().timestamp() as u64 - }, - )), Action::Return(output) => { tracing::info!( - sign_request_identifier =?sign_request_identifier.clone(), + ?sign_request_id, me = ?self.me, presignature_id = generator.presignature_id, big_r = ?output.big_r.to_base58(), s = ?output.s, "completed signature generation" ); - self.completed.insert(sign_request_identifier.clone(), Instant::now()); + self.completed + .insert(sign_request_id.clone(), Instant::now()); let request = SignatureRequest { - epsilon: SerializableScalar {scalar: generator.epsilon}, + epsilon: SerializableScalar { + scalar: generator.epsilon, + }, payload_hash: generator.request.payload.into(), }; if generator.proposer == self.me { - self.signatures - .push(ToPublish::new(sign_request_identifier.request_id, request, generator.sign_request_timestamp, output)); + self.signatures.push(ToPublish::new( + sign_request_id.request_id, + request, + generator.sign_request_timestamp, + output, + )); } // Do not retain the protocol - return false; + remove.push(sign_request_id.clone()); } } } - }); - messages + } + + for sign_request_id in remove { + self.generators.remove(&sign_request_id); + } } pub async fn handle_requests( @@ -819,12 +843,12 @@ impl SignatureManager { ) -> tokio::task::JoinHandle<()> { let presignature_manager = state.presignature_manager.clone(); let signature_manager = state.signature_manager.clone(); - let messages = state.messages.clone(); let stable = stable.clone(); let protocol_cfg = protocol_cfg.clone(); let rpc_client = ctx.rpc_client().clone(); let signer = ctx.signer().clone(); let mpc_contract_id = ctx.mpc_contract_id().clone(); + let channel = ctx.channel().clone(); // NOTE: signatures should only use stable and not active participants. The difference here is that // stable participants utilizes more than the online status of a node, such as whether or not their @@ -838,17 +862,7 @@ impl SignatureManager { .handle_requests(&stable, &mut presignature_manager, &protocol_cfg) .await; drop(presignature_manager); - - { - let mut messages = messages.write().await; - messages.extend( - signature_manager - .poke() - .into_iter() - .map(|(p, msg)| (p, crate::protocol::MpcMessage::Signature(msg))), - ); - } - + signature_manager.poke(channel).await; signature_manager .publish(&rpc_client, &signer, &mpc_contract_id) .await; diff --git a/chain-signatures/node/src/protocol/state.rs b/chain-signatures/node/src/protocol/state.rs index 31a4bb6e..e0d14146 100644 --- a/chain-signatures/node/src/protocol/state.rs +++ b/chain-signatures/node/src/protocol/state.rs @@ -3,7 +3,6 @@ use super::cryptography::CryptographicError; use super::presignature::PresignatureManager; use super::signature::SignatureManager; use super::triple::TripleManager; -use crate::http_client::MessageQueue; use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; use cait_sith::protocol::Participant; @@ -38,10 +37,10 @@ pub struct StartedState { #[derive(Clone)] pub struct GeneratingState { + pub me: Participant, pub participants: Participants, pub threshold: usize, pub protocol: KeygenProtocol, - pub messages: Arc>, } impl GeneratingState { @@ -60,7 +59,6 @@ pub struct WaitingForConsensusState { pub threshold: usize, pub private_share: SecretKeyShare, pub public_key: PublicKey, - pub messages: Arc>, } impl fmt::Debug for WaitingForConsensusState { @@ -93,7 +91,6 @@ pub struct RunningState { pub triple_manager: TripleManager, pub presignature_manager: Arc>, pub signature_manager: Arc>, - pub messages: Arc>, } impl RunningState { @@ -107,13 +104,13 @@ impl RunningState { #[derive(Clone)] pub struct ResharingState { + pub me: Participant, pub old_epoch: u64, pub old_participants: Participants, pub new_participants: Participants, pub threshold: usize, pub public_key: PublicKey, pub protocol: ReshareProtocol, - pub messages: Arc>, } impl ResharingState { diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 2acc7817..f9bcd9c2 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -1,8 +1,7 @@ use super::contract::primitives::Participants; use super::cryptography::CryptographicError; -use super::message::TripleMessage; +use super::message::{MessageChannel, TripleMessage}; use super::presignature::GenerationError; -use crate::protocol::MpcMessage; use crate::storage::triple_storage::TripleStorage; use crate::types::TripleProtocol; use crate::util::AffinePointExt; @@ -30,10 +29,7 @@ use near_account_id::AccountId; /// messages. pub type TripleId = u64; -type GeneratorOutcome = ( - TripleId, - Result<(Vec<(Participant, TripleMessage)>, Option<(Triple, bool)>), ProtocolError>, -); +type GeneratorOutcome = (TripleId, Result, ProtocolError>); // TODO: why do we have Clone here? Triples can not be reused. /// A completed triple. @@ -91,11 +87,12 @@ impl TripleGenerator { me: Participant, my_account_id: &AccountId, epoch: u64, + channel: MessageChannel, ) -> JoinHandle { tokio::task::spawn({ let mut generator = self.clone(); let my_account_id = my_account_id.clone(); - async move { generator.execute(me, &my_account_id, epoch).await } + async move { generator.execute(me, &my_account_id, epoch, channel).await } }) } @@ -121,8 +118,8 @@ impl TripleGenerator { me: Participant, my_account_id: &AccountId, epoch: u64, + channel: MessageChannel, ) -> GeneratorOutcome { - let mut messages = Vec::new(); loop { let action = match self.poke().await { Ok(action) => action, @@ -151,40 +148,56 @@ impl TripleGenerator { Action::Wait => { tracing::debug!("triple: waiting"); // Retain protocol until we are finished - break (self.id, Ok((messages, None))); + break (self.id, Ok(None)); } Action::SendMany(data) => { - for p in &self.participants { - messages.push(( - *p, + for to in &self.participants { + if *to == me { + continue; + } + + channel + .send( + me, + *to, + TripleMessage { + id: self.id, + epoch, + from: me, + data: data.clone(), + timestamp: Utc::now().timestamp() as u64, + }, + ) + .await; + } + } + Action::SendPrivate(to, data) => { + channel + .send( + me, + to, TripleMessage { id: self.id, epoch, from: me, - data: data.clone(), + data, timestamp: Utc::now().timestamp() as u64, }, - )) - } + ) + .await } - Action::SendPrivate(p, data) => messages.push(( - p, - TripleMessage { - id: self.id, - epoch, - from: me, - data, - timestamp: Utc::now().timestamp() as u64, - }, - )), Action::Return(output) => { - // elapsed = ?generator.timestamp.unwrap().elapsed(), + let elapsed = { + let timestamp = self.timestamp.read().await; + timestamp.map(|t| t.elapsed()).unwrap_or_default() + }; tracing::info!( id = self.id, ?me, big_a = ?output.1.big_a.to_base58(), big_b = ?output.1.big_b.to_base58(), big_c = ?output.1.big_c.to_base58(), + ?elapsed, "completed triple generation" ); @@ -231,7 +244,7 @@ impl TripleGenerator { .inc(); } - break (self.id, Ok((messages, Some((triple, triple_is_mine))))); + break (self.id, Ok(Some((triple, triple_is_mine)))); } } } @@ -337,11 +350,8 @@ impl TripleTasks { my_account_id: &AccountId, epoch: u64, cfg: &ProtocolConfig, - ) -> ( - Vec<(Triple, bool)>, - Vec<(Participant, TripleMessage)>, - HashMap, - ) { + channel: MessageChannel, + ) -> (Vec<(Triple, bool)>, HashMap) { // Add more protocols to the ongoing pool if there is space. let to_generate_len = cfg.max_concurrent_generation as usize - self.ongoing.len(); if !self.queued.is_empty() && to_generate_len > 0 { @@ -349,8 +359,10 @@ impl TripleTasks { if let Some(id) = self.queued.pop_front() { self.ongoing.insert(id); let generator = self.generators.get(&id).unwrap(); - self.ongoing_tasks - .push_back((id, generator.spawn_execution(me, my_account_id, epoch))); + self.ongoing_tasks.push_back(( + id, + generator.spawn_execution(me, my_account_id, epoch, channel.clone()), + )); } } } @@ -363,13 +375,14 @@ impl TripleTasks { .any(|(running_id, _)| running_id == id) { let generator = self.generators.get(id).unwrap(); - self.ongoing_tasks - .push_back((*id, generator.spawn_execution(me, my_account_id, epoch))); + self.ongoing_tasks.push_back(( + *id, + generator.spawn_execution(me, my_account_id, epoch, channel.clone()), + )); } } let mut triples = Vec::new(); - let mut messages = Vec::new(); let mut errors = HashMap::new(); let mut interval = tokio::time::interval(Duration::from_millis(5)); @@ -400,12 +413,10 @@ impl TripleTasks { } }; match outcome { - Ok((mut msgs, triple)) => { - if let Some((triple, mine)) = triple { - self.remove(id); - triples.push((triple, mine)); - } - messages.append(&mut msgs); + Ok(None) => {} + Ok(Some((triple, mine))) => { + self.remove(id); + triples.push((triple, mine)); } Err(e) => { tracing::info!(id, ?e, "triple completed with error"); @@ -415,7 +426,7 @@ impl TripleTasks { } } - (triples, messages, errors) + (triples, errors) } } @@ -736,11 +747,11 @@ impl TripleManager { /// messages to be sent to the respective participant. /// /// An empty vector means we cannot progress until we receive a new message. - pub async fn poke(&self, cfg: &ProtocolConfig) -> Vec<(Participant, TripleMessage)> { - let (triples, messages, errors) = { + pub async fn poke(&self, cfg: &ProtocolConfig, channel: MessageChannel) { + let (triples, errors) = { let mut tasks = self.tasks.write().await; tasks - .poke(self.me, &self.my_account_id, self.epoch, cfg) + .poke(self.me, &self.my_account_id, self.epoch, cfg, channel) .await }; @@ -755,33 +766,23 @@ impl TripleManager { for (triple, mine) in triples { self.insert(triple, mine, false).await; } - - messages } pub fn execute( self, active: &Participants, protocol_cfg: &ProtocolConfig, - messages: Arc>, + channel: &MessageChannel, ) -> JoinHandle<()> { let active = active.clone(); let protocol_cfg = protocol_cfg.clone(); + let channel = channel.clone(); tokio::task::spawn(async move { if let Err(err) = self.stockpile(&active, &protocol_cfg).await { tracing::warn!(?err, "running: failed to stockpile triples"); } - - { - let mut messages = messages.write().await; - messages.extend( - self.poke(&protocol_cfg) - .await - .into_iter() - .map(|(p, msg)| (p, MpcMessage::Triple(msg))), - ); - } + self.poke(&protocol_cfg, channel).await; crate::metrics::NUM_TRIPLES_MINE .with_label_values(&[self.my_account_id.as_str()]) diff --git a/chain-signatures/node/src/types.rs b/chain-signatures/node/src/types.rs index 3a8b4ef2..14d82909 100644 --- a/chain-signatures/node/src/types.rs +++ b/chain-signatures/node/src/types.rs @@ -16,6 +16,8 @@ pub type TripleProtocol = pub type PresignatureProtocol = Box> + Send + Sync>; pub type SignatureProtocol = Box> + Send + Sync>; +pub type Epoch = u64; + #[derive(Clone)] pub struct KeygenProtocol { me: Participant, diff --git a/chain-signatures/node/src/web/error.rs b/chain-signatures/node/src/web/error.rs index 71527289..a74e36c3 100644 --- a/chain-signatures/node/src/web/error.rs +++ b/chain-signatures/node/src/web/error.rs @@ -2,7 +2,7 @@ use axum::extract::rejection::JsonRejection; use reqwest::StatusCode; use tokio::sync::mpsc::error::SendError; -use crate::protocol::{ConsensusError, CryptographicError, MpcMessage}; +use crate::protocol::{ConsensusError, CryptographicError, Message}; pub type Result = std::result::Result; @@ -17,7 +17,7 @@ pub enum Error { #[error(transparent)] Cryptography(#[from] CryptographicError), #[error(transparent)] - Message(#[from] SendError), + Message(#[from] SendError), #[error(transparent)] Rpc(#[from] near_fetch::Error), } diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index d624ffc8..cec9ae26 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -3,7 +3,7 @@ mod error; use self::error::Error; use crate::indexer::Indexer; use crate::protocol::message::SignedMessage; -use crate::protocol::{MpcMessage, NodeState}; +use crate::protocol::{Message, NodeState}; use crate::web::error::Result; use anyhow::Context; use axum::http::StatusCode; @@ -19,7 +19,7 @@ use std::{net::SocketAddr, sync::Arc}; use tokio::sync::{mpsc::Sender, RwLock}; struct AxumState { - sender: Sender, + sender: Sender, protocol_state: Arc>, cipher_sk: hpke::SecretKey, indexer: Indexer, @@ -27,7 +27,7 @@ struct AxumState { pub async fn run( port: u16, - sender: Sender, + sender: Sender, cipher_sk: hpke::SecretKey, protocol_state: Arc>, indexer: Indexer, diff --git a/integration-tests/src/lib.rs b/integration-tests/src/lib.rs index e4c1e786..e1042f2a 100644 --- a/integration-tests/src/lib.rs +++ b/integration-tests/src/lib.rs @@ -17,10 +17,8 @@ use futures::StreamExt; use mpc_contract::config::{PresignatureConfig, ProtocolConfig, TripleConfig}; use mpc_contract::primitives::CandidateInfo; use mpc_node::gcp::GcpService; -use mpc_node::http_client; -use mpc_node::mesh; -use mpc_node::storage; use mpc_node::storage::triple_storage::TripleStorage; +use mpc_node::{mesh, node_client, storage}; use near_crypto::KeyFile; use near_workspaces::network::{Sandbox, ValidatorKey}; use near_workspaces::types::{KeyType, SecretKey}; @@ -203,7 +201,7 @@ pub struct Context { pub redis: crate::containers::Redis, pub storage_options: storage::Options, pub mesh_options: mesh::Options, - pub message_options: http_client::Options, + pub message_options: node_client::Options, } pub async fn setup(docker_client: &DockerClient) -> anyhow::Result { @@ -239,11 +237,13 @@ pub async fn setup(docker_client: &DockerClient) -> anyhow::Result { }; let mesh_options = mpc_node::mesh::Options { - fetch_participant_timeout: 1000, refresh_active_timeout: 1000, }; - let message_options = http_client::Options { timeout: 1000 }; + let message_options = node_client::Options { + timeout: 1000, + state_timeout: 1000, + }; Ok(Context { docker_client: docker_client.clone(),