Skip to content

Commit

Permalink
Add radio location estimates ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
macpie committed Sep 30, 2024
1 parent 21708cd commit c58903b
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 5 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,6 @@ sqlx = { git = "https://github.com/helium/sqlx.git", rev = "92a2268f02e0cac6fccb
# Patching for beacon must point directly to the crate, it will not look in the
# repo for sibling crates.
#
# [patch.'https://github.com/helium/proto']
# helium-proto = { path = "../proto" }
[patch.'https://github.com/helium/proto']
helium-proto = { path = "../proto" }
# beacon = { path = "../proto/beacon" }
5 changes: 5 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ pub const SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str =
"subscriber_verified_mapping_ingest_report";
pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str =
"verified_subscriber_verified_mapping_ingest_report";
pub const RADIO_LOCATION_ESTIMATES_INGEST_REPORT: &str = "radio_location_estimates_ingest_report";

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -220,6 +221,7 @@ pub enum FileType {
VerifiedSPBoostedRewardsBannedRadioIngestReport,
SubscriberVerifiedMappingEventIngestReport,
VerifiedSubscriberVerifiedMappingEventIngestReport,
RadioLocationEstimatesIngestReport,
}

impl fmt::Display for FileType {
Expand Down Expand Up @@ -291,6 +293,7 @@ impl fmt::Display for FileType {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport => {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT
}
Self::RadioLocationEstimatesIngestReport => RADIO_LOCATION_ESTIMATES_INGEST_REPORT,
};
f.write_str(s)
}
Expand Down Expand Up @@ -365,6 +368,7 @@ impl FileType {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport => {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT
}
Self::RadioLocationEstimatesIngestReport => RADIO_LOCATION_ESTIMATES_INGEST_REPORT,
}
}
}
Expand Down Expand Up @@ -439,6 +443,7 @@ impl FromStr for FileType {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT => {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport
}
RADIO_LOCATION_ESTIMATES_INGEST_REPORT => Self::RadioLocationEstimatesIngestReport,
_ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))),
};
Ok(result)
Expand Down
1 change: 1 addition & 0 deletions file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod mobile_radio_threshold;
pub mod mobile_session;
pub mod mobile_subscriber;
pub mod mobile_transfer;
pub mod radio_location_estimates;
pub mod reward_manifest;
mod settings;
pub mod speedtest;
Expand Down
108 changes: 108 additions & 0 deletions file_store/src/radio_location_estimates.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use crate::{
traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode},
Error, Result,
};
use chrono::{DateTime, Utc};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
RadioLocationEstimateV1, RadioLocationEstimatesReqV1, RleEventV1,
};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct RadioLocationEstimates {
pub radio_id: String,
pub estimates: Vec<RadioLocationEstimate>,
pub timestamp: DateTime<Utc>,
pub signer: PublicKeyBinary,
}

impl MsgDecode for RadioLocationEstimates {
type Msg = RadioLocationEstimatesReqV1;
}

impl MsgTimestamp<Result<DateTime<Utc>>> for RadioLocationEstimatesReqV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.timestamp.to_timestamp()
}
}

impl MsgTimestamp<u64> for RadioLocationEstimates {
fn timestamp(&self) -> u64 {
self.timestamp.encode_timestamp()
}
}

impl TryFrom<RadioLocationEstimatesReqV1> for RadioLocationEstimates {
type Error = Error;
fn try_from(req: RadioLocationEstimatesReqV1) -> Result<Self> {
let timestamp = req.timestamp()?;
Ok(Self {
radio_id: req.radio_id,
estimates: req
.estimates
.into_iter()
.map(|e| e.try_into().unwrap())
.collect(),
timestamp,
signer: req.signer.into(),
})
}
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct RadioLocationEstimate {
pub radius: Decimal,
pub confidence: Decimal,
pub events: Vec<RadioLocationEstimateEvent>,
}

impl TryFrom<RadioLocationEstimateV1> for RadioLocationEstimate {
type Error = Error;
fn try_from(estimate: RadioLocationEstimateV1) -> Result<Self> {
Ok(Self {
radius: to_rust_decimal(estimate.radius.unwrap()),
confidence: to_rust_decimal(estimate.confidence.unwrap()),
events: estimate
.events
.into_iter()
.map(|e| e.try_into().unwrap())
.collect(),
})
}
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct RadioLocationEstimateEvent {
pub id: String,
pub timestamp: DateTime<Utc>,
}

impl MsgTimestamp<Result<DateTime<Utc>>> for RleEventV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.timestamp.to_timestamp()
}
}

impl MsgTimestamp<u64> for RadioLocationEstimateEvent {
fn timestamp(&self) -> u64 {
self.timestamp.encode_timestamp()
}
}

impl TryFrom<RleEventV1> for RadioLocationEstimateEvent {
type Error = Error;
fn try_from(event: RleEventV1) -> Result<Self> {
let timestamp = event.timestamp()?;
Ok(Self {
id: event.id,
timestamp,
})
}
}

