Skip to content

Commit

Permalink
Handle refreshed_at NULL case
Browse files Browse the repository at this point in the history
  • Loading branch information
kurotych committed Nov 13, 2024
1 parent 3454d5d commit 58f0b9e
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 26 deletions.
3 changes: 2 additions & 1 deletion mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ pub(crate) mod db {

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
static ref GET_METADATA_SQL_REFRESHED_AT: String = format!("{GET_METADATA_SQL} where infos.refreshed_at > $1");
static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL}
where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#);

static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET);

Expand Down
190 changes: 165 additions & 25 deletions mobile_config/tests/gateway_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::Utc;
use chrono::{DateTime, Utc};
use futures::stream::StreamExt;

use helium_crypto::{KeyTag, Keypair, PublicKey, PublicKeyBinary, Sign};
Expand Down Expand Up @@ -83,16 +83,151 @@ async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> {
Ok(())
}

async fn spawn_gateway_service(
pool: PgPool,
admin_pub_key: PublicKey,
) -> (
String,
tokio::task::JoinHandle<std::result::Result<(), helium_proto::services::Error>>,
) {
let server_key = make_keypair();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

// Start the gateway server
let keys = CacheKeys::from_iter([(admin_pub_key.to_owned(), KeyRole::Administrator)]);
let (_key_cache_tx, key_cache) = KeyCache::new(keys);
let gws = GatewayService::new(key_cache, pool, server_key);
let handle = tokio::spawn(
transport::Server::builder()
.add_service(proto::GatewayServer::new(gws))
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)),
);

(format!("http://{addr}"), handle)
}

#[sqlx::test]
async fn gateway_stream_info_refreshed_at(pool: PgPool) {
let admin_key = make_keypair();
let asset1_pubkey = make_keypair().public_key().clone();
let asset1_hex_idx = 631711281837647359_i64;
let asset2_hex_idx = 631711286145955327_i64;
let asset2_pubkey = make_keypair().public_key().clone();
let now = Utc::now();
let now_plus_10 = now + chrono::Duration::seconds(10);

create_db_tables(&pool).await;
add_db_record(
&pool,
"asset1",
asset1_hex_idx,
"\"wifiIndoor\"",
asset1_pubkey.clone().into(),
now,
Some(now),
)
.await;
add_db_record(
&pool,
"asset2",
asset2_hex_idx,
"\"wifiDataOnly\"",
asset2_pubkey.clone().into(),
now_plus_10,
Some(now_plus_10),
)
.await;

let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await;
let mut client = GatewayClient::connect(addr).await.unwrap();

// Regression test
let req = make_gateway_stream_signed_req(&admin_key, &[], 0);
let mut stream = client.info_stream(req).await.unwrap().into_inner();
let resp = stream.next().await.unwrap().unwrap();
assert_eq!(resp.gateways.len(), 2);

// No device types but filter by refreshed_at
let req = make_gateway_stream_signed_req(&admin_key, &[], now_plus_10.timestamp() as u64);
let mut stream = client.info_stream(req).await.unwrap().into_inner();
let resp = stream.next().await.unwrap().unwrap();
assert_eq!(resp.gateways.len(), 1);
assert_eq!(
resp.gateways.first().unwrap().device_type,
Into::<i32>::into(DeviceType::WifiDataOnly)
);

// No refreshed_at but filter by device_type
let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiIndoor], 0);
let mut stream = client.info_stream(req).await.unwrap().into_inner();
let resp = stream.next().await.unwrap().unwrap();
assert_eq!(resp.gateways.len(), 1);
assert_eq!(
resp.gateways.first().unwrap().device_type,
Into::<i32>::into(DeviceType::WifiIndoor)
);

// Filter by device_type and refreshed_at
let req = make_gateway_stream_signed_req(
&admin_key,
&[DeviceType::WifiIndoor],
now.timestamp() as u64,
);
let mut stream = client.info_stream(req).await.unwrap().into_inner();
let resp = stream.next().await.unwrap().unwrap();
assert_eq!(resp.gateways.len(), 1);
assert_eq!(
resp.gateways.first().unwrap().device_type,
Into::<i32>::into(DeviceType::WifiIndoor)
);
}

#[sqlx::test]
async fn gateway_stream_info_refreshed_at_is_null(pool: PgPool) {
let admin_key = make_keypair();
let asset1_pubkey = make_keypair().public_key().clone();
let asset1_hex_idx = 631711281837647359_i64;
let now = Utc::now();

create_db_tables(&pool).await;
add_db_record(
&pool,
"asset1",
asset1_hex_idx,
"\"wifiIndoor\"",
asset1_pubkey.clone().into(),
now,
None,
)
.await;

let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await;
let mut client = GatewayClient::connect(addr).await.unwrap();

let req = make_gateway_stream_signed_req(&admin_key, &[], now.timestamp() as u64);
let mut stream = client.info_stream(req).await.unwrap().into_inner();

// Make sure the gateway was returned
let resp = stream.next().await.unwrap().unwrap();
assert_eq!(resp.gateways.len(), 1);

let req = make_gateway_stream_signed_req(&admin_key, &[], (now.timestamp() + 1) as u64);
let mut stream = client.info_stream(req).await.unwrap().into_inner();
// Response is empty
assert!(stream.next().await.is_none());
}

