Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle entity duplicates in Mobile radio tracker #923

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions mobile_config/src/mobile_radio_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ impl MobileRadio {
}

#[derive(Debug, sqlx::FromRow)]
struct TrackedMobileRadio {
entity_key: EntityKey,
hash: String,
last_changed_at: DateTime<Utc>,
last_checked_at: DateTime<Utc>,
pub struct TrackedMobileRadio {
pub entity_key: EntityKey,
pub hash: String,
pub last_changed_at: DateTime<Utc>,
pub last_checked_at: DateTime<Utc>,
}

impl TrackedMobileRadio {
Expand Down Expand Up @@ -152,7 +152,7 @@ impl MobileRadioTracker {
}
}

async fn track_changes(pool: &Pool<Postgres>, metadata: &Pool<Postgres>) -> anyhow::Result<()> {
pub async fn track_changes(pool: &Pool<Postgres>, metadata: &Pool<Postgres>) -> anyhow::Result<()> {
tracing::info!("looking for changes to radios");
let tracked_radios = get_tracked_radios(pool).await?;
let all_mobile_radios = get_all_mobile_radios(metadata);
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn identify_changes(
.await
}

async fn get_tracked_radios(
pub async fn get_tracked_radios(
pool: &Pool<Postgres>,
) -> anyhow::Result<HashMap<EntityKey, TrackedMobileRadio>> {
sqlx::query_as::<_, TrackedMobileRadio>(
Expand All @@ -208,8 +208,21 @@ async fn get_tracked_radios(
fn get_all_mobile_radios(metadata: &Pool<Postgres>) -> impl Stream<Item = MobileRadio> + '_ {
sqlx::query_as::<_, MobileRadio>(
r#"
WITH unique_entity_keys AS (
SELECT
kta.entity_key as entity_key,
kta.asset as asset,
MAX(mhi.refreshed_at) AS refreshed_at
FROM key_to_assets kta
INNER JOIN mobile_hotspot_infos mhi
ON kta.asset = mhi.asset
WHERE kta.entity_key IS NOT NULL
AND mhi.refreshed_at IS NOT NULL
GROUP BY kta.entity_key, kta.asset
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this query be simplified back into something more like the original with a SELECT DISTINCT ON (entity_key, asset) ... ORDER BY refreshed_at DESC to limit results to the most recent single entry for that given combo of identifiers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the hint. Now it is much better!

https://www.postgresql.org/docs/current/sql-select.html#SQL-DISTINCT


SELECT
kta.entity_key,
uek.entity_key,
mhi.refreshed_at,
mhi.location::bigint,
mhi.is_full_hotspot::int,
Expand All @@ -218,11 +231,9 @@ fn get_all_mobile_radios(metadata: &Pool<Postgres>) -> impl Stream<Item = Mobile
mhi.dc_onboarding_fee_paid::bigint,
mhi.device_type::text,
mhi.deployment_info::text
FROM key_to_assets kta
FROM unique_entity_keys uek
INNER JOIN mobile_hotspot_infos mhi ON
kta.asset = mhi.asset
WHERE kta.entity_key IS NOT NULL
AND mhi.refreshed_at IS NOT NULL
uek.asset = mhi.asset and mhi.refreshed_at = uek.refreshed_at
"#,
)
.fetch(metadata)
Expand Down
133 changes: 133 additions & 0 deletions mobile_config/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use bs58;
use chrono::{DateTime, Duration, Utc};
use helium_crypto::PublicKeyBinary;
use helium_crypto::{KeyTag, Keypair};
use sqlx::PgPool;

pub async fn add_mobile_tracker_record(
pool: &PgPool,
key: PublicKeyBinary,
last_changed_at: DateTime<Utc>,
) {
let b58 = bs58::decode(key.to_string()).into_vec().unwrap();

sqlx::query(
r#"
INSERT INTO
"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at")
VALUES
($1, $2, $3, $4);
"#,
)
.bind(b58)
.bind("hash")
.bind(last_changed_at)
.bind(last_changed_at + Duration::hours(1))
.execute(pool)
.await
.unwrap();
}

#[allow(clippy::too_many_arguments)]
pub async fn add_db_record(
pool: &PgPool,
asset: &str,
location: i64,
device_type: &str,
key: PublicKeyBinary,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
deployment_info: Option<&str>,
) {
add_mobile_hotspot_infos(
pool,
asset,
location,
device_type,
created_at,
refreshed_at,
deployment_info,
)
.await;
add_asset_key(pool, asset, key).await;
}

pub async fn add_mobile_hotspot_infos(
pool: &PgPool,
asset: &str,
location: i64,
device_type: &str,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
deployment_info: Option<&str>,
) {
sqlx::query(
r#"
INSERT INTO
"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info")
VALUES
($1, $2, $3::jsonb, $4, $5, $6::jsonb);
"#,
)
.bind(asset)
.bind(location)
.bind(device_type)
.bind(created_at)
.bind(refreshed_at)
.bind(deployment_info)
.execute(pool)
.await
.unwrap();
}

pub async fn add_asset_key(pool: &PgPool, asset: &str, key: PublicKeyBinary) {
let b58 = bs58::decode(key.to_string()).into_vec().unwrap();
sqlx::query(
r#"
INSERT INTO
"key_to_assets" ("asset", "entity_key")
VALUES ($1, $2);
"#,
)
.bind(asset)
.bind(b58)
.execute(pool)
.await
.unwrap();
}

pub async fn create_db_tables(pool: &PgPool) {
sqlx::query(
r#"
CREATE TABLE mobile_hotspot_infos (
asset character varying(255) NULL,
location numeric NULL,
device_type jsonb NOT NULL,
created_at timestamptz NOT NULL DEFAULT NOW(),
refreshed_at timestamptz,
deployment_info jsonb,
is_full_hotspot bool NULL,
num_location_asserts integer NULL,
is_active bool NULL,
dc_onboarding_fee_paid numeric NULL
);"#,
)
.execute(pool)
.await
.unwrap();

sqlx::query(
r#"
CREATE TABLE key_to_assets (
asset character varying(255) NULL,
entity_key bytea NULL
);"#,
)
.execute(pool)
.await
.unwrap();
}

pub fn make_keypair() -> Keypair {
Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng)
}
131 changes: 5 additions & 126 deletions mobile_config/tests/gateway_service.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::vec;

use chrono::{DateTime, Duration, Utc};
use chrono::{Duration, Utc};
use futures::stream::StreamExt;

use helium_crypto::{KeyTag, Keypair, PublicKey, PublicKeyBinary, Sign};
use helium_crypto::{Keypair, PublicKey, Sign};
use helium_proto::services::mobile_config::{
self as proto, gateway_metadata_v2::DeploymentInfo, DeviceType, GatewayClient,
GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV2,
Expand All @@ -18,6 +18,9 @@ use sqlx::PgPool;
use tokio::net::TcpListener;
use tonic::{transport, Code};

pub mod common;
use common::*;

#[sqlx::test]
async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> {
// NOTE(mj): The information we're requesting does not exist in the DB for
Expand Down Expand Up @@ -538,130 +541,6 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) {
}
}

async fn add_mobile_tracker_record(
pool: &PgPool,
key: PublicKeyBinary,
last_changed_at: DateTime<Utc>,
) {
let b58 = bs58::decode(key.to_string()).into_vec().unwrap();

sqlx::query(
r#"
INSERT INTO
"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at")
VALUES
($1, $2, $3, $4);
"#,
)
.bind(b58)
.bind("hash")
.bind(last_changed_at)
.bind(last_changed_at + Duration::hours(1))
.execute(pool)
.await
.unwrap();
}

#[allow(clippy::too_many_arguments)]
async fn add_db_record(
pool: &PgPool,
asset: &str,
location: i64,
device_type: &str,
key: PublicKeyBinary,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
deployment_info: Option<&str>,
) {
add_mobile_hotspot_infos(
pool,
asset,
location,
device_type,
created_at,
refreshed_at,
deployment_info,
)
.await;
add_asset_key(pool, asset, key).await;
}

async fn add_mobile_hotspot_infos(
pool: &PgPool,
asset: &str,
location: i64,
device_type: &str,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
deployment_info: Option<&str>,
) {
sqlx::query(
r#"
INSERT INTO
"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info")
VALUES
($1, $2, $3::jsonb, $4, $5, $6::jsonb);
"#,
)
.bind(asset)
.bind(location)
.bind(device_type)
.bind(created_at)
.bind(refreshed_at)
.bind(deployment_info)
.execute(pool)
.await
.unwrap();
}

async fn add_asset_key(pool: &PgPool, asset: &str, key: PublicKeyBinary) {
let b58 = bs58::decode(key.to_string()).into_vec().unwrap();
sqlx::query(
r#"
INSERT INTO
"key_to_assets" ("asset", "entity_key")
VALUES ($1, $2);
"#,
)
.bind(asset)
.bind(b58)
.execute(pool)
.await
.unwrap();
}

async fn create_db_tables(pool: &PgPool) {
sqlx::query(
r#"
CREATE TABLE mobile_hotspot_infos (
asset character varying(255) NULL,
location numeric NULL,
device_type jsonb NOT NULL,
created_at timestamptz NOT NULL DEFAULT NOW(),
refreshed_at timestamptz,
deployment_info jsonb
);"#,
)
.execute(pool)
.await
.unwrap();

sqlx::query(
r#"
CREATE TABLE key_to_assets (
asset character varying(255) NULL,
entity_key bytea NULL
);"#,
)
.execute(pool)
.await
.unwrap();
}

fn make_keypair() -> Keypair {
Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng)
}

fn make_gateway_stream_signed_req_v2(
signer: &Keypair,
device_types: &[DeviceType],
Expand Down
Loading