Skip to content

Commit

Permalink
Merge branch 'develop' into serhii/release-1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
volovyks committed Jan 29, 2025
2 parents 64643a1 + b42e0d3 commit 9f27ff6
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 68 deletions.
29 changes: 10 additions & 19 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::mesh::Mesh;
use crate::node_client::{self, NodeClient};
use crate::protocol::message::MessageChannel;
use crate::protocol::{MpcSignProtocol, SignQueue};
use crate::rpc::{EthClient, NearClient, RpcExecutor};
use crate::rpc::{NearClient, RpcExecutor};
use crate::storage::app_data_storage;
use crate::{indexer, indexer_eth, logs, mesh, storage, web};
use clap::Parser;
Expand Down Expand Up @@ -37,9 +37,6 @@ pub enum Cli {
/// This node's account ed25519 secret key
#[arg(long, env("MPC_ACCOUNT_SK"))]
account_sk: SecretKey,
/// The ethereum account secret key used to sign eth respond txn.
#[arg(long, env("MPC_ETH_ACCOUNT_SK"))]
eth_account_sk: String,
/// The web port for this server
#[arg(long, env("MPC_WEB_PORT"))]
web_port: u16,
Expand All @@ -53,12 +50,12 @@ pub enum Cli {
/// The secret key used to sign messages to be sent between nodes.
#[arg(long, env("MPC_SIGN_SK"))]
sign_sk: Option<SecretKey>,
/// Ethereum Indexer options
#[clap(flatten)]
eth: indexer_eth::EthArgs,
/// NEAR Lake Indexer options
#[clap(flatten)]
indexer_options: indexer::Options,
/// Ethereum Indexer options
#[clap(flatten)]
indexer_eth_options: indexer_eth::Options,
/// Local address that other peers can use to message this node.
#[arg(long, env("MPC_LOCAL_ADDRESS"))]
my_address: Option<Url>,
Expand Down Expand Up @@ -89,13 +86,12 @@ impl Cli {
account_id,
mpc_contract_id,
account_sk,
eth_account_sk,
web_port,
cipher_pk,
cipher_sk,
sign_sk,
eth,
indexer_options,
indexer_eth_options,
my_address,
debug_id,
storage_options,
Expand All @@ -114,8 +110,6 @@ impl Cli {
account_id.to_string(),
"--account-sk".to_string(),
account_sk.to_string(),
"--eth-account-sk".to_string(),
eth_account_sk.to_string(),
"--web-port".to_string(),
web_port.to_string(),
"--cipher-pk".to_string(),
Expand Down Expand Up @@ -145,8 +139,8 @@ impl Cli {
args.extend(["--client-header-referer".to_string(), client_header_referer]);
}

args.extend(eth.into_str_args());
args.extend(indexer_options.into_str_args());
args.extend(indexer_eth_options.into_str_args());
args.extend(storage_options.into_str_args());
args.extend(mesh_options.into_str_args());
args.extend(message_options.into_str_args());
Expand All @@ -164,12 +158,11 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
mpc_contract_id,
account_id,
account_sk,
eth_account_sk,
cipher_pk,
cipher_sk,
sign_sk,
eth,
indexer_options,
indexer_eth_options,
my_address,
debug_id,
storage_options,
Expand Down Expand Up @@ -238,10 +231,9 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(cipher_pk)?)?,
sign_sk,
};
let eth_client = EthClient::new(&indexer_eth_options, &eth_account_sk);
let near_client =
NearClient::new(&near_rpc, &my_address, &network, &mpc_contract_id, signer);
let (rpc_channel, rpc) = RpcExecutor::new(&near_client, &eth_client);
let (rpc_channel, rpc) = RpcExecutor::new(&near_client, &eth);
let config = Arc::new(RwLock::new(Config::new(LocalConfig {
over: override_config.unwrap_or_else(Default::default),
network,
Expand All @@ -252,7 +244,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
?account_id,
?my_address,
near_rpc_url = ?near_client.rpc_addr(),
eth_rpc_url = ?indexer_eth_options.eth_rpc_http_url,
eth_rpc_url = ?eth.eth_rpc_http_url,
"starting node",
);
rt.block_on(async {
Expand Down Expand Up @@ -280,8 +272,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
tokio::spawn(protocol.run(contract_state, config, mesh_state));
tracing::info!("protocol thread spawned");
let web_handle = tokio::spawn(web::run(web_port, sender, state, indexer));
let eth_indexer_handle =
tokio::spawn(indexer_eth::run(indexer_eth_options, sign_tx, account_id));
let eth_indexer_handle = tokio::spawn(indexer_eth::run(eth, sign_tx, account_id));
tracing::info!("protocol http server spawned");

rpc_handle.await?;
Expand Down
62 changes: 40 additions & 22 deletions chain-signatures/node/src/indexer_eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,45 @@ use web3::types::{FilterBuilder, Log, H160, H256, U256};
/// Configures Ethereum indexer.
#[derive(Debug, Clone, clap::Parser)]
#[group(id = "indexer_eth_options")]
pub struct Options {
pub struct EthArgs {
/// The ethereum account secret key used to sign eth respond txn.
#[arg(long, env("MPC_ETH_ACCOUNT_SK"))]
pub eth_account_sk: Option<String>,
/// Ethereum WebSocket RPC URL
#[clap(long, env("MPC_INDEXER_ETH_RPC_WS_URL"))]
pub eth_rpc_ws_url: String,

#[clap(long, env("MPC_ETH_RPC_WS_URL"), requires = "eth_account_sk")]
pub eth_rpc_ws_url: Option<String>,
/// Ethereum HTTP RPC URL
#[clap(long, env("MPC_INDEXER_ETH_RPC_HTTP_URL"))]
pub eth_rpc_http_url: String,

#[clap(long, env("MPC_ETH_RPC_HTTP_URL"), requires = "eth_account_sk")]
pub eth_rpc_http_url: Option<String>,
/// The contract address to watch without the `0x` prefix
#[clap(long, env("MPC_INDEXER_ETH_CONTRACT_ADDRESS"))]
pub eth_contract_address: String,
#[clap(long, env("MPC_ETH_CONTRACT_ADDRESS"), requires = "eth_account_sk")]
pub eth_contract_address: Option<String>,
}

impl Options {
impl EthArgs {
pub fn into_str_args(self) -> Vec<String> {
let mut args = Vec::new();
args.extend([
"--eth-rpc-ws-url".to_string(),
self.eth_rpc_ws_url,
"--eth-rpc-http-url".to_string(),
self.eth_rpc_http_url,
"--eth-contract-address".to_string(),
self.eth_contract_address,
]);
let mut args = Vec::with_capacity(10);
if let Some(eth_account_sk) = self.eth_account_sk {
args.extend(["--eth-account-sk".to_string(), eth_account_sk]);
}
if let Some(eth_rpc_ws_url) = self.eth_rpc_ws_url {
args.extend(["--eth-rpc-ws-url".to_string(), eth_rpc_ws_url]);
}
if let Some(eth_rpc_http_url) = self.eth_rpc_http_url {
args.extend(["--eth-rpc-http-url".to_string(), eth_rpc_http_url]);
}
if let Some(eth_contract_address) = self.eth_contract_address {
args.extend(["--eth-contract-address".to_string(), eth_contract_address]);
}
args
}

pub fn is_none(&self) -> bool {
self.eth_account_sk.is_none()
|| self.eth_rpc_ws_url.is_none()
|| self.eth_rpc_http_url.is_none()
|| self.eth_contract_address.is_none()
}
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -148,12 +160,17 @@ fn parse_event(log: &Log) -> anyhow::Result<SignatureRequestedEvent> {
}

pub async fn run(
options: Options,
args: EthArgs,
sign_tx: mpsc::Sender<SignRequest>,
node_near_account_id: AccountId,
) -> anyhow::Result<()> {
let contract_address = H160::from_str(&options.eth_contract_address)?;
if args.is_none() {
tracing::warn!("ethereum indexer is disabled");
return Ok(());
}

tracing::info!("running ethereum indexer");
let contract_address = H160::from_str(&args.eth_contract_address.unwrap())?;
let signature_requested_topic = H256::from_slice(&web3::signing::keccak256(
b"SignatureRequested(bytes32,address,uint256,uint256,string)",
));
Expand All @@ -163,8 +180,9 @@ pub async fn run(
.topics(Some(vec![signature_requested_topic]), None, None, None)
.build();

let rpc_ws = args.eth_rpc_ws_url.unwrap();
loop {
match web3::transports::WebSocket::new(&options.eth_rpc_ws_url).await {
match web3::transports::WebSocket::new(&rpc_ws).await {
Ok(ws) => {
let web3_ws = web3::Web3::new(ws);
match web3_ws.eth_subscribe().subscribe_logs(filter.clone()).await {
Expand Down
31 changes: 22 additions & 9 deletions chain-signatures/node/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::config::{Config, ContractConfig, NetworkConfig};
use crate::indexer_eth::EthArgs;
use crate::protocol::signature::ToPublish;
use crate::protocol::{Chain, ProtocolState};
use crate::util::AffinePointExt as _;
Expand Down Expand Up @@ -63,12 +64,13 @@ impl RpcChannel {

pub struct RpcExecutor {
near: NearClient,
eth: EthClient,
eth: Option<EthClient>,
action_rx: mpsc::Receiver<RpcAction>,
}

impl RpcExecutor {
pub fn new(near: &NearClient, eth: &EthClient) -> (RpcChannel, Self) {
pub fn new(near: &NearClient, eth: &EthArgs) -> (RpcChannel, Self) {
let eth = EthClient::new(eth);
let (tx, rx) = mpsc::channel(MAX_CONCURRENT_RPC_REQUESTS);
(
RpcChannel { tx },
Expand Down Expand Up @@ -115,7 +117,13 @@ impl RpcExecutor {
fn client(&self, chain: &Chain) -> ChainClient {
match chain {
Chain::NEAR => ChainClient::Near(self.near.clone()),
Chain::Ethereum => ChainClient::Ethereum(self.eth.clone()),
Chain::Ethereum => {
if let Some(eth) = &self.eth {
ChainClient::Ethereum(eth.clone())
} else {
ChainClient::Err("no eth client available for node")
}
}
}
}
}
Expand Down Expand Up @@ -277,10 +285,10 @@ pub struct EthClient {
}

impl EthClient {
pub fn new(options: &crate::indexer_eth::Options, account_sk: &str) -> Self {
let transport = web3::transports::Http::new(&options.eth_rpc_http_url).unwrap();
pub fn new(args: &crate::indexer_eth::EthArgs) -> Option<Self> {
let transport = web3::transports::Http::new(args.eth_rpc_http_url.as_ref()?).unwrap();
let client = web3::Web3::new(transport);
let address = web3::types::H160::from_str(&options.eth_contract_address).unwrap();
let address = web3::types::H160::from_str(args.eth_contract_address.as_ref()?).unwrap();

let contract_json: serde_json::Value = serde_json::from_slice(include_bytes!(
"../../contract-eth/artifacts/contracts/ChainSignatures.sol/ChainSignatures.json"
Expand All @@ -292,17 +300,18 @@ impl EthClient {
contract_json["abi"].to_string().as_bytes(),
)
.unwrap();
Self {
Some(Self {
client,
contract,
account_sk: web3::signing::SecretKey::from_str(account_sk)
account_sk: web3::signing::SecretKey::from_str(args.eth_account_sk.as_ref()?)
.expect("failed to parse eth account sk, should not begin with 0x"),
}
})
}
}

/// Client related to a specific chain
pub enum ChainClient {
Err(&'static str),
Near(NearClient),
Ethereum(EthClient),
}
Expand Down Expand Up @@ -366,6 +375,10 @@ async fn execute_publish(client: ChainClient, mut action: PublishAction) {
ChainClient::Ethereum(eth) => {
try_publish_eth(eth, &action.to_publish, &action.timestamp, &signature).await
}
ChainClient::Err(msg) => {
tracing::warn!(msg, "no client for chain");
Ok(())
}
};
if publish.is_ok() {
break;
Expand Down
12 changes: 6 additions & 6 deletions integration-tests/src/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,22 @@ impl Node {
running_threshold: 120,
behind_threshold: 120,
};
let indexer_eth_options = mpc_node::indexer_eth::Options {
eth_rpc_ws_url: config.cfg.eth_rpc_ws_url.clone(),
eth_rpc_http_url: config.cfg.eth_rpc_http_url.clone(),
eth_contract_address: config.cfg.eth_contract_address.clone(),
let eth_args = mpc_node::indexer_eth::EthArgs {
eth_account_sk: Some(config.cfg.eth_account_sk.clone()),
eth_rpc_ws_url: Some(config.cfg.eth_rpc_ws_url.clone()),
eth_rpc_http_url: Some(config.cfg.eth_rpc_http_url.clone()),
eth_contract_address: Some(config.cfg.eth_contract_address.clone()),
};
let args = mpc_node::cli::Cli::Start {
near_rpc: config.near_rpc.clone(),
mpc_contract_id: ctx.mpc_contract.id().clone(),
account_id: config.account.id().clone(),
account_sk: config.account.secret_key().to_string().parse()?,
eth_account_sk: config.cfg.eth_account_sk.clone(),
web_port: Self::CONTAINER_PORT,
cipher_pk: hex::encode(config.cipher_pk.to_bytes()),
cipher_sk: hex::encode(config.cipher_sk.to_bytes()),
indexer_options: indexer_options.clone(),
indexer_eth_options,
eth: eth_args,
my_address: None,
debug_id: Some(node_id),
storage_options: ctx.storage_options.clone(),
Expand Down
25 changes: 13 additions & 12 deletions integration-tests/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ impl Node {
running_threshold: 120,
behind_threshold: 120,
};
let indexer_eth_options = mpc_node::indexer_eth::Options {
eth_rpc_ws_url: cfg.eth_rpc_ws_url.clone(),
eth_rpc_http_url: cfg.eth_rpc_http_url.clone(),
eth_contract_address: cfg.eth_contract_address.clone(),
let eth = mpc_node::indexer_eth::EthArgs {
eth_account_sk: Some(cfg.eth_account_sk.clone()),
eth_rpc_ws_url: Some(cfg.eth_rpc_ws_url.clone()),
eth_rpc_http_url: Some(cfg.eth_rpc_http_url.clone()),
eth_contract_address: Some(cfg.eth_contract_address.clone()),
};
let near_rpc = ctx.lake_indexer.rpc_host_address.clone();
let mpc_contract_id = ctx.mpc_contract.id().clone();
Expand All @@ -82,13 +83,12 @@ impl Node {
mpc_contract_id: mpc_contract_id.clone(),
account_id: account_id.clone(),
account_sk: account_sk.to_string().parse()?,
eth_account_sk: cfg.eth_account_sk.clone(),
web_port,
cipher_pk: hex::encode(cipher_pk.to_bytes()),
cipher_sk: hex::encode(cipher_sk.to_bytes()),
sign_sk: Some(sign_sk.clone()),
eth,
indexer_options,
indexer_eth_options,
my_address: None,
debug_id: Some(node_id),
storage_options: ctx.storage_options.clone(),
Expand Down Expand Up @@ -178,23 +178,24 @@ impl Node {
running_threshold: 120,
behind_threshold: 120,
};
let indexer_eth_options = mpc_node::indexer_eth::Options {
eth_rpc_ws_url: config.cfg.eth_rpc_ws_url.clone(),
eth_rpc_http_url: config.cfg.eth_rpc_http_url.clone(),
eth_contract_address: config.cfg.eth_contract_address.clone(),

let eth = mpc_node::indexer_eth::EthArgs {
eth_account_sk: Some(config.cfg.eth_account_sk.clone()),
eth_rpc_ws_url: Some(config.cfg.eth_rpc_ws_url.clone()),
eth_rpc_http_url: Some(config.cfg.eth_rpc_http_url.clone()),
eth_contract_address: Some(config.cfg.eth_contract_address.clone()),
};
let cli = mpc_node::cli::Cli::Start {
near_rpc: config.near_rpc.clone(),
mpc_contract_id: ctx.mpc_contract.id().clone(),
account_id: config.account.id().clone(),
account_sk: config.account.secret_key().to_string().parse()?,
eth_account_sk: config.cfg.eth_account_sk.clone(),
web_port,
cipher_pk: hex::encode(config.cipher_pk.to_bytes()),
cipher_sk: hex::encode(config.cipher_sk.to_bytes()),
sign_sk: Some(config.sign_sk.clone()),
eth,
indexer_options,
indexer_eth_options,
my_address: None,
debug_id: Some(node_id),
storage_options: ctx.storage_options.clone(),
Expand Down

0 comments on commit 9f27ff6

Please sign in to comment.