Skip to content

Commit

Permalink
[Mobile Config] Add request/response fields to info_stream (#894)
Browse files Browse the repository at this point in the history
* Add `refreshed_at`, `created_at` to `gateway_info`
* Add `min_refreshed_at` to `gateway_info_req_v1`
  • Loading branch information
kurotych authored Nov 14, 2024
1 parent a1c9f27 commit d903cf7
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 32 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mobile_config/src/client/gateway_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl GatewayInfoResolver for GatewayClient {
let mut req = mobile_config::GatewayInfoStreamReqV1 {
batch_size: self.batch_size,
device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(),
min_refreshed_at: 0,
signer: self.signing_key.public_key().into(),
signature: vec![],
};
Expand Down
2 changes: 2 additions & 0 deletions mobile_config/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum ClientError {
LocationParseError(#[from] std::num::ParseIntError),
#[error("unknown service provider {0}")]
UnknownServiceProvider(String),
#[error("Invalid GatewayInfo proto response {0}")]
InvalidGatewayInfoProto(#[from] crate::gateway_info::GatewayInfoProtoParseError),
}

macro_rules! call_with_retry {
Expand Down
60 changes: 55 additions & 5 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono::{DateTime, TimeZone, Utc};
use futures::stream::BoxStream;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::mobile_config::{
Expand All @@ -17,6 +18,8 @@ pub struct GatewayInfo {
pub address: PublicKeyBinary,
pub metadata: Option<GatewayMetadata>,
pub device_type: DeviceType,
pub refreshed_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
}

impl GatewayInfo {
Expand All @@ -25,8 +28,18 @@ impl GatewayInfo {
}
}

#[derive(thiserror::Error, Debug)]
pub enum GatewayInfoProtoParseError {
#[error("Invalid location string: {0}")]
InvalidLocation(#[from] std::num::ParseIntError),
#[error("Invalid created_at: {0}")]
InvalidCreatedAt(u64),
#[error("Invalid refreshed_at: {0}")]
InvalidRefreshedAt(u64),
}

impl TryFrom<GatewayInfoProto> for GatewayInfo {
type Error = std::num::ParseIntError;
type Error = GatewayInfoProtoParseError;

fn try_from(info: GatewayInfoProto) -> Result<Self, Self::Error> {
let metadata = if let Some(ref metadata) = info.metadata {
Expand All @@ -38,10 +51,27 @@ impl TryFrom<GatewayInfoProto> for GatewayInfo {
None
};
let device_type = info.device_type().into();

let created_at = Utc
.timestamp_opt(info.created_at as i64, 0)
.single()
.ok_or(GatewayInfoProtoParseError::InvalidCreatedAt(
info.created_at,
))?;

let refreshed_at = Utc
.timestamp_opt(info.refreshed_at as i64, 0)
.single()
.ok_or(GatewayInfoProtoParseError::InvalidRefreshedAt(
info.refreshed_at,
))?;

Ok(Self {
address: info.address.into(),
metadata,
device_type,
created_at,
refreshed_at,
})
}
}
Expand All @@ -61,6 +91,8 @@ impl TryFrom<GatewayInfo> for GatewayInfoProto {
address: info.address.into(),
metadata,
device_type: info.device_type as i32,
created_at: info.created_at.timestamp() as u64,
refreshed_at: info.created_at.timestamp() as u64,
})
}
}
Expand Down Expand Up @@ -115,22 +147,28 @@ impl std::str::FromStr for DeviceType {

pub(crate) mod db {
use super::{DeviceType, GatewayInfo, GatewayMetadata};
use chrono::{DateTime, Utc};
use futures::stream::{Stream, StreamExt};
use helium_crypto::PublicKeyBinary;
use sqlx::{types::Json, PgExecutor, Row};
use std::str::FromStr;

const GET_METADATA_SQL: &str = r#"
select kta.entity_key, infos.location::bigint, infos.device_type
select kta.entity_key, infos.location::bigint, infos.device_type,
infos.refreshed_at, infos.created_at
from mobile_hotspot_infos infos
join key_to_assets kta on infos.asset = kta.asset
"#;
const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) ";
const DEVICE_TYPES_WHERE_SNIPPET: &str = " where device_type::text = any($1) ";
const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) ";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
static ref DEVICE_TYPES_METADATA_SQL: String = format!("{GET_METADATA_SQL} {DEVICE_TYPES_WHERE_SNIPPET}");
static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL}
where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#);

static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET);

}

pub async fn get_info(
Expand Down Expand Up @@ -166,13 +204,16 @@ pub(crate) mod db {
pub fn all_info_stream<'a>(
db: impl PgExecutor<'a> + 'a,
device_types: &'a [DeviceType],
min_refreshed_at: DateTime<Utc>,
) -> impl Stream<Item = GatewayInfo> + 'a {
match device_types.is_empty() {
true => sqlx::query_as::<_, GatewayInfo>(GET_METADATA_SQL)
true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT)
.bind(min_refreshed_at)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed(),
false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL)
.bind(min_refreshed_at)
.bind(
device_types
.iter()
Expand Down Expand Up @@ -200,13 +241,22 @@ pub(crate) mod db {
.as_ref(),
)
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
let created_at = row.get::<DateTime<Utc>, &str>("created_at");
// `refreshed_at` can be NULL in the database schema.
// If so, fallback to using `created_at` as the default value of `refreshed_at`.
let refreshed_at = row
.get::<Option<DateTime<Utc>>, &str>("refreshed_at")
.unwrap_or(created_at);

Ok(Self {
address: PublicKeyBinary::from_str(
&bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(),
)
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?,
metadata,
device_type,
refreshed_at,
created_at,
})
}
}
Expand Down
10 changes: 8 additions & 2 deletions mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
key_cache::KeyCache,
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
};
use chrono::Utc;
use chrono::{TimeZone, Utc};
use file_store::traits::{MsgVerify, TimestampEncode};
use futures::{
stream::{Stream, StreamExt, TryStreamExt},
Expand Down Expand Up @@ -163,14 +163,20 @@ impl mobile_config::Gateway for GatewayService {
let (tx, rx) = tokio::sync::mpsc::channel(100);

let device_types: Vec<DeviceType> = request.device_types().map(|v| v.into()).collect();
let min_refreshed_at = Utc
.timestamp_opt(request.min_refreshed_at as i64, 0)
.single()
.ok_or(Status::invalid_argument(
"Invalid min_refreshed_at argument",
))?;

tracing::debug!(
"fetching all gateways' info. Device types: {:?} ",
device_types
);

tokio::spawn(async move {
let stream = gateway_info::db::all_info_stream(&pool, &device_types);
let stream = gateway_info::db::all_info_stream(&pool, &device_types, min_refreshed_at);
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Expand Down
Loading

0 comments on commit d903cf7

Please sign in to comment.