#[sqlx::test]
async fn gateway_stream_info_data_types(pool: PgPool) {
let admin_key = make_keypair();
let server_key = make_keypair();
let asset1_pubkey = make_keypair().public_key().clone();
let asset1_hex_idx = 631711281837647359_i64;
let asset2_hex_idx = 631711286145955327_i64;
let asset3_hex_idx = 631711286145006591_i64;
let asset2_pubkey = make_keypair().public_key().clone();
let asset3_pubkey = make_keypair().public_key().clone();
let now = Utc::now();

create_db_tables(&pool).await;
add_db_record(
Expand All @@ -101,6 +236,8 @@ async fn gateway_stream_info_data_types(pool: PgPool) {
asset1_hex_idx,
"\"wifiIndoor\"",
asset1_pubkey.clone().into(),
now,
Some(now),
)
.await;
add_db_record(
Expand All @@ -109,6 +246,8 @@ async fn gateway_stream_info_data_types(pool: PgPool) {
asset2_hex_idx,
"\"wifiDataOnly\"",
asset2_pubkey.clone().into(),
now,
Some(now),
)
.await;
add_db_record(
Expand All @@ -117,27 +256,17 @@ async fn gateway_stream_info_data_types(pool: PgPool) {
asset3_hex_idx,
"\"wifiDataOnly\"",
asset3_pubkey.clone().into(),
now,
Some(now),
)
.await;

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await;

// Start the gateway server
let keys = CacheKeys::from_iter([(admin_key.public_key().to_owned(), KeyRole::Administrator)]);
let (_key_cache_tx, key_cache) = KeyCache::new(keys);
let gws = GatewayService::new(key_cache, pool.clone(), server_key);
let _handle = tokio::spawn(
transport::Server::builder()
.add_service(proto::GatewayServer::new(gws))
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)),
);
let mut client = GatewayClient::connect(format!("http://{addr}"))
.await
.unwrap();
let mut client = GatewayClient::connect(addr).await.unwrap();

// Check wifi indoor
let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiIndoor]);
let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiIndoor], 0);
let mut stream = client.info_stream(req).await.unwrap().into_inner();
let res = stream.next().await.unwrap().unwrap();
let gw_info = res.gateways.first().unwrap();
Expand All @@ -154,7 +283,7 @@ async fn gateway_stream_info_data_types(pool: PgPool) {
assert!(stream.next().await.is_none());

// Check wifi data only
let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiDataOnly]);
let req = make_gateway_stream_signed_req(&admin_key, &[DeviceType::WifiDataOnly], 0);
let stream = client.info_stream(req).await.unwrap().into_inner();

let resp = stream
Expand All @@ -175,7 +304,7 @@ async fn gateway_stream_info_data_types(pool: PgPool) {
);

// Check all
let req = make_gateway_stream_signed_req(&admin_key, &[]);
let req = make_gateway_stream_signed_req(&admin_key, &[], 0);
let stream = client.info_stream(req).await.unwrap().into_inner();

let resp = stream
Expand All @@ -192,24 +321,34 @@ async fn add_db_record(
location: i64,
device_type: &str,
key: PublicKeyBinary,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
) {
add_mobile_hotspot_infos(pool, asset, location, device_type).await;
add_mobile_hotspot_infos(pool, asset, location, device_type, created_at, refreshed_at).await;
add_asset_key(pool, asset, key).await;
}

async fn add_mobile_hotspot_infos(pool: &PgPool, asset: &str, location: i64, device_type: &str) {
async fn add_mobile_hotspot_infos(
pool: &PgPool,
asset: &str,
location: i64,
device_type: &str,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
) {
sqlx::query(
r#"
INSERT INTO
"mobile_hotspot_infos" ("asset", "location", "device_type", "refreshed_at")
"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at")
VALUES
($1, $2, $3::jsonb, $4);
($1, $2, $3::jsonb, $4, $5);
"#,
)
.bind(asset)
.bind(location)
.bind(device_type)
.bind(Utc::now())
.bind(created_at)
.bind(refreshed_at)
.execute(pool)
.await
.unwrap();
Expand Down Expand Up @@ -264,13 +403,14 @@ fn make_keypair() -> Keypair {
fn make_gateway_stream_signed_req(
signer: &Keypair,
device_types: &[DeviceType],
min_refreshed_at: u64,
) -> proto::GatewayInfoStreamReqV1 {
let mut req = GatewayInfoStreamReqV1 {
batch_size: 10000,
signer: signer.public_key().to_vec(),
signature: vec![],
device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(),
min_refreshed_at: 0,
min_refreshed_at,
};

req.signature = signer.sign(&req.encode_to_vec()).unwrap();
Expand Down

0 comments on commit 58f0b9e

Please sign in to comment.