Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make sign queue use less shared state #91

Merged
merged 59 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
30f61f2
Cleanup triples interface insert
ChaoticTempest Dec 3, 2024
02e6a8e
Added triple store error
ChaoticTempest Dec 3, 2024
d1f65e1
Cleaned presignature store
ChaoticTempest Dec 3, 2024
adeb91d
Upgrade bollard/testcontainers for better test interface
ChaoticTempest Dec 4, 2024
a4d1258
Added node cluster interface
ChaoticTempest Dec 4, 2024
2b72dc5
Added SignAction interface
ChaoticTempest Dec 4, 2024
e7876ca
Added rogue respond to sign interface
ChaoticTempest Dec 4, 2024
7570a2d
Reformat all test with new interface
ChaoticTempest Dec 5, 2024
5d08a93
No more need for wait_for::running_mpc
ChaoticTempest Dec 5, 2024
d26d5b7
Rename to signable
ChaoticTempest Dec 5, 2024
87f9ee5
Remove unnecessary functions
ChaoticTempest Dec 5, 2024
bf42956
signable rename
ChaoticTempest Dec 5, 2024
b2fee59
Remove unnecssary functions
ChaoticTempest Dec 5, 2024
82d5291
Merge branch 'develop' of github.com:sig-net/sig-mpc into phuong/chor…
ChaoticTempest Dec 5, 2024
0edb22a
Resolved comments
ChaoticTempest Dec 5, 2024
25117ca
Merge branch 'develop' of github.com:sig-net/sig-mpc into phuong/chor…
ChaoticTempest Dec 5, 2024
68e9736
Merge branch 'phuong/chore/cleanup-redis' of github.com:sig-net/sig-m…
ChaoticTempest Dec 5, 2024
4d2e292
Merge branch 'develop' of github.com:sig-net/sig-mpc into phuong/chor…
ChaoticTempest Dec 9, 2024
99a899b
Merge branch 'phuong/chore/cleanup-redis' of github.com:sig-net/sig-m…
ChaoticTempest Dec 9, 2024
33fdbd0
fmt
ChaoticTempest Dec 9, 2024
bd8ac54
Merge branch 'phuong/chore/cleanup-redis' of github.com:sig-net/sig-m…
ChaoticTempest Dec 9, 2024
4cf6240
clippy
ChaoticTempest Dec 9, 2024
2558a0c
Merge branch 'phuong/chore/cleanup-redis' of github.com:sig-net/sig-m…
ChaoticTempest Dec 9, 2024
6fbe3d3
Rename vote/leave
ChaoticTempest Dec 9, 2024
695a3ad
Rename proper NodeConfig and NodeEnvConfig
ChaoticTempest Dec 10, 2024
2cba7d7
Merge branch 'develop' of github.com:sig-net/sig-mpc into phuong/chor…
ChaoticTempest Dec 10, 2024
bfefd15
Merge branch 'develop' of github.com:sig-net/sig-mpc into phuong/chor…
ChaoticTempest Dec 10, 2024
41db464
Bump join to 20secs
Dec 12, 2024
ff88bfe
clippy & fmt
ChaoticTempest Dec 12, 2024
20619dd
Internalize triple manager RwLock and tasks to crypto loop
ChaoticTempest Dec 12, 2024
9123d04
Made each generator a task
ChaoticTempest Dec 12, 2024
7e40cd2
Cleaned up triple generation task
ChaoticTempest Dec 13, 2024
063d40d
Merge branch 'develop' of github.com:sig-net/sig-mpc into phuong/chor…
ChaoticTempest Dec 13, 2024
46da477
Added wait for running/joining/resharing on per node
ChaoticTempest Dec 13, 2024
31e4174
Fix mac build
ChaoticTempest Dec 13, 2024
4cba601
Added candidate check
ChaoticTempest Dec 14, 2024
5b2fa45
Merge branch 'phuong/chore/cleanup-testing-interface' of github.com:/…
ChaoticTempest Dec 14, 2024
4098eb9
clippy and better err messages
ChaoticTempest Dec 16, 2024
887443f
Merge branch 'phuong/chore/cleanup-testing-interface' of github.com:s…
ChaoticTempest Dec 16, 2024
60984c4
Made http_client use participant instead of info for less clone
ChaoticTempest Dec 16, 2024
2a64026
Bump reshare test to 900secs
ChaoticTempest Dec 16, 2024
37451f6
Made presignature and signature have execute functions
ChaoticTempest Dec 16, 2024
c435100
Better errors on vote_{join, leave}
ChaoticTempest Dec 16, 2024
f320346
Wait on finality final for running mpc
ChaoticTempest Dec 16, 2024
f4e9c95
Merge branch 'phuong/chore/cleanup-testing-interface' of github.com:s…
ChaoticTempest Dec 16, 2024
b7f1f67
minor optimization
ChaoticTempest Dec 16, 2024
5d61c14
Added debug impl for SignOutcome
ChaoticTempest Dec 17, 2024
9ccf81c
Fix offline test
ChaoticTempest Dec 17, 2024
93611ab
Merge branch 'phuong/chore/cleanup-testing-interface' of github.com:s…
ChaoticTempest Dec 17, 2024
b066ae6
Updated nightly to run 100 sigs
ChaoticTempest Dec 17, 2024
82be71a
Merge branch 'develop' of github.com:sig-net/sig-mpc into phuong/feat…
ChaoticTempest Dec 20, 2024
6082db5
Added iter impls for candidates and participants
ChaoticTempest Dec 20, 2024
b2d585a
Merge branch 'develop' of github.com:sig-net/sig-mpc into phuong/feat…
ChaoticTempest Jan 3, 2025
64e49c4
Clippy
ChaoticTempest Jan 4, 2025
cf825bf
Made sign queue indexer use sign_tx for sending requests
ChaoticTempest Jan 7, 2025
9b2abfe
Made sign_rx be passed around instead of sign queue
ChaoticTempest Jan 7, 2025
ce3b7aa
Made SignatureManager store threshold
ChaoticTempest Jan 7, 2025
d79038b
Merge branch 'develop' of github.com:sig-net/sig-mpc into phuong/feat…
ChaoticTempest Jan 8, 2025
5a70c08
Fix sign_queue::len
ChaoticTempest Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 67 additions & 5 deletions chain-signatures/contract/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use k256::Scalar;
use near_sdk::borsh::{self, BorshDeserialize, BorshSerialize};
use near_sdk::serde::{Deserialize, Serialize};
use near_sdk::{AccountId, BorshStorageKey, CryptoHash, NearToken, PublicKey};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{btree_map, BTreeMap, HashMap, HashSet};