fn to_rust_decimal(x: helium_proto::Decimal) -> rust_decimal::Decimal {
let str = x.value.as_str();
rust_decimal::Decimal::from_str_exact(str).unwrap()
}
5 changes: 5 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,8 @@ impl_file_sink!(
FileType::RewardManifest.to_str(),
"reward_manifest"
);
impl_file_sink!(
poc_mobile::RadioLocationEstimatesIngestReportV1,
FileType::RadioLocationEstimatesIngestReport.to_str(),
"radio_location_estimates_ingest_report"
);
1 change: 1 addition & 0 deletions file_store/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature);
impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature);
impl_msg_verify!(poc_mobile::RadioLocationEstimatesReqV1, signature);

#[cfg(test)]
mod test {
Expand Down
45 changes: 43 additions & 2 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use helium_proto::services::poc_mobile::{
CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1,
DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1,
InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1,
InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1,
RadioThresholdReportRespV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
InvalidatedRadioThresholdReportRespV1, RadioLocationEstimatesIngestReportV1,
RadioLocationEstimatesReqV1, RadioLocationEstimatesRespV1, RadioThresholdIngestReportV1,
RadioThresholdReportReqV1, RadioThresholdReportRespV1,
ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1,
SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1,
SubscriberLocationReqV1, SubscriberLocationRespV1,
Expand Down Expand Up @@ -46,6 +48,7 @@ pub struct GrpcServer {
sp_boosted_rewards_ban_sink:
FileSinkClient<ServiceProviderBoostedRewardsBannedRadioIngestReportV1>,
subscriber_mapping_event_sink: FileSinkClient<SubscriberVerifiedMappingEventIngestReportV1>,
radio_location_estimate_sink: FileSinkClient<RadioLocationEstimatesIngestReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand Down Expand Up @@ -85,6 +88,7 @@ impl GrpcServer {
ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
>,
subscriber_mapping_event_sink: FileSinkClient<SubscriberVerifiedMappingEventIngestReportV1>,
radio_location_estimate_sink: FileSinkClient<RadioLocationEstimatesIngestReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand All @@ -100,6 +104,7 @@ impl GrpcServer {
coverage_object_report_sink,
sp_boosted_rewards_ban_sink,
subscriber_mapping_event_sink,
radio_location_estimate_sink,
required_network,
address,
api_token,
Expand Down Expand Up @@ -437,6 +442,30 @@ impl poc_mobile::PocMobile for GrpcServer {
let id = timestamp.to_string();
Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id }))
}

async fn submit_radio_location_estimates(
&self,
request: Request<RadioLocationEstimatesReqV1>,
) -> GrpcResult<RadioLocationEstimatesRespV1> {
let timestamp: u64 = Utc::now().timestamp_millis() as u64;
let req: RadioLocationEstimatesReqV1 = request.into_inner();

custom_tracing::record_b58("pub_key", &req.signer);

let report = self
.verify_public_key(req.signer.as_ref())
.and_then(|public_key| self.verify_network(public_key))
.and_then(|public_key| self.verify_signature(public_key, req))
.map(|(_, req)| RadioLocationEstimatesIngestReportV1 {
received_timestamp: timestamp,
report: Some(req),
})?;

_ = self.radio_location_estimate_sink.write(report, []).await;

let id = timestamp.to_string();
Ok(Response::new(RadioLocationEstimatesRespV1 { id }))
}
}

pub async fn grpc_server(settings: &Settings) -> Result<()> {
Expand Down Expand Up @@ -546,6 +575,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
)
.await?;

let (radio_location_estimates_sink, radio_location_estimates_server) =
RadioLocationEstimatesIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;

let Some(api_token) = settings
.token
.as_ref()
Expand All @@ -565,6 +604,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
coverage_object_report_sink,
sp_boosted_rewards_ban_sink,
subscriber_mapping_event_sink,
radio_location_estimates_sink,
settings.network,
settings.listen_addr,
api_token,
Expand All @@ -588,6 +628,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.add_task(coverage_object_report_sink_server)
.add_task(sp_boosted_rewards_ban_sink_server)
.add_task(subscriber_mapping_event_server)
.add_task(radio_location_estimates_server)
.add_task(grpc_server)
.build()
.start()
Expand Down
3 changes: 3 additions & 0 deletions ingest/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10);
let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10);
let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10);
let (radio_location_estimates_tx, _radio_location_estimates_rx) =
tokio::sync::mpsc::channel(10);

tokio::spawn(async move {
let grpc_server = GrpcServer::new(
Expand All @@ -57,6 +59,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
FileSinkClient::new(coverage_obj_tx, "noop"),
FileSinkClient::new(sp_boosted_tx, "noop"),
FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"),
FileSinkClient::new(radio_location_estimates_tx, "noop"),
Network::MainNet,
socket_addr,
api_token,
Expand Down

0 comments on commit c58903b

Please sign in to comment.