Skip to content

Commit

Permalink
feat: add MessageChannel for async protocol message receiving and sen…
Browse files Browse the repository at this point in the history
…ding (#93)

* Added MessageChannel for receiving messages from endpoints

* Replace MessageQueue with MessageChannel

* Made NodeClient for requesting node requests

* No longer need to find me()

* Cleanup encryption

* Cleanup encryption again by files

* Rename http_client to node_client

* Rename to inbox and outbox

* Added back queue size metric

* Made triple directly use channel

* Parallelize sending messages

* Made presignatures/signatures use channel directly

* clippy

* fix reshare test
  • Loading branch information
ChaoticTempest authored Jan 14, 2025
1 parent 5a628ea commit 922bb12
Show file tree
Hide file tree
Showing 20 changed files with 959 additions and 956 deletions.
51 changes: 25 additions & 26 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig};
use crate::gcp::GcpService;
use crate::mesh::Mesh;
use crate::node_client::{self, NodeClient};
use crate::protocol::message::MessageChannel;
use crate::protocol::{MpcSignProtocol, SignQueue};
use crate::storage::app_data_storage;
use crate::{http_client, indexer, mesh, storage, web};
use crate::{indexer, mesh, storage, web};
use clap::Parser;
use deadpool_redis::Runtime;
use local_ip_address::local_ip;
use near_account_id::AccountId;
use near_crypto::{InMemorySigner, SecretKey};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tokio::sync::RwLock;
use tracing_stackdriver::layer as stackdriver_layer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry};
use url::Url;
Expand Down Expand Up @@ -68,7 +70,7 @@ pub enum Cli {
#[clap(flatten)]
mesh_options: mesh::Options,
#[clap(flatten)]
message_options: http_client::Options,
message_options: node_client::Options,
},
}

Expand Down Expand Up @@ -237,11 +239,10 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
Url::parse(&format!("http://{my_ip}:{web_port}")).unwrap()
});

let (sender, receiver) = mpsc::channel(16384);

tracing::info!(%my_address, "address detected");
let client = NodeClient::new(&message_options);
let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk);
let (mesh, mesh_state) = Mesh::init(mesh_options);
let (mesh, mesh_state) = Mesh::init(&client, mesh_options);
let config = Arc::new(RwLock::new(Config::new(LocalConfig {
over: override_config.unwrap_or_else(Default::default),
network: NetworkConfig {
Expand All @@ -250,24 +251,26 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
},
})));
let contract_state = Arc::new(RwLock::new(None));
let (protocol, protocol_state) = MpcSignProtocol::init(
my_address,
mpc_contract_id.clone(),
account_id,
rpc_client.clone(),
signer,
receiver,
sign_rx,
key_storage,
triple_storage,
presignature_storage,
message_options,
);

let contract_updater =
crate::contract_updater::ContractUpdater::init(rpc_client, mpc_contract_id);
crate::contract_updater::ContractUpdater::init(&rpc_client, &mpc_contract_id);

rt.block_on(async {
let (sender, channel) =
MessageChannel::spawn(client, &account_id, &config, &mesh_state).await;
let (protocol, protocol_state) = MpcSignProtocol::init(
my_address,
mpc_contract_id,
account_id,
rpc_client,
signer,
channel,
sign_rx,
key_storage,
triple_storage,
presignature_storage,
);

tracing::info!("protocol initialized");
let contract_handle = tokio::spawn({
let contract_state = Arc::clone(&contract_state);
Expand All @@ -278,12 +281,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
let contract_state = Arc::clone(&contract_state);
async move { mesh.run(contract_state).await }
});
let protocol_handle = tokio::spawn({
let contract_state = Arc::clone(&contract_state);
let config = Arc::clone(&config);
let mesh_state = Arc::clone(&mesh_state);
async move { protocol.run(contract_state, config, mesh_state).await }
});
let protocol_handle =
tokio::spawn(protocol.run(contract_state, config, mesh_state));
tracing::info!("protocol thread spawned");
let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?;
let web_handle = tokio::spawn(async move {
Expand Down
4 changes: 2 additions & 2 deletions chain-signatures/node/src/contract_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ pub struct ContractUpdater {
}

impl ContractUpdater {
pub fn init(rpc_client: near_fetch::Client, mpc_contract_id: AccountId) -> Self {
pub fn init(rpc_client: &near_fetch::Client, mpc_contract_id: &AccountId) -> Self {
Self {
rpc_client,
rpc_client: rpc_client.clone(),
mpc_contract_id: mpc_contract_id.clone(),
}
}
Expand Down
Loading

0 comments on commit 922bb12

Please sign in to comment.