pub mod hpke {
pub type PublicKey = [u8; 32];
Expand Down Expand Up @@ -125,8 +125,8 @@ impl Default for Participants {
impl From<Candidates> for Participants {
fn from(candidates: Candidates) -> Self {
let mut participants = Participants::new();
for (account_id, candidate_info) in candidates.iter() {
participants.insert(account_id.clone(), candidate_info.clone().into());
for (account_id, candidate_info) in candidates.into_iter() {
participants.insert(account_id, candidate_info.into());
}
participants
}
Expand Down Expand Up @@ -162,10 +162,14 @@ impl Participants {
self.participants.get(account_id)
}

pub fn iter(&self) -> impl Iterator<Item = (&AccountId, &ParticipantInfo)> {
pub fn iter(&self) -> btree_map::Iter<'_, AccountId, ParticipantInfo> {
self.participants.iter()
}

pub fn iter_mut(&mut self) -> btree_map::IterMut<'_, AccountId, ParticipantInfo> {
self.participants.iter_mut()
}

pub fn keys(&self) -> impl Iterator<Item = &AccountId> {
self.participants.keys()
}
Expand All @@ -179,6 +183,33 @@ impl Participants {
}
}

impl<'a> IntoIterator for &'a Participants {
type Item = (&'a AccountId, &'a ParticipantInfo);
type IntoIter = btree_map::Iter<'a, AccountId, ParticipantInfo>;

fn into_iter(self) -> Self::IntoIter {
self.participants.iter()
}
}

impl<'a> IntoIterator for &'a mut Participants {
type Item = (&'a AccountId, &'a mut ParticipantInfo);
type IntoIter = btree_map::IterMut<'a, AccountId, ParticipantInfo>;

fn into_iter(self) -> Self::IntoIter {
self.participants.iter_mut()
}
}

impl IntoIterator for Participants {
type Item = (AccountId, ParticipantInfo);
type IntoIter = btree_map::IntoIter<AccountId, ParticipantInfo>;

fn into_iter(self) -> Self::IntoIter {
self.participants.into_iter()
}
}

#[derive(BorshDeserialize, BorshSerialize, Serialize, Deserialize, Debug, Clone)]
pub struct Candidates {
pub candidates: BTreeMap<AccountId, CandidateInfo>,
Expand Down Expand Up @@ -213,9 +244,40 @@ impl Candidates {
self.candidates.get(account_id)
}

pub fn iter(&self) -> impl Iterator<Item = (&AccountId, &CandidateInfo)> {
pub fn iter(&self) -> btree_map::Iter<'_, AccountId, CandidateInfo> {
self.candidates.iter()
}

pub fn iter_mut(&mut self) -> btree_map::IterMut<'_, AccountId, CandidateInfo> {
self.candidates.iter_mut()
}
}

impl<'a> IntoIterator for &'a Candidates {
type Item = (&'a AccountId, &'a CandidateInfo);
type IntoIter = btree_map::Iter<'a, AccountId, CandidateInfo>;

fn into_iter(self) -> Self::IntoIter {
self.candidates.iter()
}
}

impl<'a> IntoIterator for &'a mut Candidates {
type Item = (&'a AccountId, &'a mut CandidateInfo);
type IntoIter = btree_map::IterMut<'a, AccountId, CandidateInfo>;

fn into_iter(self) -> Self::IntoIter {
self.candidates.iter_mut()
}
}

impl IntoIterator for Candidates {
type Item = (AccountId, CandidateInfo);
type IntoIter = btree_map::IntoIter<AccountId, CandidateInfo>;

fn into_iter(self) -> Self::IntoIter {
self.candidates.into_iter()
}
}

#[derive(BorshDeserialize, BorshSerialize, Serialize, Deserialize, Debug)]
Expand Down
6 changes: 3 additions & 3 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
mesh_options,
message_options,
} => {
let sign_queue = Arc::new(RwLock::new(SignQueue::new()));
let (sign_tx, sign_rx) = SignQueue::channel();
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
Expand Down Expand Up @@ -221,7 +221,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
&indexer_options,
&mpc_contract_id,
&account_id,
&sign_queue,
sign_tx,
app_data_storage,
rpc_client.clone(),
)?;
Expand Down Expand Up @@ -257,7 +257,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
rpc_client.clone(),
signer,
receiver,
sign_queue,
sign_rx,
key_storage,
triple_storage,
presignature_storage,
Expand Down
34 changes: 20 additions & 14 deletions chain-signatures/node/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::protocol::contract::primitives::{ParticipantInfo, Participants};
use crate::protocol::contract::primitives::Participants;
use crate::protocol::message::SignedMessage;
use crate::protocol::MpcMessage;
use cait_sith::protocol::Participant;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub async fn send_encrypted<U: IntoUrl>(
// TODO: add in retry logic either in struct or at call site.
// TODO: add check for participant list to see if the messages to be sent are still valid.
pub struct MessageQueue {
deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>,
deque: VecDeque<(Participant, MpcMessage, Instant)>,
seen_counts: HashSet<String>,
message_options: Options,
}
Expand All @@ -117,16 +117,21 @@ impl MessageQueue {
self.deque.is_empty()
}

pub fn push(&mut self, info: ParticipantInfo, msg: MpcMessage) {
self.deque.push_back((info, msg, Instant::now()));
pub fn push(&mut self, node: Participant, msg: MpcMessage) {
self.deque.push_back((node, msg, Instant::now()));
}

pub fn extend(&mut self, other: impl IntoIterator<Item = (Participant, MpcMessage)>) {
self.deque
.extend(other.into_iter().map(|(i, msg)| (i, msg, Instant::now())));
}

pub async fn send_encrypted(
&mut self,
from: Participant,
sign_sk: &near_crypto::SecretKey,
client: &Client,
participants: &Participants,
active: &Participants,
cfg: &ProtocolConfig,
) -> Vec<SendError> {
let mut failed = VecDeque::new();
Expand All @@ -136,21 +141,22 @@ impl MessageQueue {
let outer = Instant::now();
let uncompacted = self.deque.len();
let mut encrypted = HashMap::new();
while let Some((info, msg, instant)) = self.deque.pop_front() {
while let Some((id, msg, instant)) = self.deque.pop_front() {
if instant.elapsed() > timeout(&msg, cfg) {
errors.push(SendError::Timeout(format!(
"{} message has timed out: {info:?}",
"{} message has timed out for node={id:?}",
msg.typename(),
)));
continue;
}

if !participants.contains_key(&Participant::from(info.id)) {
let counter = participant_counter.entry(info.id).or_insert(0);
let Some(info) = active.get(&id) else {
let counter = participant_counter.entry(id).or_insert(0);
*counter += 1;
failed.push_back((info, msg, instant));
failed.push_back((id, msg, instant));
continue;
}
};

let encrypted_msg = match SignedMessage::encrypt(&msg, from, sign_sk, &info.cipher_pk) {
Ok(encrypted) => encrypted,
Err(err) => {
Expand All @@ -159,15 +165,15 @@ impl MessageQueue {
}
};
let encrypted = encrypted.entry(info.id).or_insert_with(Vec::new);
encrypted.push((encrypted_msg, (info, msg, instant)));
encrypted.push((encrypted_msg, (id, msg, instant)));
}

let mut compacted = 0;
for (id, encrypted) in encrypted {
for partition in partition_ciphered_256kb(encrypted) {
let (encrypted_partition, msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip();
// guaranteed to unwrap due to our previous loop check:
let info = participants.get(&Participant::from(id)).unwrap();
let info = active.get(&Participant::from(id)).unwrap();
let account_id = &info.account_id;
let number_of_messages = encrypted_partition.len() as f64;

Expand Down Expand Up @@ -231,7 +237,7 @@ impl MessageQueue {
/// Encrypted message with a reference to the old message. Only the ciphered portion of this
/// type will be sent over the wire, while the original message is kept just in case things
/// go wrong somewhere and the message needs to be requeued to be sent later.
type EncryptedMessage = (Ciphered, (ParticipantInfo, MpcMessage, Instant));
type EncryptedMessage = (Ciphered, (Participant, MpcMessage, Instant));

fn partition_ciphered_256kb(encrypted: Vec<EncryptedMessage>) -> Vec<Vec<EncryptedMessage>> {
let mut result = Vec::new();
Expand Down
22 changes: 14 additions & 8 deletions chain-signatures/node/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::protocol::{SignQueue, SignRequest};
use crate::protocol::SignRequest;
use crate::storage::app_data_storage::AppDataStorage;
use crypto_shared::{derive_epsilon, ScalarExt};
use k256::Scalar;
Expand All @@ -13,7 +13,7 @@ use std::ops::Mul;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::sync::{mpsc, RwLock};

/// Configures indexer.
#[derive(Debug, Clone, clap::Parser)]
Expand Down Expand Up @@ -169,7 +169,7 @@ impl Indexer {
struct Context {
mpc_contract_id: AccountId,
node_account_id: AccountId,
queue: Arc<RwLock<SignQueue>>,
sign_tx: mpsc::Sender<SignRequest>,
indexer: Indexer,
}

Expand Down Expand Up @@ -267,14 +267,20 @@ async fn handle_block(

// Add the requests after going through the whole block to avoid partial processing if indexer fails somewhere.
// This way we can revisit the same block if we failed while not having added the requests partially.
let mut queue = ctx.queue.write().await;
for request in pending_requests {
queue.add(request);
tracing::info!(
request_id = ?near_primitives::hash::CryptoHash(request.request_id),
payload = hex::encode(request.request.payload.to_bytes()),
entropy = hex::encode(request.entropy),
"new sign request"
);
if let Err(err) = ctx.sign_tx.send(request).await {
tracing::error!(?err, "failed to send the sign request into sign queue");
}
crate::metrics::NUM_SIGN_REQUESTS
.with_label_values(&[ctx.node_account_id.as_str()])
.inc();
}
drop(queue);

let log_indexing_interval = 1000;
if block.block_height() % log_indexing_interval == 0 {
Expand All @@ -292,7 +298,7 @@ pub fn run(
options: &Options,
mpc_contract_id: &AccountId,
node_account_id: &AccountId,
queue: &Arc<RwLock<SignQueue>>,
sign_tx: mpsc::Sender<SignRequest>,
app_data_storage: AppDataStorage,
rpc_client: near_fetch::Client,
) -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, Indexer)> {
Expand All @@ -308,7 +314,7 @@ pub fn run(
let context = Context {
mpc_contract_id: mpc_contract_id.clone(),
node_account_id: node_account_id.clone(),
queue: queue.clone(),
sign_tx,
indexer: indexer.clone(),
};

Expand Down
35 changes: 18 additions & 17 deletions chain-signatures/node/src/mesh/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,19 @@ impl Pool {
}
}

let connections = self.connections.read().await.clone(); // Clone connections for iteration
let mut join_set = JoinSet::new();
let connections = {
let conn = self.connections.read().await;
conn.clone()
};

// Spawn tasks for each participant
for (participant, info) in connections.iter() {
let participant = *participant;
let info = info.clone();
let self_clone = Arc::clone(&self); // Clone Arc for use inside tasks
let mut join_set = JoinSet::new();
for (participant, info) in connections.into_iter() {
let pool = Arc::clone(&self);

join_set.spawn(async move {
match self_clone.fetch_participant_state(&info).await {
Ok(state) => match self_clone.send_empty_msg(&participant, &info).await {
match pool.fetch_participant_state(&info).await {
Ok(state) => match pool.send_empty_msg(&participant, &info).await {
Ok(()) => Ok((participant, state, info)),
Err(e) => {
tracing::warn!(
Expand Down Expand Up @@ -139,19 +140,19 @@ impl Pool {
}
}

let connections = self.potential_connections.read().await;

let mut join_set = JoinSet::new();
let connections = {
let conn = self.potential_connections.read().await;
conn.clone()
};

// Spawn tasks for each participant
for (participant, info) in connections.iter() {
let participant = *participant;
let info = info.clone();
let self_clone = Arc::clone(&self); // Clone Arc for use inside tasks
let mut join_set = JoinSet::new();
for (participant, info) in connections.into_iter() {
let pool = Arc::clone(&self); // Clone Arc for use inside tasks

join_set.spawn(async move {
match self_clone.fetch_participant_state(&info).await {
Ok(state) => match self_clone.send_empty_msg(&participant, &info).await {
match pool.fetch_participant_state(&info).await {
Ok(state) => match pool.send_empty_msg(&participant, &info).await {
Ok(()) => Ok((participant, state, info)),
Err(e) => {
tracing::warn!(
Expand Down
Loading
Loading