forked from stratum-mining/stratum
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request stratum-mining#558 from darricksee/scale-test
Scale test
- Loading branch information
Showing
5 changed files
with
321 additions
and
0 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
[package] | ||
name = "scale" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
[profile.release] | ||
lto = true | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
clap = "2.33.3" | ||
serde = { version = "1.0.89", features = ["derive", "alloc"], default-features = false, optional = true} | ||
async-channel = "1.5.1" | ||
async-std="1.8.0" | ||
bytes = "1.0.1" | ||
binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" } | ||
codec_sv2 = { path = "../../protocols/v2/codec-sv2", features=["noise_sv2"] } | ||
network_helpers = { version = "0.1", path = "../../utils/network-helpers", features = ["with_tokio"] } | ||
roles_logic_sv2 = { path = "../../protocols/v2/roles-logic-sv2" } | ||
tokio = { version = "1", features = ["full"] } | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# Scale Test | ||
|
||
This test simply outputs the time spent sending 1,000,000 SubmitSharesStandard | ||
through the system. When you start the test you specify -h <num of hops> -e (for encryption). | ||
The test spawns <num of hops> "proxies" (ports 19000->19000+<num of hops>) which simply decrypt/encrypt each | ||
SubmitSharesStandard message coming in (if encryption is on). Then it sends | ||
1,000,000 share messages to the first proxy and then times the whole system to see | ||
how long it takes for the last proxy to receive all 1M messages. It uses the same | ||
network_helpers that the pool, and proxies use so it should be a good approximation | ||
of the work they do. | ||
|
||
The test is run with the following command: | ||
NOTE: running without `--release` dramatically slows down the test. | ||
|
||
```cargo run --release -- -h 4 -e``` | ||
This runs the test with 4 hops and encryption on. | ||
|
||
```cargo run --release -- -h 4``` | ||
This runs the test with 4 hops and encryption off. | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,235 @@ | ||
use std::thread; | ||
use tokio::{ | ||
net::{TcpListener, TcpStream}, | ||
task, | ||
}; | ||
|
||
use async_channel::{bounded, Receiver, Sender}; | ||
|
||
use binary_sv2::{Deserialize, GetSize, Serialize}; | ||
use clap::{App, Arg}; | ||
use codec_sv2::{HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardSv2Frame}; | ||
use std::time::Duration; | ||
|
||
use network_helpers::{ | ||
noise_connection_tokio::Connection, plain_connection_tokio::PlainConnection, | ||
}; | ||
|
||
use roles_logic_sv2::{ | ||
mining_sv2::*, | ||
parsers::{Mining, MiningDeviceMessages}, | ||
}; | ||
|
||
pub type EitherFrame = StandardEitherFrame<Message>; | ||
|
||
pub const AUTHORITY_PUBLIC_K: [u8; 32] = [ | ||
215, 11, 47, 78, 34, 232, 25, 192, 195, 168, 170, 209, 95, 181, 40, 114, 154, 226, 176, 190, | ||
90, 169, 238, 89, 191, 183, 97, 63, 194, 119, 11, 31, | ||
]; | ||
|
||
pub const AUTHORITY_PRIVATE_K: [u8; 32] = [ | ||
204, 93, 167, 220, 169, 204, 172, 35, 9, 84, 174, 208, 171, 89, 25, 53, 196, 209, 161, 148, 4, | ||
5, 173, 0, 234, 59, 15, 127, 31, 160, 136, 131, | ||
]; | ||
|
||
static HOST: &str = "127.0.0.1"; | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
let matches = App::new("ScaleTest") | ||
.arg(Arg::with_name("encrypt").short("e").help("Use encryption")) | ||
.arg( | ||
Arg::with_name("hops") | ||
.short("h") | ||
.takes_value(true) | ||
.help("Number of hops"), | ||
) | ||
.get_matches(); | ||
|
||
let total_messages = 1_000_000; | ||
let encrypt = matches.is_present("encrypt"); | ||
let hops: u16 = matches.value_of("hops").unwrap_or("0").parse().unwrap_or(0); | ||
let mut orig_port: u16 = 19000; | ||
|
||
// create channel to tell final server number of messages | ||
let (tx, rx) = bounded(1); | ||
|
||
if hops > 0 { | ||
orig_port = spawn_proxies(encrypt, hops, tx, total_messages).await; | ||
} else { | ||
println!("Usage: ./program -h <hops> -e"); | ||
} | ||
println!("Connecting to localhost:{}", orig_port); | ||
setup_driver(orig_port, encrypt, rx, total_messages, hops).await; | ||
} | ||
|
||
async fn setup_driver( | ||
server_port: u16, | ||
encrypt: bool, | ||
rx: Receiver<String>, | ||
total_messages: i32, | ||
hops: u16, | ||
) { | ||
let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)) | ||
.await | ||
.unwrap(); | ||
let (_server_receiver, server_sender): (Receiver<EitherFrame>, Sender<EitherFrame>); | ||
|
||
if encrypt { | ||
let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap(); | ||
|
||
(_server_receiver, server_sender) = | ||
Connection::new(server_stream, HandshakeRole::Initiator(initiator)).await; | ||
} else { | ||
(_server_receiver, server_sender) = PlainConnection::new(server_stream).await; | ||
} | ||
// Create timer to see how long this method takes | ||
let start = std::time::Instant::now(); | ||
|
||
send_messages(server_sender, total_messages).await; | ||
|
||
//listen for message on rx | ||
let msg = rx.recv().await.unwrap(); | ||
|
||
let end = std::time::Instant::now(); | ||
|
||
println!( | ||
"client: {} - Took {}s hops: {} encryption: {}", | ||
msg, | ||
(end - start).as_secs(), | ||
hops, | ||
encrypt | ||
); | ||
} | ||
|
||
pub type Message = MiningDeviceMessages<'static>; | ||
pub type StdFrame = StandardSv2Frame<Message>; | ||
|
||
async fn send_messages(stream: Sender<EitherFrame>, total_messages: i32) { | ||
let mut number: i32 = 0; | ||
println!("Creating share"); | ||
let share = MiningDeviceMessages::Mining(Mining::SubmitSharesStandard(SubmitSharesStandard { | ||
channel_id: 1, | ||
sequence_number: number as u32, | ||
job_id: 2, | ||
nonce: 3, | ||
ntime: 4, | ||
version: 5, | ||
})); | ||
|
||
while number <= total_messages { | ||
println!("client: sending msg-{}", number); | ||
let frame: StdFrame = share.clone().try_into().unwrap(); | ||
let binary: EitherFrame = frame.into(); | ||
|
||
stream.send(binary).await.unwrap(); | ||
number += 1; | ||
} | ||
} | ||
|
||
async fn handle_messages<Mining: Serialize + Deserialize<'static> + GetSize + Send + 'static>( | ||
name: String, | ||
client: Receiver<EitherFrame>, | ||
server: Option<Sender<EitherFrame>>, | ||
total_messages: i32, | ||
tx: Sender<String>, | ||
) { | ||
let mut messages_received = 0; | ||
|
||
while messages_received <= total_messages { | ||
let frame: StdFrame = client.recv().await.unwrap().try_into().unwrap(); | ||
|
||
let binary: EitherFrame = frame.into(); | ||
|
||
if server.is_some() { | ||
server.as_ref().unwrap().send(binary).await.unwrap(); | ||
} else { | ||
messages_received += 1; | ||
println!("last server: {} got msg {}", name, messages_received); | ||
} | ||
} | ||
tx.send("got all messages".to_string()).await.unwrap(); | ||
} | ||
|
||
async fn create_proxy( | ||
name: String, | ||
listen_port: u16, | ||
server_port: u16, | ||
encrypt: bool, | ||
total_messages: i32, | ||
tx: Sender<String>, | ||
) { | ||
println!( | ||
"Creating proxy listener {}: {} connecting to: {}", | ||
name, listen_port, server_port | ||
); | ||
let listener = TcpListener::bind(format!("0.0.0.0:{}", listen_port)) | ||
.await | ||
.unwrap(); | ||
println!("Bound - now waiting for connection..."); | ||
let cli_stream = listener.accept().await.unwrap().0; | ||
let (cli_receiver, _cli_sender): (Receiver<EitherFrame>, Sender<EitherFrame>); | ||
|
||
if encrypt { | ||
let responder = Responder::from_authority_kp( | ||
&AUTHORITY_PUBLIC_K[..], | ||
&AUTHORITY_PRIVATE_K[..], | ||
Duration::from_secs(3600), | ||
) | ||
.unwrap(); | ||
(cli_receiver, _cli_sender) = | ||
Connection::new(cli_stream, HandshakeRole::Responder(responder)).await; | ||
} else { | ||
(cli_receiver, _cli_sender) = PlainConnection::new(cli_stream).await; | ||
} | ||
|
||
let mut server = None; | ||
if server_port > 0 { | ||
println!("Proxy {} Connecting to server: {}", name, server_port); | ||
let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port)) | ||
.await | ||
.unwrap(); | ||
let (_server_receiver, server_sender): (Receiver<EitherFrame>, Sender<EitherFrame>); | ||
|
||
if encrypt { | ||
let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap(); | ||
(_server_receiver, server_sender) = | ||
Connection::new(server_stream, HandshakeRole::Initiator(initiator)).await; | ||
} else { | ||
(_server_receiver, server_sender) = PlainConnection::new(server_stream).await; | ||
} | ||
server = Some(server_sender); | ||
} | ||
|
||
println!("Proxy {} has a client", name); | ||
handle_messages::<Mining>(name, cli_receiver, server, total_messages, tx).await; | ||
} | ||
|
||
async fn spawn_proxies(encrypt: bool, hops: u16, tx: Sender<String>, total_messages: i32) -> u16 { | ||
let orig_port: u16 = 19000; | ||
let final_server_port = orig_port + (hops - 1); | ||
let mut listen_port = final_server_port; | ||
let mut server_port: u16 = 0; | ||
|
||
for name in (0..hops).rev() { | ||
let tx_clone = tx.clone(); | ||
let name_clone = name.to_string(); | ||
|
||
task::spawn(async move { | ||
create_proxy( | ||
name_clone, | ||
listen_port, | ||
server_port, | ||
encrypt, | ||
total_messages, | ||
tx_clone, | ||
) | ||
.await; | ||
}); | ||
|
||
thread::sleep(std::time::Duration::from_secs(1)); | ||
server_port = listen_port; | ||
listen_port -= 1; | ||
} | ||
orig_port | ||
} |