diff --git a/Cargo.lock b/Cargo.lock index c052ef265..65935c746 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#e4b935efc2d6743d0506198d2208c49540762235" +source = "git+https://github.com/helium/proto?branch=master#1cda75ccc8d8cf8a069ca95cbba6d43593e39869" dependencies = [ "base64 0.21.0", "byteorder", @@ -3047,7 +3047,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#e4b935efc2d6743d0506198d2208c49540762235" +source = "git+https://github.com/helium/proto?branch=master#1cda75ccc8d8cf8a069ca95cbba6d43593e39869" dependencies = [ "bytes", "prost", @@ -4172,11 +4172,13 @@ dependencies = [ "config", "db-store", "file-store", + "flate2", "futures", "futures-util", "h3o", "helium-crypto", "helium-proto", + "hextree", "http-serde", "humantime", "lazy_static", diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index ef6668c4a..be11bd304 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -13,6 +13,7 @@ thiserror = {workspace = true} serde = {workspace = true} serde_json = {workspace = true} h3o = {workspace = true, features = ["geo"]} +hextree = "0" http-serde = {workspace = true} clap = {workspace = true} sqlx = {workspace = true} @@ -24,6 +25,7 @@ sha2 = {workspace = true} lazy_static = {workspace = true} chrono = {workspace = true} triggered = {workspace = true} +flate2 = "1" futures = {workspace = true} futures-util = {workspace = true} prost = {workspace = true} diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 1e15b5c88..f78e93124 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -1,5 +1,5 @@ use crate::{ - coverage::CoverageDaemon, data_session::DataSessionIngestor, + coverage::CoverageDaemon, data_session::DataSessionIngestor, geofence::Geofence, heartbeats::cbrs::HeartbeatDaemon as CellHeartbeatDaemon, heartbeats::wifi::HeartbeatDaemon as WifiHeartbeatDaemon, rewarder::Rewarder, speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, telemetry, @@ -93,6 +93,8 @@ impl Cmd { .create() .await?; + let geofence = Geofence::from_settings(settings)?; + let cbrs_heartbeat_daemon = CellHeartbeatDaemon::new( pool.clone(), gateway_client.clone(), @@ -102,6 +104,7 @@ impl Cmd { settings.max_distance_from_coverage, valid_heartbeats.clone(), seniority_updates.clone(), + geofence.clone(), ); let wifi_heartbeat_daemon = WifiHeartbeatDaemon::new( @@ -113,6 +116,7 @@ impl Cmd { settings.max_distance_from_coverage, valid_heartbeats, seniority_updates, + geofence, ); // Speedtests diff --git a/mobile_verifier/src/geofence.rs b/mobile_verifier/src/geofence.rs new file mode 100644 index 000000000..4e01f6f61 --- /dev/null +++ b/mobile_verifier/src/geofence.rs @@ -0,0 +1,70 @@ +use base64::{engine::general_purpose, Engine as _}; +use h3o::{LatLng, Resolution}; +use hextree::{Cell, HexTreeSet}; +use std::{fs, io::Read, path, sync::Arc}; + +use crate::{heartbeats::Heartbeat, Settings}; + +pub trait GeofenceValidator: Clone + Send + Sync + 'static { + fn in_valid_region(&self, heartbeat: &Heartbeat) -> bool; +} + +#[derive(Clone)] +pub struct Geofence { + regions: Arc, + resolution: Resolution, +} + +impl Geofence { + pub fn from_settings(settings: &Settings) -> anyhow::Result { + let paths = settings.region_paths()?; + tracing::info!(?paths, "geofence_regions"); + Ok(Self { + regions: Arc::new(valid_mapping_regions(paths)?), + resolution: settings.fencing_resolution()?, + }) + } +} + +impl GeofenceValidator for Geofence { + fn in_valid_region(&self, heartbeat: &Heartbeat) -> bool { + let Ok(lat_lon) = LatLng::new(heartbeat.lat, heartbeat.lon) else { + return false; + }; + let Ok(cell) = Cell::try_from(u64::from(lat_lon.to_cell(self.resolution))) else { + return false; + }; + self.regions.contains(cell) + } +} + +pub fn valid_mapping_regions(encoded_files: Vec) -> anyhow::Result { + let mut combined_regions: Vec = Vec::new(); + for file in encoded_files { + let indexes = from_base64_file(file)?; + combined_regions.extend(indexes); + } + let region_set: HexTreeSet = combined_regions.iter().collect(); + Ok(region_set) +} + +fn from_base64_file>(file: P) -> anyhow::Result> { + let mut file = fs::File::open(file.as_ref())?; + let mut encoded_string = String::new(); + file.read_to_string(&mut encoded_string)?; + + let compressed_bytes = general_purpose::STANDARD.decode(&encoded_string)?; + let mut decoder = flate2::read::GzDecoder::new(&compressed_bytes[..]); + + let mut uncompressed_bytes = Vec::new(); + decoder.read_to_end(&mut uncompressed_bytes)?; + + let mut indexes: Vec = Vec::new(); + let mut h3_idx_buf = [0_u8; 8]; + for chunk in uncompressed_bytes.chunks(8) { + h3_idx_buf.as_mut_slice().copy_from_slice(chunk); + let index = u64::from_le_bytes(h3_idx_buf); + indexes.push(Cell::try_from(index)?); + } + Ok(indexes) +} diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index b067182bd..0a9795c7c 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -1,6 +1,7 @@ use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat}; use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, + geofence::GeofenceValidator, GatewayResolver, }; @@ -18,7 +19,7 @@ use std::{ use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; -pub struct HeartbeatDaemon { +pub struct HeartbeatDaemon { pool: sqlx::Pool, gateway_info_resolver: GIR, heartbeats: Receiver>, @@ -27,11 +28,13 @@ pub struct HeartbeatDaemon { max_distance_to_coverage: u32, heartbeat_sink: FileSinkClient, seniority_sink: FileSinkClient, + geofence: GFV, } -impl HeartbeatDaemon +impl HeartbeatDaemon where GIR: GatewayResolver, + GFV: GeofenceValidator, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -43,6 +46,7 @@ where max_distance_to_coverage: u32, heartbeat_sink: FileSinkClient, seniority_sink: FileSinkClient, + geofence: GFV, ) -> Self { Self { pool, @@ -53,6 +57,7 @@ where max_distance_to_coverage, heartbeat_sink, seniority_sink, + geofence, } } @@ -123,6 +128,7 @@ where self.max_distance_to_asserted, self.max_distance_to_coverage, &epoch, + &self.geofence, ), heartbeat_cache, coverage_claim_time_cache, @@ -139,9 +145,10 @@ where } } -impl ManagedTask for HeartbeatDaemon +impl ManagedTask for HeartbeatDaemon where GIR: GatewayResolver, + GFV: GeofenceValidator, { fn start_task( self: Box, diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 935708c58..27cd3072d 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -4,6 +4,7 @@ pub mod wifi; use crate::{ cell_type::{CellType, CellTypeLabel}, coverage::{CoverageClaimTimeCache, CoverageObjectCache, CoverageObjectMeta, Seniority}, + geofence::GeofenceValidator, GatewayResolution, GatewayResolver, }; use anyhow::anyhow; @@ -353,6 +354,7 @@ impl ValidatedHeartbeat { max_distance_to_asserted: u32, max_distance_to_coverage: u32, epoch: &Range>, + geofence: &impl GeofenceValidator, ) -> anyhow::Result { let Some(coverage_object) = heartbeat.coverage_object else { return Ok(Self::new( @@ -447,6 +449,17 @@ impl ValidatedHeartbeat { )); }; + if !geofence.in_valid_region(&heartbeat) { + return Ok(Self::new( + heartbeat, + cell_type, + dec!(0), + None, + Some(coverage_object.meta), + proto::HeartbeatValidity::UnsupportedLocation, + )); + } + match gateway_info_resolver .resolve_gateway(&heartbeat.hotspot_key) .await? @@ -509,6 +522,7 @@ impl ValidatedHeartbeat { max_distance_to_asserted: u32, max_distance_to_coverage: u32, epoch: &'a Range>, + geofence: &'a impl GeofenceValidator, ) -> impl Stream> + 'a { heartbeats.then(move |heartbeat| async move { Self::validate( @@ -518,6 +532,7 @@ impl ValidatedHeartbeat { max_distance_to_asserted, max_distance_to_coverage, epoch, + geofence, ) .await }) diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index 11cf176cc..9e87dbeb1 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -1,6 +1,7 @@ use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat}; use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, + geofence::GeofenceValidator, GatewayResolver, }; use chrono::{DateTime, Duration, Utc}; @@ -17,7 +18,7 @@ use std::{ use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; -pub struct HeartbeatDaemon { +pub struct HeartbeatDaemon { pool: sqlx::Pool, gateway_info_resolver: GIR, heartbeats: Receiver>, @@ -26,11 +27,13 @@ pub struct HeartbeatDaemon { max_distance_to_coverage: u32, heartbeat_sink: FileSinkClient, seniority_sink: FileSinkClient, + geofence: GFV, } -impl HeartbeatDaemon +impl HeartbeatDaemon where GIR: GatewayResolver, + GFV: GeofenceValidator, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -42,6 +45,7 @@ where max_distance_to_coverage: u32, heartbeat_sink: FileSinkClient, seniority_sink: FileSinkClient, + geofence: GFV, ) -> Self { Self { pool, @@ -52,6 +56,7 @@ where max_distance_to_coverage, heartbeat_sink, seniority_sink, + geofence, } } @@ -116,6 +121,7 @@ where self.max_distance_to_asserted, self.max_distance_to_coverage, &epoch, + &self.geofence, ), heartbeat_cache, coverage_claim_time_cache, @@ -132,9 +138,10 @@ where } } -impl ManagedTask for HeartbeatDaemon +impl ManagedTask for HeartbeatDaemon where GIR: GatewayResolver, + GFV: GeofenceValidator, { fn start_task( self: Box, diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index ee491cd0f..4af49fc64 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -2,6 +2,7 @@ pub mod cell_type; pub mod cli; pub mod coverage; pub mod data_session; +pub mod geofence; pub mod heartbeats; pub mod reward_shares; pub mod rewarder; diff --git a/mobile_verifier/src/settings.rs b/mobile_verifier/src/settings.rs index 9b005c971..8061c6a43 100644 --- a/mobile_verifier/src/settings.rs +++ b/mobile_verifier/src/settings.rs @@ -35,6 +35,14 @@ pub struct Settings { /// beyond which its location weight will be reduced #[serde(default = "default_max_asserted_distance_deviation")] pub max_asserted_distance_deviation: u32, + // Geofencing settings + pub geofence_regions: String, + #[serde(default = "default_fencing_resolution")] + pub fencing_resolution: u8, +} + +fn default_fencing_resolution() -> u8 { + 7 } pub fn default_max_distance_from_coverage() -> u32 { @@ -96,4 +104,18 @@ impl Settings { .single() .unwrap() } + + pub fn region_paths(&self) -> anyhow::Result> { + let paths = std::fs::read_dir(&self.geofence_regions)?; + Ok(paths + .into_iter() + .collect::, std::io::Error>>()? + .into_iter() + .map(|path| path.path()) + .collect()) + } + + pub fn fencing_resolution(&self) -> anyhow::Result { + Ok(h3o::Resolution::try_from(self.fencing_resolution)?) + } } diff --git a/mobile_verifier/tests/modeled_coverage.rs b/mobile_verifier/tests/modeled_coverage.rs index ff458c1ae..72f191885 100644 --- a/mobile_verifier/tests/modeled_coverage.rs +++ b/mobile_verifier/tests/modeled_coverage.rs @@ -11,6 +11,7 @@ use helium_proto::services::mobile_config::NetworkKeyRole; use helium_proto::services::poc_mobile::{CoverageObjectValidity, SignalLevel}; use mobile_verifier::{ coverage::{CoverageClaimTimeCache, CoverageObject, CoverageObjectCache, Seniority}, + geofence::GeofenceValidator, heartbeats::{Heartbeat, HeartbeatReward, KeyType, SeniorityUpdate, ValidatedHeartbeat}, reward_shares::CoveragePoints, speedtests::Speedtest, @@ -22,6 +23,15 @@ use sqlx::PgPool; use std::{collections::HashMap, ops::Range, pin::pin}; use uuid::Uuid; +#[derive(Clone)] +struct MockGeofence {} + +impl GeofenceValidator for MockGeofence { + fn in_valid_region(&self, _heartbeat: &Heartbeat) -> bool { + true + } +} + #[sqlx::test] #[ignore] async fn test_save_wifi_coverage_object(pool: PgPool) -> anyhow::Result<()> { @@ -388,6 +398,7 @@ async fn process_input( 2000, 2000, epoch, + &MockGeofence {}, )); while let Some(heartbeat) = heartbeats.next().await.transpose()? { let coverage_claim_time = coverage_claim_time_cache @@ -1263,6 +1274,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow 2000, 2000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), + &MockGeofence {}, ) .await .unwrap(); @@ -1276,6 +1288,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow 1000000, 2000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), + &MockGeofence {}, ) .await .unwrap(); @@ -1289,6 +1302,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow 2000, 1000000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), + &MockGeofence {}, ) .await .unwrap(); @@ -1302,6 +1316,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow 1000000, 1000000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), + &MockGeofence {}, ) .await .unwrap();