From 577d554455ba7949880b53d2f9f09d66fba75880 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Wed, 23 Aug 2023 14:58:21 +0100 Subject: [PATCH 1/9] taskman up iot config --- Cargo.lock | 49 ++++++++++++++------ iot_config/Cargo.toml | 3 ++ iot_config/src/gateway_service.rs | 5 --- iot_config/src/main.rs | 75 +++++++++++++++++-------------- iot_config/src/org_service.rs | 5 --- iot_config/src/route_service.rs | 35 +-------------- 6 files changed, 81 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 932ee6aa0..be5e146d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -324,7 +324,7 @@ checksum = "7556be6f2b0d82376c1ece1fda4dffd728816ac53bee2c285f3f74269ddc4a97" dependencies = [ "anyhow", "clap 4.1.11", - "helium-crypto", + "helium-crypto 0.6.8", "md5", ] @@ -2100,7 +2100,7 @@ dependencies = [ "bytes", "chrono", "config", - "helium-crypto", + "helium-crypto 0.8.0", "reqwest", "serde", "serde_json", @@ -2523,7 +2523,7 @@ dependencies = [ "derive_builder", "futures", "futures-util", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "hex-literal", "http", @@ -2938,6 +2938,25 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "helium-crypto" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "289576899272c1b9f6cb1a2d393c5f3c142b62b4343454bd1ada5d0eefd47ce7" +dependencies = [ + "base64 0.21.0", + "bs58 0.4.0", + "ed25519-compact", + "k256", + "lazy_static", + "p256", + "rand_core 0.6.4", + "serde", + "sha2 0.10.6", + "signature", + "thiserror", +] + [[package]] name = "helium-crypto" version = "0.8.0" @@ -3293,7 +3312,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "http", "metrics", @@ -3355,7 +3374,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "hextree", "http", @@ -3369,9 +3388,11 @@ dependencies = [ "serde", "serde_json", "sqlx", + "task-manager", "thiserror", "tokio", "tokio-stream", + "tokio-util", "tonic", "tower-http", "tracing", @@ -3392,7 +3413,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "http", "http-serde", @@ -3431,7 +3452,7 @@ dependencies = [ "futures", "futures-util", "h3o", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "http-serde", "humantime", @@ -3994,7 +4015,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "hextree", "http", @@ -4028,7 +4049,7 @@ dependencies = [ "dialoguer", "futures", "h3o", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "mobile-config", "prost", @@ -4055,7 +4076,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "http", "http-serde", @@ -4089,7 +4110,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "http-serde", "humantime", @@ -4700,7 +4721,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "http", "hyper", @@ -5312,7 +5333,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto", + "helium-crypto 0.8.0", "helium-proto", "http-serde", "lazy_static", @@ -5995,7 +6016,7 @@ dependencies = [ "clap 4.1.11", "data-credits", "futures", - "helium-crypto", + "helium-crypto 0.8.0", "helium-sub-daos", "metrics", "serde", diff --git a/iot_config/Cargo.toml b/iot_config/Cargo.toml index 75871fdf8..c42f05a67 100644 --- a/iot_config/Cargo.toml +++ b/iot_config/Cargo.toml @@ -35,8 +35,11 @@ sqlx = {workspace = true} thiserror = {workspace = true} tokio = {workspace = true} tokio-stream = {workspace = true} +tokio-util = { workspace = true } tonic = {workspace = true} tower-http = {workspace = true} tracing = {workspace = true} tracing-subscriber = {workspace = true} triggered = {workspace = true} +task-manager = { path = "../task_manager" } + diff --git a/iot_config/src/gateway_service.rs b/iot_config/src/gateway_service.rs index 162d01e9f..d4362bd43 100644 --- a/iot_config/src/gateway_service.rs +++ b/iot_config/src/gateway_service.rs @@ -35,7 +35,6 @@ pub struct GatewayService { region_map: RegionMapReader, signing_key: Arc, delegate_cache: watch::Receiver, - shutdown: triggered::Listener, } impl GatewayService { @@ -45,7 +44,6 @@ impl GatewayService { region_map: RegionMapReader, auth_cache: AuthCache, delegate_cache: watch::Receiver, - shutdown: triggered::Listener, ) -> Result { let gateway_cache = Arc::new(Cache::new()); let cache_clone = gateway_cache.clone(); @@ -58,7 +56,6 @@ impl GatewayService { region_map, signing_key: Arc::new(settings.signing_keypair()?), delegate_cache, - shutdown, }) } @@ -278,13 +275,11 @@ impl iot_config::Gateway for GatewayService { let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; let region_map = self.region_map.clone(); - let shutdown_listener = self.shutdown.clone(); let (tx, rx) = tokio::sync::mpsc::channel(20); tokio::spawn(async move { tokio::select! { - _ = shutdown_listener => (), _ = stream_all_gateways_info( &pool, tx.clone(), diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index f3a1b6d68..550773ee6 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -1,5 +1,6 @@ use anyhow::{Error, Result}; use clap::Parser; +use futures::future::LocalBoxFuture; use futures_util::TryFutureExt; use helium_proto::services::iot_config::{AdminServer, GatewayServer, OrgServer, RouteServer}; use iot_config::{ @@ -7,8 +8,8 @@ use iot_config::{ org_service::OrgService, region_map::RegionMapReader, route_service::RouteService, settings::Settings, telemetry, }; -use std::{path::PathBuf, time::Duration}; -use tokio::signal; +use std::{net::SocketAddr, path::PathBuf, time::Duration}; +use task_manager::{ManagedTask, TaskManager}; use tonic::transport; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -60,16 +61,6 @@ impl Daemon { poc_metrics::start_metrics(&settings.metrics)?; telemetry::initialize(); - // Configure shutdown trigger - let (shutdown_trigger, shutdown_listener) = triggered::trigger(); - let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?; - tokio::spawn(async move { - tokio::select! { - _ = sigterm.recv() => shutdown_trigger.trigger(), - _ = signal::ctrl_c() => shutdown_trigger.trigger(), - } - }); - // Create database pool let pool = settings.database.connect("iot-config-store").await?; sqlx::migrate!().run(&pool).await?; @@ -89,21 +80,14 @@ impl Daemon { region_map.clone(), auth_cache.clone(), delegate_key_cache, - shutdown_listener.clone(), - )?; - let route_svc = RouteService::new( - settings, - auth_cache.clone(), - pool.clone(), - shutdown_listener.clone(), )?; + let route_svc = RouteService::new(settings, auth_cache.clone(), pool.clone())?; let org_svc = OrgService::new( settings, auth_cache.clone(), pool.clone(), route_svc.clone_update_channel(), delegate_key_updater, - shutdown_listener.clone(), )?; let admin_svc = AdminService::new( settings, @@ -120,19 +104,44 @@ impl Daemon { tracing::debug!("listening on {listen_addr}"); tracing::debug!("signing as {pubkey}"); - transport::Server::builder() - .http2_keepalive_interval(Some(Duration::from_secs(250))) - .http2_keepalive_timeout(Some(Duration::from_secs(60))) - .layer(tower_http::trace::TraceLayer::new_for_grpc()) - .add_service(GatewayServer::new(gateway_svc)) - .add_service(OrgServer::new(org_svc)) - .add_service(RouteServer::new(route_svc)) - .add_service(AdminServer::new(admin_svc)) - .serve_with_shutdown(listen_addr, shutdown_listener) - .map_err(Error::from) - .await?; - - Ok(()) + let grpc_server = GrpcServer { + listen_addr, + gateway_svc, + route_svc, + org_svc, + admin_svc, + }; + + TaskManager::builder().add_task(grpc_server).start().await + } +} + +pub struct GrpcServer { + listen_addr: SocketAddr, + gateway_svc: GatewayService, + route_svc: RouteService, + org_svc: OrgService, + admin_svc: AdminService, +} + +impl ManagedTask for GrpcServer { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(async move { + transport::Server::builder() + .http2_keepalive_interval(Some(Duration::from_secs(250))) + .http2_keepalive_timeout(Some(Duration::from_secs(60))) + .layer(tower_http::trace::TraceLayer::new_for_grpc()) + .add_service(GatewayServer::new(self.gateway_svc)) + .add_service(OrgServer::new(self.org_svc)) + .add_service(RouteServer::new(self.route_svc)) + .add_service(AdminServer::new(self.admin_svc)) + .serve_with_shutdown(self.listen_addr, shutdown) + .map_err(Error::from) + .await + }) } } diff --git a/iot_config/src/org_service.rs b/iot_config/src/org_service.rs index 19e9561cf..326500dd2 100644 --- a/iot_config/src/org_service.rs +++ b/iot_config/src/org_service.rs @@ -26,7 +26,6 @@ pub struct OrgService { route_update_tx: broadcast::Sender, signing_key: Keypair, delegate_updater: watch::Sender, - shutdown: triggered::Listener, } #[derive(Clone, Debug, PartialEq)] @@ -42,7 +41,6 @@ impl OrgService { pool: Pool, route_update_tx: broadcast::Sender, delegate_updater: watch::Sender, - shutdown: triggered::Listener, ) -> Result { Ok(Self { auth_cache, @@ -50,7 +48,6 @@ impl OrgService { route_update_tx, signing_key: settings.signing_keypair()?, delegate_updater, - shutdown, }) } @@ -467,7 +464,6 @@ impl iot_config::Org for OrgService { })?; tokio::select! { - _ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")), result = self.stream_org_routes_enable_disable(request.oui) => result? } } @@ -506,7 +502,6 @@ impl iot_config::Org for OrgService { })?; tokio::select! { - _ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")), result = self.stream_org_routes_enable_disable(request.oui) => result? } } diff --git a/iot_config/src/route_service.rs b/iot_config/src/route_service.rs index 99baa6d95..ed16de8ec 100644 --- a/iot_config/src/route_service.rs +++ b/iot_config/src/route_service.rs @@ -37,7 +37,6 @@ pub struct RouteService { auth_cache: AuthCache, pool: Pool, update_channel: broadcast::Sender, - shutdown: triggered::Listener, signing_key: Arc, } @@ -48,17 +47,11 @@ enum OrgId<'a> { } impl RouteService { - pub fn new( - settings: &Settings, - auth_cache: AuthCache, - pool: Pool, - shutdown: triggered::Listener, - ) -> Result { + pub fn new(settings: &Settings, auth_cache: AuthCache, pool: Pool) -> Result { Ok(Self { auth_cache, pool, update_channel: update_channel(), - shutdown, signing_key: Arc::new(settings.signing_keypair()?), }) } @@ -390,7 +383,6 @@ impl iot_config::Route for RouteService { tracing::info!("client subscribed to route stream"); let pool = self.pool.clone(); - let shutdown_listener = self.shutdown.clone(); let (tx, rx) = tokio::sync::mpsc::channel(20); let signing_key = self.signing_key.clone(); @@ -398,7 +390,6 @@ impl iot_config::Route for RouteService { tokio::spawn(async move { tokio::select! { - _ = shutdown_listener.clone() => return, result = stream_existing_routes(&pool, &signing_key, tx.clone()) .and_then(|_| stream_existing_euis(&pool, &signing_key, tx.clone())) .and_then(|_| stream_existing_devaddrs(&pool, &signing_key, tx.clone())) @@ -413,13 +404,7 @@ impl iot_config::Route for RouteService { tracing::info!("existing routes sent; streaming updates as available"); telemetry::route_stream_subscribe(); loop { - let shutdown = shutdown_listener.clone(); - tokio::select! { - _ = shutdown => { - telemetry::route_stream_unsubscribe(); - return - } msg = route_updates.recv() => { match msg { Ok(update) => { @@ -459,7 +444,6 @@ impl iot_config::Route for RouteService { let pool = self.pool.clone(); let (tx, rx) = tokio::sync::mpsc::channel(20); - let shutdown_listener = self.shutdown.clone(); tracing::debug!(route_id = request.route_id, "listing eui pairs"); @@ -484,9 +468,6 @@ impl iot_config::Route for RouteService { }; tokio::select! { - _ = shutdown_listener => { - _ = tx.send(Err(Status::unavailable("service shutting down"))).await; - } _ = async { while let Some(eui) = eui_stream.next().await { let message = match eui { @@ -533,7 +514,6 @@ impl iot_config::Route for RouteService { .await?; tokio::select! { - _ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")), result = incoming_stream .map_ok(|update| match validator.validate_update(&update) { Ok(()) => Ok(update), @@ -620,7 +600,6 @@ impl iot_config::Route for RouteService { let (tx, rx) = tokio::sync::mpsc::channel(20); let pool = self.pool.clone(); - let shutdown_listener = self.shutdown.clone(); tracing::debug!(route_id = request.route_id, "listing devaddr ranges"); @@ -643,9 +622,6 @@ impl iot_config::Route for RouteService { }; tokio::select! { - _ = shutdown_listener => { - _ = tx.send(Err(Status::unavailable("service shutting down"))).await; - } _ = async { while let Some(devaddr) = devaddrs.next().await { let message = match devaddr { @@ -695,7 +671,6 @@ impl iot_config::Route for RouteService { .await?; tokio::select! { - _ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")), result = incoming_stream .map_ok(|update| match validator.validate_update(&update) { Ok(()) => Ok(update), @@ -788,7 +763,6 @@ impl iot_config::Route for RouteService { let pool = self.pool.clone(); let (tx, rx) = tokio::sync::mpsc::channel(20); - let shutdown_listener = self.shutdown.clone(); tracing::debug!( route_id = request.route_id, @@ -816,9 +790,6 @@ impl iot_config::Route for RouteService { }; tokio::select! { - _ = shutdown_listener => { - _ = tx.send(Err(Status::unavailable("service shutting down"))).await; - } _ = async { while let Some(skf) = skf_stream.next().await { let message = match skf { @@ -850,7 +821,6 @@ impl iot_config::Route for RouteService { let pool = self.pool.clone(); let (tx, rx) = tokio::sync::mpsc::channel(20); - let shutdown_listener = self.shutdown.clone(); tracing::debug!( route_id = request.route_id, @@ -882,9 +852,6 @@ impl iot_config::Route for RouteService { }; tokio::select! { - _ = shutdown_listener => { - _ = tx.send(Err(Status::unavailable("service shutting down"))).await; - } _ = async { while let Some(skf) = skf_stream.next().await { let message = match skf { From ccd7e634ee831ef1067faf6025fabad1dcf5b04a Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Wed, 6 Sep 2023 10:04:03 +0100 Subject: [PATCH 2/9] bump animal dep --- Cargo.lock | 51 ++++++++++++++++----------------------------------- 1 file changed, 16 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be5e146d7..c9690a119 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -324,7 +324,7 @@ checksum = "7556be6f2b0d82376c1ece1fda4dffd728816ac53bee2c285f3f74269ddc4a97" dependencies = [ "anyhow", "clap 4.1.11", - "helium-crypto 0.6.8", + "helium-crypto", "md5", ] @@ -2100,7 +2100,7 @@ dependencies = [ "bytes", "chrono", "config", - "helium-crypto 0.8.0", + "helium-crypto", "reqwest", "serde", "serde_json", @@ -2523,7 +2523,7 @@ dependencies = [ "derive_builder", "futures", "futures-util", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "hex-literal", "http", @@ -2938,25 +2938,6 @@ dependencies = [ "unicode-segmentation", ] -[[package]] -name = "helium-crypto" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "289576899272c1b9f6cb1a2d393c5f3c142b62b4343454bd1ada5d0eefd47ce7" -dependencies = [ - "base64 0.21.0", - "bs58 0.4.0", - "ed25519-compact", - "k256", - "lazy_static", - "p256", - "rand_core 0.6.4", - "serde", - "sha2 0.10.6", - "signature", - "thiserror", -] - [[package]] name = "helium-crypto" version = "0.8.0" @@ -2967,7 +2948,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.2.8", + "getrandom 0.1.16", "k256", "lazy_static", "multihash", @@ -3312,7 +3293,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "http", "metrics", @@ -3374,7 +3355,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "hextree", "http", @@ -3413,7 +3394,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "http", "http-serde", @@ -3452,7 +3433,7 @@ dependencies = [ "futures", "futures-util", "h3o", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "http-serde", "humantime", @@ -4015,7 +3996,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "hextree", "http", @@ -4049,7 +4030,7 @@ dependencies = [ "dialoguer", "futures", "h3o", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "mobile-config", "prost", @@ -4076,7 +4057,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "http", "http-serde", @@ -4110,7 +4091,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "http-serde", "humantime", @@ -4714,14 +4695,14 @@ dependencies = [ "anyhow", "base64 0.21.0", "blake3", - "bs58 0.4.0", + "bs58 0.3.1", "chrono", "clap 4.1.11", "config", "file-store", "futures", "futures-util", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "http", "hyper", @@ -5333,7 +5314,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-crypto 0.8.0", + "helium-crypto", "helium-proto", "http-serde", "lazy_static", @@ -6016,7 +5997,7 @@ dependencies = [ "clap 4.1.11", "data-credits", "futures", - "helium-crypto 0.8.0", + "helium-crypto", "helium-sub-daos", "metrics", "serde", From 4f6552451b7f1f016f11c7aaf118de79d6e87c8f Mon Sep 17 00:00:00 2001 From: andymck Date: Wed, 6 Sep 2023 15:56:46 +0100 Subject: [PATCH 3/9] Update iot_config/Cargo.toml Co-authored-by: Jeff Grunewald --- iot_config/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/iot_config/Cargo.toml b/iot_config/Cargo.toml index c42f05a67..973627bbb 100644 --- a/iot_config/Cargo.toml +++ b/iot_config/Cargo.toml @@ -42,4 +42,3 @@ tracing = {workspace = true} tracing-subscriber = {workspace = true} triggered = {workspace = true} task-manager = { path = "../task_manager" } - From ec80d34b0873a8a45fb277efa7f5b64b8cb28bf8 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Thu, 14 Sep 2023 14:48:51 +0100 Subject: [PATCH 4/9] address review comments --- iot_config/src/route_service.rs | 36 +++++++++++---------------------- 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/iot_config/src/route_service.rs b/iot_config/src/route_service.rs index ed16de8ec..6e74e566b 100644 --- a/iot_config/src/route_service.rs +++ b/iot_config/src/route_service.rs @@ -389,35 +389,23 @@ impl iot_config::Route for RouteService { let mut route_updates = self.subscribe_to_routes(); tokio::spawn(async move { - tokio::select! { - result = stream_existing_routes(&pool, &signing_key, tx.clone()) - .and_then(|_| stream_existing_euis(&pool, &signing_key, tx.clone())) - .and_then(|_| stream_existing_devaddrs(&pool, &signing_key, tx.clone())) - .and_then(|_| stream_existing_skfs(&pool, &signing_key, tx.clone())) => { - if let Err(error) = result { - tracing::error!(?error, "Error occurred streaming current routing configuration"); - return; - } - } + if stream_existing_routes(&pool, &signing_key, tx.clone()) + .and_then(|_| stream_existing_euis(&pool, &signing_key, tx.clone())) + .and_then(|_| stream_existing_devaddrs(&pool, &signing_key, tx.clone())) + .and_then(|_| stream_existing_skfs(&pool, &signing_key, tx.clone())) + .await + .is_err() + { + return; } tracing::info!("existing routes sent; streaming updates as available"); telemetry::route_stream_subscribe(); loop { - tokio::select! { - msg = route_updates.recv() => { - match msg { - Ok(update) => { - if tx.send(Ok(update)).await.is_err() { - tracing::info!("Client disconnected; shutting down stream"); - telemetry::route_stream_unsubscribe(); - return; - } - } - Err(error) => { - tracing::error!(?error, "Error occurred processing route stream update"); - } - } + while let Ok(update) = route_updates.recv().await { + if tx.send(Ok(update)).await.is_err() { + telemetry::route_stream_unsubscribe(); + return; } } } From 92a09cdbed73ed65ce2db9d4cff5923ed04d0b74 Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Wed, 4 Oct 2023 11:32:12 -0400 Subject: [PATCH 5/9] handle shutting down streams --- iot_config/src/main.rs | 23 ++- iot_config/src/org_service.rs | 8 +- iot_config/src/route_service.rs | 339 +++++++++++++++----------------- 3 files changed, 182 insertions(+), 188 deletions(-) diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index 550773ee6..83453a579 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -130,7 +130,7 @@ impl ManagedTask for GrpcServer { shutdown: triggered::Listener, ) -> LocalBoxFuture<'static, anyhow::Result<()>> { Box::pin(async move { - transport::Server::builder() + let grpc_server = transport::Server::builder() .http2_keepalive_interval(Some(Duration::from_secs(250))) .http2_keepalive_timeout(Some(Duration::from_secs(60))) .layer(tower_http::trace::TraceLayer::new_for_grpc()) @@ -138,9 +138,24 @@ impl ManagedTask for GrpcServer { .add_service(OrgServer::new(self.org_svc)) .add_service(RouteServer::new(self.route_svc)) .add_service(AdminServer::new(self.admin_svc)) - .serve_with_shutdown(self.listen_addr, shutdown) - .map_err(Error::from) - .await + .serve(self.listen_addr) + .map_err(Error::from); + + tokio::select! { + _ = shutdown => { + tracing::warn!("grpc server shutting down"); + Ok(()) + } + res = grpc_server => { + match res { + Ok(()) => Ok(()), + Err(err) => { + tracing::error!(?err, "grpc server failed with error"); + Err(anyhow::anyhow!("grpc server exiting with error")) + } + } + } + } }) } } diff --git a/iot_config/src/org_service.rs b/iot_config/src/org_service.rs index 326500dd2..efb87091d 100644 --- a/iot_config/src/org_service.rs +++ b/iot_config/src/org_service.rs @@ -463,9 +463,7 @@ impl iot_config::Org for OrgService { Status::internal(format!("org disable failed for: {}", request.oui)) })?; - tokio::select! { - result = self.stream_org_routes_enable_disable(request.oui) => result? - } + self.stream_org_routes_enable_disable(request.oui).await? } let mut resp = OrgDisableResV1 { @@ -501,9 +499,7 @@ impl iot_config::Org for OrgService { Status::internal(format!("org enable failed for: {}", request.oui)) })?; - tokio::select! { - result = self.stream_org_routes_enable_disable(request.oui) => result? - } + self.stream_org_routes_enable_disable(request.oui).await? } let mut resp = OrgEnableResV1 { diff --git a/iot_config/src/route_service.rs b/iot_config/src/route_service.rs index 6e74e566b..96e88cf49 100644 --- a/iot_config/src/route_service.rs +++ b/iot_config/src/route_service.rs @@ -389,24 +389,25 @@ impl iot_config::Route for RouteService { let mut route_updates = self.subscribe_to_routes(); tokio::spawn(async move { - if stream_existing_routes(&pool, &signing_key, tx.clone()) + let broadcast = stream_existing_routes(&pool, &signing_key, tx.clone()) .and_then(|_| stream_existing_euis(&pool, &signing_key, tx.clone())) .and_then(|_| stream_existing_devaddrs(&pool, &signing_key, tx.clone())) .and_then(|_| stream_existing_skfs(&pool, &signing_key, tx.clone())) - .await - .is_err() - { + .await; + if let Err(error) = broadcast { + tracing::error!( + ?error, + "Error occurred streaming current routing configuration" + ); return; } tracing::info!("existing routes sent; streaming updates as available"); telemetry::route_stream_subscribe(); - loop { - while let Ok(update) = route_updates.recv().await { - if tx.send(Ok(update)).await.is_err() { - telemetry::route_stream_unsubscribe(); - return; - } + while let Ok(update) = route_updates.recv().await { + if tx.send(Ok(update)).await.is_err() { + telemetry::route_stream_unsubscribe(); + return; } } }); @@ -455,18 +456,14 @@ impl iot_config::Route for RouteService { } }; - tokio::select! { - _ = async { - while let Some(eui) = eui_stream.next().await { - let message = match eui { - Ok(eui) => Ok(eui.into()), - Err(bad_eui) => Err(Status::internal(format!("invalid eui: {:?}", bad_eui))), - }; - if tx.send(message).await.is_err() { - break; - } - } - } => (), + while let Some(eui) = eui_stream.next().await { + let message = match eui { + Ok(eui) => Ok(eui.into()), + Err(bad_eui) => Err(Status::internal(format!("invalid eui: {:?}", bad_eui))), + }; + if tx.send(message).await.is_err() { + break; + } } }); @@ -501,64 +498,63 @@ impl iot_config::Route for RouteService { .ok_or_else(|| Status::invalid_argument("no eui pairs provided"))? .await?; - tokio::select! { - result = incoming_stream - .map_ok(|update| match validator.validate_update(&update) { - Ok(()) => Ok(update), - Err(reason) => Err(Status::invalid_argument(format!( - "invalid update request: {reason:?}" - ))), - }) - .try_chunks(UPDATE_BATCH_LIMIT) - .map_err(|err| Status::internal(format!("eui pair updates failed to batch: {err:?}"))) - .and_then(|batch| async move { - batch - .into_iter() - .collect::, Status>>() - }) - .and_then(|batch| async move { + incoming_stream + .map_ok(|update| match validator.validate_update(&update) { + Ok(()) => Ok(update), + Err(reason) => Err(Status::invalid_argument(format!( + "invalid update request: {reason:?}" + ))), + }) + .try_chunks(UPDATE_BATCH_LIMIT) + .map_err(|err| Status::internal(format!("eui pair updates failed to batch: {err:?}"))) + .and_then(|batch| async move { + batch + .into_iter() + .collect::, Status>>() + }) + .and_then(|batch| async move { + batch + .into_iter() + .map( + |update: RouteUpdateEuisReqV1| match (update.action(), update.eui_pair) { + (ActionV1::Add, Some(eui_pair)) => Ok((ActionV1::Add, eui_pair)), + (ActionV1::Remove, Some(eui_pair)) => Ok((ActionV1::Remove, eui_pair)), + _ => Err(Status::invalid_argument("invalid eui pair update request")), + }, + ) + .collect::, Status>>() + }) + .try_for_each(|batch: Vec<(ActionV1, EuiPairV1)>| async move { + let (to_add, to_remove): (Vec<(ActionV1, EuiPairV1)>, Vec<(ActionV1, EuiPairV1)>) = batch .into_iter() - .map( - |update: RouteUpdateEuisReqV1| match (update.action(), update.eui_pair) { - (ActionV1::Add, Some(eui_pair)) => Ok((ActionV1::Add, eui_pair)), - (ActionV1::Remove, Some(eui_pair)) => Ok((ActionV1::Remove, eui_pair)), - _ => Err(Status::invalid_argument("invalid eui pair update request")), - }, - ) - .collect::, Status>>() + .partition(|(action, _update)| action == &ActionV1::Add); + telemetry::count_eui_updates(to_add.len(), to_remove.len()); + tracing::debug!( + adding = to_add.len(), + removing = to_remove.len(), + "updating eui pairs" + ); + let adds_update: Vec = + to_add.into_iter().map(|(_, add)| add.into()).collect(); + let removes_update: Vec = to_remove + .into_iter() + .map(|(_, remove)| remove.into()) + .collect(); + route::update_euis( + &adds_update, + &removes_update, + &self.pool, + self.signing_key.clone(), + self.clone_update_channel(), + ) + .await + .map_err(|err| { + tracing::error!("eui pair update failed: {err:?}"); + Status::internal(format!("eui pair update failed: {err:?}")) }) - .try_for_each(|batch: Vec<(ActionV1, EuiPairV1)>| async move { - let (to_add, to_remove): (Vec<(ActionV1, EuiPairV1)>, Vec<(ActionV1, EuiPairV1)>) = - batch - .into_iter() - .partition(|(action, _update)| action == &ActionV1::Add); - telemetry::count_eui_updates(to_add.len(), to_remove.len()); - tracing::debug!( - adding = to_add.len(), - removing = to_remove.len(), - "updating eui pairs" - ); - let adds_update: Vec = - to_add.into_iter().map(|(_, add)| add.into()).collect(); - let removes_update: Vec = to_remove - .into_iter() - .map(|(_, remove)| remove.into()) - .collect(); - route::update_euis( - &adds_update, - &removes_update, - &self.pool, - self.signing_key.clone(), - self.clone_update_channel(), - ) - .await - .map_err(|err| { - tracing::error!("eui pair update failed: {err:?}"); - Status::internal(format!("eui pair update failed: {err:?}")) - }) - }) => result? - } + }) + .await?; let mut resp = RouteEuisResV1 { timestamp: Utc::now().encode_timestamp(), @@ -609,21 +605,17 @@ impl iot_config::Route for RouteService { } }; - tokio::select! { - _ = async { - while let Some(devaddr) = devaddrs.next().await { - let message = match devaddr { - Ok(devaddr) => Ok(devaddr.into()), - Err(bad_devaddr) => Err(Status::internal(format!( - "invalid devaddr: {:?}", - bad_devaddr - ))), - }; - if tx.send(message).await.is_err() { - break; - } - } - } => (), + while let Some(devaddr) = devaddrs.next().await { + let message = match devaddr { + Ok(devaddr) => Ok(devaddr.into()), + Err(bad_devaddr) => Err(Status::internal(format!( + "invalid devaddr: {:?}", + bad_devaddr + ))), + }; + if tx.send(message).await.is_err() { + break; + } } }); @@ -658,70 +650,69 @@ impl iot_config::Route for RouteService { .ok_or_else(|| Status::invalid_argument("no devaddr range provided"))? .await?; - tokio::select! { - result = incoming_stream - .map_ok(|update| match validator.validate_update(&update) { - Ok(()) => Ok(update), - Err(reason) => Err(Status::invalid_argument(format!( - "invalid update request: {reason:?}" - ))), - }) - .try_chunks(UPDATE_BATCH_LIMIT) + incoming_stream + .map_ok(|update| match validator.validate_update(&update) { + Ok(()) => Ok(update), + Err(reason) => Err(Status::invalid_argument(format!( + "invalid update request: {reason:?}" + ))), + }) + .try_chunks(UPDATE_BATCH_LIMIT) + .map_err(|err| { + Status::internal(format!("devaddr range update failed to batch: {err:?}")) + }) + .and_then(|batch| async move { + batch + .into_iter() + .collect::, Status>>() + }) + .and_then(|batch| async move { + batch + .into_iter() + .map(|update: RouteUpdateDevaddrRangesReqV1| { + match (update.action(), update.devaddr_range) { + (ActionV1::Add, Some(range)) => Ok((ActionV1::Add, range)), + (ActionV1::Remove, Some(range)) => Ok((ActionV1::Remove, range)), + _ => Err(Status::invalid_argument( + "invalid devaddr range update request", + )), + } + }) + .collect::, Status>>() + }) + .try_for_each(|batch: Vec<(ActionV1, DevaddrRangeV1)>| async move { + let (to_add, to_remove): ( + Vec<(ActionV1, DevaddrRangeV1)>, + Vec<(ActionV1, DevaddrRangeV1)>, + ) = batch + .into_iter() + .partition(|(action, _update)| action == &ActionV1::Add); + telemetry::count_devaddr_updates(to_add.len(), to_remove.len()); + tracing::debug!( + adding = to_add.len(), + removing = to_remove.len(), + "updating devaddr ranges" + ); + let adds_update: Vec = + to_add.into_iter().map(|(_, add)| add.into()).collect(); + let removes_update: Vec = to_remove + .into_iter() + .map(|(_, remove)| remove.into()) + .collect(); + route::update_devaddr_ranges( + &adds_update, + &removes_update, + &self.pool, + self.signing_key.clone(), + self.clone_update_channel(), + ) + .await .map_err(|err| { - Status::internal(format!("devaddr range update failed to batch: {err:?}")) - }) - .and_then(|batch| async move { - batch - .into_iter() - .collect::, Status>>() - }) - .and_then(|batch| async move { - batch - .into_iter() - .map(|update: RouteUpdateDevaddrRangesReqV1| { - match (update.action(), update.devaddr_range) { - (ActionV1::Add, Some(range)) => Ok((ActionV1::Add, range)), - (ActionV1::Remove, Some(range)) => Ok((ActionV1::Remove, range)), - _ => Err(Status::invalid_argument( - "invalid devaddr range update request", - )), - } - }) - .collect::, Status>>() + tracing::error!("devaddr range update failed: {err:?}"); + Status::internal("devaddr range update failed") }) - .try_for_each(|batch: Vec<(ActionV1, DevaddrRangeV1)>| async move { - let (to_add, to_remove): ( - Vec<(ActionV1, DevaddrRangeV1)>, - Vec<(ActionV1, DevaddrRangeV1)>, - ) = batch - .into_iter() - .partition(|(action, _update)| action == &ActionV1::Add); - telemetry::count_devaddr_updates(to_add.len(), to_remove.len()); - tracing::debug!( - adding = to_add.len(), - removing = to_remove.len(), - "updating devaddr ranges" - ); - let adds_update: Vec = - to_add.into_iter().map(|(_, add)| add.into()).collect(); - let removes_update: Vec = to_remove - .into_iter() - .map(|(_, remove)| remove.into()) - .collect(); - route::update_devaddr_ranges( - &adds_update, - &removes_update, - &self.pool, - self.signing_key.clone(), - self.clone_update_channel(), - ) - .await - .map_err(|err| { - tracing::error!("devaddr range update failed: {err:?}"); - Status::internal("devaddr range update failed") - }) - }) => result? - } + }) + .await?; let mut resp = RouteDevaddrRangesResV1 { timestamp: Utc::now().encode_timestamp(), @@ -777,18 +768,14 @@ impl iot_config::Route for RouteService { } }; - tokio::select! { - _ = async { - while let Some(skf) = skf_stream.next().await { - let message = match skf { - Ok(skf) => Ok(skf.into()), - Err(bad_skf) => Err(Status::internal(format!("invalid skf: {:?}", bad_skf))), - }; - if tx.send(message).await.is_err() { - break; - } - } - } => (), + while let Some(skf) = skf_stream.next().await { + let message = match skf { + Ok(skf) => Ok(skf.into()), + Err(bad_skf) => Err(Status::internal(format!("invalid skf: {:?}", bad_skf))), + }; + if tx.send(message).await.is_err() { + break; + } } }); @@ -839,18 +826,14 @@ impl iot_config::Route for RouteService { } }; - tokio::select! { - _ = async { - while let Some(skf) = skf_stream.next().await { - let message = match skf { - Ok(skf) => Ok(skf.into()), - Err(bad_skf) => Err(Status::internal(format!("invalid skf: {:?}", bad_skf))), - }; - if tx.send(message).await.is_err() { - break; - } - } - } => (), + while let Some(skf) = skf_stream.next().await { + let message = match skf { + Ok(skf) => Ok(skf.into()), + Err(bad_skf) => Err(Status::internal(format!("invalid skf: {:?}", bad_skf))), + }; + if tx.send(message).await.is_err() { + break; + } } }); From b8bfd05f9d1a0a21ed3ad9102cce46776ab5358d Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Wed, 4 Oct 2023 23:59:20 -0400 Subject: [PATCH 6/9] update and authorized signing key change logging --- iot_config/src/admin_service.rs | 12 ++++-- iot_config/src/org.rs | 69 ++++++++++++++++++++++++--------- iot_config/src/org_service.rs | 62 ++++++++++++++++------------- 3 files changed, 93 insertions(+), 50 deletions(-) diff --git a/iot_config/src/admin_service.rs b/iot_config/src/admin_service.rs index 51b737168..4e4647c3d 100644 --- a/iot_config/src/admin_service.rs +++ b/iot_config/src/admin_service.rs @@ -94,13 +94,16 @@ impl iot_config::Admin for AdminService { admin::insert_key(request.pubkey.clone().into(), key_type, &self.pool) .and_then(|_| async move { if self.auth_updater.send_if_modified(|cache| { - if let std::collections::hash_map::Entry::Vacant(key) = cache.entry(pubkey) { + if let std::collections::hash_map::Entry::Vacant(key) = + cache.entry(pubkey.clone()) + { key.insert(key_type); true } else { false } }) { + tracing::info!(%pubkey, %key_type, "key authorized"); Ok(()) } else { Err(anyhow!("key already registered")) @@ -108,7 +111,7 @@ impl iot_config::Admin for AdminService { }) .map_err(|err| { let pubkey: PublicKeyBinary = request.pubkey.into(); - tracing::error!(pubkey = pubkey.to_string(), "pubkey add failed"); + tracing::error!(%pubkey, "pubkey add failed"); Status::internal(format!("error saving requested key: {pubkey}, {err:?}")) }) .await?; @@ -135,10 +138,11 @@ impl iot_config::Admin for AdminService { admin::remove_key(request.pubkey.clone().into(), &self.pool) .and_then(|deleted| async move { match deleted { - Some((pubkey, _key_type)) => { + Some((pubkey, key_type)) => { self.auth_updater.send_modify(|cache| { cache.remove(&pubkey); }); + tracing::info!(%pubkey, %key_type,"key de-authorized"); Ok(()) } None => Ok(()), @@ -146,7 +150,7 @@ impl iot_config::Admin for AdminService { }) .map_err(|_| { let pubkey: PublicKeyBinary = request.pubkey.into(); - tracing::error!(pubkey = pubkey.to_string(), "pubkey remove failed"); + tracing::error!(%pubkey, "pubkey remove failed"); Status::internal(format!("error removing request key: {pubkey}")) }) .await?; diff --git a/iot_config/src/org.rs b/iot_config/src/org.rs index bad9f386a..9c553dfb9 100644 --- a/iot_config/src/org.rs +++ b/iot_config/src/org.rs @@ -156,6 +156,7 @@ pub async fn update_org( authorizer: UpdateAuthorizer, updates: Vec, db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres>, + delegate_cache: &watch::Sender, ) -> Result { let mut txn = db.begin().await?; @@ -165,37 +166,52 @@ pub async fn update_org( let net_id = get_org_netid(oui, &mut txn).await?; let is_helium_org = is_helium_netid(&net_id); - for update in updates { + for update in updates.iter() { match update.update { - Some(proto::Update::Owner(pubkeybin)) if authorizer == UpdateAuthorizer::Admin => { - update_owner(oui, pubkeybin.into(), &mut txn).await? + Some(proto::Update::Owner(ref pubkeybin)) if authorizer == UpdateAuthorizer::Admin => { + let pubkeybin: PublicKeyBinary = pubkeybin.clone().into(); + update_owner(oui, &pubkeybin, &mut txn).await?; + tracing::info!(oui, pubkey = %pubkeybin, "owner pubkey updated"); } - Some(proto::Update::Payer(pubkeybin)) if authorizer == UpdateAuthorizer::Admin => { - update_payer(oui, pubkeybin.into(), &mut txn).await? + Some(proto::Update::Payer(ref pubkeybin)) if authorizer == UpdateAuthorizer::Admin => { + let pubkeybin: PublicKeyBinary = pubkeybin.clone().into(); + update_payer(oui, &pubkeybin, &mut txn).await?; + tracing::info!(oui, pubkey = %pubkeybin, "payer pubkey updated"); } Some(proto::Update::Devaddrs(addr_count)) if authorizer == UpdateAuthorizer::Admin && is_helium_org => { - add_devaddr_slab(oui, net_id, addr_count, &mut txn).await? + add_devaddr_slab(oui, net_id, addr_count, &mut txn).await?; + tracing::info!(oui, addrs = addr_count, "new devaddr slab assigned"); } - Some(proto::Update::Constraint(constraint_update)) + Some(proto::Update::Constraint(ref constraint_update)) if authorizer == UpdateAuthorizer::Admin && is_helium_org => { match (constraint_update.action(), &constraint_update.constraint) { - (proto::ActionV1::Add, Some(ref constraint)) => add_constraint_update(oui, net_id, constraint.into(), &mut txn).await?, - (proto::ActionV1::Remove, Some(ref constraint)) => remove_constraint_update(oui, net_id, current_org.constraints.as_ref(), constraint.into(), &mut txn).await?, + (proto::ActionV1::Add, Some(ref constraint)) => { + let constraint: DevAddrConstraint = constraint.into(); + add_constraint_update(oui, net_id, constraint.clone(), &mut txn).await?; + tracing::info!(oui, %net_id, ?constraint, "devaddr constraint added"); + } + (proto::ActionV1::Remove, Some(ref constraint)) => { + let constraint: DevAddrConstraint = constraint.into(); + remove_constraint_update(oui, net_id, current_org.constraints.as_ref(), constraint.clone(), &mut txn).await?; + tracing::info!(oui, %net_id, ?constraint, "devaddr constraint removed"); + } _ => return Err(OrgStoreError::InvalidUpdate(format!("invalid action or missing devaddr constraint update: {constraint_update:?}"))) } } - Some(proto::Update::DelegateKey(delegate_key_update)) => { + Some(proto::Update::DelegateKey(ref delegate_key_update)) => { match delegate_key_update.action() { proto::ActionV1::Add => { - add_delegate_key(oui, delegate_key_update.delegate_key.into(), &mut txn) - .await? + let delegate = delegate_key_update.delegate_key.clone().into(); + add_delegate_key(oui, &delegate, &mut txn).await?; + tracing::info!(oui, %delegate, "delegate key authorized"); } proto::ActionV1::Remove => { - remove_delegate_key(oui, delegate_key_update.delegate_key.into(), &mut txn) - .await? + let delegate = delegate_key_update.delegate_key.clone().into(); + remove_delegate_key(oui, &delegate, &mut txn).await?; + tracing::info!(oui, %delegate, "delegate key de-authorized"); } } } @@ -207,6 +223,23 @@ pub async fn update_org( }; } + for update in updates.iter() { + if let Some(proto::Update::DelegateKey(ref delegate_key_update)) = update.update { + match delegate_key_update.action() { + proto::ActionV1::Add => { + delegate_cache.send_if_modified(|cache| { + cache.insert(delegate_key_update.delegate_key.clone().into()) + }); + } + proto::ActionV1::Remove => { + delegate_cache.send_if_modified(|cache| { + cache.remove(&delegate_key_update.delegate_key.clone().into()) + }); + } + } + } + } + let updated_org = get(oui, &mut txn) .await? .ok_or_else(|| OrgStoreError::SaveOrg(format!("{oui}")))?; @@ -231,7 +264,7 @@ pub async fn get_org_netid( async fn update_owner( oui: u64, - owner_pubkey: PublicKeyBinary, + owner_pubkey: &PublicKeyBinary, db: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<(), sqlx::Error> { sqlx::query(" update organizations set owner_pubkey = $1 where oui = $2 ") @@ -244,7 +277,7 @@ async fn update_owner( async fn update_payer( oui: u64, - payer_pubkey: PublicKeyBinary, + payer_pubkey: &PublicKeyBinary, db: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<(), sqlx::Error> { sqlx::query(" update organizations set payer_pubkey = $1 where oui = $2 ") @@ -257,7 +290,7 @@ async fn update_payer( async fn add_delegate_key( oui: u64, - delegate_pubkey: PublicKeyBinary, + delegate_pubkey: &PublicKeyBinary, db: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<(), sqlx::Error> { sqlx::query(" insert into organization_delegate_keys (delegate_pubkey, oui) values ($1, $2) ") @@ -270,7 +303,7 @@ async fn add_delegate_key( async fn remove_delegate_key( oui: u64, - delegate_pubkey: PublicKeyBinary, + delegate_pubkey: &PublicKeyBinary, db: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result<(), sqlx::Error> { sqlx::query(" delete from organization_delegate_keys where delegate_pubkey = $1 and oui = $2 ") diff --git a/iot_config/src/org_service.rs b/iot_config/src/org_service.rs index efb87091d..f286d0747 100644 --- a/iot_config/src/org_service.rs +++ b/iot_config/src/org_service.rs @@ -233,7 +233,7 @@ impl iot_config::Org for OrgService { }) .collect::, Status>>()?; - tracing::debug!("create helium org request: {request:?}"); + tracing::info!(?request, "create helium org"); let net_id = request.net_id(); let requested_addrs = if request.devaddrs >= 8 && request.devaddrs % 2 == 0 { @@ -256,6 +256,7 @@ impl iot_config::Org for OrgService { tracing::error!(?net_id, count = %requested_addrs, reason = ?err, "failed to retrieve available helium devaddrs"); Status::failed_precondition("helium addresses unavailable") })?; + tracing::info!(constraints = ?devaddr_constraints, "devaddr constraints issued"); let helium_netid_field = helium_netids::HeliumNetId::from(net_id).id(); let org = org::create_org( @@ -282,16 +283,14 @@ impl iot_config::Org for OrgService { org.delegate_keys.as_ref().map(|keys| { self.delegate_updater.send_if_modified(|cache| { - keys.iter().fold( - false, - |acc, key| { - if cache.insert(key.clone()) { - true - } else { - acc - } - }, - ) + keys.iter().fold(false, |acc, key| { + if cache.insert(key.clone()) { + tracing::info!(%key, "delegate key authorized"); + true + } else { + acc + } + }) }) }); @@ -338,12 +337,13 @@ impl iot_config::Org for OrgService { }) .collect::, Status>>()?; - tracing::debug!("create roamer org request: {request:?}"); + tracing::info!(?request, "create roamer org"); let net_id = lora_field::net_id(request.net_id); let devaddr_range = net_id .full_range() .map_err(|_| Status::invalid_argument("invalid net_id"))?; + tracing::info!(constraints = ?devaddr_range, "roaming devaddr range"); let org = org::create_org( request.owner.into(), @@ -365,16 +365,14 @@ impl iot_config::Org for OrgService { org.delegate_keys.as_ref().map(|keys| { self.delegate_updater.send_if_modified(|cache| { - keys.iter().fold( - false, - |acc, key| { - if cache.insert(key.clone()) { - true - } else { - acc - } - }, - ) + keys.iter().fold(false, |acc, key| { + if cache.insert(key.clone()) { + tracing::info!(?key, "delegate key authorized"); + true + } else { + acc + } + }) }) }); @@ -407,12 +405,18 @@ impl iot_config::Org for OrgService { .verify_update_request_signature(&signer, &request) .await?; - let org = org::update_org(request.oui, authorizer, request.updates, &self.pool) - .await - .map_err(|err| { - tracing::error!(reason = ?err, "org update failed"); - Status::internal(format!("org update failed: {err:?}")) - })?; + let org = org::update_org( + request.oui, + authorizer, + request.updates, + &self.pool, + &self.delegate_updater, + ) + .await + .map_err(|err| { + tracing::error!(reason = ?err, "org update failed"); + Status::internal(format!("org update failed: {err:?}")) + })?; let net_id = org::get_org_netid(org.oui, &self.pool) .await @@ -462,6 +466,7 @@ impl iot_config::Org for OrgService { ); Status::internal(format!("org disable failed for: {}", request.oui)) })?; + tracing::info!(oui = request.oui, "org locked"); self.stream_org_routes_enable_disable(request.oui).await? } @@ -498,6 +503,7 @@ impl iot_config::Org for OrgService { ); Status::internal(format!("org enable failed for: {}", request.oui)) })?; + tracing::info!(oui = request.oui, "org unlocked"); self.stream_org_routes_enable_disable(request.oui).await? } From b1b419cb986d2f791a9891d0172183a8a9be38d1 Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Mon, 9 Oct 2023 15:26:32 -0400 Subject: [PATCH 7/9] update delegate key cache after db txn committed --- iot_config/src/org.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/iot_config/src/org.rs b/iot_config/src/org.rs index 9c553dfb9..5f1fd54b6 100644 --- a/iot_config/src/org.rs +++ b/iot_config/src/org.rs @@ -223,6 +223,12 @@ pub async fn update_org( }; } + let updated_org = get(oui, &mut txn) + .await? + .ok_or_else(|| OrgStoreError::SaveOrg(format!("{oui}")))?; + + txn.commit().await?; + for update in updates.iter() { if let Some(proto::Update::DelegateKey(ref delegate_key_update)) = update.update { match delegate_key_update.action() { @@ -240,12 +246,6 @@ pub async fn update_org( } } - let updated_org = get(oui, &mut txn) - .await? - .ok_or_else(|| OrgStoreError::SaveOrg(format!("{oui}")))?; - - txn.commit().await?; - Ok(updated_org) } From 98beef09f2e1f27f90e71a878bf0c5bf52bf0268 Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Mon, 9 Oct 2023 15:41:31 -0400 Subject: [PATCH 8/9] fix the same lint again --- file_store/src/iot_invalid_poc.rs | 2 +- file_store/src/iot_valid_poc.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/file_store/src/iot_invalid_poc.rs b/file_store/src/iot_invalid_poc.rs index aa80e0ec0..2714e0b4d 100644 --- a/file_store/src/iot_invalid_poc.rs +++ b/file_store/src/iot_invalid_poc.rs @@ -98,7 +98,7 @@ impl From for LoraInvalidBeaconReportV1 { location: v .location .map(|l| l.to_string()) - .unwrap_or_else(String::new), + .unwrap_or_default(), gain: v.gain, elevation: v.elevation, invalid_details: v.invalid_details, diff --git a/file_store/src/iot_valid_poc.rs b/file_store/src/iot_valid_poc.rs index b190402bb..a74500225 100644 --- a/file_store/src/iot_valid_poc.rs +++ b/file_store/src/iot_valid_poc.rs @@ -157,7 +157,7 @@ impl From for LoraValidBeaconReportV1 { location: v .location .map(|l| l.to_string()) - .unwrap_or_else(String::new), + .unwrap_or_default(), gain: v.gain, elevation: v.elevation, hex_scale: (v.hex_scale * SCALE_MULTIPLIER).to_u32().unwrap_or(0), @@ -217,7 +217,7 @@ impl From for LoraVerifiedWitnessReportV1 { location: v .location .map(|l| l.to_string()) - .unwrap_or_else(String::new), + .unwrap_or_default(), gain: v.gain, elevation: v.elevation, hex_scale: (v.hex_scale * SCALE_MULTIPLIER).to_u32().unwrap_or(0), From 958725b7ca467f59bdb0f021338565427bcc4dca Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Mon, 9 Oct 2023 16:41:57 -0400 Subject: [PATCH 9/9] fmt --- file_store/src/iot_invalid_poc.rs | 5 +---- file_store/src/iot_valid_poc.rs | 10 ++-------- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/file_store/src/iot_invalid_poc.rs b/file_store/src/iot_invalid_poc.rs index 2714e0b4d..d633b9345 100644 --- a/file_store/src/iot_invalid_poc.rs +++ b/file_store/src/iot_invalid_poc.rs @@ -95,10 +95,7 @@ impl From for LoraInvalidBeaconReportV1 { received_timestamp, reason: v.reason as i32, report: Some(report), - location: v - .location - .map(|l| l.to_string()) - .unwrap_or_default(), + location: v.location.map(|l| l.to_string()).unwrap_or_default(), gain: v.gain, elevation: v.elevation, invalid_details: v.invalid_details, diff --git a/file_store/src/iot_valid_poc.rs b/file_store/src/iot_valid_poc.rs index a74500225..e8bccb6fd 100644 --- a/file_store/src/iot_valid_poc.rs +++ b/file_store/src/iot_valid_poc.rs @@ -154,10 +154,7 @@ impl From for LoraValidBeaconReportV1 { Self { received_timestamp, - location: v - .location - .map(|l| l.to_string()) - .unwrap_or_default(), + location: v.location.map(|l| l.to_string()).unwrap_or_default(), gain: v.gain, elevation: v.elevation, hex_scale: (v.hex_scale * SCALE_MULTIPLIER).to_u32().unwrap_or(0), @@ -214,10 +211,7 @@ impl From for LoraVerifiedWitnessReportV1 { received_timestamp, status: v.status.into(), report: Some(report), - location: v - .location - .map(|l| l.to_string()) - .unwrap_or_default(), + location: v.location.map(|l| l.to_string()).unwrap_or_default(), gain: v.gain, elevation: v.elevation, hex_scale: (v.hex_scale * SCALE_MULTIPLIER).to_u32().unwrap_or(0),