Skip to content

Commit

Permalink
mobile config gateway decoding fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffgrunewald committed Dec 16, 2023
1 parent fea3166 commit 5ad87c8
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 33 deletions.
1 change: 1 addition & 0 deletions mobile_config.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ COPY Cargo.toml Cargo.lock ./
COPY db_store ./db_store/
COPY file_store ./file_store/
COPY metrics ./metrics/
COPY task_manager ./task_manager/
COPY mobile_config/Cargo.toml ./mobile_config/Cargo.toml

# Enable sparse registry to avoid crates indexing infinite loop
Expand Down
8 changes: 6 additions & 2 deletions mobile_config/src/client/gateway_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ impl GatewayInfoResolver for GatewayClient {
Ok(info_res) => {
let response = info_res.into_inner();
response.verify(&self.config_pubkey)?;
response.info.map(gateway_info::GatewayInfo::from)
response
.info
.map(gateway_info::GatewayInfo::try_from)
.transpose()?
}
Err(status) if status.code() == tonic::Code::NotFound => None,
Err(status) => Err(status)?,
Expand Down Expand Up @@ -111,7 +114,8 @@ impl GatewayInfoResolver for GatewayClient {
}
})
.flat_map(|res| stream::iter(res.gateways))
.map(gateway_info::GatewayInfo::from)
.map(gateway_info::GatewayInfo::try_from)
.filter_map(|gateway| async move { gateway.ok() })
.boxed();

