From bcff49518af15f553198a9a468114ec9c947b692 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 19 Dec 2024 14:07:40 +0200 Subject: [PATCH 1/3] Handle entity duplicates in Mobile radio tracker --- mobile_config/src/mobile_radio_tracker.rs | 35 ++++-- mobile_config/tests/common/mod.rs | 133 ++++++++++++++++++++ mobile_config/tests/gateway_service.rs | 127 +------------------ mobile_config/tests/mobile_radio_tracker.rs | 64 ++++++++++ 4 files changed, 223 insertions(+), 136 deletions(-) create mode 100644 mobile_config/tests/common/mod.rs create mode 100644 mobile_config/tests/mobile_radio_tracker.rs diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 0dc03a8c3..dcbfb2c27 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -72,11 +72,11 @@ impl MobileRadio { } #[derive(Debug, sqlx::FromRow)] -struct TrackedMobileRadio { - entity_key: EntityKey, - hash: String, - last_changed_at: DateTime, - last_checked_at: DateTime, +pub struct TrackedMobileRadio { + pub entity_key: EntityKey, + pub hash: String, + pub last_changed_at: DateTime, + pub last_checked_at: DateTime, } impl TrackedMobileRadio { @@ -152,7 +152,7 @@ impl MobileRadioTracker { } } -async fn track_changes(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { +pub async fn track_changes(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { tracing::info!("looking for changes to radios"); let tracked_radios = get_tracked_radios(pool).await?; let all_mobile_radios = get_all_mobile_radios(metadata); @@ -183,7 +183,7 @@ async fn identify_changes( .await } -async fn get_tracked_radios( +pub async fn get_tracked_radios( pool: &Pool, ) -> anyhow::Result> { sqlx::query_as::<_, TrackedMobileRadio>( @@ -208,8 +208,21 @@ async fn get_tracked_radios( fn get_all_mobile_radios(metadata: &Pool) -> impl Stream + '_ { sqlx::query_as::<_, MobileRadio>( r#" + WITH unique_entity_keys AS ( + SELECT + kta.entity_key as entity_key, + kta.asset as asset, + MAX(mhi.refreshed_at) AS refreshed_at + FROM key_to_assets kta + INNER JOIN mobile_hotspot_infos mhi + ON kta.asset = mhi.asset + WHERE kta.entity_key IS NOT NULL + AND mhi.refreshed_at IS NOT NULL + GROUP BY kta.entity_key, kta.asset + ) + SELECT - kta.entity_key, + uek.entity_key, mhi.refreshed_at, mhi.location::bigint, mhi.is_full_hotspot::int, @@ -218,11 +231,9 @@ fn get_all_mobile_radios(metadata: &Pool) -> impl Stream, +) { + let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); + + sqlx::query( + r#" + INSERT INTO +"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at") + VALUES +($1, $2, $3, $4); + "#, + ) + .bind(b58) + .bind("hash") + .bind(last_changed_at) + .bind(last_changed_at + Duration::hours(1)) + .execute(pool) + .await + .unwrap(); +} + +#[allow(clippy::too_many_arguments)] +pub async fn add_db_record( + pool: &PgPool, + asset: &str, + location: i64, + device_type: &str, + key: PublicKeyBinary, + created_at: DateTime, + refreshed_at: Option>, + deployment_info: Option<&str>, +) { + add_mobile_hotspot_infos( + pool, + asset, + location, + device_type, + created_at, + refreshed_at, + deployment_info, + ) + .await; + add_asset_key(pool, asset, key).await; +} + +pub async fn add_mobile_hotspot_infos( + pool: &PgPool, + asset: &str, + location: i64, + device_type: &str, + created_at: DateTime, + refreshed_at: Option>, + deployment_info: Option<&str>, +) { + sqlx::query( + r#" + INSERT INTO +"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info") + VALUES +($1, $2, $3::jsonb, $4, $5, $6::jsonb); + "#, + ) + .bind(asset) + .bind(location) + .bind(device_type) + .bind(created_at) + .bind(refreshed_at) + .bind(deployment_info) + .execute(pool) + .await + .unwrap(); +} + +pub async fn add_asset_key(pool: &PgPool, asset: &str, key: PublicKeyBinary) { + let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); + sqlx::query( + r#" + INSERT INTO + "key_to_assets" ("asset", "entity_key") + VALUES ($1, $2); + "#, + ) + .bind(asset) + .bind(b58) + .execute(pool) + .await + .unwrap(); +} + +pub async fn create_db_tables(pool: &PgPool) { + sqlx::query( + r#" + CREATE TABLE mobile_hotspot_infos ( + asset character varying(255) NULL, + location numeric NULL, + device_type jsonb NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + refreshed_at timestamptz, + deployment_info jsonb, + is_full_hotspot bool NULL, + num_location_asserts integer NULL, + is_active bool NULL, + dc_onboarding_fee_paid numeric NULL + );"#, + ) + .execute(pool) + .await + .unwrap(); + + sqlx::query( + r#" + CREATE TABLE key_to_assets ( + asset character varying(255) NULL, + entity_key bytea NULL + );"#, + ) + .execute(pool) + .await + .unwrap(); +} + +pub fn make_keypair() -> Keypair { + Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng) +} diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 609cb43cb..a8e122964 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -18,6 +18,9 @@ use sqlx::PgPool; use tokio::net::TcpListener; use tonic::{transport, Code}; +mod common; +use common::*; + #[sqlx::test] async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> { // NOTE(mj): The information we're requesting does not exist in the DB for @@ -538,130 +541,6 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) { } } -async fn add_mobile_tracker_record( - pool: &PgPool, - key: PublicKeyBinary, - last_changed_at: DateTime, -) { - let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); - - sqlx::query( - r#" - INSERT INTO -"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at") - VALUES -($1, $2, $3, $4); - "#, - ) - .bind(b58) - .bind("hash") - .bind(last_changed_at) - .bind(last_changed_at + Duration::hours(1)) - .execute(pool) - .await - .unwrap(); -} - -#[allow(clippy::too_many_arguments)] -async fn add_db_record( - pool: &PgPool, - asset: &str, - location: i64, - device_type: &str, - key: PublicKeyBinary, - created_at: DateTime, - refreshed_at: Option>, - deployment_info: Option<&str>, -) { - add_mobile_hotspot_infos( - pool, - asset, - location, - device_type, - created_at, - refreshed_at, - deployment_info, - ) - .await; - add_asset_key(pool, asset, key).await; -} - -async fn add_mobile_hotspot_infos( - pool: &PgPool, - asset: &str, - location: i64, - device_type: &str, - created_at: DateTime, - refreshed_at: Option>, - deployment_info: Option<&str>, -) { - sqlx::query( - r#" - INSERT INTO -"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info") - VALUES -($1, $2, $3::jsonb, $4, $5, $6::jsonb); - "#, - ) - .bind(asset) - .bind(location) - .bind(device_type) - .bind(created_at) - .bind(refreshed_at) - .bind(deployment_info) - .execute(pool) - .await - .unwrap(); -} - -async fn add_asset_key(pool: &PgPool, asset: &str, key: PublicKeyBinary) { - let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); - sqlx::query( - r#" - INSERT INTO - "key_to_assets" ("asset", "entity_key") - VALUES ($1, $2); - "#, - ) - .bind(asset) - .bind(b58) - .execute(pool) - .await - .unwrap(); -} - -async fn create_db_tables(pool: &PgPool) { - sqlx::query( - r#" - CREATE TABLE mobile_hotspot_infos ( - asset character varying(255) NULL, - location numeric NULL, - device_type jsonb NOT NULL, - created_at timestamptz NOT NULL DEFAULT NOW(), - refreshed_at timestamptz, - deployment_info jsonb - );"#, - ) - .execute(pool) - .await - .unwrap(); - - sqlx::query( - r#" - CREATE TABLE key_to_assets ( - asset character varying(255) NULL, - entity_key bytea NULL - );"#, - ) - .execute(pool) - .await - .unwrap(); -} - -fn make_keypair() -> Keypair { - Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng) -} - fn make_gateway_stream_signed_req_v2( signer: &Keypair, device_types: &[DeviceType], diff --git a/mobile_config/tests/mobile_radio_tracker.rs b/mobile_config/tests/mobile_radio_tracker.rs new file mode 100644 index 000000000..6d93abbd0 --- /dev/null +++ b/mobile_config/tests/mobile_radio_tracker.rs @@ -0,0 +1,64 @@ +use chrono::Utc; +use helium_crypto::PublicKeyBinary; +use mobile_config::mobile_radio_tracker::{get_tracked_radios, track_changes}; +use sqlx::PgPool; + +mod common; +use common::*; + +#[sqlx::test] +async fn mobile_tracker_handle_entity_duplicates(pool: PgPool) { + // In case of duplications mobile tracker must use newer (refreshed_at) + let asset1_pubkey = make_keypair().public_key().clone(); + let asset1_hex_idx = 631711281837647359_i64; + create_db_tables(&pool).await; + let now = Utc::now(); + let now_minus_hour = now - chrono::Duration::hours(1); + let pubkey_binary = PublicKeyBinary::from(asset1_pubkey.clone()); + + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now_minus_hour, + Some(now_minus_hour), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 160, "elevation": 5, "electricalDownTilt": 1, "mechanicalDownTilt": 2}}"#) + ) + .await; + + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + None, + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 160, "elevation": 5, "electricalDownTilt": 1, "mechanicalDownTilt": 2}}"#) + ) + .await; + + add_db_record( + &pool, + "asset1", + asset1_hex_idx, + "\"wifiIndoor\"", + asset1_pubkey.clone().into(), + now, + Some(now), + Some(r#"{"wifiInfoV0": {"antenna": 18, "azimuth": 160, "elevation": 5, "electricalDownTilt": 1, "mechanicalDownTilt": 2}}"#) + ) + .await; + + let b58 = bs58::decode(pubkey_binary.to_string()).into_vec().unwrap(); + track_changes(&pool, &pool).await.unwrap(); + let tracked_radios = get_tracked_radios(&pool).await.unwrap(); + assert_eq!(tracked_radios.len(), 1); + let tracked_radio = tracked_radios.get::>(&b58).unwrap(); + assert_eq!( + tracked_radio.last_changed_at.timestamp_millis(), + now.timestamp_millis() + ); +} From a089f3025a7fb9b3d0442503bbbdfe8e6845c83d Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Thu, 19 Dec 2024 14:19:14 +0200 Subject: [PATCH 2/3] Fix warnings --- mobile_config/tests/gateway_service.rs | 6 +++--- mobile_config/tests/mobile_radio_tracker.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index a8e122964..0aaaa7bec 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -1,9 +1,9 @@ use std::vec; -use chrono::{DateTime, Duration, Utc}; +use chrono::{Duration, Utc}; use futures::stream::StreamExt; -use helium_crypto::{KeyTag, Keypair, PublicKey, PublicKeyBinary, Sign}; +use helium_crypto::{Keypair, PublicKey, Sign}; use helium_proto::services::mobile_config::{ self as proto, gateway_metadata_v2::DeploymentInfo, DeviceType, GatewayClient, GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV2, @@ -18,7 +18,7 @@ use sqlx::PgPool; use tokio::net::TcpListener; use tonic::{transport, Code}; -mod common; +pub mod common; use common::*; #[sqlx::test] diff --git a/mobile_config/tests/mobile_radio_tracker.rs b/mobile_config/tests/mobile_radio_tracker.rs index 6d93abbd0..d0a4783d4 100644 --- a/mobile_config/tests/mobile_radio_tracker.rs +++ b/mobile_config/tests/mobile_radio_tracker.rs @@ -3,7 +3,7 @@ use helium_crypto::PublicKeyBinary; use mobile_config::mobile_radio_tracker::{get_tracked_radios, track_changes}; use sqlx::PgPool; -mod common; +pub mod common; use common::*; #[sqlx::test] From 96084a0b8e9862389b0746818b0aaf01adad8d49 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Fri, 3 Jan 2025 12:44:09 +0200 Subject: [PATCH 3/3] Simplify sql query --- mobile_config/src/mobile_radio_tracker.rs | 40 +++++++++-------------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index dcbfb2c27..881503bb0 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -208,32 +208,24 @@ pub async fn get_tracked_radios( fn get_all_mobile_radios(metadata: &Pool) -> impl Stream + '_ { sqlx::query_as::<_, MobileRadio>( r#" - WITH unique_entity_keys AS ( - SELECT - kta.entity_key as entity_key, - kta.asset as asset, - MAX(mhi.refreshed_at) AS refreshed_at - FROM key_to_assets kta - INNER JOIN mobile_hotspot_infos mhi - ON kta.asset = mhi.asset - WHERE kta.entity_key IS NOT NULL - AND mhi.refreshed_at IS NOT NULL - GROUP BY kta.entity_key, kta.asset - ) - SELECT - uek.entity_key, - mhi.refreshed_at, - mhi.location::bigint, - mhi.is_full_hotspot::int, - mhi.num_location_asserts, - mhi.is_active::int, - mhi.dc_onboarding_fee_paid::bigint, - mhi.device_type::text, - mhi.deployment_info::text - FROM unique_entity_keys uek + DISTINCT ON (kta.entity_key, mhi.asset) + kta.entity_key, + mhi.asset, + mhi.refreshed_at, + mhi.location::bigint, + mhi.is_full_hotspot::int, + mhi.num_location_asserts, + mhi.is_active::int, + mhi.dc_onboarding_fee_paid::bigint, + mhi.device_type::text, + mhi.deployment_info::text + FROM key_to_assets kta INNER JOIN mobile_hotspot_infos mhi ON - uek.asset = mhi.asset and mhi.refreshed_at = uek.refreshed_at + kta.asset = mhi.asset + WHERE kta.entity_key IS NOT NULL + AND mhi.refreshed_at IS NOT NULL + ORDER BY kta.entity_key, mhi.asset, refreshed_at DESC "#, ) .fetch(metadata)