diff --git a/Cargo.lock b/Cargo.lock index 74c492ed7..5489a6377 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,17 +1615,17 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#0452523b68781b85ea2aba2d1c06edabd5898159" +source = "git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes#2310830305f2b35124f12e0d48d5b62511a9ce34" dependencies = [ "base64 0.21.7", "byteorder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "prost", "rand 0.8.5", "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", ] @@ -1789,7 +1789,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "http 0.2.11", "http-serde", "humantime-serde", @@ -2631,7 +2631,7 @@ dependencies = [ "axum 0.7.4", "bs58 0.4.0", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "http 0.2.11", "notify", "serde", @@ -3213,7 +3213,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "hex-literal", "http 0.2.11", "lazy_static", @@ -3795,7 +3795,7 @@ dependencies = [ "h3o", "helium-anchor-gen", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=master)", "hex", "hex-literal", "itertools", @@ -3835,6 +3835,22 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "helium-proto" +version = "0.1.0" +source = "git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes#2310830305f2b35124f12e0d48d5b62511a9ce34" +dependencies = [ + "bytes", + "prost", + "prost-build", + "serde", + "serde_json", + "strum", + "strum_macros", + "tonic", + "tonic-build", +] + [[package]] name = "helium-sub-daos" version = "0.1.8" @@ -3876,7 +3892,7 @@ dependencies = [ "async-trait", "chrono", "derive_builder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "hextree", "rust_decimal", "rust_decimal_macros", @@ -4291,8 +4307,9 @@ dependencies = [ "file-store", "futures", "futures-util", + "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "http 0.2.11", "humantime-serde", "metrics", @@ -4300,6 +4317,8 @@ dependencies = [ "poc-metrics", "prost", "rand 0.8.5", + "rust_decimal", + "rust_decimal_macros", "serde", "serde_json", "sha2 0.10.8", @@ -4361,7 +4380,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "hextree", "http 0.2.11", "http-serde", @@ -4403,7 +4422,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "http 0.2.11", "http-serde", "humantime-serde", @@ -4445,7 +4464,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "http-serde", "humantime-serde", "iot-config", @@ -5033,7 +5052,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "hextree", "http 0.2.11", "http-serde", @@ -5073,7 +5092,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "mobile-config", "prost", "rand 0.8.5", @@ -5109,7 +5128,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "http 0.2.11", "http-serde", "humantime-serde", @@ -5153,7 +5172,8 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", + "hex", "hex-assignments", "hextree", "http-serde", @@ -5837,7 +5857,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "http 0.2.11", "hyper 0.14.28", "jsonrpsee", @@ -5920,7 +5940,7 @@ dependencies = [ "futures-util", "helium-anchor-gen", "helium-lib", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -6061,7 +6081,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.4.0", + "heck 0.5.0", "itertools", "log", "multimap", @@ -6559,7 +6579,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/radio_location_esimates_with_hexes)", "humantime-serde", "lazy_static", "metrics", @@ -9986,7 +10006,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", "twox-hash", "xorf", diff --git a/Cargo.toml b/Cargo.toml index 9860df9a0..f80dcffea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,10 +70,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", ] } -helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "mj/radio_location_esimates_with_hexes", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "mj/radio_location_esimates_with_hexes" } solana-client = "1.18" solana-sdk = "1.18" solana-program = "1.18" @@ -132,4 +132,3 @@ sqlx = { git = "https://github.com/helium/sqlx.git", rev = "92a2268f02e0cac6fccb # # [patch.'https://github.com/helium/proto'] # helium-proto = { path = "../proto" } -# beacon = { path = "../proto/beacon" } diff --git a/file_store/src/error.rs b/file_store/src/error.rs index 3083357cf..0382252f0 100644 --- a/file_store/src/error.rs +++ b/file_store/src/error.rs @@ -41,6 +41,8 @@ pub enum Error { //Not recommended for internal use! #[error("external error")] ExternalError(#[from] Box), + #[error("error parsing decimal")] + IntoDecimal(#[from] rust_decimal::Error), } #[derive(Error, Debug)] diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index d0f824432..a7b18fcde 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -168,6 +168,9 @@ pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = pub const PROMOTION_REWARD_INGEST_REPORT: &str = "promotion_reward_ingest_report"; pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward"; pub const SERVICE_PROVIDER_PROMOTION_FUND: &str = "service_provider_promotion_fund"; +pub const RADIO_LOCATION_ESTIMATES_INGEST_REPORT: &str = "radio_location_estimates_ingest_report"; +pub const VERIFIED_RADIO_LOCATION_ESTIMATES_REPORT: &str = + "verified_radio_location_estimates_report"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -228,6 +231,8 @@ pub enum FileType { PromotionRewardIngestReport, VerifiedPromotionReward, ServiceProviderPromotionFund, + RadioLocationEstimatesIngestReport, + VerifiedRadioLocationEstimatesReport, } impl fmt::Display for FileType { @@ -303,6 +308,8 @@ impl fmt::Display for FileType { Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT, Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD, Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND, + Self::RadioLocationEstimatesIngestReport => RADIO_LOCATION_ESTIMATES_INGEST_REPORT, + Self::VerifiedRadioLocationEstimatesReport => VERIFIED_RADIO_LOCATION_ESTIMATES_REPORT, }; f.write_str(s) } @@ -381,6 +388,8 @@ impl FileType { Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT, Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD, Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND, + Self::RadioLocationEstimatesIngestReport => RADIO_LOCATION_ESTIMATES_INGEST_REPORT, + Self::VerifiedRadioLocationEstimatesReport => VERIFIED_RADIO_LOCATION_ESTIMATES_REPORT, } } } @@ -458,6 +467,8 @@ impl FromStr for FileType { PROMOTION_REWARD_INGEST_REPORT => Self::PromotionRewardIngestReport, VERIFIED_PROMOTION_REWARD => Self::VerifiedPromotionReward, SERVICE_PROVIDER_PROMOTION_FUND => Self::ServiceProviderPromotionFund, + RADIO_LOCATION_ESTIMATES_INGEST_REPORT => Self::RadioLocationEstimatesIngestReport, + VERIFIED_RADIO_LOCATION_ESTIMATES_REPORT => Self::VerifiedRadioLocationEstimatesReport, _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index 477c0dea9..58160648f 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -20,12 +20,15 @@ pub mod mobile_radio_threshold; pub mod mobile_session; pub mod mobile_subscriber; pub mod mobile_transfer; +pub mod radio_location_estimates; +pub mod radio_location_estimates_ingest_report; pub mod reward_manifest; mod settings; pub mod speedtest; pub mod subscriber_verified_mapping_event; pub mod subscriber_verified_mapping_event_ingest_report; pub mod traits; +pub mod verified_radio_location_estimates; pub mod verified_subscriber_verified_mapping_event_ingest_report; pub mod wifi_heartbeat; diff --git a/file_store/src/radio_location_estimates.rs b/file_store/src/radio_location_estimates.rs new file mode 100644 index 000000000..5287a52d8 --- /dev/null +++ b/file_store/src/radio_location_estimates.rs @@ -0,0 +1,201 @@ +use crate::{ + traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode}, + Error, Result, +}; +use chrono::{DateTime, Utc}; +use h3o::CellIndex; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::{ + self as proto, RadioLocationCorrelationV1, RadioLocationEstimateV1, RadioLocationEstimatesReqV1, +}; +use rust_decimal::Decimal; +use serde::{Deserialize, Serialize}; +use std::fmt; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum Entity { + CbrsId(String), + WifiPubKey(PublicKeyBinary), +} + +impl fmt::Display for Entity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Entity::CbrsId(id) => write!(f, "{}", id), + Entity::WifiPubKey(pub_key) => write!(f, "{}", pub_key), + } + } +} + +impl From for Entity { + fn from(entity: proto::radio_location_estimates_req_v1::Entity) -> Self { + match entity { + proto::radio_location_estimates_req_v1::Entity::CbrsId(v) => Entity::CbrsId(v), + proto::radio_location_estimates_req_v1::Entity::WifiPubKey(k) => { + Entity::WifiPubKey(k.into()) + } + } + } +} + +impl From for proto::radio_location_estimates_req_v1::Entity { + fn from(entity: Entity) -> Self { + match entity { + Entity::CbrsId(v) => proto::radio_location_estimates_req_v1::Entity::CbrsId(v), + Entity::WifiPubKey(k) => { + proto::radio_location_estimates_req_v1::Entity::WifiPubKey(k.into()) + } + } + } +} + +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] +pub struct RadioLocationEstimatesReq { + pub entity: Entity, + pub estimates: Vec, + pub timestamp: DateTime, + pub carrier_key: PublicKeyBinary, +} + +impl MsgDecode for RadioLocationEstimatesReq { + type Msg = RadioLocationEstimatesReqV1; +} + +impl MsgTimestamp>> for RadioLocationEstimatesReqV1 { + fn timestamp(&self) -> Result> { + self.timestamp.to_timestamp() + } +} + +impl MsgTimestamp for RadioLocationEstimatesReq { + fn timestamp(&self) -> u64 { + self.timestamp.encode_timestamp() + } +} + +impl From for RadioLocationEstimatesReqV1 { + fn from(rle: RadioLocationEstimatesReq) -> Self { + let timestamp = rle.timestamp(); + RadioLocationEstimatesReqV1 { + entity: Some(rle.entity.into()), + estimates: rle.estimates.into_iter().map(|e| e.into()).collect(), + timestamp, + carrier_key: rle.carrier_key.into(), + signature: vec![], + } + } +} + +impl TryFrom for RadioLocationEstimatesReq { + type Error = Error; + fn try_from(req: RadioLocationEstimatesReqV1) -> Result { + let timestamp = req.timestamp()?; + Ok(Self { + entity: if let Some(entity) = req.entity { + entity.into() + } else { + return Err(Error::NotFound("entity".to_string())); + }, + estimates: req + .estimates + .into_iter() + .map(|e| e.try_into().unwrap()) + .collect(), + timestamp, + carrier_key: req.carrier_key.into(), + }) + } +} + +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] +pub struct RadioLocationEstimate { + pub hex: CellIndex, + pub grid_distance: u32, + pub confidence: Decimal, + pub radio_location_correlations: Vec, +} + +impl From for RadioLocationEstimateV1 { + fn from(rle: RadioLocationEstimate) -> Self { + RadioLocationEstimateV1 { + hex: rle.hex.into(), + grid_distance: rle.grid_distance, + confidence: Some(to_proto_decimal(rle.confidence)), + radio_location_correlations: rle + .radio_location_correlations + .into_iter() + .map(|e| e.into()) + .collect(), + } + } +} + +impl TryFrom for RadioLocationEstimate { + type Error = Error; + fn try_from(estimate: RadioLocationEstimateV1) -> Result { + let hex = CellIndex::try_from(estimate.hex) + .map_err(crate::error::DecodeError::InvalidCellIndexError)?; + + Ok(Self { + hex, + grid_distance: estimate.grid_distance, + confidence: to_rust_decimal(estimate.confidence)?, + radio_location_correlations: estimate + .radio_location_correlations + .into_iter() + .flat_map(|rlc| rlc.try_into()) + .collect(), + }) + } +} + +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] +pub struct RadioLocationCorrelation { + pub id: String, + pub timestamp: DateTime, +} + +impl MsgTimestamp>> for RadioLocationCorrelationV1 { + fn timestamp(&self) -> Result> { + self.timestamp.to_timestamp() + } +} + +impl MsgTimestamp for RadioLocationCorrelation { + fn timestamp(&self) -> u64 { + self.timestamp.encode_timestamp() + } +} + +impl From for RadioLocationCorrelationV1 { + fn from(event: RadioLocationCorrelation) -> Self { + let timestamp = event.timestamp(); + RadioLocationCorrelationV1 { + id: event.id, + timestamp, + } + } +} + +impl TryFrom for RadioLocationCorrelation { + type Error = Error; + fn try_from(event: RadioLocationCorrelationV1) -> Result { + let timestamp = event.timestamp()?; + Ok(Self { + id: event.id, + timestamp, + }) + } +} + +fn to_rust_decimal(x: Option) -> Result { + let x = x.ok_or(Error::NotFound("Decimal".to_string()))?; + let str = x.value.as_str(); + Ok(rust_decimal::Decimal::from_str_exact(str)?) +} + +fn to_proto_decimal(x: rust_decimal::Decimal) -> helium_proto::Decimal { + helium_proto::Decimal { + value: x.to_string(), + } +} diff --git a/file_store/src/radio_location_estimates_ingest_report.rs b/file_store/src/radio_location_estimates_ingest_report.rs new file mode 100644 index 000000000..26d8e32e2 --- /dev/null +++ b/file_store/src/radio_location_estimates_ingest_report.rs @@ -0,0 +1,58 @@ +use crate::{ + radio_location_estimates::RadioLocationEstimatesReq, + traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode}, + Error, Result, +}; +use chrono::{DateTime, Utc}; +use helium_proto::services::poc_mobile::{ + RadioLocationEstimatesIngestReportV1, RadioLocationEstimatesReqV1, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] +pub struct RadioLocationEstimatesIngestReport { + pub received_timestamp: DateTime, + pub report: RadioLocationEstimatesReq, +} + +impl MsgDecode for RadioLocationEstimatesIngestReport { + type Msg = RadioLocationEstimatesIngestReportV1; +} + +impl MsgTimestamp>> for RadioLocationEstimatesIngestReportV1 { + fn timestamp(&self) -> Result> { + self.received_timestamp.to_timestamp() + } +} + +impl MsgTimestamp for RadioLocationEstimatesIngestReport { + fn timestamp(&self) -> u64 { + self.received_timestamp.encode_timestamp() + } +} + +impl From for RadioLocationEstimatesIngestReportV1 { + fn from(v: RadioLocationEstimatesIngestReport) -> Self { + let received_timestamp = v.timestamp(); + let report: RadioLocationEstimatesReqV1 = v.report.into(); + Self { + received_timestamp, + report: Some(report), + } + } +} + +impl TryFrom for RadioLocationEstimatesIngestReport { + type Error = Error; + fn try_from(v: RadioLocationEstimatesIngestReportV1) -> Result { + Ok(Self { + received_timestamp: v.timestamp()?, + report: v + .report + .ok_or_else(|| { + Error::not_found("ingest RadioLocationEstimatesIngestReport report") + })? + .try_into()?, + }) + } +} diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index f8166b024..d4ef856cc 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -273,3 +273,13 @@ impl_file_sink!( FileType::RewardManifest.to_str(), "reward_manifest" ); +impl_file_sink!( + poc_mobile::RadioLocationEstimatesIngestReportV1, + FileType::RadioLocationEstimatesIngestReport.to_str(), + "radio_location_estimates_ingest_report" +); +impl_file_sink!( + poc_mobile::VerifiedRadioLocationEstimatesReportV1, + FileType::VerifiedRadioLocationEstimatesReport.to_str(), + "verified_radio_location_estimates_report" +); diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index f4c05eeba..dc779ffc0 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -97,6 +97,7 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature); impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature); +impl_msg_verify!(poc_mobile::RadioLocationEstimatesReqV1, signature); #[cfg(test)] mod test { diff --git a/file_store/src/verified_radio_location_estimates.rs b/file_store/src/verified_radio_location_estimates.rs new file mode 100644 index 000000000..8602a10d4 --- /dev/null +++ b/file_store/src/verified_radio_location_estimates.rs @@ -0,0 +1,63 @@ +use crate::{ + radio_location_estimates_ingest_report::RadioLocationEstimatesIngestReport, + traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode}, + Error, Result, +}; +use chrono::{DateTime, Utc}; +use helium_proto::services::poc_mobile::{ + RadioLocationEstimatesIngestReportV1, RadioLocationEstimatesVerificationStatus, + VerifiedRadioLocationEstimatesReportV1, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] +pub struct VerifiedRadioLocationEstimatesReport { + pub report: RadioLocationEstimatesIngestReport, + pub status: RadioLocationEstimatesVerificationStatus, + pub timestamp: DateTime, +} + +impl MsgDecode for VerifiedRadioLocationEstimatesReport { + type Msg = VerifiedRadioLocationEstimatesReportV1; +} + +impl MsgTimestamp>> for VerifiedRadioLocationEstimatesReportV1 { + fn timestamp(&self) -> Result> { + self.timestamp.to_timestamp() + } +} + +impl MsgTimestamp for VerifiedRadioLocationEstimatesReport { + fn timestamp(&self) -> u64 { + self.timestamp.encode_timestamp() + } +} + +impl From for VerifiedRadioLocationEstimatesReportV1 { + fn from(v: VerifiedRadioLocationEstimatesReport) -> Self { + let timestamp = v.timestamp(); + let report: RadioLocationEstimatesIngestReportV1 = v.report.into(); + Self { + report: Some(report), + status: v.status as i32, + timestamp, + } + } +} + +impl TryFrom for VerifiedRadioLocationEstimatesReport { + type Error = Error; + fn try_from(v: VerifiedRadioLocationEstimatesReportV1) -> Result { + Ok(Self { + report: v + .clone() + .report + .ok_or_else(|| { + Error::not_found("ingest VerifiedRadioLocationEstimatesReport report") + })? + .try_into()?, + status: v.status.try_into()?, + timestamp: v.timestamp()?, + }) + } +} diff --git a/ingest/Cargo.toml b/ingest/Cargo.toml index b8df84ebe..2be4394cd 100644 --- a/ingest/Cargo.toml +++ b/ingest/Cargo.toml @@ -8,36 +8,39 @@ license.workspace = true [dependencies] anyhow = { workspace = true } -config = { workspace = true } -clap = { workspace = true } -thiserror = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } base64 = { workspace = true } bs58 = { workspace = true } -sha2 = { workspace = true } -http = { workspace = true } -tonic = { workspace = true } -triggered = { workspace = true } +chrono = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +custom-tracing = { path = "../custom_tracing", features = ["grpc"] } +file-store = { path = "../file_store" } futures = { workspace = true } futures-util = { workspace = true } +h3o = { workspace = true } +helium-crypto = { workspace = true } +helium-proto = { workspace = true } +http = { workspace = true } +humantime-serde = { workspace = true } +metrics = { workspace = true } +metrics-exporter-prometheus = { workspace = true } +poc-metrics = { path = "../metrics" } prost = { workspace = true } +rand = { workspace = true } +rust_decimal = { workspace = true } +rust_decimal_macros = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +sha2 = { workspace = true } +task-manager = { path = "../task_manager" } +thiserror = { workspace = true } tokio = { workspace = true } -tokio-util = { workspace = true } tokio-stream = { workspace = true } +tokio-util = { workspace = true } +tonic = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } -chrono = { workspace = true } -helium-proto = { workspace = true } -helium-crypto = { workspace = true } -file-store = { path = "../file_store" } -poc-metrics = { path = "../metrics" } -metrics = { workspace = true } -metrics-exporter-prometheus = { workspace = true } -task-manager = { path = "../task_manager" } -rand = { workspace = true } -custom-tracing = { path = "../custom_tracing", features = ["grpc"] } -humantime-serde = { workspace = true } +triggered = { workspace = true } [dev-dependencies] backon = "0" diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index c1e91ded1..8b26e1123 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -14,8 +14,10 @@ use helium_proto::services::poc_mobile::{ CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1, InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1, - InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1, - RadioThresholdReportRespV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + InvalidatedRadioThresholdReportRespV1, RadioLocationEstimatesIngestReportV1, + RadioLocationEstimatesReqV1, RadioLocationEstimatesRespV1, RadioThresholdIngestReportV1, + RadioThresholdReportReqV1, RadioThresholdReportRespV1, + ServiceProviderBoostedRewardsBannedRadioIngestReportV1, ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1, SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1, SubscriberLocationReqV1, SubscriberLocationRespV1, @@ -46,6 +48,7 @@ pub struct GrpcServer { sp_boosted_rewards_ban_sink: FileSinkClient, subscriber_mapping_event_sink: FileSinkClient, + radio_location_estimate_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -85,6 +88,7 @@ impl GrpcServer { ServiceProviderBoostedRewardsBannedRadioIngestReportV1, >, subscriber_mapping_event_sink: FileSinkClient, + radio_location_estimate_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -100,6 +104,7 @@ impl GrpcServer { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, + radio_location_estimate_sink, required_network, address, api_token, @@ -437,6 +442,30 @@ impl poc_mobile::PocMobile for GrpcServer { let id = timestamp.to_string(); Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id })) } + + async fn submit_radio_location_estimates( + &self, + request: Request, + ) -> GrpcResult { + let timestamp: u64 = Utc::now().timestamp_millis() as u64; + let req: RadioLocationEstimatesReqV1 = request.into_inner(); + + custom_tracing::record_b58("pub_key", &req.carrier_key); + + let report = self + .verify_public_key(req.carrier_key.as_ref()) + .and_then(|public_key| self.verify_network(public_key)) + .and_then(|public_key| self.verify_signature(public_key, req)) + .map(|(_, req)| RadioLocationEstimatesIngestReportV1 { + received_timestamp: timestamp, + report: Some(req), + })?; + + _ = self.radio_location_estimate_sink.write(report, []).await; + + let id = timestamp.to_string(); + Ok(Response::new(RadioLocationEstimatesRespV1 { id })) + } } pub async fn grpc_server(settings: &Settings) -> Result<()> { @@ -546,6 +575,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ) .await?; + let (radio_location_estimates_sink, radio_location_estimates_server) = + RadioLocationEstimatesIngestReportV1::file_sink( + store_base_path, + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), + env!("CARGO_PKG_NAME"), + ) + .await?; + let Some(api_token) = settings .token .as_ref() @@ -565,6 +604,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, + radio_location_estimates_sink, settings.network, settings.listen_addr, api_token, @@ -588,6 +628,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .add_task(coverage_object_report_sink_server) .add_task(sp_boosted_rewards_ban_sink_server) .add_task(subscriber_mapping_event_server) + .add_task(radio_location_estimates_server) .add_task(grpc_server) .build() .start() diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index 52512eba1..e7ca88740 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -1,9 +1,11 @@ use anyhow::bail; use backon::{ExponentialBuilder, Retryable}; use file_store::file_sink::FileSinkClient; -use helium_crypto::{KeyTag, Keypair, Network, Sign}; +use helium_crypto::{KeyTag, Keypair, Network, PublicKey, Sign}; use helium_proto::services::poc_mobile::{ - Client as PocMobileClient, SubscriberVerifiedMappingEventIngestReportV1, + radio_location_estimates_req_v1, Client as PocMobileClient, RadioLocationEstimateV1, + RadioLocationEstimatesIngestReportV1, RadioLocationEstimatesReqV1, + RadioLocationEstimatesRespV1, SubscriberVerifiedMappingEventIngestReportV1, SubscriberVerifiedMappingEventReqV1, SubscriberVerifiedMappingEventResV1, }; use ingest::server_mobile::GrpcServer; @@ -44,6 +46,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10); let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10); let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10); + let (radio_location_estimates_tx, radio_location_estimates_rx) = tokio::sync::mpsc::channel(10); tokio::spawn(async move { let grpc_server = GrpcServer::new( @@ -57,6 +60,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { FileSinkClient::new(coverage_obj_tx, "noop"), FileSinkClient::new(sp_boosted_tx, "noop"), FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"), + FileSinkClient::new(radio_location_estimates_tx, "noop"), Network::MainNet, socket_addr, api_token, @@ -70,6 +74,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { key_pair, token.to_string(), subscriber_mapping_rx, + radio_location_estimates_rx, ) .await; @@ -80,8 +85,10 @@ pub struct TestClient { client: PocMobileClient, key_pair: Arc, authorization: MetadataValue, - file_sink_rx: + subscriber_mapping_rx: Receiver>, + radio_location_estimates_rx: + Receiver>, } impl TestClient { @@ -89,9 +96,12 @@ impl TestClient { socket_addr: SocketAddr, key_pair: Keypair, api_token: String, - file_sink_rx: Receiver< + subscriber_mapping_rx: Receiver< file_store::file_sink::Message, >, + radio_location_estimates_rx: Receiver< + file_store::file_sink::Message, + >, ) -> TestClient { let client = (|| PocMobileClient::connect(format!("http://{socket_addr}"))) .retry(&ExponentialBuilder::default()) @@ -102,12 +112,34 @@ impl TestClient { client, key_pair: Arc::new(key_pair), authorization: format!("Bearer {}", api_token).try_into().unwrap(), - file_sink_rx, + subscriber_mapping_rx, + radio_location_estimates_rx, + } + } + + pub async fn recv_subscriber_mapping( + mut self, + ) -> anyhow::Result { + match timeout(Duration::from_secs(2), self.subscriber_mapping_rx.recv()).await { + Ok(Some(msg)) => match msg { + file_store::file_sink::Message::Commit(_) => bail!("got Commit"), + file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"), + file_store::file_sink::Message::Data(_, data) => Ok(data), + }, + Ok(None) => bail!("got none"), + Err(reason) => bail!("got error {reason}"), } } - pub async fn recv(mut self) -> anyhow::Result { - match timeout(Duration::from_secs(2), self.file_sink_rx.recv()).await { + pub async fn recv_radio_location_estimates( + mut self, + ) -> anyhow::Result { + match timeout( + Duration::from_secs(2), + self.radio_location_estimates_rx.recv(), + ) + .await + { Ok(Some(msg)) => match msg { file_store::file_sink::Message::Commit(_) => bail!("got Commit"), file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"), @@ -145,6 +177,33 @@ impl TestClient { Ok(res.into_inner()) } + + pub async fn submit_radio_location_estimates( + &mut self, + pub_key: &PublicKey, + estimates: Vec, + ) -> anyhow::Result { + let mut req = RadioLocationEstimatesReqV1 { + entity: Some(radio_location_estimates_req_v1::Entity::WifiPubKey( + pub_key.into(), + )), + estimates, + timestamp: 0, + carrier_key: self.key_pair.public_key().to_vec(), + signature: vec![], + }; + + req.signature = self.key_pair.sign(&req.encode_to_vec()).expect("sign"); + + let mut request = Request::new(req); + let metadata = request.metadata_mut(); + + metadata.insert("authorization", self.authorization.clone()); + + let res = self.client.submit_radio_location_estimates(request).await?; + + Ok(res.into_inner()) + } } pub fn generate_keypair() -> Keypair { diff --git a/ingest/tests/mobile_ingest.rs b/ingest/tests/mobile_ingest.rs index 477e2ede2..3d055f798 100644 --- a/ingest/tests/mobile_ingest.rs +++ b/ingest/tests/mobile_ingest.rs @@ -1,3 +1,12 @@ +use h3o::LatLng; +use helium_crypto::{KeyTag, Keypair, PublicKey}; +use helium_proto::services::poc_mobile::{ + radio_location_estimates_req_v1::Entity, RadioLocationCorrelationV1, RadioLocationEstimateV1, + RadioLocationEstimatesReqV1, +}; +use rand::rngs::OsRng; +use rust_decimal::prelude::*; + mod common; #[tokio::test] @@ -15,7 +24,7 @@ async fn submit_verified_subscriber_mapping_event() -> anyhow::Result<()> { let timestamp: String = res.unwrap().id; - match client.recv().await { + match client.recv_subscriber_mapping().await { Ok(report) => { assert_eq!(timestamp, report.received_timestamp.to_string()); @@ -33,3 +42,67 @@ async fn submit_verified_subscriber_mapping_event() -> anyhow::Result<()> { trigger.trigger(); Ok(()) } + +#[tokio::test] +async fn submit_radio_location_estimates() -> anyhow::Result<()> { + let (mut client, trigger) = common::setup_mobile().await?; + + let key_pair = Keypair::generate(KeyTag::default(), &mut OsRng); + let public_key = key_pair.public_key(); + let hex = LatLng::new(41.41208, -122.19288) + .unwrap() + .to_cell(h3o::Resolution::Twelve); + let estimates = vec![RadioLocationEstimateV1 { + hex: u64::from(hex), + grid_distance: 2, + confidence: to_proto_decimal(0.75), + radio_location_correlations: vec![RadioLocationCorrelationV1 { + id: "event_1".to_string(), + timestamp: 0, + }], + }]; + + let res = client + .submit_radio_location_estimates(public_key, estimates.clone()) + .await; + + assert!(res.is_ok()); + + let timestamp: String = res.unwrap().id; + + match client.recv_radio_location_estimates().await { + Ok(report) => { + assert_eq!(timestamp, report.received_timestamp.to_string()); + + match report.report { + None => panic!("No report found"), + Some(req) => { + let req_public_key = wifi_public_key(req.clone())?; + assert_eq!(public_key.to_string(), req_public_key.to_string()); + assert_eq!(estimates, req.estimates); + } + } + } + Err(e) => panic!("got error {e}"), + } + + trigger.trigger(); + Ok(()) +} + +fn to_proto_decimal(x: f64) -> Option { + let d = Decimal::from_f64(x).unwrap(); + Some(helium_proto::Decimal { + value: d.to_string(), + }) +} + +fn wifi_public_key(req: RadioLocationEstimatesReqV1) -> anyhow::Result { + let entity: Entity = req.entity.unwrap(); + let Entity::WifiPubKey(public_key_bytes) = entity.clone() else { + anyhow::bail!("not WifiPubKey") + }; + let public_key = PublicKey::from_bytes(&public_key_bytes)?; + + Ok(public_key) +} diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index 84e6c61b3..dad9bf2d2 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -9,57 +9,58 @@ authors.workspace = true [dependencies] anyhow = { workspace = true } async-compression = { version = "0", features = ["tokio", "gzip"] } -config = { workspace = true } -thiserror = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -h3o = { workspace = true, features = ["geo"] } -hextree = { workspace = true } -http-serde = { workspace = true } -clap = { workspace = true } -sqlx = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { workspace = true } +async-trait = { workspace = true } base64 = { workspace = true } -sha2 = { workspace = true } -lazy_static = { workspace = true } chrono = { workspace = true } -triggered = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +coverage-map = { path = "../coverage_map" } +coverage-point-calculator = { path = "../coverage_point_calculator" } +custom-tracing = { path = "../custom_tracing" } +db-store = { path = "../db_store" } +derive_builder = { workspace = true } +file-store = { path = "../file_store" } flate2 = "1" futures = { workspace = true } futures-util = { workspace = true } -prost = { workspace = true } -once_cell = { workspace = true } -helium-proto = { workspace = true } +h3o = { workspace = true, features = ["geo"] } helium-crypto = { workspace = true, features = ["sqlx-postgres"] } +helium-proto = { workspace = true } +hex = "0.4" +hex-assignments = { path = "../hex_assignments" } +hextree = { workspace = true } +http-serde = { workspace = true } humantime = { workspace = true } -rust_decimal = { workspace = true } -rust_decimal_macros = { workspace = true } -tonic = { workspace = true } -tokio-stream = { workspace = true } -tokio-util = { workspace = true } +humantime-serde = { workspace = true } +lazy_static = { workspace = true } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } mobile-config = { path = "../mobile_config" } -file-store = { path = "../file_store" } -db-store = { path = "../db_store" } +once_cell = { workspace = true } poc-metrics = { path = "../metrics" } -reward-scheduler = { path = "../reward_scheduler" } price = { path = "../price" } +prost = { workspace = true } rand = { workspace = true } -async-trait = { workspace = true } +regex = "1" retainer = { workspace = true } -uuid = { workspace = true } -task-manager = { path = "../task_manager" } +reward-scheduler = { path = "../reward_scheduler" } +rust_decimal = { workspace = true } +rust_decimal_macros = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +sha2 = { workspace = true } solana-sdk = { workspace = true } -derive_builder = { workspace = true } -regex = "1" -humantime-serde = { workspace = true } -custom-tracing = { path = "../custom_tracing" } -hex-assignments = { path = "../hex_assignments" } -coverage-point-calculator = { path = "../coverage_point_calculator" } -coverage-map = { path = "../coverage_map" } +sqlx = { workspace = true } +task-manager = { path = "../task_manager" } +thiserror = { workspace = true } +tokio = { workspace = true } +tokio-stream = { workspace = true } +tokio-util = { workspace = true } +tonic = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +triggered = { workspace = true } +uuid = { workspace = true } [dev-dependencies] backon = "0" diff --git a/mobile_verifier/migrations/38_radio_location_estimates.sql b/mobile_verifier/migrations/38_radio_location_estimates.sql new file mode 100644 index 000000000..9b1ec007e --- /dev/null +++ b/mobile_verifier/migrations/38_radio_location_estimates.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS radio_location_estimates ( + hashed_key TEXT NOT NULL, + radio_type radio_type NOT NULL, + radio_key TEXT NOT NULL, + received_timestamp TIMESTAMPTZ NOT NULL, + hex BIGINT NOT NULL, + grid_distance BIGINT NOT NULL, + confidence DECIMAL NOT NULL, + invalidated_at TIMESTAMPTZ DEFAULT NULL, + inserted_at TIMESTAMPTZ DEFAULT now(), + PRIMARY KEY (hashed_key) +); diff --git a/mobile_verifier/migrations/39_update_cbrs_hearbeats.sql b/mobile_verifier/migrations/39_update_cbrs_hearbeats.sql new file mode 100644 index 000000000..e5f55707f --- /dev/null +++ b/mobile_verifier/migrations/39_update_cbrs_hearbeats.sql @@ -0,0 +1,3 @@ +ALTER TABLE cbrs_heartbeats +ADD COLUMN lat DOUBLE PRECISION NOT NULL DEFAULT 0.0, +ADD COLUMN lon DOUBLE PRECISION NOT NULL DEFAULT 0.0; \ No newline at end of file diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index b7ab815fd..23498a54d 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -1,11 +1,12 @@ -use std::time::Duration; - use crate::{ boosting_oracles::DataSetDownloaderDaemon, coverage::{new_coverage_object_notification_channel, CoverageDaemon}, data_session::DataSessionIngestor, geofence::Geofence, - heartbeats::{cbrs::CbrsHeartbeatDaemon, wifi::WifiHeartbeatDaemon}, + heartbeats::{ + cbrs::CbrsHeartbeatDaemon, location_cache::LocationCache, wifi::WifiHeartbeatDaemon, + }, + radio_location_estimates::RadioLocationEstimatesDaemon, radio_threshold::RadioThresholdIngestor, rewarder::Rewarder, sp_boosted_rewards_bans::ServiceProviderBoostedRewardsBanIngestor, @@ -25,6 +26,7 @@ use mobile_config::client::{ entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient, CarrierServiceClient, GatewayClient, }; +use std::time::Duration; use task_manager::TaskManager; #[derive(Debug, clap::Args)] @@ -101,6 +103,8 @@ impl Cmd { let (new_coverage_obj_notifier, new_coverage_obj_notification) = new_coverage_object_notification_channel(); + let location_cache = LocationCache::new(&pool).await?; + TaskManager::builder() .add_task(file_upload_server) .add_task(valid_heartbeats_server) @@ -115,6 +119,7 @@ impl Cmd { valid_heartbeats.clone(), seniority_updates.clone(), usa_geofence, + location_cache.clone(), ) .await?, ) @@ -127,6 +132,7 @@ impl Cmd { valid_heartbeats, seniority_updates.clone(), usa_and_mexico_geofence, + location_cache.clone(), ) .await?, ) @@ -198,13 +204,23 @@ impl Cmd { ServiceProviderBoostedRewardsBanIngestor::create_managed_task( pool.clone(), file_upload.clone(), - report_ingest, - auth_client, + report_ingest.clone(), + auth_client.clone(), settings, seniority_updates, ) .await?, ) + .add_task( + RadioLocationEstimatesDaemon::create_managed_task( + pool.clone(), + settings, + file_upload.clone(), + report_ingest.clone(), + auth_client.clone(), + ) + .await?, + ) .add_task( Rewarder::create_managed_task( pool, @@ -213,6 +229,7 @@ impl Cmd { carrier_client, hex_boosting_client, speedtests_avg, + location_cache, ) .await?, ) diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index e02010c81..143405ae5 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -1,8 +1,9 @@ -use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat}; +use super::{ + location_cache::LocationCache, process_validated_heartbeats, Heartbeat, ValidatedHeartbeat, +}; use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, geofence::GeofenceValidator, - heartbeats::LocationCache, GatewayResolver, Settings, }; @@ -16,7 +17,6 @@ use file_store::{ }; use futures::{stream::StreamExt, TryFutureExt}; use helium_proto::services::poc_mobile as proto; -use retainer::Cache; use sqlx::{Pool, Postgres}; use std::{ sync::Arc, @@ -33,6 +33,7 @@ pub struct CbrsHeartbeatDaemon { heartbeat_sink: FileSinkClient, seniority_sink: FileSinkClient, geofence: GFV, + location_cache: LocationCache, } impl CbrsHeartbeatDaemon @@ -49,6 +50,7 @@ where valid_heartbeats: FileSinkClient, seniority_updates: FileSinkClient, geofence: GFV, + location_cache: LocationCache, ) -> anyhow::Result { // CBRS Heartbeats let (cbrs_heartbeats, cbrs_heartbeats_server) = @@ -69,6 +71,7 @@ where valid_heartbeats, seniority_updates, geofence, + location_cache, ); Ok(TaskManager::builder() @@ -86,6 +89,7 @@ where heartbeat_sink: FileSinkClient, seniority_sink: FileSinkClient, geofence: GFV, + location_cache: LocationCache, ) -> Self { Self { pool, @@ -95,12 +99,13 @@ where heartbeat_sink, seniority_sink, geofence, + location_cache, } } pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("Starting CBRS HeartbeatDaemon"); - let heartbeat_cache = Arc::new(Cache::<(String, DateTime), ()>::new()); + let heartbeat_cache = Arc::new(retainer::Cache::<(String, DateTime), ()>::new()); let heartbeat_cache_clone = heartbeat_cache.clone(); tokio::spawn(async move { @@ -111,8 +116,6 @@ where let coverage_claim_time_cache = CoverageClaimTimeCache::new(); let coverage_object_cache = CoverageObjectCache::new(&self.pool); - // Unused: - let location_cache = LocationCache::new(&self.pool); loop { tokio::select! { @@ -128,7 +131,6 @@ where &heartbeat_cache, &coverage_claim_time_cache, &coverage_object_cache, - &location_cache, ).await?; metrics::histogram!("cbrs_heartbeat_processing_time") .record(start.elapsed()); @@ -142,10 +144,9 @@ where async fn process_file( &self, file: FileInfoStream, - heartbeat_cache: &Arc), ()>>, + heartbeat_cache: &Arc), ()>>, coverage_claim_time_cache: &CoverageClaimTimeCache, coverage_object_cache: &CoverageObjectCache, - location_cache: &LocationCache, ) -> anyhow::Result<()> { tracing::info!("Processing CBRS heartbeat file {}", file.file_info.key); let mut transaction = self.pool.begin().await?; @@ -166,7 +167,7 @@ where heartbeats, &self.gateway_info_resolver, coverage_object_cache, - location_cache, + &self.location_cache, self.max_distance_to_coverage, &epoch, &self.geofence, diff --git a/mobile_verifier/src/heartbeats/last_location.rs b/mobile_verifier/src/heartbeats/last_location.rs deleted file mode 100644 index cb6e45c76..000000000 --- a/mobile_verifier/src/heartbeats/last_location.rs +++ /dev/null @@ -1,124 +0,0 @@ -use std::sync::Arc; - -use chrono::{DateTime, Duration, Utc}; -use helium_crypto::PublicKeyBinary; -use retainer::Cache; -use sqlx::PgPool; - -#[derive(sqlx::FromRow, Copy, Clone)] -pub struct LastLocation { - pub location_validation_timestamp: DateTime, - pub latest_timestamp: DateTime, - pub lat: f64, - pub lon: f64, -} - -impl LastLocation { - pub fn new( - location_validation_timestamp: DateTime, - latest_timestamp: DateTime, - lat: f64, - lon: f64, - ) -> Self { - Self { - location_validation_timestamp, - latest_timestamp, - lat, - lon, - } - } - - /// Calculates the duration from now in which last_valid_timestamp is 12 hours old - pub fn duration_to_expiration(&self) -> Duration { - ((self.latest_timestamp + Duration::hours(12)) - Utc::now()).max(Duration::zero()) - } -} - -/// A cache for previous valid (or invalid) WiFi heartbeat locations -#[derive(Clone)] -pub struct LocationCache { - pool: PgPool, - locations: Arc>>, -} - -impl LocationCache { - pub fn new(pool: &PgPool) -> Self { - let locations = Arc::new(Cache::new()); - let locations_clone = locations.clone(); - tokio::spawn(async move { - locations_clone - .monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24)) - .await - }); - Self { - pool: pool.clone(), - locations, - } - } - - async fn fetch_from_db_and_set( - &self, - hotspot: &PublicKeyBinary, - ) -> anyhow::Result> { - let last_location: Option = sqlx::query_as( - r#" - SELECT location_validation_timestamp, latest_timestamp, lat, lon - FROM wifi_heartbeats - WHERE location_validation_timestamp IS NOT NULL - AND latest_timestamp >= $1 - AND hotspot_key = $2 - ORDER BY latest_timestamp DESC - LIMIT 1 - "#, - ) - .bind(Utc::now() - Duration::hours(12)) - .bind(hotspot) - .fetch_optional(&self.pool) - .await?; - self.locations - .insert( - hotspot.clone(), - last_location, - last_location - .map(|x| x.duration_to_expiration()) - .unwrap_or_else(|| Duration::days(365)) - .to_std()?, - ) - .await; - Ok(last_location) - } - - pub async fn fetch_last_location( - &self, - hotspot: &PublicKeyBinary, - ) -> anyhow::Result> { - Ok( - if let Some(last_location) = self.locations.get(hotspot).await { - *last_location - } else { - self.fetch_from_db_and_set(hotspot).await? - }, - ) - } - - pub async fn set_last_location( - &self, - hotspot: &PublicKeyBinary, - last_location: LastLocation, - ) -> anyhow::Result<()> { - let duration_to_expiration = last_location.duration_to_expiration(); - self.locations - .insert( - hotspot.clone(), - Some(last_location), - duration_to_expiration.to_std()?, - ) - .await; - Ok(()) - } - - /// Only used for testing. - pub async fn delete_last_location(&self, hotspot: &PublicKeyBinary) { - self.locations.remove(hotspot).await; - } -} diff --git a/mobile_verifier/src/heartbeats/location_cache.rs b/mobile_verifier/src/heartbeats/location_cache.rs new file mode 100644 index 000000000..9a4bb5122 --- /dev/null +++ b/mobile_verifier/src/heartbeats/location_cache.rs @@ -0,0 +1,270 @@ +use chrono::{DateTime, Duration, Utc}; +use file_store::radio_location_estimates::Entity; +use futures::StreamExt; +use helium_crypto::PublicKeyBinary; +use sqlx::{PgPool, Row}; +use std::{collections::HashMap, str::FromStr, sync::Arc}; +use tokio::sync::{Mutex, MutexGuard}; + +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub enum LocationCacheKey { + CbrsId(String), + WifiPubKey(PublicKeyBinary), +} + +#[derive(sqlx::FromRow, Copy, Clone, Debug)] +pub struct LocationCacheValue { + pub lat: f64, + pub lon: f64, + pub timestamp: DateTime, +} + +impl LocationCacheValue { + pub fn new(lat: f64, lon: f64, timestamp: DateTime) -> Self { + Self { + lat, + lon, + timestamp, + } + } +} + +type LocationCacheData = HashMap; + +/// A cache WiFi/Cbrs heartbeat locations +#[derive(Clone)] +pub struct LocationCache { + pool: PgPool, + wifi: Arc>, + cbrs: Arc>, +} + +impl LocationCache { + pub async fn new(pool: &PgPool) -> anyhow::Result { + let wifi = Arc::new(Mutex::new(HashMap::new())); + let cbrs = Arc::new(Mutex::new(HashMap::new())); + + hydrate_wifi(pool, &wifi).await?; + hydrate_cbrs(pool, &cbrs).await?; + + Ok(Self { + pool: pool.clone(), + wifi, + cbrs, + }) + } + + pub async fn get(&self, key: LocationCacheKey) -> anyhow::Result> { + { + let data = self.key_to_lock(&key).await; + if let Some(&value) = data.get(&key) { + return Ok(Some(value)); + } + } + match key { + LocationCacheKey::WifiPubKey(pub_key_bin) => { + self.fetch_wifi_and_insert(pub_key_bin).await + } + LocationCacheKey::CbrsId(id) => self.fetch_cbrs_and_insert(id).await, + } + } + + pub async fn get_recent( + &self, + key: LocationCacheKey, + when: Duration, + ) -> anyhow::Result> { + { + let data = self.key_to_lock(&key).await; + if let Some(&value) = data.get(&key) { + let now = Utc::now(); + let before = now - when; + if value.timestamp > before { + return Ok(Some(value)); + } else { + return Ok(None); + } + } + } + match key { + LocationCacheKey::WifiPubKey(pub_key_bin) => { + self.fetch_wifi_and_insert(pub_key_bin).await + } + LocationCacheKey::CbrsId(id) => self.fetch_cbrs_and_insert(id).await, + } + } + + pub async fn get_all(&self) -> LocationCacheData { + let wifi_data = self.wifi.lock().await; + let mut wifi_data_cloned = wifi_data.clone(); + + let cbrs_data = self.cbrs.lock().await; + let cbrs_data_cloned = cbrs_data.clone(); + + wifi_data_cloned.extend(cbrs_data_cloned); + wifi_data_cloned + } + + pub async fn insert( + &self, + key: LocationCacheKey, + value: LocationCacheValue, + ) -> anyhow::Result<()> { + let mut data = self.key_to_lock(&key).await; + data.insert(key, value); + Ok(()) + } + + /// Only used for testing. + pub async fn remove(&self, key: LocationCacheKey) -> anyhow::Result<()> { + let mut data = self.key_to_lock(&key).await; + data.remove(&key); + Ok(()) + } + + async fn key_to_lock(&self, key: &LocationCacheKey) -> MutexGuard<'_, LocationCacheData> { + match key { + LocationCacheKey::WifiPubKey(_) => self.wifi.lock().await, + LocationCacheKey::CbrsId(_) => self.cbrs.lock().await, + } + } + + async fn fetch_wifi_and_insert( + &self, + pub_key_bin: PublicKeyBinary, + ) -> anyhow::Result> { + let sqlx_return: Option = sqlx::query_as( + r#" + SELECT lat, lon, location_validation_timestamp AS timestamp + FROM wifi_heartbeats + WHERE location_validation_timestamp IS NOT NULL + AND location_validation_timestamp >= $1 + AND hotspot_key = $2 + ORDER BY location_validation_timestamp DESC + LIMIT 1 + "#, + ) + .bind(Utc::now() - Duration::hours(12)) + .bind(pub_key_bin.clone()) + .fetch_optional(&self.pool) + .await?; + match sqlx_return { + None => Ok(None), + Some(value) => { + let key = LocationCacheKey::WifiPubKey(pub_key_bin); + let mut data = self.key_to_lock(&key).await; + data.insert(key, value); + Ok(Some(value)) + } + } + } + + async fn fetch_cbrs_and_insert( + &self, + cbsd_id: String, + ) -> anyhow::Result> { + let sqlx_return: Option = sqlx::query_as( + r#" + SELECT lat, lon, latest_timestamp AS timestamp + FROM cbrs_heartbeats + WHERE latest_timestamp IS NOT NULL + AND latest_timestamp >= $1 + AND cbsd_id = $2 + ORDER BY latest_timestamp DESC + LIMIT 1 + "#, + ) + .bind(Utc::now() - Duration::hours(12)) + .bind(cbsd_id.clone()) + .fetch_optional(&self.pool) + .await?; + + match sqlx_return { + None => Ok(None), + Some(value) => { + let key = LocationCacheKey::CbrsId(cbsd_id); + let mut data = self.key_to_lock(&key).await; + data.insert(key, value); + Ok(Some(value)) + } + } + } +} + +async fn hydrate_wifi(pool: &PgPool, mutex: &Arc>) -> anyhow::Result<()> { + let mut rows = sqlx::query( + r#" + SELECT wh.lat, wh.lon, wh.location_validation_timestamp AS timestamp, wh.hotspot_key + FROM wifi_heartbeats wh + JOIN ( + SELECT hotspot_key, MAX(location_validation_timestamp) AS max_timestamp + FROM wifi_heartbeats + WHERE location_validation_timestamp IS NOT NULL + GROUP BY hotspot_key + ) latest ON wh.hotspot_key = latest.hotspot_key + AND wh.location_validation_timestamp = latest.max_timestamp + "#, + ) + .fetch(pool); + + while let Some(row_result) = rows.next().await { + let row = row_result?; + + let hotspot_key: String = row.get("hotspot_key"); + let pub_key_bin = PublicKeyBinary::from_str(&hotspot_key)?; + let key = LocationCacheKey::WifiPubKey(pub_key_bin); + + let value = LocationCacheValue { + lat: row.get("lat"), + lon: row.get("lon"), + timestamp: row.get("timestamp"), + }; + + let mut data = mutex.lock().await; + data.insert(key.clone(), value); + } + + Ok(()) +} + +async fn hydrate_cbrs(pool: &PgPool, mutex: &Arc>) -> anyhow::Result<()> { + let mut rows = sqlx::query( + r#" + SELECT ch.lat, ch.lon, ch.latest_timestamp AS timestamp, ch.cbsd_id + FROM cbrs_heartbeats ch + JOIN ( + SELECT cbsd_id, MAX(latest_timestamp) AS max_timestamp + FROM cbrs_heartbeats + WHERE latest_timestamp IS NOT NULL + GROUP BY cbsd_id + ) latest ON ch.cbsd_id = latest.cbsd_id + AND ch.latest_timestamp = latest.max_timestamp + "#, + ) + .fetch(pool); + + while let Some(row_result) = rows.next().await { + let row = row_result?; + + let id: String = row.get("cbsd_id"); + let key = LocationCacheKey::CbrsId(id); + + let value = LocationCacheValue { + lat: row.get("lat"), + lon: row.get("lon"), + timestamp: row.get("timestamp"), + }; + + let mut data = mutex.lock().await; + data.insert(key.clone(), value); + } + + Ok(()) +} + +pub fn key_to_entity(entity: LocationCacheKey) -> Entity { + match entity { + LocationCacheKey::CbrsId(id) => Entity::CbrsId(id), + LocationCacheKey::WifiPubKey(pub_key) => Entity::WifiPubKey(pub_key), + } +} diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index b72b47526..3ba6829e0 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -1,7 +1,8 @@ pub mod cbrs; -pub mod last_location; +pub mod location_cache; pub mod wifi; +use self::location_cache::{LocationCache, LocationCacheKey, LocationCacheValue}; use crate::{ cell_type::{CellType, CellTypeLabel}, coverage::{CoverageClaimTimeCache, CoverageObjectCache, CoverageObjectMeta}, @@ -26,8 +27,6 @@ use sqlx::{postgres::PgTypeInfo, Decode, Encode, Postgres, Transaction, Type}; use std::{ops::Range, pin::pin, time}; use uuid::Uuid; -use self::last_location::{LastLocation, LocationCache}; - /// Minimum number of heartbeats required to give a reward to the hotspot. const MINIMUM_HEARTBEAT_COUNT: i64 = 12; @@ -375,7 +374,7 @@ impl ValidatedHeartbeat { mut heartbeat: Heartbeat, gateway_info_resolver: &impl GatewayResolver, coverage_object_cache: &CoverageObjectCache, - last_location_cache: &LocationCache, + location_cache: &LocationCache, max_distance_to_coverage: u32, epoch: &Range>, geofence: &impl GeofenceValidator, @@ -483,7 +482,6 @@ impl ValidatedHeartbeat { proto::HeartbeatValidity::UnsupportedLocation, )); } - match gateway_info_resolver .resolve_gateway(&heartbeat.hotspot_key) .await? @@ -504,28 +502,52 @@ impl ValidatedHeartbeat { Some(coverage_object.meta), proto::HeartbeatValidity::GatewayNotFound, )), - GatewayResolution::GatewayNotAsserted if heartbeat.hb_type == HbType::Wifi => { - Ok(Self::new( + GatewayResolution::GatewayNotAsserted => match heartbeat.hb_type { + HbType::Wifi => Ok(Self::new( heartbeat, cell_type, dec!(0), None, Some(coverage_object.meta), proto::HeartbeatValidity::GatewayNotAsserted, - )) - } + )), + HbType::Cbrs => { + if let Some(cbsd_id) = heartbeat.cbsd_id.clone() { + location_cache + .insert( + LocationCacheKey::CbrsId(cbsd_id), + LocationCacheValue::new( + heartbeat.lat, + heartbeat.lon, + heartbeat.timestamp, + ), + ) + .await?; + }; + Ok(Self::new( + heartbeat, + cell_type, + dec!(1.0), + None, + Some(coverage_object.meta), + proto::HeartbeatValidity::Valid, + )) + } + }, GatewayResolution::AssertedLocation(location) if heartbeat.hb_type == HbType::Wifi => { let asserted_latlng: LatLng = CellIndex::try_from(location)?.into(); let is_valid = match heartbeat.location_validation_timestamp { None => { - if let Some(last_location) = last_location_cache - .fetch_last_location(&heartbeat.hotspot_key) + if let Some(last_location) = location_cache + .get_recent( + LocationCacheKey::WifiPubKey(heartbeat.hotspot_key.clone()), + Duration::hours(12), + ) .await? { heartbeat.lat = last_location.lat; heartbeat.lon = last_location.lon; - heartbeat.location_validation_timestamp = - Some(last_location.location_validation_timestamp); + heartbeat.location_validation_timestamp = Some(last_location.timestamp); // Can't panic, previous lat and lon must be valid. hb_latlng = heartbeat.centered_latlng().unwrap(); true @@ -534,14 +556,13 @@ impl ValidatedHeartbeat { } } Some(location_validation_timestamp) => { - last_location_cache - .set_last_location( - &heartbeat.hotspot_key, - LastLocation::new( - location_validation_timestamp, - heartbeat.timestamp, + location_cache + .insert( + LocationCacheKey::WifiPubKey(heartbeat.hotspot_key.clone()), + LocationCacheValue::new( heartbeat.lat, heartbeat.lon, + location_validation_timestamp, ), ) .await?; @@ -677,8 +698,8 @@ impl ValidatedHeartbeat { let truncated_timestamp = self.truncated_timestamp()?; sqlx::query( r#" - INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, lat, lon) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (cbsd_id, truncated_timestamp) DO UPDATE SET latest_timestamp = EXCLUDED.latest_timestamp, coverage_object = EXCLUDED.coverage_object @@ -691,6 +712,8 @@ impl ValidatedHeartbeat { .bind(truncated_timestamp) .bind(self.heartbeat.coverage_object) .bind(self.location_trust_score_multiplier) + .bind(self.heartbeat.lat) + .bind(self.heartbeat.lon) .execute(&mut *exec) .await?; Ok(()) diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index 9d2ed0d5f..e6dfd8322 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -1,8 +1,9 @@ -use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat}; +use super::{ + location_cache::LocationCache, process_validated_heartbeats, Heartbeat, ValidatedHeartbeat, +}; use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, geofence::GeofenceValidator, - heartbeats::LocationCache, GatewayResolver, Settings, }; use chrono::{DateTime, Duration, Utc}; @@ -32,6 +33,7 @@ pub struct WifiHeartbeatDaemon { heartbeat_sink: FileSinkClient, seniority_sink: FileSinkClient, geofence: GFV, + location_cache: LocationCache, } impl WifiHeartbeatDaemon @@ -48,6 +50,7 @@ where valid_heartbeats: FileSinkClient, seniority_updates: FileSinkClient, geofence: GFV, + location_cache: LocationCache, ) -> anyhow::Result { // Wifi Heartbeats let (wifi_heartbeats, wifi_heartbeats_server) = @@ -67,6 +70,7 @@ where valid_heartbeats, seniority_updates, geofence, + location_cache, ); Ok(TaskManager::builder() @@ -84,6 +88,7 @@ where heartbeat_sink: FileSinkClient, seniority_sink: FileSinkClient, geofence: GFV, + location_cache: LocationCache, ) -> Self { Self { pool, @@ -93,6 +98,7 @@ where heartbeat_sink, seniority_sink, geofence, + location_cache, } } @@ -109,7 +115,6 @@ where let coverage_claim_time_cache = CoverageClaimTimeCache::new(); let coverage_object_cache = CoverageObjectCache::new(&self.pool); - let location_cache = LocationCache::new(&self.pool); loop { tokio::select! { @@ -124,8 +129,7 @@ where file, &heartbeat_cache, &coverage_claim_time_cache, - &coverage_object_cache, - &location_cache + &coverage_object_cache ).await?; metrics::histogram!("wifi_heartbeat_processing_time") .record(start.elapsed()); @@ -142,7 +146,6 @@ where heartbeat_cache: &Cache<(String, DateTime), ()>, coverage_claim_time_cache: &CoverageClaimTimeCache, coverage_object_cache: &CoverageObjectCache, - location_cache: &LocationCache, ) -> anyhow::Result<()> { tracing::info!("Processing WIFI heartbeat file {}", file.file_info.key); let mut transaction = self.pool.begin().await?; @@ -157,7 +160,7 @@ where heartbeats, &self.gateway_info_resolver, coverage_object_cache, - location_cache, + &self.location_cache, self.max_distance_to_coverage, &epoch, &self.geofence, diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index 9fc3757c0..5a7645b43 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -5,6 +5,7 @@ pub mod coverage; pub mod data_session; pub mod geofence; pub mod heartbeats; +pub mod radio_location_estimates; pub mod radio_threshold; pub mod reward_shares; pub mod rewarder; diff --git a/mobile_verifier/src/radio_location_estimates.rs b/mobile_verifier/src/radio_location_estimates.rs new file mode 100644 index 000000000..f00bfb335 --- /dev/null +++ b/mobile_verifier/src/radio_location_estimates.rs @@ -0,0 +1,335 @@ +use crate::{heartbeats::HbType, Settings}; +use chrono::{DateTime, Utc}; +use file_store::{ + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::FileSinkClient, + file_source, + file_upload::FileUpload, + radio_location_estimates::{Entity, RadioLocationEstimate, RadioLocationEstimatesReq}, + radio_location_estimates_ingest_report::RadioLocationEstimatesIngestReport, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, + verified_radio_location_estimates::VerifiedRadioLocationEstimatesReport, + FileStore, FileType, +}; +use futures::{StreamExt, TryStreamExt}; +use h3o::CellIndex; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::{ + mobile_config::NetworkKeyRole, + poc_mobile::{ + RadioLocationEstimatesVerificationStatus, VerifiedRadioLocationEstimatesReportV1, + }, +}; +use mobile_config::client::authorization_client::AuthorizationVerifier; +use sha2::{Digest, Sha256}; +use sqlx::{Pool, Postgres, Transaction}; +use task_manager::{ManagedTask, TaskManager}; +use tokio::sync::mpsc::Receiver; + +pub struct RadioLocationEstimatesDaemon { + pool: Pool, + authorization_verifier: AV, + reports_receiver: Receiver>, + verified_report_sink: FileSinkClient, +} + +impl RadioLocationEstimatesDaemon +where + AV: AuthorizationVerifier + Send + Sync + 'static, +{ + pub fn new( + pool: Pool, + authorization_verifier: AV, + reports_receiver: Receiver>, + verified_report_sink: FileSinkClient, + ) -> Self { + Self { + pool, + authorization_verifier, + reports_receiver, + verified_report_sink, + } + } + + pub async fn create_managed_task( + pool: Pool, + settings: &Settings, + file_upload: FileUpload, + file_store: FileStore, + authorization_verifier: AV, + ) -> anyhow::Result { + let (reports_receiver, reports_receiver_server) = + file_source::continuous_source::() + .state(pool.clone()) + .store(file_store) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) + .prefix(FileType::RadioLocationEstimatesIngestReport.to_string()) + .create() + .await?; + + let (verified_report_sink, verified_report_sink_server) = + VerifiedRadioLocationEstimatesReportV1::file_sink( + settings.store_base_path(), + file_upload.clone(), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, + env!("CARGO_PKG_NAME"), + ) + .await?; + + let task = Self::new( + pool, + authorization_verifier, + reports_receiver, + verified_report_sink, + ); + + Ok(TaskManager::builder() + .add_task(reports_receiver_server) + .add_task(verified_report_sink_server) + .add_task(task) + .build()) + } + + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("Starting sme deamon"); + loop { + tokio::select! { + biased; + _ = shutdown.clone() => { + tracing::info!("sme deamon shutting down"); + break; + } + Some(file) = self.reports_receiver.recv() => { + self.process_file(file).await?; + } + } + } + Ok(()) + } + + async fn process_file( + &self, + file_info_stream: FileInfoStream, + ) -> anyhow::Result<()> { + tracing::info!( + "Processing Radio Location Estimates file {}", + file_info_stream.file_info.key + ); + + let mut transaction = self.pool.begin().await?; + + file_info_stream + .into_stream(&mut transaction) + .await? + .map(anyhow::Ok) + .try_fold( + transaction, + |mut transaction, report: RadioLocationEstimatesIngestReport| async move { + let verified_report_status = self.verify_report(&report.report).await; + + if verified_report_status == RadioLocationEstimatesVerificationStatus::Valid { + save_to_db(&report, &mut transaction).await?; + + // Once they are saved to DB should we directly write to ban table? + // maybe_ban_radios(&report, &mut transaction).await?; + } + + let verified_report_proto: VerifiedRadioLocationEstimatesReportV1 = + VerifiedRadioLocationEstimatesReport { + report, + status: verified_report_status, + timestamp: Utc::now(), + } + .into(); + + self.verified_report_sink + .write( + verified_report_proto, + &[("report_status", verified_report_status.as_str_name())], + ) + .await?; + + Ok(transaction) + }, + ) + .await? + .commit() + .await?; + + self.verified_report_sink.commit().await?; + + Ok(()) + } + + async fn verify_report( + &self, + req: &RadioLocationEstimatesReq, + ) -> RadioLocationEstimatesVerificationStatus { + if !self.verify_known_carrier_key(&req.carrier_key).await { + return RadioLocationEstimatesVerificationStatus::InvalidKey; + } + + RadioLocationEstimatesVerificationStatus::Valid + } + + async fn verify_known_carrier_key(&self, public_key: &PublicKeyBinary) -> bool { + self.authorization_verifier + .verify_authorized_key(public_key, NetworkKeyRole::MobileCarrier) + .await + .unwrap_or_default() + } +} + +impl ManagedTask for RadioLocationEstimatesDaemon +where + AV: AuthorizationVerifier + Send + Sync + 'static, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } +} + +async fn save_to_db( + report: &RadioLocationEstimatesIngestReport, + exec: &mut Transaction<'_, Postgres>, +) -> Result<(), sqlx::Error> { + let estimates = &report.report.estimates; + let entity = &report.report.entity; + let received_timestamp = report.received_timestamp; + for estimate in estimates { + insert_estimate(entity, received_timestamp, estimate, exec).await?; + } + invalidate_old_estimates(entity, received_timestamp, exec).await?; + + Ok(()) +} + +async fn invalidate_old_estimates( + entity: &Entity, + timestamp: DateTime, + exec: &mut Transaction<'_, Postgres>, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE radio_location_estimates + SET invalidated_at = now() + WHERE radio_key = $1 + AND received_timestamp < $2; + "#, + ) + .bind(entity.to_string()) + .bind(timestamp) + .execute(exec) + .await?; + + Ok(()) +} + +async fn insert_estimate( + entity: &Entity, + received_timestamp: DateTime, + estimate: &RadioLocationEstimate, + exec: &mut Transaction<'_, Postgres>, +) -> Result<(), sqlx::Error> { + let hex = estimate.hex; + let grid_distance = estimate.grid_distance; + + let hashed_key = hash_key(entity, received_timestamp, hex, grid_distance); + + sqlx::query( + r#" + INSERT INTO radio_location_estimates + (hashed_key, radio_type, radio_key, received_timestamp, hex, grid_distance, confidence) + VALUES + ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (hashed_key) + DO NOTHING + "#, + ) + .bind(hashed_key) + .bind(entity_to_radio_type(entity)) + .bind(entity.to_string()) + .bind(received_timestamp) + .bind(u64::from(hex) as i64) + .bind(grid_distance as i32) + .bind(estimate.confidence) + .execute(exec) + .await?; + + Ok(()) +} + +pub fn hash_key( + entity: &Entity, + timestamp: DateTime, + hex: CellIndex, + grid_distance: u32, +) -> String { + let key = format!("{entity}{timestamp}{hex}{grid_distance}"); + + let mut hasher = Sha256::new(); + hasher.update(key); + let hashed_key = hasher.finalize(); + hex::encode(hashed_key) +} + +pub async fn clear_invalided( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + timestamp: &DateTime, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + DELETE FROM radio_location_estimates + WHERE invalidated_at IS NOT NULL + AND invalidated_at < $1 + "#, + ) + .bind(timestamp) + .execute(&mut *tx) + .await?; + Ok(()) +} + +// async fn get_valid_estimates( +// pool: &PgPool, +// radio_key: &Entity, +// threshold: Decimal, +// ) -> anyhow::Result> { +// let rows = sqlx::query( +// r#" +// SELECT hex, grid_distance +// FROM radio_location_estimates +// WHERE radio_key = $1 +// AND confidence >= $2 +// AND invalidated_at IS NULL +// ORDER BY radius DESC, confidence DESC +// "#, +// ) +// .bind(radio_key.to_string()) +// .bind(threshold) +// .fetch_all(pool) +// .await?; + +// let results = rows +// .into_iter() +// .map(|row| { +// let hex = CellIndex::from_str(row.get("hex")).unwrap(); +// let grid_distance = row.get::("grid_distance") as u32; + +// (hex, grid_distance) +// }) +// .collect(); + +// Ok(results) +// } + +fn entity_to_radio_type(entity: &Entity) -> HbType { + match entity { + Entity::CbrsId(_) => HbType::Cbrs, + Entity::WifiPubKey(_) => HbType::Wifi, + } +} diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index a433594e7..f6580c55e 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -1,8 +1,9 @@ +use self::boosted_hex_eligibility::BoostedHexEligibility; use crate::{ boosting_oracles::db::check_for_unprocessed_data_sets, coverage, data_session, - heartbeats::{self, HeartbeatReward}, - radio_threshold, + heartbeats::{self, location_cache::LocationCache, HeartbeatReward}, + radio_location_estimates, radio_threshold, reward_shares::{ self, CalculatedPocRewardShares, CoverageShares, DataTransferAndPocAllocatedRewardBuckets, MapperShares, TransferRewards, @@ -21,7 +22,6 @@ use file_store::{ traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, }; use futures_util::TryFutureExt; - use helium_proto::{ reward_manifest::RewardData::MobileRewardData, services::poc_mobile::{ @@ -47,8 +47,6 @@ use std::{ops::Range, time::Duration}; use task_manager::{ManagedTask, TaskManager}; use tokio::time::sleep; -use self::boosted_hex_eligibility::BoostedHexEligibility; - pub mod boosted_hex_eligibility; const REWARDS_NOT_CURRENT_DELAY_PERIOD: i64 = 5; @@ -63,6 +61,8 @@ pub struct Rewarder { reward_manifests: FileSinkClient, price_tracker: PriceTracker, speedtest_averages: FileSinkClient, + #[allow(dead_code)] + location_cache: LocationCache, } impl Rewarder @@ -77,6 +77,7 @@ where carrier_service_verifier: A, hex_boosting_info_resolver: B, speedtests_avg: FileSinkClient, + location_cache: LocationCache, ) -> anyhow::Result { let (price_tracker, price_daemon) = PriceTracker::new_tm(&settings.price_tracker).await?; @@ -108,6 +109,7 @@ where reward_manifests, price_tracker, speedtests_avg, + location_cache, ); Ok(TaskManager::builder() @@ -129,6 +131,7 @@ where reward_manifests: FileSinkClient, price_tracker: PriceTracker, speedtest_averages: FileSinkClient, + location_cache: LocationCache, ) -> Self { Self { pool, @@ -140,6 +143,7 @@ where reward_manifests, price_tracker, speedtest_averages, + location_cache, } } @@ -265,6 +269,7 @@ where &self.hex_service_client, &self.mobile_rewards, &self.speedtest_averages, + &self.location_cache, reward_period, mobile_bone_price, ) @@ -301,6 +306,7 @@ where coverage::clear_coverage_objects(&mut transaction, &reward_period.start).await?; sp_boosted_rewards_bans::clear_bans(&mut transaction, reward_period.start).await?; subscriber_verified_mapping_event::clear(&mut transaction, &reward_period.start).await?; + radio_location_estimates::clear_invalided(&mut transaction, &reward_period.start).await?; // subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?; let next_reward_period = scheduler.next_reward_period(); @@ -360,6 +366,7 @@ pub async fn reward_poc_and_dc( hex_service_client: &impl HexBoostingInfoResolver, mobile_rewards: &FileSinkClient, speedtest_avg_sink: &FileSinkClient, + location_cache: &LocationCache, reward_period: &Range>, mobile_bone_price: Decimal, ) -> anyhow::Result { @@ -397,6 +404,7 @@ pub async fn reward_poc_and_dc( speedtest_avg_sink, reward_period, reward_shares, + location_cache, ) .await?; @@ -423,6 +431,7 @@ async fn reward_poc( speedtest_avg_sink: &FileSinkClient, reward_period: &Range>, reward_shares: DataTransferAndPocAllocatedRewardBuckets, + _location_cache: &LocationCache, ) -> anyhow::Result<(Decimal, CalculatedPocRewardShares)> { let heartbeats = HeartbeatReward::validated(pool, reward_period); let speedtest_averages = diff --git a/mobile_verifier/tests/integrations/boosting_oracles.rs b/mobile_verifier/tests/integrations/boosting_oracles.rs index 55f1e58eb..329018dea 100644 --- a/mobile_verifier/tests/integrations/boosting_oracles.rs +++ b/mobile_verifier/tests/integrations/boosting_oracles.rs @@ -17,7 +17,7 @@ use mobile_config::boosted_hex_info::BoostedHexes; use mobile_verifier::{ coverage::{CoverageClaimTimeCache, CoverageObject, CoverageObjectCache}, geofence::GeofenceValidator, - heartbeats::{last_location::LocationCache, Heartbeat, HeartbeatReward, ValidatedHeartbeat}, + heartbeats::{location_cache::LocationCache, Heartbeat, HeartbeatReward, ValidatedHeartbeat}, reward_shares::CoverageShares, rewarder::boosted_hex_eligibility::BoostedHexEligibility, seniority::{Seniority, SeniorityUpdate}, @@ -338,7 +338,7 @@ async fn test_footfall_and_urbanization_and_landtype(pool: PgPool) -> anyhow::Re let coverage_objects = CoverageObjectCache::new(&pool); let coverage_claim_time_cache = CoverageClaimTimeCache::new(); - let location_cache = LocationCache::new(&pool); + let location_cache = LocationCache::new(&pool).await?; let epoch = start..end; let mut heartbeats = pin!(ValidatedHeartbeat::validate_heartbeats( diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index ae11f4363..3981969f8 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -18,7 +18,7 @@ use mobile_config::boosted_hex_info::BoostedHexInfo; use mobile_verifier::{ cell_type::CellType, coverage::CoverageObject, - heartbeats::{HbType, Heartbeat, ValidatedHeartbeat}, + heartbeats::{location_cache::LocationCache, HbType, Heartbeat, ValidatedHeartbeat}, radio_threshold, reward_shares, rewarder, speedtests, }; use rust_decimal::prelude::*; @@ -137,6 +137,8 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { .to_u64() .unwrap(); + let location_cache = LocationCache::new(&pool).await?; + let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -144,6 +146,7 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -320,7 +323,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu ]; let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - + let location_cache = LocationCache::new(&pool).await?; let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -328,6 +331,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -483,6 +487,9 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res .unwrap(); let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); + + let location_cache = LocationCache::new(&pool).await?; + let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -490,6 +497,7 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -657,7 +665,7 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { ]; let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - + let location_cache = LocationCache::new(&pool).await?; let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -665,6 +673,7 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -790,6 +799,7 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: .to_u64() .unwrap(); + let location_cache = LocationCache::new(&pool).await?; let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -797,6 +807,7 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -969,7 +980,7 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( let total_poc_emissions = reward_shares::get_scheduled_tokens_for_poc(epoch_duration) .to_u64() .unwrap(); - + let location_cache = LocationCache::new(&pool).await?; let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -977,6 +988,7 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -1175,6 +1187,7 @@ async fn test_poc_with_cbrs_and_multi_coverage_boosted_hexes(pool: PgPool) -> an .to_u64() .unwrap(); + let location_cache = LocationCache::new(&pool).await?; let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -1182,6 +1195,7 @@ async fn test_poc_with_cbrs_and_multi_coverage_boosted_hexes(pool: PgPool) -> an &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), diff --git a/mobile_verifier/tests/integrations/last_location.rs b/mobile_verifier/tests/integrations/last_location.rs index c04d0678d..dad91c936 100644 --- a/mobile_verifier/tests/integrations/last_location.rs +++ b/mobile_verifier/tests/integrations/last_location.rs @@ -8,7 +8,10 @@ use helium_proto::services::poc_mobile::{self as proto, LocationSource}; use mobile_verifier::{ coverage::{CoverageObject, CoverageObjectCache}, geofence::GeofenceValidator, - heartbeats::{last_location::LocationCache, HbType, Heartbeat, ValidatedHeartbeat}, + heartbeats::{ + location_cache::{LocationCache, LocationCacheKey}, + HbType, Heartbeat, ValidatedHeartbeat, + }, }; use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; @@ -36,7 +39,7 @@ async fn heartbeat_uses_last_good_location_when_invalid_location( let epoch_end = epoch_start + Duration::days(2); let coverage_objects = CoverageObjectCache::new(&pool); - let location_cache = LocationCache::new(&pool); + let location_cache = LocationCache::new(&pool).await?; let mut transaction = pool.begin().await?; let coverage_object = coverage_object(&hotspot, &mut transaction).await?; @@ -98,7 +101,7 @@ async fn heartbeat_will_use_last_good_location_from_db(pool: PgPool) -> anyhow:: let epoch_end = epoch_start + Duration::days(2); let coverage_objects = CoverageObjectCache::new(&pool); - let location_cache = LocationCache::new(&pool); + let location_cache = LocationCache::new(&pool).await?; let mut transaction = pool.begin().await?; let coverage_object = coverage_object(&hotspot, &mut transaction).await?; @@ -122,7 +125,10 @@ async fn heartbeat_will_use_last_good_location_from_db(pool: PgPool) -> anyhow:: dec!(1.0) ); - location_cache.delete_last_location(&hotspot).await; + location_cache + .remove(LocationCacheKey::WifiPubKey(hotspot.clone())) + .await?; + transaction = pool.begin().await?; validated_heartbeat_1.clone().save(&mut transaction).await?; transaction.commit().await?; @@ -167,7 +173,7 @@ async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours( let epoch_end = epoch_start + Duration::days(2); let coverage_objects = CoverageObjectCache::new(&pool); - let location_cache = LocationCache::new(&pool); + let location_cache = LocationCache::new(&pool).await?; let mut transaction = pool.begin().await?; let coverage_object = coverage_object(&hotspot, &mut transaction).await?; @@ -175,8 +181,7 @@ async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours( let validated_heartbeat_1 = ValidatedHeartbeat::validate( heartbeat(&hotspot, &coverage_object) - .location_validation_timestamp(Utc::now()) - .timestamp(Utc::now() - Duration::hours(12) - Duration::seconds(1)) + .location_validation_timestamp(Utc::now() - Duration::hours(12) - Duration::seconds(1)) .build(), &GatewayClientAllOwnersValid, &coverage_objects, @@ -242,11 +247,6 @@ impl HeartbeatBuilder { self } - fn timestamp(mut self, ts: DateTime) -> Self { - self.timestamp = Some(ts); - self - } - fn build(self) -> Heartbeat { let (lat, lon) = self.latlng.unwrap_or_else(|| { let lat_lng: LatLng = self diff --git a/mobile_verifier/tests/integrations/main.rs b/mobile_verifier/tests/integrations/main.rs index 0980b0065..b346016f6 100644 --- a/mobile_verifier/tests/integrations/main.rs +++ b/mobile_verifier/tests/integrations/main.rs @@ -5,6 +5,7 @@ mod heartbeats; mod hex_boosting; mod last_location; mod modeled_coverage; +mod radio_location_estimates; mod rewarder_mappers; mod rewarder_oracles; mod rewarder_poc_dc; diff --git a/mobile_verifier/tests/integrations/modeled_coverage.rs b/mobile_verifier/tests/integrations/modeled_coverage.rs index a4e7d8224..b22a2449a 100644 --- a/mobile_verifier/tests/integrations/modeled_coverage.rs +++ b/mobile_verifier/tests/integrations/modeled_coverage.rs @@ -19,7 +19,7 @@ use mobile_verifier::{ coverage::{CoverageClaimTimeCache, CoverageObject, CoverageObjectCache}, geofence::GeofenceValidator, heartbeats::{ - last_location::LocationCache, Heartbeat, HeartbeatReward, KeyType, ValidatedHeartbeat, + location_cache::LocationCache, Heartbeat, HeartbeatReward, KeyType, ValidatedHeartbeat, }, reward_shares::CoverageShares, rewarder::boosted_hex_eligibility::BoostedHexEligibility, @@ -378,7 +378,7 @@ async fn process_input( ) -> anyhow::Result<()> { let coverage_objects = CoverageObjectCache::new(pool); let coverage_claim_time_cache = CoverageClaimTimeCache::new(); - let location_cache = LocationCache::new(pool); + let location_cache = LocationCache::new(pool).await?; let mut transaction = pool.begin().await?; let mut coverage_objs = pin!(CoverageObject::validate_coverage_objects( @@ -1376,7 +1376,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow let max_covered_distance = 5_000; let coverage_object_cache = CoverageObjectCache::new(&pool); - let location_cache = LocationCache::new(&pool); + let location_cache = LocationCache::new(&pool).await?; let mk_heartbeat = |latlng: LatLng| WifiHeartbeatIngestReport { report: WifiHeartbeat { diff --git a/mobile_verifier/tests/integrations/radio_location_estimates.rs b/mobile_verifier/tests/integrations/radio_location_estimates.rs new file mode 100644 index 000000000..615ddf996 --- /dev/null +++ b/mobile_verifier/tests/integrations/radio_location_estimates.rs @@ -0,0 +1,241 @@ +use crate::common::MockAuthorizationClient; +use chrono::{DateTime, Duration, Utc}; +use file_store::{ + file_info_poller::FileInfoStream, + file_sink::FileSinkClient, + radio_location_estimates::{ + Entity, RadioLocationCorrelation, RadioLocationEstimate, RadioLocationEstimatesReq, + }, + radio_location_estimates_ingest_report::RadioLocationEstimatesIngestReport, + FileInfo, +}; +use h3o::{CellIndex, LatLng}; +use helium_crypto::{KeyTag, Keypair, PublicKeyBinary}; +use mobile_verifier::radio_location_estimates::{ + clear_invalided, hash_key, RadioLocationEstimatesDaemon, +}; +use rand::rngs::OsRng; +use rust_decimal::prelude::FromPrimitive; +use sqlx::{PgPool, Pool, Postgres, Row}; + +#[sqlx::test] +async fn verifier_test(pool: PgPool) -> anyhow::Result<()> { + let task_pool = pool.clone(); + let (reports_tx, reports_rx) = tokio::sync::mpsc::channel(10); + let (sink_tx, _sink_rx) = tokio::sync::mpsc::channel(10); + let (trigger, listener) = triggered::trigger(); + + tokio::spawn(async move { + let deamon = RadioLocationEstimatesDaemon::new( + task_pool, + MockAuthorizationClient::new(), + reports_rx, + FileSinkClient::new(sink_tx, "metric"), + ); + + deamon.run(listener).await.expect("failed to complete task"); + }); + + // Sending reports as if they are coming from ingestor + let (fis, reports) = file_info_stream(); + reports_tx.send(fis).await?; + + let mut retry = 0; + const MAX_RETRIES: u32 = 3; + const RETRY_WAIT: std::time::Duration = std::time::Duration::from_secs(1); + + let mut expected_n = 0; + for report in &reports { + expected_n += report.report.estimates.len(); + } + + while retry <= MAX_RETRIES { + let saved_estimates = select_radio_location_estimates(&pool).await?; + + // Check that we have expected (2) number of estimates saved in DB + // 1 should be invalidated and other should be valid + // We know the order (invalid first becase we order by in select_radio_location_estimates) + if expected_n == saved_estimates.len() { + compare_report_and_estimate(&reports[0], &saved_estimates[0], false); + compare_report_and_estimate(&reports[1], &saved_estimates[1], true); + break; + } else { + retry += 1; + tokio::time::sleep(RETRY_WAIT).await; + } + } + + assert!( + retry <= MAX_RETRIES, + "Exceeded maximum retries: {}", + MAX_RETRIES + ); + + // Now clear invalidated estimates there should be only 1 left in DB + let mut tx = pool.begin().await?; + clear_invalided(&mut tx, &Utc::now()).await?; + tx.commit().await?; + + let leftover_estimates = select_radio_location_estimates(&pool).await?; + assert_eq!(1, leftover_estimates.len()); + // Check that we have the right estimate left over + compare_report_and_estimate(&reports[1], &leftover_estimates[0], true); + + trigger.trigger(); + + Ok(()) +} + +fn file_info_stream() -> ( + FileInfoStream, + Vec, +) { + let file_info = FileInfo { + key: "test_file_info".to_string(), + prefix: "verified_mapping_event".to_string(), + timestamp: Utc::now(), + size: 0, + }; + + let carrier_key_pair = generate_keypair(); + let carrier_public_key_binary: PublicKeyBinary = + carrier_key_pair.public_key().to_owned().into(); + + let hotspot_key_pair = generate_keypair(); + let hotspot_public_key_binary: PublicKeyBinary = + hotspot_key_pair.public_key().to_owned().into(); + + let entity = Entity::WifiPubKey(hotspot_public_key_binary); + + let reports = vec![ + RadioLocationEstimatesIngestReport { + received_timestamp: Utc::now() - Duration::hours(1), + report: RadioLocationEstimatesReq { + entity: entity.clone(), + estimates: vec![RadioLocationEstimate { + hex: LatLng::new(0.1, 0.1) + .unwrap() + .to_cell(h3o::Resolution::Twelve), + grid_distance: 2, + confidence: rust_decimal::Decimal::from_f32(0.1).unwrap(), + radio_location_correlations: vec![RadioLocationCorrelation { + id: "event_1".to_string(), + timestamp: Utc::now() - Duration::hours(1), + }], + }], + timestamp: Utc::now() - Duration::hours(1), + carrier_key: carrier_public_key_binary.clone(), + }, + }, + RadioLocationEstimatesIngestReport { + received_timestamp: Utc::now(), + report: RadioLocationEstimatesReq { + entity: entity.clone(), + estimates: vec![RadioLocationEstimate { + hex: LatLng::new(0.2, 0.2) + .unwrap() + .to_cell(h3o::Resolution::Twelve), + grid_distance: 2, + confidence: rust_decimal::Decimal::from_f32(0.2).unwrap(), + radio_location_correlations: vec![RadioLocationCorrelation { + id: "event_1".to_string(), + timestamp: Utc::now(), + }], + }], + timestamp: Utc::now(), + carrier_key: carrier_public_key_binary.clone(), + }, + }, + ]; + ( + FileInfoStream::new("default".to_string(), file_info, reports.clone()), + reports, + ) +} + +fn generate_keypair() -> Keypair { + Keypair::generate(KeyTag::default(), &mut OsRng) +} + +fn timestamp_match(dt1: DateTime, dt2: DateTime) -> bool { + let difference = dt1.signed_duration_since(dt2); + difference.num_seconds().abs() < 1 +} + +fn compare_report_and_estimate( + report: &RadioLocationEstimatesIngestReport, + estimate: &RadioLocationEstimateDB, + should_be_valid: bool, +) { + assert_eq!( + hash_key( + &report.report.entity, + report.received_timestamp, + report.report.estimates[0].hex, + report.report.estimates[0].grid_distance, + ), + estimate.hashed_key + ); + + assert_eq!(report.report.entity.to_string(), estimate.radio_key); + assert!(timestamp_match( + report.received_timestamp, + estimate.received_timestamp + )); + assert_eq!(report.report.estimates[0].hex, estimate.hex); + assert_eq!( + report.report.estimates[0].grid_distance, + estimate.grid_distance + ); + assert_eq!(report.report.estimates[0].confidence, estimate.confidence); + + if should_be_valid { + assert!(estimate.invalidated_at.is_none()); + } else { + assert!(estimate.invalidated_at.is_some()); + } +} + +#[derive(Debug)] +pub struct RadioLocationEstimateDB { + pub hashed_key: String, + pub radio_key: String, + pub received_timestamp: DateTime, + pub hex: CellIndex, + pub grid_distance: u32, + pub confidence: rust_decimal::Decimal, + pub invalidated_at: Option>, +} + +pub async fn select_radio_location_estimates( + pool: &Pool, +) -> anyhow::Result> { + let rows = sqlx::query( + r#" + SELECT hashed_key, radio_key, hashed_key, received_timestamp, hex, grid_distance, confidence, invalidated_at + FROM radio_location_estimates + ORDER BY received_timestamp ASC + "#, + ) + .fetch_all(pool) + .await?; + + let estimates = rows + .into_iter() + .map(|row| { + let hex = row.get::("hex") as u64; + let hex = CellIndex::try_from(hex).expect("valid Cell Index"); + RadioLocationEstimateDB { + hashed_key: row.get("hashed_key"), + radio_key: row.get("radio_key"), + received_timestamp: row.get("received_timestamp"), + hex, + grid_distance: row.get::("grid_distance") as u32, + confidence: row.get("confidence"), + invalidated_at: row.try_get("invalidated_at").ok(), + } + }) + .collect(); + + Ok(estimates) +} diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index e26af88b8..c889cbd60 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -13,7 +13,7 @@ use mobile_verifier::{ cell_type::CellType, coverage::CoverageObject, data_session, - heartbeats::{HbType, Heartbeat, ValidatedHeartbeat}, + heartbeats::{location_cache::LocationCache, HbType, Heartbeat, ValidatedHeartbeat}, reward_shares, rewarder, speedtests, }; use rust_decimal::prelude::*; @@ -44,7 +44,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { let boosted_hexes = vec![]; let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - + let location_cache = LocationCache::new(&pool).await?; let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -52,6 +52,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ),