From da2c3e81412b67970122d544b68bd41778fb4889 Mon Sep 17 00:00:00 2001 From: darricksee Date: Thu, 4 May 2023 11:09:09 -0400 Subject: [PATCH 01/10] Got scale test working with share message --- Cargo.lock | 41 +++++++ Cargo.toml | 1 + test/scale/Cargo.toml | 19 +++ test/scale/src/main.rs | 259 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 320 insertions(+) create mode 100644 test/scale/Cargo.toml create mode 100644 test/scale/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index e02c62faf..7e56675d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -52,6 +52,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + [[package]] name = "async-attributes" version = "1.1.2" @@ -402,9 +411,13 @@ version = "2.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" dependencies = [ + "ansi_term", + "atty", "bitflags", + "strsim", "textwrap", "unicode-width", + "vec_map", ] [[package]] @@ -1679,6 +1692,22 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scale" +version = "0.1.0" +dependencies = [ + "async-channel", + "async-std", + "binary_sv2", + "bytes", + "clap", + "codec_sv2", + "network_helpers", + "roles_logic_sv2", + "serde", + "tokio", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -1867,6 +1896,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + [[package]] name = "subtle" version = "2.4.1" @@ -2166,6 +2201,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index b17f30197..194f1889f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "roles/v2/pool", "roles/v2/test-utils/mining-device", "roles/translator", + "test/scale", "utils/network-helpers", "utils/buffer", "utils/error-handling", diff --git a/test/scale/Cargo.toml b/test/scale/Cargo.toml new file mode 100644 index 000000000..fbefc70e1 --- /dev/null +++ b/test/scale/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "scale" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = "2.33.3" +serde = { version = "1.0.89", features = ["derive", "alloc"], default-features = false, optional = true} +async-channel = "1.5.1" +async-std="1.8.0" +bytes = "1.0.1" +binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" } +codec_sv2 = { path = "../../protocols/v2/codec-sv2", features=["noise_sv2"] } +network_helpers = { version = "0.1", path = "../../utils/network-helpers", features = ["with_tokio"] } +roles_logic_sv2 = { path = "../../protocols/v2/roles-logic-sv2" } +tokio = { version = "1", features = ["full"] } + diff --git a/test/scale/src/main.rs b/test/scale/src/main.rs new file mode 100644 index 000000000..7a02da7e4 --- /dev/null +++ b/test/scale/src/main.rs @@ -0,0 +1,259 @@ +use std::io::Read; +use tokio::{ + task, + net::{TcpListener, TcpStream}}; +use std::thread; + +use async_channel::{bounded, Receiver, SendError, Sender}; + +use clap::{Arg, App}; +use std::time::Duration; +use binary_sv2::{Deserialize, GetSize, Serialize}; +use codec_sv2::{HandshakeRole, Initiator, Responder, + StandardNoiseDecoder, StandardSv2Frame, StandardEitherFrame +}; +use network_helpers::plain_connection_tokio::PlainConnection; + +use roles_logic_sv2::{ + mining_sv2::*, + parsers::{Mining, MiningDeviceMessages}, +}; + +pub type EitherFrame = StandardEitherFrame; + + +pub const AUTHORITY_PUBLIC_K: [u8; 32] = [ + 215, 11, 47, 78, 34, 232, 25, 192, 195, 168, 170, 209, 95, 181, 40, 114, 154, 226, 176, 190, + 90, 169, 238, 89, 191, 183, 97, 63, 194, 119, 11, 31, +]; + +pub const AUTHORITY_PRIVATE_K: [u8; 32] = [ + 204, 93, 167, 220, 169, 204, 172, 35, 9, 84, 174, 208, 171, 89, 25, 53, 196, 209, 161, 148, 4, + 5, 173, 0, 234, 59, 15, 127, 31, 160, 136, 131, +]; + +static HOST: &'static str = "127.0.0.1"; + +const CERT_VALIDITY: Duration = Duration::from_secs(3600); +#[tokio::main] +async fn main() { + let matches = App::new("Example program") + .arg(Arg::with_name("encrypt") + .short("e") + .help("Use encryption")) + .arg(Arg::with_name("hops") + .short("h") + .takes_value(true) + .help("Number of hops")) + .get_matches(); + + let total_messages = 1_000_000; + let encrypt = matches.is_present("encrypt"); + let hops: u16 = matches.value_of("hops").unwrap_or("0").parse().unwrap_or(0); + let mut orig_port: u16 = 19000; + + // create channel to tell final server number of messages + let (tx, rx) = bounded(1); + + if hops > 0 { + orig_port = hop_server(encrypt, hops, tx, total_messages).await; + } else { + println!("Usage: ./program -h -e"); + } + println!("Connecting to localhost:{}", orig_port); + // + setup_driver(orig_port, encrypt, rx, total_messages).await; +} + +async fn setup_driver(server_port: u16, encrypt: bool, rx: Receiver, total_messages: i32) { + let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)).await.unwrap(); + + let (server_receiver, server_sender): (Receiver, Sender) = + PlainConnection::new(server_stream).await; + + // Create timer to see how long this method takes + let start = std::time::Instant::now(); + + send_messages(server_sender, total_messages).await; + + //listen for message on rx + let msg = rx.recv().await.unwrap(); + + let end = std::time::Instant::now(); + + println!("client: {} - Took {}s", msg, (end - start).as_secs()); + +} + +pub type Message = MiningDeviceMessages<'static>; +pub type StdFrame = StandardSv2Frame; + +async fn send_messages(stream: Sender, total_messages: i32) { + let mut number: i32 = 0; + println!("Creating share"); + let share = MiningDeviceMessages::Mining(Mining::SubmitSharesStandard(SubmitSharesStandard { + channel_id: 1, + sequence_number: number as u32, + job_id: 2, + nonce: 3, + ntime: 4, + version: 5, + })); + + while number <= total_messages { + println!("client: sending msg-{}", number); + let frame: StdFrame = share.clone().try_into().unwrap(); + let binary: EitherFrame = frame.into(); + + stream.send(binary).await.unwrap(); + number = number + 1; + } +} + +async fn handle_messages + GetSize + Send + 'static>( + name: String, client: Receiver, server: Option>, + total_messages: i32, tx: Sender) { + + let mut messages_received = 0; + + while messages_received <= total_messages { + println!("{} is waiting...", name); + let mut frame: StdFrame = client.recv().await.unwrap().try_into().unwrap(); + println!("{} got msg {}", name, messages_received); + + let binary: EitherFrame = frame.into(); + + + if server.is_some() { + server.as_ref().unwrap().send(binary).await; + } else { + messages_received = messages_received + 1; + println!("last server: {} got msg {}", name, messages_received); + } + + } + tx.send("got all messages".to_string()).await.unwrap(); +} + +// fn handle_messages_old + GetSize + Send + 'static>(name: String, client: TcpStream, server: Option, total_messages: i32, tx: Sender) { +// let mut reader = std::io::BufReader::new(client); +// +// let responder = Responder::from_authority_kp( +// &AUTHORITY_PUBLIC_K[..], +// &AUTHORITY_PRIVATE_K[..], +// CERT_VALIDITY, +// ).unwrap(); +// +// let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap(); +// let role = HandshakeRole::Initiator(initiator); +// +// let mut decoder = StandardNoiseDecoder::::new(); +// +// let mut messages_recieved = 0; +// +// loop { +// let mut buffer = decoder.writable(); +// +// let result = reader.read_exact(&mut buffer); +// +// messages_recieved = messages_recieved + 1; +// +// if buffer.len() > 0 { +// if server.is_some() { +// server.as_ref().unwrap().write(buffer).unwrap(); +// } else { +// println!("server: {} got {:?}", name, buffer); +// // This is the last server - so when this gets the last message send the main thread +// //the "got it" message +// +// // parse buffer to i32 +// if messages_recieved == total_messages { +// tx.send("got it".to_string()).unwrap(); +// } +// } +// } else { +// println!("server: {} received empty message", name); +// } +// } +// } + +async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: bool, total_messages: i32, tx: Sender) { + println!("Creating proxy listener {}: {} connecting to: {}", name, listen_port.to_string(), server_port.to_string()); + let listener = TcpListener::bind(format!("0.0.0.0:{}", listen_port)).await.unwrap(); + println!("Bound - now waiting for connection..."); + let cli_stream = listener.accept().await.unwrap().0; + let (cli_receiver, cli_sender): (Receiver, Sender) = + network_helpers::plain_connection_tokio::PlainConnection::new(cli_stream).await; + + let mut server = None; + if server_port > 0 { + println!("Proxy {} Connecting to server: {}", name, server_port.to_string()); + let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)).await.unwrap(); + + let (server_receiver, server_sender): (Receiver, Sender) = + PlainConnection::new(server_stream).await; + server = Some(server_sender); + } + + println!("Proxy {} has a client", name); + handle_messages::(name, cli_receiver, server, total_messages, tx).await; + +} + + +async fn hop_server(encrypt: bool, hops: u16, mut tx: Sender, total_messages: i32) -> u16 { + let orig_port : u16 = 19000; + let final_server_port = orig_port + (hops - 1); + let mut listen_port = final_server_port; + let mut server_port: u16 = 0; + + for name in (0..hops).rev() { + let tx_clone = tx.clone(); + let name_clone = name.to_string(); + + //let port_clone = server_port.clone(); + task::spawn(async move { + create_proxy(name_clone, listen_port, server_port, encrypt, total_messages, tx_clone).await; + + }); + + thread::sleep(std::time::Duration::from_secs(1)); + server_port = listen_port; + listen_port = listen_port - 1; + } + return orig_port; +} +// fn create_proxy_old(name: String, listen_port: u16, server_port: u16, encrypt: bool, total_messages: i32, tx: Sender) { +// println!("Creating proxy listener {}: {} connecting to: {}", name, listen_port.to_string(), server_port.to_string()); +// let listener = TcpListener::bind(format!("localhost:{}", listen_port)).await.unwrap(); +// let mut server_stream = None; +// +// if server_port > 0 { +// println!("Proxy {} Connecting to server: {}", name, server_port.to_string()); +// server_stream = TcpStream::connect(format!("localhost:{}", server_port)).await.ok(); +// } +// +// let client_stream = listener.accept().await.unwrap().0; +// println!("Proxy {} has a client", name); +// handle_messages::(name, client_stream, server_stream, total_messages, tx); +// +// } + + +// fn hop_server_old(encrypt: bool, hops: u16, mut tx: Sender, total_messages: i32) -> u16 { +// let orig_port : u16 = 19000; +// let final_server_port = orig_port + (hops - 1); +// let mut listen_port = final_server_port; +// let mut server_port = 0; +// +// for name in (0..hops).rev() { +// let tx_clone = tx.clone(); +// thread::spawn(move || { +// create_proxy(name.to_string(), listen_port, server_port, encrypt, total_messages, tx_clone); +// }); +// thread::sleep(std::time::Duration::from_secs(1)); +// server_port = listen_port; +// listen_port = listen_port - 1; +// } +// return orig_port; +// } From d8c58fcb96fb09afb9f6b96ea3e009957307b653 Mon Sep 17 00:00:00 2001 From: darricksee Date: Thu, 4 May 2023 15:54:57 -0400 Subject: [PATCH 02/10] Scale test works with encryption now --- test/scale/src/main.rs | 138 +++++++++++++---------------------------- 1 file changed, 42 insertions(+), 96 deletions(-) diff --git a/test/scale/src/main.rs b/test/scale/src/main.rs index 7a02da7e4..bbfc4be89 100644 --- a/test/scale/src/main.rs +++ b/test/scale/src/main.rs @@ -1,18 +1,20 @@ -use std::io::Read; use tokio::{ task, net::{TcpListener, TcpStream}}; use std::thread; -use async_channel::{bounded, Receiver, SendError, Sender}; +use async_channel::{bounded, Receiver, Sender}; use clap::{Arg, App}; use std::time::Duration; use binary_sv2::{Deserialize, GetSize, Serialize}; use codec_sv2::{HandshakeRole, Initiator, Responder, - StandardNoiseDecoder, StandardSv2Frame, StandardEitherFrame + StandardSv2Frame, StandardEitherFrame }; + +use codec_sv2::Frame; use network_helpers::plain_connection_tokio::PlainConnection; +use network_helpers::noise_connection_tokio::Connection; use roles_logic_sv2::{ mining_sv2::*, @@ -34,7 +36,6 @@ pub const AUTHORITY_PRIVATE_K: [u8; 32] = [ static HOST: &'static str = "127.0.0.1"; -const CERT_VALIDITY: Duration = Duration::from_secs(3600); #[tokio::main] async fn main() { let matches = App::new("Example program") @@ -62,15 +63,21 @@ async fn main() { } println!("Connecting to localhost:{}", orig_port); // - setup_driver(orig_port, encrypt, rx, total_messages).await; + setup_driver(orig_port, encrypt, rx, total_messages, hops).await; } -async fn setup_driver(server_port: u16, encrypt: bool, rx: Receiver, total_messages: i32) { +async fn setup_driver(server_port: u16, encrypt: bool, rx: Receiver, total_messages: i32, + hops: u16) { let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)).await.unwrap(); + let (_server_receiver, server_sender): (Receiver, Sender); - let (server_receiver, server_sender): (Receiver, Sender) = - PlainConnection::new(server_stream).await; + if encrypt { + let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap(); + (_server_receiver, server_sender) = Connection::new(server_stream, HandshakeRole::Initiator(initiator)).await; + } else { + (_server_receiver, server_sender) = PlainConnection::new(server_stream).await; + } // Create timer to see how long this method takes let start = std::time::Instant::now(); @@ -81,7 +88,8 @@ async fn setup_driver(server_port: u16, encrypt: bool, rx: Receiver, tot let end = std::time::Instant::now(); - println!("client: {} - Took {}s", msg, (end - start).as_secs()); + println!("client: {} - Took {}s hops: {} encryption: {}", msg, (end - start).as_secs(), + hops, encrypt); } @@ -117,15 +125,12 @@ async fn handle_messages + GetSize + Se let mut messages_received = 0; while messages_received <= total_messages { - println!("{} is waiting...", name); - let mut frame: StdFrame = client.recv().await.unwrap().try_into().unwrap(); - println!("{} got msg {}", name, messages_received); + let frame: StdFrame = client.recv().await.unwrap().try_into().unwrap(); let binary: EitherFrame = frame.into(); - if server.is_some() { - server.as_ref().unwrap().send(binary).await; + server.as_ref().unwrap().send(binary).await.unwrap(); } else { messages_received = messages_received + 1; println!("last server: {} got msg {}", name, messages_received); @@ -135,63 +140,38 @@ async fn handle_messages + GetSize + Se tx.send("got all messages".to_string()).await.unwrap(); } -// fn handle_messages_old + GetSize + Send + 'static>(name: String, client: TcpStream, server: Option, total_messages: i32, tx: Sender) { -// let mut reader = std::io::BufReader::new(client); -// -// let responder = Responder::from_authority_kp( -// &AUTHORITY_PUBLIC_K[..], -// &AUTHORITY_PRIVATE_K[..], -// CERT_VALIDITY, -// ).unwrap(); -// -// let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap(); -// let role = HandshakeRole::Initiator(initiator); -// -// let mut decoder = StandardNoiseDecoder::::new(); -// -// let mut messages_recieved = 0; -// -// loop { -// let mut buffer = decoder.writable(); -// -// let result = reader.read_exact(&mut buffer); -// -// messages_recieved = messages_recieved + 1; -// -// if buffer.len() > 0 { -// if server.is_some() { -// server.as_ref().unwrap().write(buffer).unwrap(); -// } else { -// println!("server: {} got {:?}", name, buffer); -// // This is the last server - so when this gets the last message send the main thread -// //the "got it" message -// -// // parse buffer to i32 -// if messages_recieved == total_messages { -// tx.send("got it".to_string()).unwrap(); -// } -// } -// } else { -// println!("server: {} received empty message", name); -// } -// } -// } - async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: bool, total_messages: i32, tx: Sender) { println!("Creating proxy listener {}: {} connecting to: {}", name, listen_port.to_string(), server_port.to_string()); let listener = TcpListener::bind(format!("0.0.0.0:{}", listen_port)).await.unwrap(); println!("Bound - now waiting for connection..."); let cli_stream = listener.accept().await.unwrap().0; - let (cli_receiver, cli_sender): (Receiver, Sender) = - network_helpers::plain_connection_tokio::PlainConnection::new(cli_stream).await; + let (cli_receiver, _cli_sender): (Receiver, Sender); + + if encrypt { + let responder = Responder::from_authority_kp( + &AUTHORITY_PUBLIC_K[..], + &AUTHORITY_PRIVATE_K[..], + Duration::from_secs(3600), + ) + .unwrap(); + (cli_receiver, _cli_sender) = Connection::new(cli_stream, HandshakeRole::Responder(responder)).await; + } else { + (cli_receiver, _cli_sender) = PlainConnection::new(cli_stream).await; + } let mut server = None; if server_port > 0 { println!("Proxy {} Connecting to server: {}", name, server_port.to_string()); let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)).await.unwrap(); + let (_server_receiver, server_sender): (Receiver, Sender); - let (server_receiver, server_sender): (Receiver, Sender) = - PlainConnection::new(server_stream).await; + if encrypt { + let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap(); + (_server_receiver, server_sender) = Connection::new(server_stream, + HandshakeRole::Initiator(initiator)).await; + } else { + (_server_receiver, server_sender) = PlainConnection::new(server_stream).await; + } server = Some(server_sender); } @@ -201,7 +181,7 @@ async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: } -async fn hop_server(encrypt: bool, hops: u16, mut tx: Sender, total_messages: i32) -> u16 { +async fn hop_server(encrypt: bool, hops: u16, tx: Sender, total_messages: i32) -> u16 { let orig_port : u16 = 19000; let final_server_port = orig_port + (hops - 1); let mut listen_port = final_server_port; @@ -222,38 +202,4 @@ async fn hop_server(encrypt: bool, hops: u16, mut tx: Sender, total_mess listen_port = listen_port - 1; } return orig_port; -} -// fn create_proxy_old(name: String, listen_port: u16, server_port: u16, encrypt: bool, total_messages: i32, tx: Sender) { -// println!("Creating proxy listener {}: {} connecting to: {}", name, listen_port.to_string(), server_port.to_string()); -// let listener = TcpListener::bind(format!("localhost:{}", listen_port)).await.unwrap(); -// let mut server_stream = None; -// -// if server_port > 0 { -// println!("Proxy {} Connecting to server: {}", name, server_port.to_string()); -// server_stream = TcpStream::connect(format!("localhost:{}", server_port)).await.ok(); -// } -// -// let client_stream = listener.accept().await.unwrap().0; -// println!("Proxy {} has a client", name); -// handle_messages::(name, client_stream, server_stream, total_messages, tx); -// -// } - - -// fn hop_server_old(encrypt: bool, hops: u16, mut tx: Sender, total_messages: i32) -> u16 { -// let orig_port : u16 = 19000; -// let final_server_port = orig_port + (hops - 1); -// let mut listen_port = final_server_port; -// let mut server_port = 0; -// -// for name in (0..hops).rev() { -// let tx_clone = tx.clone(); -// thread::spawn(move || { -// create_proxy(name.to_string(), listen_port, server_port, encrypt, total_messages, tx_clone); -// }); -// thread::sleep(std::time::Duration::from_secs(1)); -// server_port = listen_port; -// listen_port = listen_port - 1; -// } -// return orig_port; -// } +} \ No newline at end of file From e6a87af356a68d03ce4332ea078f7baed9c62ac8 Mon Sep 17 00:00:00 2001 From: darricksee Date: Thu, 4 May 2023 15:56:21 -0400 Subject: [PATCH 03/10] Don't need this unused import --- test/scale/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/scale/src/main.rs b/test/scale/src/main.rs index bbfc4be89..cfca43f88 100644 --- a/test/scale/src/main.rs +++ b/test/scale/src/main.rs @@ -12,7 +12,6 @@ use codec_sv2::{HandshakeRole, Initiator, Responder, StandardSv2Frame, StandardEitherFrame }; -use codec_sv2::Frame; use network_helpers::plain_connection_tokio::PlainConnection; use network_helpers::noise_connection_tokio::Connection; @@ -38,7 +37,7 @@ static HOST: &'static str = "127.0.0.1"; #[tokio::main] async fn main() { - let matches = App::new("Example program") + let matches = App::new("ScaleTest") .arg(Arg::with_name("encrypt") .short("e") .help("Use encryption")) From f4db37114ca5d26cdc4ad9171d5911137344a812 Mon Sep 17 00:00:00 2001 From: darricksee Date: Thu, 4 May 2023 16:05:16 -0400 Subject: [PATCH 04/10] Added a readme --- test/scale/README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 test/scale/README.md diff --git a/test/scale/README.md b/test/scale/README.md new file mode 100644 index 000000000..d0bd62d35 --- /dev/null +++ b/test/scale/README.md @@ -0,0 +1,21 @@ +# Scale Test + +This test simply outputs the time spent sending 1,000,000 SubmitSharesStandard +through the system. When you start the test you specify -h -e (for encryption). +The test spawns "proxies" which simply decrypt/encrypt each +SubmitSharesStandard message coming in (if encryption is on). Then it sends +1,000,000 share messages to the first proxy and then times the whole system to see +how long it takes for the last proxy to receive all 1M messages. It uses the same +network_helpers that the pool, and proxies use so it should be a good approximation +of the work they do. + +The test is run with the following command: + +```cargo run -- -h 4 -e``` +This runs the test with 4 hops and encryption on. + +```cargo run -- -h 4``` +This runs the test with 4 hops and encryption off. + + + From b323e3e3bd4eb2fb14ed6043dc986e37b05afdc1 Mon Sep 17 00:00:00 2001 From: darricksee Date: Thu, 4 May 2023 16:10:05 -0400 Subject: [PATCH 05/10] specified ports --- test/scale/README.md | 2 +- test/scale/src/main.rs | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/test/scale/README.md b/test/scale/README.md index d0bd62d35..4e5b0cd4f 100644 --- a/test/scale/README.md +++ b/test/scale/README.md @@ -2,7 +2,7 @@ This test simply outputs the time spent sending 1,000,000 SubmitSharesStandard through the system. When you start the test you specify -h -e (for encryption). -The test spawns "proxies" which simply decrypt/encrypt each +The test spawns "proxies" (ports 19000->19000+) which simply decrypt/encrypt each SubmitSharesStandard message coming in (if encryption is on). Then it sends 1,000,000 share messages to the first proxy and then times the whole system to see how long it takes for the last proxy to receive all 1M messages. It uses the same diff --git a/test/scale/src/main.rs b/test/scale/src/main.rs index cfca43f88..d91087cd9 100644 --- a/test/scale/src/main.rs +++ b/test/scale/src/main.rs @@ -56,12 +56,11 @@ async fn main() { let (tx, rx) = bounded(1); if hops > 0 { - orig_port = hop_server(encrypt, hops, tx, total_messages).await; + orig_port = spawn_proxies(encrypt, hops, tx, total_messages).await; } else { println!("Usage: ./program -h -e"); } println!("Connecting to localhost:{}", orig_port); - // setup_driver(orig_port, encrypt, rx, total_messages, hops).await; } @@ -180,7 +179,7 @@ async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: } -async fn hop_server(encrypt: bool, hops: u16, tx: Sender, total_messages: i32) -> u16 { +async fn spawn_proxies(encrypt: bool, hops: u16, tx: Sender, total_messages: i32) -> u16 { let orig_port : u16 = 19000; let final_server_port = orig_port + (hops - 1); let mut listen_port = final_server_port; @@ -190,10 +189,8 @@ async fn hop_server(encrypt: bool, hops: u16, tx: Sender, total_messages let tx_clone = tx.clone(); let name_clone = name.to_string(); - //let port_clone = server_port.clone(); task::spawn(async move { create_proxy(name_clone, listen_port, server_port, encrypt, total_messages, tx_clone).await; - }); thread::sleep(std::time::Duration::from_secs(1)); From cabb9d4b3ee63a439d7d672492d7696a541d4d42 Mon Sep 17 00:00:00 2001 From: darricksee Date: Fri, 5 May 2023 11:23:13 -0400 Subject: [PATCH 06/10] Added release config and removed lint issues --- test/scale/Cargo.toml | 3 +++ test/scale/src/main.rs | 14 +++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/test/scale/Cargo.toml b/test/scale/Cargo.toml index fbefc70e1..b54229e8a 100644 --- a/test/scale/Cargo.toml +++ b/test/scale/Cargo.toml @@ -3,6 +3,9 @@ name = "scale" version = "0.1.0" edition = "2021" +[profile.release] +lto = true + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] diff --git a/test/scale/src/main.rs b/test/scale/src/main.rs index d91087cd9..c296fa8eb 100644 --- a/test/scale/src/main.rs +++ b/test/scale/src/main.rs @@ -33,7 +33,7 @@ pub const AUTHORITY_PRIVATE_K: [u8; 32] = [ 5, 173, 0, 234, 59, 15, 127, 31, 160, 136, 131, ]; -static HOST: &'static str = "127.0.0.1"; +static HOST: &str = "127.0.0.1"; #[tokio::main] async fn main() { @@ -112,7 +112,7 @@ async fn send_messages(stream: Sender, total_messages: i32) { let binary: EitherFrame = frame.into(); stream.send(binary).await.unwrap(); - number = number + 1; + number += 1; } } @@ -130,7 +130,7 @@ async fn handle_messages + GetSize + Se if server.is_some() { server.as_ref().unwrap().send(binary).await.unwrap(); } else { - messages_received = messages_received + 1; + messages_received += 1; println!("last server: {} got msg {}", name, messages_received); } @@ -139,7 +139,7 @@ async fn handle_messages + GetSize + Se } async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: bool, total_messages: i32, tx: Sender) { - println!("Creating proxy listener {}: {} connecting to: {}", name, listen_port.to_string(), server_port.to_string()); + println!("Creating proxy listener {}: {} connecting to: {}", name, listen_port, server_port); let listener = TcpListener::bind(format!("0.0.0.0:{}", listen_port)).await.unwrap(); println!("Bound - now waiting for connection..."); let cli_stream = listener.accept().await.unwrap().0; @@ -159,7 +159,7 @@ async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: let mut server = None; if server_port > 0 { - println!("Proxy {} Connecting to server: {}", name, server_port.to_string()); + println!("Proxy {} Connecting to server: {}", name, server_port); let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)).await.unwrap(); let (_server_receiver, server_sender): (Receiver, Sender); @@ -195,7 +195,7 @@ async fn spawn_proxies(encrypt: bool, hops: u16, tx: Sender, total_messa thread::sleep(std::time::Duration::from_secs(1)); server_port = listen_port; - listen_port = listen_port - 1; + listen_port -= 1; } - return orig_port; + orig_port } \ No newline at end of file From 95e46780610186738bd3dc92e9fd67bc0646ba1a Mon Sep 17 00:00:00 2001 From: darricksee Date: Fri, 5 May 2023 11:25:47 -0400 Subject: [PATCH 07/10] Fmt --- test/scale/src/main.rs | 113 ++++++++++++++++++++++++++--------------- 1 file changed, 73 insertions(+), 40 deletions(-) diff --git a/test/scale/src/main.rs b/test/scale/src/main.rs index c296fa8eb..09fb3e426 100644 --- a/test/scale/src/main.rs +++ b/test/scale/src/main.rs @@ -1,19 +1,18 @@ +use std::thread; use tokio::{ + net::{TcpListener, TcpStream}, task, - net::{TcpListener, TcpStream}}; -use std::thread; +}; use async_channel::{bounded, Receiver, Sender}; -use clap::{Arg, App}; -use std::time::Duration; use binary_sv2::{Deserialize, GetSize, Serialize}; -use codec_sv2::{HandshakeRole, Initiator, Responder, - StandardSv2Frame, StandardEitherFrame -}; +use clap::{App, Arg}; +use codec_sv2::{HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardSv2Frame}; +use std::time::Duration; -use network_helpers::plain_connection_tokio::PlainConnection; use network_helpers::noise_connection_tokio::Connection; +use network_helpers::plain_connection_tokio::PlainConnection; use roles_logic_sv2::{ mining_sv2::*, @@ -22,7 +21,6 @@ use roles_logic_sv2::{ pub type EitherFrame = StandardEitherFrame; - pub const AUTHORITY_PUBLIC_K: [u8; 32] = [ 215, 11, 47, 78, 34, 232, 25, 192, 195, 168, 170, 209, 95, 181, 40, 114, 154, 226, 176, 190, 90, 169, 238, 89, 191, 183, 97, 63, 194, 119, 11, 31, @@ -38,13 +36,13 @@ static HOST: &str = "127.0.0.1"; #[tokio::main] async fn main() { let matches = App::new("ScaleTest") - .arg(Arg::with_name("encrypt") - .short("e") - .help("Use encryption")) - .arg(Arg::with_name("hops") - .short("h") - .takes_value(true) - .help("Number of hops")) + .arg(Arg::with_name("encrypt").short("e").help("Use encryption")) + .arg( + Arg::with_name("hops") + .short("h") + .takes_value(true) + .help("Number of hops"), + ) .get_matches(); let total_messages = 1_000_000; @@ -64,15 +62,23 @@ async fn main() { setup_driver(orig_port, encrypt, rx, total_messages, hops).await; } -async fn setup_driver(server_port: u16, encrypt: bool, rx: Receiver, total_messages: i32, - hops: u16) { - let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)).await.unwrap(); +async fn setup_driver( + server_port: u16, + encrypt: bool, + rx: Receiver, + total_messages: i32, + hops: u16, +) { + let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)) + .await + .unwrap(); let (_server_receiver, server_sender): (Receiver, Sender); if encrypt { let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap(); - (_server_receiver, server_sender) = Connection::new(server_stream, HandshakeRole::Initiator(initiator)).await; + (_server_receiver, server_sender) = + Connection::new(server_stream, HandshakeRole::Initiator(initiator)).await; } else { (_server_receiver, server_sender) = PlainConnection::new(server_stream).await; } @@ -86,9 +92,13 @@ async fn setup_driver(server_port: u16, encrypt: bool, rx: Receiver, tot let end = std::time::Instant::now(); - println!("client: {} - Took {}s hops: {} encryption: {}", msg, (end - start).as_secs(), - hops, encrypt); - + println!( + "client: {} - Took {}s hops: {} encryption: {}", + msg, + (end - start).as_secs(), + hops, + encrypt + ); } pub type Message = MiningDeviceMessages<'static>; @@ -117,9 +127,12 @@ async fn send_messages(stream: Sender, total_messages: i32) { } async fn handle_messages + GetSize + Send + 'static>( - name: String, client: Receiver, server: Option>, - total_messages: i32, tx: Sender) { - + name: String, + client: Receiver, + server: Option>, + total_messages: i32, + tx: Sender, +) { let mut messages_received = 0; while messages_received <= total_messages { @@ -133,14 +146,25 @@ async fn handle_messages + GetSize + Se messages_received += 1; println!("last server: {} got msg {}", name, messages_received); } - } tx.send("got all messages".to_string()).await.unwrap(); } -async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: bool, total_messages: i32, tx: Sender) { - println!("Creating proxy listener {}: {} connecting to: {}", name, listen_port, server_port); - let listener = TcpListener::bind(format!("0.0.0.0:{}", listen_port)).await.unwrap(); +async fn create_proxy( + name: String, + listen_port: u16, + server_port: u16, + encrypt: bool, + total_messages: i32, + tx: Sender, +) { + println!( + "Creating proxy listener {}: {} connecting to: {}", + name, listen_port, server_port + ); + let listener = TcpListener::bind(format!("0.0.0.0:{}", listen_port)) + .await + .unwrap(); println!("Bound - now waiting for connection..."); let cli_stream = listener.accept().await.unwrap().0; let (cli_receiver, _cli_sender): (Receiver, Sender); @@ -151,8 +175,9 @@ async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: &AUTHORITY_PRIVATE_K[..], Duration::from_secs(3600), ) - .unwrap(); - (cli_receiver, _cli_sender) = Connection::new(cli_stream, HandshakeRole::Responder(responder)).await; + .unwrap(); + (cli_receiver, _cli_sender) = + Connection::new(cli_stream, HandshakeRole::Responder(responder)).await; } else { (cli_receiver, _cli_sender) = PlainConnection::new(cli_stream).await; } @@ -160,13 +185,15 @@ async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: let mut server = None; if server_port > 0 { println!("Proxy {} Connecting to server: {}", name, server_port); - let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)).await.unwrap(); + let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)) + .await + .unwrap(); let (_server_receiver, server_sender): (Receiver, Sender); if encrypt { let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap(); - (_server_receiver, server_sender) = Connection::new(server_stream, - HandshakeRole::Initiator(initiator)).await; + (_server_receiver, server_sender) = + Connection::new(server_stream, HandshakeRole::Initiator(initiator)).await; } else { (_server_receiver, server_sender) = PlainConnection::new(server_stream).await; } @@ -175,12 +202,10 @@ async fn create_proxy(name: String, listen_port: u16, server_port: u16, encrypt: println!("Proxy {} has a client", name); handle_messages::(name, cli_receiver, server, total_messages, tx).await; - } - async fn spawn_proxies(encrypt: bool, hops: u16, tx: Sender, total_messages: i32) -> u16 { - let orig_port : u16 = 19000; + let orig_port: u16 = 19000; let final_server_port = orig_port + (hops - 1); let mut listen_port = final_server_port; let mut server_port: u16 = 0; @@ -190,7 +215,15 @@ async fn spawn_proxies(encrypt: bool, hops: u16, tx: Sender, total_messa let name_clone = name.to_string(); task::spawn(async move { - create_proxy(name_clone, listen_port, server_port, encrypt, total_messages, tx_clone).await; + create_proxy( + name_clone, + listen_port, + server_port, + encrypt, + total_messages, + tx_clone, + ) + .await; }); thread::sleep(std::time::Duration::from_secs(1)); @@ -198,4 +231,4 @@ async fn spawn_proxies(encrypt: bool, hops: u16, tx: Sender, total_messa listen_port -= 1; } orig_port -} \ No newline at end of file +} From 8c636c107ac3e992b697657308fe9e692c45b64d Mon Sep 17 00:00:00 2001 From: darricksee Date: Fri, 5 May 2023 11:29:22 -0400 Subject: [PATCH 08/10] clippy --- test/scale/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/scale/src/main.rs b/test/scale/src/main.rs index 09fb3e426..4120b21f8 100644 --- a/test/scale/src/main.rs +++ b/test/scale/src/main.rs @@ -11,8 +11,7 @@ use clap::{App, Arg}; use codec_sv2::{HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardSv2Frame}; use std::time::Duration; -use network_helpers::noise_connection_tokio::Connection; -use network_helpers::plain_connection_tokio::PlainConnection; +use network_helpers::{noise_connection_tokio::Connection, plain_connection_tokio::PlainConnection}; use roles_logic_sv2::{ mining_sv2::*, From 52fa41b49368fae8fabf8b97cb678bfb0e82eb16 Mon Sep 17 00:00:00 2001 From: darricksee Date: Fri, 5 May 2023 11:31:39 -0400 Subject: [PATCH 09/10] yet more fmt --- test/scale/src/main.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/scale/src/main.rs b/test/scale/src/main.rs index 4120b21f8..0ce8192ae 100644 --- a/test/scale/src/main.rs +++ b/test/scale/src/main.rs @@ -11,7 +11,9 @@ use clap::{App, Arg}; use codec_sv2::{HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardSv2Frame}; use std::time::Duration; -use network_helpers::{noise_connection_tokio::Connection, plain_connection_tokio::PlainConnection}; +use network_helpers::{ + noise_connection_tokio::Connection, plain_connection_tokio::PlainConnection, +}; use roles_logic_sv2::{ mining_sv2::*, From 916450e812d019a6a351c7aaa01fefb1335910cc Mon Sep 17 00:00:00 2001 From: darricksee Date: Tue, 9 May 2023 09:54:19 -0400 Subject: [PATCH 10/10] Added note about `--release` flag --- test/scale/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/scale/README.md b/test/scale/README.md index 4e5b0cd4f..36404b494 100644 --- a/test/scale/README.md +++ b/test/scale/README.md @@ -10,11 +10,12 @@ network_helpers that the pool, and proxies use so it should be a good approximat of the work they do. The test is run with the following command: +NOTE: running without `--release` dramatically slows down the test. -```cargo run -- -h 4 -e``` +```cargo run --release -- -h 4 -e``` This runs the test with 4 hops and encryption on. -```cargo run -- -h 4``` +```cargo run --release -- -h 4``` This runs the test with 4 hops and encryption off.