diff --git a/ingest/src/server_iot.rs b/ingest/src/server_iot.rs index 4145944ce..df8040d71 100644 --- a/ingest/src/server_iot.rs +++ b/ingest/src/server_iot.rs @@ -344,8 +344,6 @@ impl poc_lora::PocLora for GrpcServer { } pub async fn grpc_server(settings: &Settings) -> Result<()> { - let grpc_addr = settings.listen_addr()?; - // Initialize uploader let (file_upload, file_upload_server) = file_upload::FileUpload::from_settings_tm(&settings.output).await?; @@ -374,6 +372,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .create() .await?; + let grpc_addr = settings.listen; let grpc_server = GrpcServer { beacon_report_sink, witness_report_sink, diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index a0116eb47..34bfc40f3 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -342,8 +342,6 @@ impl poc_mobile::PocMobile for GrpcServer { } pub async fn grpc_server(settings: &Settings) -> Result<()> { - let grpc_addr = settings.listen_addr()?; - // Initialize uploader let (file_upload, file_upload_server) = file_upload::FileUpload::from_settings_tm(&settings.output).await?; @@ -451,6 +449,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { bail!("expected valid api token in settings"); }; + let grpc_addr = settings.listen; let grpc_server = GrpcServer { heartbeat_report_sink, wifi_heartbeat_report_sink, diff --git a/ingest/src/settings.rs b/ingest/src/settings.rs index 6dc041a4b..266e0e72a 100644 --- a/ingest/src/settings.rs +++ b/ingest/src/settings.rs @@ -1,11 +1,7 @@ use config::{Config, Environment, File}; use helium_crypto::Network; use serde::Deserialize; -use std::{ - net::{AddrParseError, SocketAddr}, - path::Path, - str::FromStr, -}; +use std::{net::SocketAddr, path::Path}; #[derive(Debug, Deserialize)] pub struct Settings { @@ -20,7 +16,7 @@ pub struct Settings { pub mode: Mode, /// Listen address. Required. Default is 0.0.0.0:9081 #[serde(default = "default_listen_addr")] - pub listen: String, + pub listen: SocketAddr, /// Local folder for storing intermediate files pub cache: String, /// Network required in all public keys: mainnet | testnet @@ -49,8 +45,8 @@ pub fn default_session_key_offer_timeout() -> u64 { 5 } -pub fn default_listen_addr() -> String { - "0.0.0.0:9081".to_string() +pub fn default_listen_addr() -> SocketAddr { + "0.0.0.0:9081".parse().unwrap() } pub fn default_log() -> String { @@ -97,10 +93,6 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn listen_addr(&self) -> Result { - SocketAddr::from_str(&self.listen) - } - pub fn session_key_offer_timeout(&self) -> std::time::Duration { std::time::Duration::from_secs(self.session_key_offer_timeout) } diff --git a/ingest/tests/iot_ingest.rs b/ingest/tests/iot_ingest.rs index 5d8d700a5..94d19cf87 100644 --- a/ingest/tests/iot_ingest.rs +++ b/ingest/tests/iot_ingest.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, str::FromStr}; +use std::net::{SocketAddr, TcpListener}; use backon::{ExponentialBuilder, Retryable}; use file_store::file_sink::{FileSinkClient, Message as SinkMessage}; @@ -12,7 +12,7 @@ use helium_proto::services::poc_lora::{ }; use ingest::server_iot::GrpcServer; use prost::Message; -use rand::{rngs::OsRng, Rng}; +use rand::rngs::OsRng; use task_manager::TaskManager; use tokio::{sync::mpsc::error::TryRecvError, task::LocalSet, time::timeout}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -22,12 +22,12 @@ use tonic::{transport::Channel, Streaming}; async fn initialize_session_and_send_beacon_and_witness() { let (beacon_client, mut beacons) = create_file_sink(); let (witness_client, mut witnesses) = create_file_sink(); - let port = get_port(); + let addr = get_socket_addr().expect("socket addr"); LocalSet::new() .run_until(async move { tokio::task::spawn_local(async move { - let server = create_test_server(port, beacon_client, witness_client, None, None); + let server = create_test_server(addr, beacon_client, witness_client, None, None); TaskManager::builder() .add_task(server) .build() @@ -38,7 +38,7 @@ async fn initialize_session_and_send_beacon_and_witness() { let pub_key = generate_keypair(); let session_key = generate_keypair(); - let mut client = connect_and_stream(port).await; + let mut client = connect_and_stream(addr).await; let offer = client.receive_offer().await; client @@ -75,12 +75,12 @@ async fn initialize_session_and_send_beacon_and_witness() { async fn stream_stops_after_incorrectly_signed_init_request() { let (beacon_client, _) = create_file_sink(); let (witness_client, _) = create_file_sink(); - let port = get_port(); + let addr = get_socket_addr().expect("socket addr"); LocalSet::new() .run_until(async move { tokio::task::spawn_local(async move { - let server = create_test_server(port, beacon_client, witness_client, None, None); + let server = create_test_server(addr, beacon_client, witness_client, None, None); TaskManager::builder() .add_task(server) .build() @@ -91,7 +91,7 @@ async fn stream_stops_after_incorrectly_signed_init_request() { let pub_key = generate_keypair(); let session_key = generate_keypair(); - let mut client = connect_and_stream(port).await; + let mut client = connect_and_stream(addr).await; let offer = client.receive_offer().await; client @@ -113,12 +113,12 @@ async fn stream_stops_after_incorrectly_signed_init_request() { async fn stream_stops_after_incorrectly_signed_beacon() { let (beacon_client, beacons) = create_file_sink(); let (witness_client, _) = create_file_sink(); - let port = get_port(); + let addr = get_socket_addr().expect("socket addr"); LocalSet::new() .run_until(async move { tokio::task::spawn_local(async move { - let server = create_test_server(port, beacon_client, witness_client, None, None); + let server = create_test_server(addr, beacon_client, witness_client, None, None); TaskManager::builder() .add_task(server) .build() @@ -129,7 +129,7 @@ async fn stream_stops_after_incorrectly_signed_beacon() { let pub_key = generate_keypair(); let session_key = generate_keypair(); - let mut client = connect_and_stream(port).await; + let mut client = connect_and_stream(addr).await; let offer = client.receive_offer().await; client @@ -154,12 +154,12 @@ async fn stream_stops_after_incorrectly_signed_beacon() { async fn stream_stops_after_incorrect_beacon_pubkey() { let (beacon_client, beacons) = create_file_sink(); let (witness_client, _) = create_file_sink(); - let port = get_port(); + let addr = get_socket_addr().expect("socket addr"); LocalSet::new() .run_until(async move { tokio::task::spawn_local(async move { - let server = create_test_server(port, beacon_client, witness_client, None, None); + let server = create_test_server(addr, beacon_client, witness_client, None, None); TaskManager::builder() .add_task(server) .build() @@ -170,7 +170,7 @@ async fn stream_stops_after_incorrect_beacon_pubkey() { let pub_key = generate_keypair(); let session_key = generate_keypair(); - let mut client = connect_and_stream(port).await; + let mut client = connect_and_stream(addr).await; let offer = client.receive_offer().await; client @@ -198,12 +198,12 @@ async fn stream_stops_after_incorrect_beacon_pubkey() { async fn stream_stops_after_incorrectly_signed_witness() { let (beacon_client, _) = create_file_sink(); let (witness_client, witnesses) = create_file_sink(); - let port = get_port(); + let addr = get_socket_addr().expect("socket addr"); LocalSet::new() .run_until(async move { tokio::task::spawn_local(async move { - let server = create_test_server(port, beacon_client, witness_client, None, None); + let server = create_test_server(addr, beacon_client, witness_client, None, None); TaskManager::builder() .add_task(server) .build() @@ -214,7 +214,7 @@ async fn stream_stops_after_incorrectly_signed_witness() { let pub_key = generate_keypair(); let session_key = generate_keypair(); - let mut client = connect_and_stream(port).await; + let mut client = connect_and_stream(addr).await; let offer = client.receive_offer().await; client @@ -239,12 +239,12 @@ async fn stream_stops_after_incorrectly_signed_witness() { async fn stream_stops_after_incorrect_witness_pubkey() { let (beacon_client, _) = create_file_sink(); let (witness_client, witnesses) = create_file_sink(); - let port = get_port(); + let addr = get_socket_addr().expect("socket addr"); LocalSet::new() .run_until(async move { tokio::task::spawn_local(async move { - let server = create_test_server(port, beacon_client, witness_client, None, None); + let server = create_test_server(addr, beacon_client, witness_client, None, None); TaskManager::builder() .add_task(server) .build() @@ -255,7 +255,7 @@ async fn stream_stops_after_incorrect_witness_pubkey() { let pub_key = generate_keypair(); let session_key = generate_keypair(); - let mut client = connect_and_stream(port).await; + let mut client = connect_and_stream(addr).await; let offer = client.receive_offer().await; client @@ -283,12 +283,12 @@ async fn stream_stops_after_incorrect_witness_pubkey() { async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() { let (beacon_client, mut beacons) = create_file_sink(); let (witness_client, _) = create_file_sink(); - let port = get_port(); + let addr = get_socket_addr().expect("socket addr"); LocalSet::new() .run_until(async move { tokio::task::spawn_local(async move { - let server = create_test_server(port, beacon_client, witness_client, None, None); + let server = create_test_server(addr, beacon_client, witness_client, None, None); TaskManager::builder() .add_task(server) .build() @@ -299,7 +299,7 @@ async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() { let pub_key = generate_keypair(); let session_key = generate_keypair(); - let mut client = connect_and_stream(port).await; + let mut client = connect_and_stream(addr).await; let offer = client.receive_offer().await; client @@ -337,13 +337,13 @@ async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() { async fn stream_stops_if_init_not_sent_within_timeout() { let (beacon_client, _) = create_file_sink(); let (witness_client, _) = create_file_sink(); - let port = get_port(); + let addr = get_socket_addr().expect("socket addr"); LocalSet::new() .run_until(async move { tokio::task::spawn_local(async move { let server = - create_test_server(port, beacon_client, witness_client, Some(500), None); + create_test_server(addr, beacon_client, witness_client, Some(500), None); TaskManager::builder() .add_task(server) .build() @@ -351,7 +351,7 @@ async fn stream_stops_if_init_not_sent_within_timeout() { .await }); - let mut client = connect_and_stream(port).await; + let mut client = connect_and_stream(addr).await; let _offer = client.receive_offer().await; client.assert_closed().await; @@ -363,13 +363,13 @@ async fn stream_stops_if_init_not_sent_within_timeout() { async fn stream_stops_on_session_timeout() { let (beacon_client, mut beacons) = create_file_sink(); let (witness_client, _) = create_file_sink(); - let port = get_port(); + let addr = get_socket_addr().expect("socket addr"); LocalSet::new() .run_until(async move { tokio::task::spawn_local(async move { let server = - create_test_server(port, beacon_client, witness_client, Some(500), Some(900)); + create_test_server(addr, beacon_client, witness_client, Some(500), Some(900)); TaskManager::builder() .add_task(server) .build() @@ -377,7 +377,7 @@ async fn stream_stops_on_session_timeout() { .await }); - let mut client = connect_and_stream(port).await; + let mut client = connect_and_stream(addr).await; let offer = client.receive_offer().await; let pub_key = generate_keypair(); @@ -449,8 +449,8 @@ fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { ) } -async fn connect_and_stream(port: u64) -> TestClient { - let mut client = (|| PocLoraClient::connect(format!("http://127.0.0.1:{port}"))) +async fn connect_and_stream(socket_addr: SocketAddr) -> TestClient { + let mut client = (|| PocLoraClient::connect(format!("http://{socket_addr}"))) .retry(&ExponentialBuilder::default()) .await .expect("client connect"); @@ -572,7 +572,7 @@ impl TestClient { } fn create_test_server( - port: u64, + socket_addr: SocketAddr, beacon_file_sink: FileSinkClient, witness_file_sink: FileSinkClient, offer_timeout: Option, @@ -584,7 +584,7 @@ fn create_test_server( beacon_report_sink: beacon_file_sink, witness_report_sink: witness_file_sink, required_network: Network::MainNet, - address: SocketAddr::from_str(&format!("127.0.0.1:{port}")).expect("socket address"), + address: socket_addr, session_key_offer_timeout: std::time::Duration::from_millis(offer_timeout), session_key_timeout: std::time::Duration::from_millis(timeout), } @@ -598,6 +598,7 @@ fn seconds(s: u64) -> std::time::Duration { std::time::Duration::from_secs(s) } -fn get_port() -> u64 { - rand::thread_rng().gen_range(6000..10000) +fn get_socket_addr() -> anyhow::Result { + let listener = TcpListener::bind("127.0.0.1:0")?; + Ok(listener.local_addr()?) } diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index 94d5a0966..7cb6efdc5 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -68,8 +68,6 @@ impl Daemon { // Create on-chain metadata pool let metadata_pool = settings.metadata.connect("iot-config-metadata").await?; - let listen_addr = settings.listen_addr()?; - let (auth_updater, auth_cache) = AuthCache::new(settings.admin_pubkey()?, &pool).await?; let (region_updater, region_map) = RegionMapReader::new(&pool).await?; let (delegate_key_updater, delegate_key_cache) = org::delegate_keys_cache(&pool).await?; @@ -104,6 +102,7 @@ impl Daemon { region_updater, )?; + let listen_addr = settings.listen; let pubkey = settings .signing_keypair() .map(|keypair| keypair.public_key().to_string())?; diff --git a/iot_config/src/settings.rs b/iot_config/src/settings.rs index 040ce760a..6bea2940c 100644 --- a/iot_config/src/settings.rs +++ b/iot_config/src/settings.rs @@ -1,11 +1,7 @@ use chrono::Duration; use config::{Config, Environment, File}; use serde::Deserialize; -use std::{ - net::{AddrParseError, SocketAddr}, - path::Path, - str::FromStr, -}; +use std::{net::SocketAddr, path::Path, str::FromStr}; #[derive(Debug, Deserialize)] pub struct Settings { @@ -15,7 +11,7 @@ pub struct Settings { pub log: String, /// Listen address. Required. Default is 0.0.0.0:8080 #[serde(default = "default_listen_addr")] - pub listen: String, + pub listen: SocketAddr, /// File from which to load config server signing keypair pub keypair: String, /// B58 encoded public key of the admin keypair @@ -33,8 +29,8 @@ pub fn default_log() -> String { "iot_config=debug".to_string() } -pub fn default_listen_addr() -> String { - "0.0.0.0:8080".to_string() +pub fn default_listen_addr() -> SocketAddr { + "0.0.0.0:8080".parse().unwrap() } pub fn default_deleted_entry_retention() -> u64 { @@ -66,10 +62,6 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn listen_addr(&self) -> Result { - SocketAddr::from_str(&self.listen) - } - pub fn signing_keypair(&self) -> Result> { let data = std::fs::read(&self.keypair).map_err(helium_crypto::Error::from)?; Ok(helium_crypto::Keypair::try_from(&data[..])?) diff --git a/iot_config/tests/route_service.rs b/iot_config/tests/route_service.rs index 314e06292..648587e9f 100644 --- a/iot_config/tests/route_service.rs +++ b/iot_config/tests/route_service.rs @@ -1,4 +1,7 @@ -use std::{net::SocketAddr, str::FromStr, sync::Arc}; +use std::{ + net::{SocketAddr, TcpListener}, + sync::Arc, +}; use backon::{ExponentialBuilder, Retryable}; use chrono::Utc; @@ -13,7 +16,7 @@ use iot_config::{ OrgService, RouteService, }; use prost::Message; -use rand::{rngs::OsRng, Rng}; +use rand::rngs::OsRng; use sqlx::{Pool, Postgres}; use tokio::task::JoinHandle; use tonic::{ @@ -27,7 +30,7 @@ async fn stream_sends_all_data_when_since_is_0(pool: Pool) { let admin_keypair = generate_keypair(); let client_keypair = generate_keypair(); - let port = get_port(); + let socket_addr = get_socket_addr().expect("socket addr"); let auth_cache = create_auth_cache( admin_keypair.public_key().clone(), @@ -36,10 +39,10 @@ async fn stream_sends_all_data_when_since_is_0(pool: Pool) { ) .await; - let _handle = start_server(port, signing_keypair, auth_cache, pool.clone()).await; - let mut client = connect_client(port).await; + let _handle = start_server(socket_addr, signing_keypair, auth_cache, pool.clone()).await; + let mut client = connect_client(socket_addr).await; - let org = create_org(port, &admin_keypair).await; + let org = create_org(socket_addr, &admin_keypair).await; let route = create_route(&mut client, &org.org.unwrap(), &admin_keypair).await; create_euis( @@ -129,7 +132,7 @@ async fn stream_only_sends_data_modified_since(pool: Pool) { let admin_keypair = generate_keypair(); let client_keypair = generate_keypair(); - let port = get_port(); + let socket_addr = get_socket_addr().expect("socket addr"); let auth_cache = create_auth_cache( admin_keypair.public_key().clone(), @@ -138,10 +141,10 @@ async fn stream_only_sends_data_modified_since(pool: Pool) { ) .await; - let _handle = start_server(port, signing_keypair, auth_cache, pool.clone()).await; - let mut client = connect_client(port).await; + let _handle = start_server(socket_addr, signing_keypair, auth_cache, pool.clone()).await; + let mut client = connect_client(socket_addr).await; - let org_res_v1 = create_org(port, &admin_keypair).await; + let org_res_v1 = create_org(socket_addr, &admin_keypair).await; let proto::OrgResV1 { org: Some(org), .. } = org_res_v1 else { panic!("invalid OrgResV1") @@ -236,7 +239,7 @@ async fn stream_updates_with_deactivate_reactivate(pool: Pool) { let admin_keypair = generate_keypair(); let client_keypair = generate_keypair(); - let port = get_port(); + let socket_addr = get_socket_addr().expect("socket addr"); let auth_cache = create_auth_cache( admin_keypair.public_key().clone(), @@ -245,10 +248,9 @@ async fn stream_updates_with_deactivate_reactivate(pool: Pool) { ) .await; - let _handle = start_server(port, signing_keypair, auth_cache, pool.clone()).await; - let mut client = connect_client(port).await; - - let org_res_v1 = create_org(port, &admin_keypair).await; + let _handle = start_server(socket_addr, signing_keypair, auth_cache, pool.clone()).await; + let mut client = connect_client(socket_addr).await; + let org_res_v1 = create_org(socket_addr, &admin_keypair).await; let proto::OrgResV1 { org: Some(org), .. } = org_res_v1 else { panic!("invalid OrgResV1") @@ -444,15 +446,15 @@ fn route_stream_req_v1(signer: &Keypair, since: u64) -> RouteStreamReqV1 { request } -async fn connect_client(port: u64) -> RouteClient { - (|| RouteClient::connect(format!("http://127.0.0.1:{port}"))) +async fn connect_client(socket_addr: SocketAddr) -> RouteClient { + (|| RouteClient::connect(format!("http://{socket_addr}"))) .retry(&ExponentialBuilder::default()) .await .expect("grpc client") } async fn start_server( - port: u64, + socket_addr: SocketAddr, signing_keypair: Arc, auth_cache: AuthCache, pool: Pool, @@ -477,25 +479,22 @@ async fn start_server( transport::Server::builder() .add_service(proto::OrgServer::new(org_service)) .add_service(proto::RouteServer::new(route_service)) - .serve(socket_addr(port).expect("socket addr")) + .serve(socket_addr) .map_err(anyhow::Error::from), ) } -fn socket_addr(port: u64) -> anyhow::Result { - SocketAddr::from_str(&format!("127.0.0.1:{port}")).map_err(anyhow::Error::from) -} - fn generate_keypair() -> Keypair { Keypair::generate(KeyTag::default(), &mut OsRng) } -fn get_port() -> u64 { - rand::thread_rng().gen_range(6000..10000) +fn get_socket_addr() -> anyhow::Result { + let listener = TcpListener::bind("127.0.0.1:0")?; + Ok(listener.local_addr()?) } -async fn create_org(port: u64, admin_keypair: &Keypair) -> proto::OrgResV1 { - let mut client = (|| OrgClient::connect(format!("http://127.0.0.1:{port}"))) +async fn create_org(socket_addr: SocketAddr, admin_keypair: &Keypair) -> proto::OrgResV1 { + let mut client = (|| OrgClient::connect(format!("http://{socket_addr}"))) .retry(&ExponentialBuilder::default()) .await .expect("org client"); diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index b427a4817..286b25a51 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -17,8 +17,7 @@ mod error; pub mod settings; pub fn start_metrics(settings: &Settings) -> Result { - let socket: SocketAddr = settings.endpoint.parse()?; - install(socket) + install(settings.endpoint) } fn install(socket_addr: SocketAddr) -> Result { diff --git a/metrics/src/settings.rs b/metrics/src/settings.rs index 9a0841286..55cfb2e19 100644 --- a/metrics/src/settings.rs +++ b/metrics/src/settings.rs @@ -1,12 +1,14 @@ +use std::net::SocketAddr; + use serde::Deserialize; #[derive(Debug, Deserialize, Clone)] pub struct Settings { /// Scrape endpoint for metrics #[serde(default = "default_metrics_endpoint")] - pub endpoint: String, + pub endpoint: SocketAddr, } -pub fn default_metrics_endpoint() -> String { - "127.0.0.1:19000".to_string() +fn default_metrics_endpoint() -> SocketAddr { + "127.0.0.1:19000".parse().unwrap() } diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index bfaf9a9b5..60562beb1 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -71,8 +71,6 @@ impl Daemon { // Create on-chain metadata pool let metadata_pool = settings.metadata.connect("mobile-config-metadata").await?; - let listen_addr = settings.listen_addr()?; - let (key_cache_updater, key_cache) = KeyCache::from_settings(settings, &pool).await?; let admin_svc = @@ -97,6 +95,7 @@ impl Daemon { settings.signing_keypair()?, ); + let listen_addr = settings.listen; let grpc_server = GrpcServer { listen_addr, admin_svc, diff --git a/mobile_config/src/settings.rs b/mobile_config/src/settings.rs index dfba3af10..1e9bcbedc 100644 --- a/mobile_config/src/settings.rs +++ b/mobile_config/src/settings.rs @@ -1,11 +1,7 @@ use anyhow::Context; use config::{Config, Environment, File}; use serde::Deserialize; -use std::{ - net::{AddrParseError, SocketAddr}, - path::Path, - str::FromStr, -}; +use std::{net::SocketAddr, path::Path, str::FromStr}; #[derive(Debug, Deserialize)] pub struct Settings { @@ -15,7 +11,7 @@ pub struct Settings { pub log: String, /// Listen address. Required. Default to 0.0.0.0::8080 #[serde(default = "default_listen_addr")] - pub listen: String, + pub listen: SocketAddr, /// File from which to load config server signing keypair pub signing_keypair: String, /// B58 encoded public key of the default admin keypair @@ -33,8 +29,8 @@ pub fn default_log() -> String { "mobile_config=debug".to_string() } -pub fn default_listen_addr() -> String { - "0.0.0.0:8080".to_string() +pub fn default_listen_addr() -> SocketAddr { + "0.0.0.0:8080".parse().unwrap() } impl Settings { @@ -61,10 +57,6 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn listen_addr(&self) -> Result { - SocketAddr::from_str(&self.listen) - } - pub fn signing_keypair(&self) -> anyhow::Result { let data = std::fs::read(&self.signing_keypair) .map_err(helium_crypto::Error::from)