Skip to content

Commit

Permalink
Update mobile-verifier to invalidate heartbeats outside of configured… (
Browse files Browse the repository at this point in the history
#721)

* Update mobile-verifier to invalidate heartbeats outside of configured regions

* Add log message

* Change back to master proto
  • Loading branch information
bbalser authored Jan 24, 2024
1 parent fd038aa commit f9d2b07
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 9 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mobile_verifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ thiserror = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}
h3o = {workspace = true, features = ["geo"]}
hextree = "0"
http-serde = {workspace = true}
clap = {workspace = true}
sqlx = {workspace = true}
Expand All @@ -24,6 +25,7 @@ sha2 = {workspace = true}
lazy_static = {workspace = true}
chrono = {workspace = true}
triggered = {workspace = true}
flate2 = "1"
futures = {workspace = true}
futures-util = {workspace = true}
prost = {workspace = true}
Expand Down
6 changes: 5 additions & 1 deletion mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
coverage::CoverageDaemon, data_session::DataSessionIngestor,
coverage::CoverageDaemon, data_session::DataSessionIngestor, geofence::Geofence,
heartbeats::cbrs::HeartbeatDaemon as CellHeartbeatDaemon,
heartbeats::wifi::HeartbeatDaemon as WifiHeartbeatDaemon, rewarder::Rewarder,
speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, telemetry,
Expand Down Expand Up @@ -93,6 +93,8 @@ impl Cmd {
.create()
.await?;

let geofence = Geofence::from_settings(settings)?;

let cbrs_heartbeat_daemon = CellHeartbeatDaemon::new(
pool.clone(),
gateway_client.clone(),
Expand All @@ -102,6 +104,7 @@ impl Cmd {
settings.max_distance_from_coverage,
valid_heartbeats.clone(),
seniority_updates.clone(),
geofence.clone(),
);

let wifi_heartbeat_daemon = WifiHeartbeatDaemon::new(
Expand All @@ -113,6 +116,7 @@ impl Cmd {
settings.max_distance_from_coverage,
valid_heartbeats,
seniority_updates,
geofence,
);

// Speedtests
Expand Down
70 changes: 70 additions & 0 deletions mobile_verifier/src/geofence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use base64::{engine::general_purpose, Engine as _};
use h3o::{LatLng, Resolution};
use hextree::{Cell, HexTreeSet};
use std::{fs, io::Read, path, sync::Arc};

use crate::{heartbeats::Heartbeat, Settings};

pub trait GeofenceValidator: Clone + Send + Sync + 'static {
fn in_valid_region(&self, heartbeat: &Heartbeat) -> bool;
}

#[derive(Clone)]
pub struct Geofence {
regions: Arc<HexTreeSet>,
resolution: Resolution,
}

impl Geofence {
pub fn from_settings(settings: &Settings) -> anyhow::Result<Self> {
let paths = settings.region_paths()?;
tracing::info!(?paths, "geofence_regions");
Ok(Self {
regions: Arc::new(valid_mapping_regions(paths)?),
resolution: settings.fencing_resolution()?,
})
}
}

impl GeofenceValidator for Geofence {
fn in_valid_region(&self, heartbeat: &Heartbeat) -> bool {
let Ok(lat_lon) = LatLng::new(heartbeat.lat, heartbeat.lon) else {
return false;
};
let Ok(cell) = Cell::try_from(u64::from(lat_lon.to_cell(self.resolution))) else {
return false;
};
self.regions.contains(cell)
}
}

pub fn valid_mapping_regions(encoded_files: Vec<std::path::PathBuf>) -> anyhow::Result<HexTreeSet> {
let mut combined_regions: Vec<Cell> = Vec::new();
for file in encoded_files {
let indexes = from_base64_file(file)?;
combined_regions.extend(indexes);
}
let region_set: HexTreeSet = combined_regions.iter().collect();
Ok(region_set)
}

fn from_base64_file<P: AsRef<path::Path>>(file: P) -> anyhow::Result<Vec<Cell>> {
let mut file = fs::File::open(file.as_ref())?;
let mut encoded_string = String::new();
file.read_to_string(&mut encoded_string)?;

let compressed_bytes = general_purpose::STANDARD.decode(&encoded_string)?;
let mut decoder = flate2::read::GzDecoder::new(&compressed_bytes[..]);

let mut uncompressed_bytes = Vec::new();
decoder.read_to_end(&mut uncompressed_bytes)?;

let mut indexes: Vec<Cell> = Vec::new();
let mut h3_idx_buf = [0_u8; 8];
for chunk in uncompressed_bytes.chunks(8) {
h3_idx_buf.as_mut_slice().copy_from_slice(chunk);
let index = u64::from_le_bytes(h3_idx_buf);
indexes.push(Cell::try_from(index)?);
}
Ok(indexes)
}
13 changes: 10 additions & 3 deletions mobile_verifier/src/heartbeats/cbrs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat};
use crate::{
coverage::{CoverageClaimTimeCache, CoverageObjectCache},
geofence::GeofenceValidator,
GatewayResolver,
};

Expand All @@ -18,7 +19,7 @@ use std::{
use task_manager::ManagedTask;
use tokio::sync::mpsc::Receiver;

pub struct HeartbeatDaemon<GIR> {
pub struct HeartbeatDaemon<GIR, GFV> {
pool: sqlx::Pool<sqlx::Postgres>,
gateway_info_resolver: GIR,
heartbeats: Receiver<FileInfoStream<CbrsHeartbeatIngestReport>>,
Expand All @@ -27,11 +28,13 @@ pub struct HeartbeatDaemon<GIR> {
max_distance_to_coverage: u32,
heartbeat_sink: FileSinkClient,
seniority_sink: FileSinkClient,
geofence: GFV,
}

impl<GIR> HeartbeatDaemon<GIR>
impl<GIR, GFV> HeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand All @@ -43,6 +46,7 @@ where
max_distance_to_coverage: u32,
heartbeat_sink: FileSinkClient,
seniority_sink: FileSinkClient,
geofence: GFV,
) -> Self {
Self {
pool,
Expand All @@ -53,6 +57,7 @@ where
max_distance_to_coverage,
heartbeat_sink,
seniority_sink,
geofence,
}
}

Expand Down Expand Up @@ -123,6 +128,7 @@ where
self.max_distance_to_asserted,
self.max_distance_to_coverage,
&epoch,
&self.geofence,
),
heartbeat_cache,
coverage_claim_time_cache,
Expand All @@ -139,9 +145,10 @@ where
}
}