Ok(res_stream)
Expand Down
2 changes: 2 additions & 0 deletions mobile_config/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum ClientError {
GrpcError(#[from] tonic::Status),
#[error("error verifying response signature {0}")]
VerificationError(#[from] file_store::Error),
#[error("error parsing gateway location {0}")]
LocationParseError(#[from] std::num::ParseIntError),
}

macro_rules! call_with_retry {
Expand Down
38 changes: 24 additions & 14 deletions mobile_config/src/gateway_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@ pub struct GatewayInfo {
pub device_type: DeviceType,
}

impl From<GatewayInfoProto> for GatewayInfo {
fn from(info: GatewayInfoProto) -> Self {
impl TryFrom<GatewayInfoProto> for GatewayInfo {
type Error = std::num::ParseIntError;

fn try_from(info: GatewayInfoProto) -> Result<Self, Self::Error> {
let metadata = if let Some(ref metadata) = info.metadata {
u64::from_str_radix(&metadata.location, 16)
.map(|location| GatewayMetadata { location })
.ok()
Some(
u64::from_str_radix(&metadata.location, 16)
.map(|location| GatewayMetadata { location })?,
)
} else {
None
};
let device_type = info.device_type().into();
Self {
Ok(Self {
address: info.address.into(),
metadata,
device_type,
}
})
}
}

Expand All @@ -56,7 +59,7 @@ impl TryFrom<GatewayInfo> for GatewayInfoProto {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub enum DeviceType {
Cbrs,
WifiIndoor,
Expand Down Expand Up @@ -103,7 +106,7 @@ pub(crate) mod db {
from mobile_hotspot_infos infos
join key_to_assets kta on infos.asset = kta.asset
"#;
const BATCH_SQL_WHERE_SNIPPET: &str = "where kta.entity_key = any($1)";
const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) ";

lazy_static::lazy_static! {
static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}");
Expand All @@ -127,12 +130,19 @@ pub(crate) mod db {
pub fn batch_info_stream<'a>(
db: impl PgExecutor<'a> + 'a,
addresses: &'a [PublicKeyBinary],
) -> impl Stream<Item = GatewayInfo> + 'a {
sqlx::query_as::<_, GatewayInfo>(&BATCH_METADATA_SQL)
.bind(addresses)
) -> anyhow::Result<impl Stream<Item = GatewayInfo> + 'a> {
let entity_keys = addresses
.iter()
.map(|address| bs58::decode(address.to_string()).into_vec())
.collect::<Result<Vec<_>, bs58::decode::Error>>()?;
Ok(sqlx::query_as::<_, GatewayInfo>(&BATCH_METADATA_SQL)
.bind(entity_keys)
.fetch(db)
.filter_map(|metadata| async move { metadata.ok() })
.boxed()
.filter_map(|metadata| async move {
println!("{metadata:?}");
metadata.ok()
})
.boxed())
}

pub fn all_info_stream<'a>(
Expand Down
2 changes: 1 addition & 1 deletion mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl mobile_config::Gateway for GatewayService {
let (tx, rx) = tokio::sync::mpsc::channel(100);

tokio::spawn(async move {
let stream = gateway_info::db::batch_info_stream(&pool, &addresses);
let stream = gateway_info::db::batch_info_stream(&pool, &addresses)?;
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Expand Down
47 changes: 44 additions & 3 deletions mobile_config_cli/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::{cmds::gateway::GatewayInfo, current_timestamp, NetworkKeyRole, Result};
use crate::{
cmds::gateway::{GatewayInfo, GatewayInfoStream},
current_timestamp, NetworkKeyRole, Result,
};

use base64::Engine;
use futures::{stream, StreamExt};
use helium_crypto::{Keypair, PublicKey, Sign, Verify};
use helium_proto::{
services::mobile_config::{
admin_client, authorization_client, entity_client, gateway_client, AdminAddKeyReqV1,
AdminKeyResV1, AdminRemoveKeyReqV1, AuthorizationListReqV1, AuthorizationListResV1,
AuthorizationVerifyReqV1, AuthorizationVerifyResV1, EntityVerifyReqV1, EntityVerifyResV1,
GatewayInfoReqV1, GatewayInfoResV1,
GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamResV1,
},
Message,
};
Expand Down Expand Up @@ -184,7 +188,42 @@ impl GatewayClient {
let info = response
.info
.ok_or_else(|| anyhow::anyhow!("gateway not found"))?;
info.try_into()
GatewayInfo::try_from(info)
}

pub async fn info_batch(
&mut self,
gateways: &[PublicKey],
batch_size: u32,
keypair: &Keypair,
) -> Result<GatewayInfoStream> {
let mut request = GatewayInfoBatchReqV1 {
addresses: gateways.iter().map(|pubkey| pubkey.into()).collect(),
batch_size,
signer: keypair.public_key().into(),
signature: vec![],
};
request.signature = request.sign(keypair)?;
let config_pubkey = self.server_pubkey.clone();
let stream = self
.client
.info_batch(request)
.await?
.into_inner()
.filter_map(|res| async move { res.ok() })
.map(move |res| (res, config_pubkey.clone()))
.filter_map(|(res, pubkey)| async move {
match res.verify(&pubkey) {
Ok(()) => Some(res),
Err(_) => None,
}
})
.flat_map(|res| stream::iter(res.gateways.into_iter()))
.map(GatewayInfo::try_from)
.filter_map(|gateway| async move { gateway.ok() })
.boxed();

Ok(stream)
}
}

Expand Down Expand Up @@ -212,6 +251,7 @@ impl_sign!(AuthorizationVerifyReqV1, signature);
impl_sign!(AuthorizationListReqV1, signature);
impl_sign!(EntityVerifyReqV1, signature);
impl_sign!(GatewayInfoReqV1, signature);
impl_sign!(GatewayInfoBatchReqV1, signature);

pub trait MsgVerify: Message + std::clone::Clone {
fn verify(&self, verifier: &PublicKey) -> Result
Expand Down Expand Up @@ -240,3 +280,4 @@ impl_verify!(AuthorizationVerifyResV1, signature);
impl_verify!(AuthorizationListResV1, signature);
impl_verify!(EntityVerifyResV1, signature);
impl_verify!(GatewayInfoResV1, signature);
impl_verify!(GatewayInfoStreamResV1, signature);
50 changes: 37 additions & 13 deletions mobile_config_cli/src/cmds/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
use super::{GetHotspot, PathBufKeypair};
use super::{GetHotspot, GetHotspotBatch, PathBufKeypair};
use crate::{client, Msg, PrettyJson, Result};
use angry_purple_tiger::AnimalName;
use futures::StreamExt;
use helium_crypto::PublicKey;
use helium_proto::services::mobile_config::{
GatewayInfo as GatewayInfoProto, GatewayMetadata as GatewayMetadataProto,
};
use mobile_config::gateway_info::DeviceType;
use serde::Serialize;
use std::str::FromStr;

pub type GatewayInfoStream = futures::stream::BoxStream<'static, GatewayInfo>;

#[derive(Debug, Serialize)]
pub struct GatewayInfo {
name: String,
pubkey: PublicKey,
metadata: Option<GatewayMetadata>,
device_type: DeviceType,
}

#[derive(Debug, Serialize)]
pub struct GatewayMetadata {
location: String,
lat: f64,
lon: f64,
}

pub async fn info(args: GetHotspot) -> Result<Msg> {
let mut client = client::GatewayClient::new(&args.config_host, &args.config_pubkey).await?;
match client
Expand All @@ -22,24 +41,28 @@ pub async fn info(args: GetHotspot) -> Result<Msg> {
}
}

#[derive(Debug, Serialize)]
pub struct GatewayInfo {
name: String,
pubkey: PublicKey,
metadata: Option<GatewayMetadata>,
}

#[derive(Debug, Serialize)]
pub struct GatewayMetadata {
location: String,
lat: f64,
lon: f64,
pub async fn info_batch(args: GetHotspotBatch) -> Result<Msg> {
let mut client = client::GatewayClient::new(&args.config_host, &args.config_pubkey).await?;
match client
.info_batch(&args.hotspots, args.batch_size, &args.keypair.to_keypair()?)
.await
{
Ok(info_stream) => {
let gateways = info_stream.collect::<Vec<GatewayInfo>>().await;
Msg::ok(gateways.pretty_json()?)
}
Err(err) => Msg::err(format!(
"failed to retrieve {:?} info: {}",
&args.hotspots, err
)),
}
}

impl TryFrom<GatewayInfoProto> for GatewayInfo {
type Error = anyhow::Error;

fn try_from(info: GatewayInfoProto) -> Result<Self, Self::Error> {
let device_type: DeviceType = info.device_type().into();
let pubkey = PublicKey::try_from(info.address)?;
let name: AnimalName = pubkey.clone().into();
let metadata = if let Some(md) = info.metadata {
Expand All @@ -51,6 +74,7 @@ impl TryFrom<GatewayInfoProto> for GatewayInfo {
name: name.to_string(),
pubkey,
metadata,
device_type,
})
}
}
Expand Down
17 changes: 17 additions & 0 deletions mobile_config_cli/src/cmds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ pub struct VerifyRewardableEntity {
pub enum GatewayCommands {
/// Retrieve the on-chain registered info for the hotspot
Info(GetHotspot),
/// Retrieve the on-chain registered info for the batch of hotspots
/// requested by list of Public Key Binaries
InfoBatch(GetHotspotBatch),
}

#[derive(Debug, Args)]
Expand All @@ -149,6 +152,20 @@ pub struct GetHotspot {
pub config_pubkey: String,
}

#[derive(Debug, Args)]
pub struct GetHotspotBatch {
#[arg(long)]
pub hotspots: Vec<PublicKey>,
#[arg(short, long, default_value = "5")]
pub batch_size: u32,
#[arg(from_global)]
pub keypair: PathBuf,
#[arg(from_global)]
pub config_host: String,
#[arg(from_global)]
pub config_pubkey: String,
}

#[derive(Debug, Subcommand)]
pub enum EnvCommands {
/// Make Environment variable to ease use
Expand Down
1 change: 1 addition & 0 deletions mobile_config_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub async fn handle_cli(cli: Cli) -> Result<Msg> {
},
Commands::Gateway { command } => match command {
cmds::GatewayCommands::Info(args) => gateway::info(args).await,
cmds::GatewayCommands::InfoBatch(args) => gateway::info_batch(args).await,
},
}
}

0 comments on commit 5ad87c8

Please sign in to comment.