diff --git a/Cargo.lock b/Cargo.lock index a06e8f3ae..71d815641 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3801,7 +3801,6 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#376765fe006051d6dcccf709def58e7ed291b845" dependencies = [ "bytes", "prost", diff --git a/Cargo.toml b/Cargo.toml index a45355a99..08068ea68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,6 @@ sqlx = { git = "https://github.com/helium/sqlx.git", rev = "92a2268f02e0cac6fccb # Patching for beacon must point directly to the crate, it will not look in the # repo for sibling crates. # -# [patch.'https://github.com/helium/proto'] -# helium-proto = { path = "../proto" } +[patch.'https://github.com/helium/proto'] +helium-proto = { path = "../proto" } # beacon = { path = "../proto/beacon" } diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 86c2faa4a..8a9f0eb18 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -164,6 +164,7 @@ pub const SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = "subscriber_verified_mapping_ingest_report"; pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = "verified_subscriber_verified_mapping_ingest_report"; +pub const RADIO_LOCATION_ESTIMATES_INGEST_REPORT: &str = "radio_location_estimates_ingest_report"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -220,6 +221,7 @@ pub enum FileType { VerifiedSPBoostedRewardsBannedRadioIngestReport, SubscriberVerifiedMappingEventIngestReport, VerifiedSubscriberVerifiedMappingEventIngestReport, + RadioLocationEstimatesIngestReport, } impl fmt::Display for FileType { @@ -291,6 +293,7 @@ impl fmt::Display for FileType { Self::VerifiedSubscriberVerifiedMappingEventIngestReport => { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT } + Self::RadioLocationEstimatesIngestReport => RADIO_LOCATION_ESTIMATES_INGEST_REPORT, }; f.write_str(s) } @@ -365,6 +368,7 @@ impl FileType { Self::VerifiedSubscriberVerifiedMappingEventIngestReport => { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT } + Self::RadioLocationEstimatesIngestReport => RADIO_LOCATION_ESTIMATES_INGEST_REPORT, } } } @@ -439,6 +443,7 @@ impl FromStr for FileType { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT => { Self::VerifiedSubscriberVerifiedMappingEventIngestReport } + RADIO_LOCATION_ESTIMATES_INGEST_REPORT => Self::RadioLocationEstimatesIngestReport, _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index 477c0dea9..5abfee422 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -20,6 +20,7 @@ pub mod mobile_radio_threshold; pub mod mobile_session; pub mod mobile_subscriber; pub mod mobile_transfer; +pub mod radio_location_estimates; pub mod reward_manifest; mod settings; pub mod speedtest; diff --git a/file_store/src/radio_location_estimates.rs b/file_store/src/radio_location_estimates.rs new file mode 100644 index 000000000..219a52ae0 --- /dev/null +++ b/file_store/src/radio_location_estimates.rs @@ -0,0 +1,108 @@ +use crate::{ + traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode}, + Error, Result, +}; +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::{ + RadioLocationEstimateV1, RadioLocationEstimatesReqV1, RleEventV1, +}; +use rust_decimal::Decimal; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] +pub struct RadioLocationEstimates { + pub radio_id: String, + pub estimates: Vec, + pub timestamp: DateTime, + pub signer: PublicKeyBinary, +} + +impl MsgDecode for RadioLocationEstimates { + type Msg = RadioLocationEstimatesReqV1; +} + +impl MsgTimestamp>> for RadioLocationEstimatesReqV1 { + fn timestamp(&self) -> Result> { + self.timestamp.to_timestamp() + } +} + +impl MsgTimestamp for RadioLocationEstimates { + fn timestamp(&self) -> u64 { + self.timestamp.encode_timestamp() + } +} + +impl TryFrom for RadioLocationEstimates { + type Error = Error; + fn try_from(req: RadioLocationEstimatesReqV1) -> Result { + let timestamp = req.timestamp()?; + Ok(Self { + radio_id: req.radio_id, + estimates: req + .estimates + .into_iter() + .map(|e| e.try_into().unwrap()) + .collect(), + timestamp, + signer: req.signer.into(), + }) + } +} + +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] +pub struct RadioLocationEstimate { + pub radius: Decimal, + pub confidence: Decimal, + pub events: Vec, +} + +impl TryFrom for RadioLocationEstimate { + type Error = Error; + fn try_from(estimate: RadioLocationEstimateV1) -> Result { + Ok(Self { + radius: to_rust_decimal(estimate.radius.unwrap()), + confidence: to_rust_decimal(estimate.confidence.unwrap()), + events: estimate + .events + .into_iter() + .map(|e| e.try_into().unwrap()) + .collect(), + }) + } +} + +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] +pub struct RadioLocationEstimateEvent { + pub id: String, + pub timestamp: DateTime, +} + +impl MsgTimestamp>> for RleEventV1 { + fn timestamp(&self) -> Result> { + self.timestamp.to_timestamp() + } +} + +impl MsgTimestamp for RadioLocationEstimateEvent { + fn timestamp(&self) -> u64 { + self.timestamp.encode_timestamp() + } +} + +impl TryFrom for RadioLocationEstimateEvent { + type Error = Error; + fn try_from(event: RleEventV1) -> Result { + let timestamp = event.timestamp()?; + Ok(Self { + id: event.id, + timestamp, + }) + } +} + +fn to_rust_decimal(x: helium_proto::Decimal) -> rust_decimal::Decimal { + let str = x.value.as_str(); + rust_decimal::Decimal::from_str_exact(str).unwrap() +} diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index baf598fc6..6baa7d997 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -268,3 +268,8 @@ impl_file_sink!( FileType::RewardManifest.to_str(), "reward_manifest" ); +impl_file_sink!( + poc_mobile::RadioLocationEstimatesIngestReportV1, + FileType::RadioLocationEstimatesIngestReport.to_str(), + "radio_location_estimates_ingest_report" +); diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index 47bb6cb40..513427d1f 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -95,6 +95,7 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature); impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature); +impl_msg_verify!(poc_mobile::RadioLocationEstimatesReqV1, signature); #[cfg(test)] mod test { diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index c1e91ded1..64184f2d8 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -14,8 +14,10 @@ use helium_proto::services::poc_mobile::{ CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1, InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1, - InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1, - RadioThresholdReportRespV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + InvalidatedRadioThresholdReportRespV1, RadioLocationEstimatesIngestReportV1, + RadioLocationEstimatesReqV1, RadioLocationEstimatesRespV1, RadioThresholdIngestReportV1, + RadioThresholdReportReqV1, RadioThresholdReportRespV1, + ServiceProviderBoostedRewardsBannedRadioIngestReportV1, ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1, SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1, SubscriberLocationReqV1, SubscriberLocationRespV1, @@ -46,6 +48,7 @@ pub struct GrpcServer { sp_boosted_rewards_ban_sink: FileSinkClient, subscriber_mapping_event_sink: FileSinkClient, + radio_location_estimate_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -85,6 +88,7 @@ impl GrpcServer { ServiceProviderBoostedRewardsBannedRadioIngestReportV1, >, subscriber_mapping_event_sink: FileSinkClient, + radio_location_estimate_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -100,6 +104,7 @@ impl GrpcServer { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, + radio_location_estimate_sink, required_network, address, api_token, @@ -437,6 +442,30 @@ impl poc_mobile::PocMobile for GrpcServer { let id = timestamp.to_string(); Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id })) } + + async fn submit_radio_location_estimates( + &self, + request: Request, + ) -> GrpcResult { + let timestamp: u64 = Utc::now().timestamp_millis() as u64; + let req: RadioLocationEstimatesReqV1 = request.into_inner(); + + custom_tracing::record_b58("pub_key", &req.signer); + + let report = self + .verify_public_key(req.signer.as_ref()) + .and_then(|public_key| self.verify_network(public_key)) + .and_then(|public_key| self.verify_signature(public_key, req)) + .map(|(_, req)| RadioLocationEstimatesIngestReportV1 { + received_timestamp: timestamp, + report: Some(req), + })?; + + _ = self.radio_location_estimate_sink.write(report, []).await; + + let id = timestamp.to_string(); + Ok(Response::new(RadioLocationEstimatesRespV1 { id })) + } } pub async fn grpc_server(settings: &Settings) -> Result<()> { @@ -546,6 +575,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ) .await?; + let (radio_location_estimates_sink, radio_location_estimates_server) = + RadioLocationEstimatesIngestReportV1::file_sink( + store_base_path, + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), + env!("CARGO_PKG_NAME"), + ) + .await?; + let Some(api_token) = settings .token .as_ref() @@ -565,6 +604,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, + radio_location_estimates_sink, settings.network, settings.listen_addr, api_token, @@ -588,6 +628,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .add_task(coverage_object_report_sink_server) .add_task(sp_boosted_rewards_ban_sink_server) .add_task(subscriber_mapping_event_server) + .add_task(radio_location_estimates_server) .add_task(grpc_server) .build() .start() diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index 52512eba1..c39b86766 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -44,6 +44,8 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10); let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10); let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10); + let (radio_location_estimates_tx, _radio_location_estimates_rx) = + tokio::sync::mpsc::channel(10); tokio::spawn(async move { let grpc_server = GrpcServer::new( @@ -57,6 +59,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { FileSinkClient::new(coverage_obj_tx, "noop"), FileSinkClient::new(sp_boosted_tx, "noop"), FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"), + FileSinkClient::new(radio_location_estimates_tx, "noop"), Network::MainNet, socket_addr, api_token,