impl<GIR> ManagedTask for HeartbeatDaemon<GIR>
impl<GIR, GFV> ManagedTask for HeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator,
{
fn start_task(
self: Box<Self>,
Expand Down
15 changes: 15 additions & 0 deletions mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod wifi;
use crate::{
cell_type::{CellType, CellTypeLabel},
coverage::{CoverageClaimTimeCache, CoverageObjectCache, CoverageObjectMeta, Seniority},
geofence::GeofenceValidator,
GatewayResolution, GatewayResolver,
};
use anyhow::anyhow;
Expand Down Expand Up @@ -353,6 +354,7 @@ impl ValidatedHeartbeat {
max_distance_to_asserted: u32,
max_distance_to_coverage: u32,
epoch: &Range<DateTime<Utc>>,
geofence: &impl GeofenceValidator,
) -> anyhow::Result<Self> {
let Some(coverage_object) = heartbeat.coverage_object else {
return Ok(Self::new(
Expand Down Expand Up @@ -447,6 +449,17 @@ impl ValidatedHeartbeat {
));
};

if !geofence.in_valid_region(&heartbeat) {
return Ok(Self::new(
heartbeat,
cell_type,
dec!(0),
None,
Some(coverage_object.meta),
proto::HeartbeatValidity::UnsupportedLocation,
));
}

match gateway_info_resolver
.resolve_gateway(&heartbeat.hotspot_key)
.await?
Expand Down Expand Up @@ -509,6 +522,7 @@ impl ValidatedHeartbeat {
max_distance_to_asserted: u32,
max_distance_to_coverage: u32,
epoch: &'a Range<DateTime<Utc>>,
geofence: &'a impl GeofenceValidator,
) -> impl Stream<Item = anyhow::Result<Self>> + 'a {
heartbeats.then(move |heartbeat| async move {
Self::validate(
Expand All @@ -518,6 +532,7 @@ impl ValidatedHeartbeat {
max_distance_to_asserted,
max_distance_to_coverage,
epoch,
geofence,
)
.await
})
Expand Down
13 changes: 10 additions & 3 deletions mobile_verifier/src/heartbeats/wifi.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat};
use crate::{
coverage::{CoverageClaimTimeCache, CoverageObjectCache},
geofence::GeofenceValidator,
GatewayResolver,
};
use chrono::{DateTime, Duration, Utc};
Expand All @@ -17,7 +18,7 @@ use std::{
use task_manager::ManagedTask;
use tokio::sync::mpsc::Receiver;

pub struct HeartbeatDaemon<GIR> {
pub struct HeartbeatDaemon<GIR, GFV> {
pool: sqlx::Pool<sqlx::Postgres>,
gateway_info_resolver: GIR,
heartbeats: Receiver<FileInfoStream<WifiHeartbeatIngestReport>>,
Expand All @@ -26,11 +27,13 @@ pub struct HeartbeatDaemon<GIR> {
max_distance_to_coverage: u32,
heartbeat_sink: FileSinkClient,
seniority_sink: FileSinkClient,
geofence: GFV,
}

impl<GIR> HeartbeatDaemon<GIR>
impl<GIR, GFV> HeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand All @@ -42,6 +45,7 @@ where
max_distance_to_coverage: u32,
heartbeat_sink: FileSinkClient,
seniority_sink: FileSinkClient,
geofence: GFV,
) -> Self {
Self {
pool,
Expand All @@ -52,6 +56,7 @@ where
max_distance_to_coverage,
heartbeat_sink,
seniority_sink,
geofence,
}
}

Expand Down Expand Up @@ -116,6 +121,7 @@ where
self.max_distance_to_asserted,
self.max_distance_to_coverage,
&epoch,
&self.geofence,
),
heartbeat_cache,
coverage_claim_time_cache,
Expand All @@ -132,9 +138,10 @@ where
}
}

impl<GIR> ManagedTask for HeartbeatDaemon<GIR>
impl<GIR, GFV> ManagedTask for HeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator,
{
fn start_task(
self: Box<Self>,
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod cell_type;
pub mod cli;
pub mod coverage;
pub mod data_session;
pub mod geofence;
pub mod heartbeats;
pub mod reward_shares;
pub mod rewarder;
Expand Down
22 changes: 22 additions & 0 deletions mobile_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ pub struct Settings {
/// beyond which its location weight will be reduced
#[serde(default = "default_max_asserted_distance_deviation")]
pub max_asserted_distance_deviation: u32,
// Geofencing settings
pub geofence_regions: String,
#[serde(default = "default_fencing_resolution")]
pub fencing_resolution: u8,
}

fn default_fencing_resolution() -> u8 {
7
}

pub fn default_max_distance_from_coverage() -> u32 {
Expand Down Expand Up @@ -96,4 +104,18 @@ impl Settings {
.single()
.unwrap()
}

pub fn region_paths(&self) -> anyhow::Result<Vec<std::path::PathBuf>> {
let paths = std::fs::read_dir(&self.geofence_regions)?;
Ok(paths
.into_iter()
.collect::<Result<Vec<std::fs::DirEntry>, std::io::Error>>()?
.into_iter()
.map(|path| path.path())
.collect())
}

pub fn fencing_resolution(&self) -> anyhow::Result<h3o::Resolution> {
Ok(h3o::Resolution::try_from(self.fencing_resolution)?)
}
}
Loading

0 comments on commit f9d2b07

Please sign in to comment.