From 317e7315062a4403156499967fe0579e31488085 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Thu, 28 Mar 2024 12:13:52 -0400 Subject: [PATCH] Revert PR 774 so it can continue to be tested (#777) * Revert "Fix SQL (#776)" This reverts commit 40e6541be892384d4a78bbbe7872c4db5018eb0e. * Revert "Lookup previous WiFi locations in case of None location validation timestamp (#774)" This reverts commit fab10f691ae637bcaddc0a8c36c5e180a94e2a62. --- .../migrations/30_save_lat_and_lon.sql | 3 - mobile_verifier/src/heartbeats/cbrs.rs | 8 +- mobile_verifier/src/heartbeats/mod.rs | 159 +----------------- mobile_verifier/src/heartbeats/wifi.rs | 7 +- mobile_verifier/tests/boosting_oracles.rs | 6 +- mobile_verifier/tests/modeled_coverage.rs | 13 +- 6 files changed, 14 insertions(+), 182 deletions(-) delete mode 100644 mobile_verifier/migrations/30_save_lat_and_lon.sql diff --git a/mobile_verifier/migrations/30_save_lat_and_lon.sql b/mobile_verifier/migrations/30_save_lat_and_lon.sql deleted file mode 100644 index 9073f8eb2..000000000 --- a/mobile_verifier/migrations/30_save_lat_and_lon.sql +++ /dev/null @@ -1,3 +0,0 @@ -ALTER TABLE wifi_heartbeats -ADD COLUMN lat DOUBLE PRECISION NOT NULL DEFAULT 0.0, -ADD COLUMN lon DOUBLE PRECISION NOT NULL DEFAULT 0.0; diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index 5ca03455d..023cddf32 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -2,7 +2,6 @@ use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat}; use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, geofence::GeofenceValidator, - heartbeats::LocationCache, GatewayResolver, }; @@ -75,8 +74,6 @@ where let coverage_claim_time_cache = CoverageClaimTimeCache::new(); let coverage_object_cache = CoverageObjectCache::new(&self.pool); - // Unused: - let location_cache = LocationCache::new(&self.pool); loop { #[rustfmt::skip] @@ -93,7 +90,6 @@ where &heartbeat_cache, &coverage_claim_time_cache, &coverage_object_cache, - &location_cache, ).await?; metrics::histogram!("cbrs_heartbeat_processing_time", start.elapsed()); } @@ -109,7 +105,6 @@ where heartbeat_cache: &Arc), ()>>, coverage_claim_time_cache: &CoverageClaimTimeCache, coverage_object_cache: &CoverageObjectCache, - location_cache: &LocationCache, ) -> anyhow::Result<()> { tracing::info!("Processing CBRS heartbeat file {}", file.file_info.key); let mut transaction = self.pool.begin().await?; @@ -127,10 +122,9 @@ where }); process_validated_heartbeats( ValidatedHeartbeat::validate_heartbeats( - heartbeats, &self.gateway_info_resolver, + heartbeats, coverage_object_cache, - location_cache, self.max_distance_to_asserted, self.max_distance_to_coverage, &epoch, diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 7e8ac3443..c8221137b 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -20,8 +20,8 @@ use helium_proto::services::poc_mobile as proto; use retainer::Cache; use rust_decimal::{prelude::ToPrimitive, Decimal}; use rust_decimal_macros::dec; -use sqlx::{postgres::PgTypeInfo, Decode, Encode, PgPool, Postgres, Transaction, Type}; -use std::{ops::Range, pin::pin, sync::Arc, time}; +use sqlx::{postgres::PgTypeInfo, Decode, Encode, Postgres, Transaction, Type}; +use std::{ops::Range, pin::pin, time}; use uuid::Uuid; /// Minimum number of heartbeats required to give a reward to the hotspot. @@ -379,12 +379,10 @@ impl ValidatedHeartbeat { } /// Validate a heartbeat in the given epoch. - #[allow(clippy::too_many_arguments)] pub async fn validate( - mut heartbeat: Heartbeat, + heartbeat: Heartbeat, gateway_info_resolver: &impl GatewayResolver, coverage_object_cache: &CoverageObjectCache, - last_location_cache: &LocationCache, max_distance_to_asserted: u32, max_distance_to_coverage: u32, epoch: &Range>, @@ -472,7 +470,7 @@ impl ValidatedHeartbeat { )); } - let Ok(mut hb_latlng) = heartbeat.centered_latlng() else { + let Ok(hb_latlng) = heartbeat.centered_latlng() else { return Ok(Self::new( heartbeat, cell_type, @@ -518,39 +516,8 @@ impl ValidatedHeartbeat { } GatewayResolution::AssertedLocation(location) if heartbeat.hb_type == HbType::Wifi => { let asserted_latlng: LatLng = CellIndex::try_from(location)?.into(); - let is_valid = match heartbeat.location_validation_timestamp { - None => { - if let Some(last_location) = last_location_cache - .fetch_last_location(&heartbeat.hotspot_key) - .await? - { - heartbeat.lat = last_location.lat; - heartbeat.lon = last_location.lon; - heartbeat.location_validation_timestamp = - Some(last_location.location_validation_timestamp); - // Can't panic, previous lat and lon must be valid. - hb_latlng = heartbeat.centered_latlng().unwrap(); - true - } else { - false - } - } - Some(location_validation_timestamp) => { - last_location_cache - .set_last_location( - &heartbeat.hotspot_key, - LastLocation::new( - location_validation_timestamp, - heartbeat.lat, - heartbeat.lon, - ), - ) - .await?; - true - } - }; let distance_to_asserted = asserted_latlng.distance_m(hb_latlng).round() as i64; - let location_trust_score_multiplier = if is_valid + let location_trust_score_multiplier = if heartbeat.location_validation_timestamp.is_some() // The heartbeat location to asserted location must be less than the max_distance_to_asserted value: && distance_to_asserted <= max_distance_to_asserted as i64 // The heartbeat location to every associated coverage hex must be less than max_distance_to_coverage: @@ -580,12 +547,10 @@ impl ValidatedHeartbeat { } } - #[allow(clippy::too_many_arguments)] pub fn validate_heartbeats<'a>( - heartbeats: impl Stream + 'a, gateway_info_resolver: &'a impl GatewayResolver, + heartbeats: impl Stream + 'a, coverage_object_cache: &'a CoverageObjectCache, - last_location_cache: &'a LocationCache, max_distance_to_asserted: u32, max_distance_to_coverage: u32, epoch: &'a Range>, @@ -596,7 +561,6 @@ impl ValidatedHeartbeat { heartbeat, gateway_info_resolver, coverage_object_cache, - last_location_cache, max_distance_to_asserted, max_distance_to_coverage, epoch, @@ -690,8 +654,8 @@ impl ValidatedHeartbeat { let truncated_timestamp = self.truncated_timestamp()?; sqlx::query( r#" - INSERT INTO wifi_heartbeats (hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, distance_to_asserted, lat, lon) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + INSERT INTO wifi_heartbeats (hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, distance_to_asserted) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (hotspot_key, truncated_timestamp) DO UPDATE SET latest_timestamp = EXCLUDED.latest_timestamp, coverage_object = EXCLUDED.coverage_object @@ -704,8 +668,6 @@ impl ValidatedHeartbeat { .bind(self.heartbeat.coverage_object) .bind(self.location_trust_score_multiplier) .bind(self.distance_to_asserted) - .bind(self.heartbeat.lat) - .bind(self.heartbeat.lon) .execute(&mut *exec) .await?; Ok(()) @@ -779,111 +741,6 @@ pub async fn clear_heartbeats( Ok(()) } -/// A cache for previous valid (or invalid) WiFi heartbeat locations -#[derive(Clone)] -pub struct LocationCache { - pool: PgPool, - locations: Arc>>, -} - -impl LocationCache { - pub fn new(pool: &PgPool) -> Self { - let locations = Arc::new(Cache::new()); - let locations_clone = locations.clone(); - tokio::spawn(async move { - locations_clone - .monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24)) - .await - }); - Self { - pool: pool.clone(), - locations, - } - } - - async fn fetch_from_db_and_set( - &self, - hotspot: &PublicKeyBinary, - ) -> anyhow::Result> { - let last_location: Option = sqlx::query_as( - r#" - SELECT location_validation_timestamp, lat, lon - FROM wifi_heartbeats - WHERE location_validation_timestamp IS NOT NULL - AND location_validation_timestamp >= $1 - ORDER BY location_validation_timestamp DESC - LIMIT 1 - "#, - ) - .bind(Utc::now() - Duration::hours(12)) - .fetch_optional(&self.pool) - .await?; - self.locations - .insert( - hotspot.clone(), - last_location, - last_location - .map(|x| x.duration_to_expiration()) - .unwrap_or_else(|| Duration::days(365)) - .to_std()?, - ) - .await; - Ok(last_location) - } - - pub async fn fetch_last_location( - &self, - hotspot: &PublicKeyBinary, - ) -> anyhow::Result> { - Ok( - if let Some(last_location) = self.locations.get(hotspot).await { - *last_location - } else { - self.fetch_from_db_and_set(hotspot).await? - }, - ) - } - - pub async fn set_last_location( - &self, - hotspot: &PublicKeyBinary, - last_location: LastLocation, - ) -> anyhow::Result<()> { - let duration_to_expiration = last_location.duration_to_expiration(); - self.locations - .insert( - hotspot.clone(), - Some(last_location), - duration_to_expiration.to_std()?, - ) - .await; - Ok(()) - } -} - -#[derive(sqlx::FromRow, Copy, Clone)] -pub struct LastLocation { - location_validation_timestamp: DateTime, - lat: f64, - lon: f64, -} - -impl LastLocation { - fn new(location_validation_timestamp: DateTime, lat: f64, lon: f64) -> Self { - Self { - location_validation_timestamp, - lat, - lon, - } - } - - /// Calculates the duration from now in which last_valid_timestamp is 12 hours old - fn duration_to_expiration(&self) -> Duration { - ((self.location_validation_timestamp + Duration::hours(12)) - Utc::now()) - .max(Duration::zero()) - } -} - pub struct SeniorityUpdate<'a> { heartbeat: &'a ValidatedHeartbeat, action: SeniorityUpdateAction, diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index 65749755c..8cfdca952 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -2,7 +2,6 @@ use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat}; use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, geofence::GeofenceValidator, - heartbeats::LocationCache, GatewayResolver, }; use chrono::{DateTime, Duration, Utc}; @@ -74,7 +73,6 @@ where let coverage_claim_time_cache = CoverageClaimTimeCache::new(); let coverage_object_cache = CoverageObjectCache::new(&self.pool); - let location_cache = LocationCache::new(&self.pool); loop { #[rustfmt::skip] @@ -91,7 +89,6 @@ where &heartbeat_cache, &coverage_claim_time_cache, &coverage_object_cache, - &location_cache ).await?; metrics::histogram!("wifi_heartbeat_processing_time", start.elapsed()); } @@ -107,7 +104,6 @@ where heartbeat_cache: &Cache<(String, DateTime), ()>, coverage_claim_time_cache: &CoverageClaimTimeCache, coverage_object_cache: &CoverageObjectCache, - location_cache: &LocationCache, ) -> anyhow::Result<()> { tracing::info!("Processing WIFI heartbeat file {}", file.file_info.key); let mut transaction = self.pool.begin().await?; @@ -119,10 +115,9 @@ where .map(Heartbeat::from); process_validated_heartbeats( ValidatedHeartbeat::validate_heartbeats( - heartbeats, &self.gateway_info_resolver, + heartbeats, coverage_object_cache, - location_cache, self.max_distance_to_asserted, self.max_distance_to_coverage, &epoch, diff --git a/mobile_verifier/tests/boosting_oracles.rs b/mobile_verifier/tests/boosting_oracles.rs index c42fd38eb..5687b53cb 100644 --- a/mobile_verifier/tests/boosting_oracles.rs +++ b/mobile_verifier/tests/boosting_oracles.rs @@ -17,7 +17,7 @@ use mobile_verifier::{ CoverageObjectCache, Seniority, UnassignedHex, }, geofence::GeofenceValidator, - heartbeats::{Heartbeat, HeartbeatReward, LocationCache, SeniorityUpdate, ValidatedHeartbeat}, + heartbeats::{Heartbeat, HeartbeatReward, SeniorityUpdate, ValidatedHeartbeat}, radio_threshold::VerifiedRadioThresholds, reward_shares::CoveragePoints, speedtests::Speedtest, @@ -371,14 +371,12 @@ async fn test_footfall_and_urbanization(pool: PgPool) -> anyhow::Result<()> { let coverage_objects = CoverageObjectCache::new(&pool); let coverage_claim_time_cache = CoverageClaimTimeCache::new(); - let location_cache = LocationCache::new(&pool); let epoch = start..end; let mut heartbeats = pin!(ValidatedHeartbeat::validate_heartbeats( - stream::iter(heartbeats.map(Heartbeat::from)), &AllOwnersValid, + stream::iter(heartbeats.map(Heartbeat::from)), &coverage_objects, - &location_cache, 2000, 2000, &epoch, diff --git a/mobile_verifier/tests/modeled_coverage.rs b/mobile_verifier/tests/modeled_coverage.rs index 0a40b0f22..84cbe6563 100644 --- a/mobile_verifier/tests/modeled_coverage.rs +++ b/mobile_verifier/tests/modeled_coverage.rs @@ -20,9 +20,7 @@ use mobile_verifier::{ CoverageObjectCache, Seniority, UnassignedHex, }, geofence::GeofenceValidator, - heartbeats::{ - Heartbeat, HeartbeatReward, KeyType, LocationCache, SeniorityUpdate, ValidatedHeartbeat, - }, + heartbeats::{Heartbeat, HeartbeatReward, KeyType, SeniorityUpdate, ValidatedHeartbeat}, radio_threshold::VerifiedRadioThresholds, reward_shares::CoveragePoints, speedtests::Speedtest, @@ -400,7 +398,6 @@ async fn process_input( ) -> anyhow::Result<()> { let coverage_objects = CoverageObjectCache::new(pool); let coverage_claim_time_cache = CoverageClaimTimeCache::new(); - let location_cache = LocationCache::new(pool); let mut transaction = pool.begin().await?; let mut coverage_objs = pin!(CoverageObject::validate_coverage_objects( @@ -422,10 +419,9 @@ async fn process_input( let mut transaction = pool.begin().await?; let mut heartbeats = pin!(ValidatedHeartbeat::validate_heartbeats( - stream::iter(heartbeats.map(Heartbeat::from)), &AllOwnersValid, + stream::iter(heartbeats.map(Heartbeat::from)), &coverage_objects, - &location_cache, 2000, 2000, epoch, @@ -1381,13 +1377,11 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow let hb_2: Heartbeat = hb_2.into(); let coverage_object_cache = CoverageObjectCache::new(&pool); - let location_cache = LocationCache::new(&pool); let validated_hb_1 = ValidatedHeartbeat::validate( hb_1, &AllOwnersValid, &coverage_object_cache, - &location_cache, 2000, 2000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), @@ -1402,7 +1396,6 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow hb_2.clone(), &AllOwnersValid, &coverage_object_cache, - &location_cache, 1000000, 2000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), @@ -1417,7 +1410,6 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow hb_2.clone(), &AllOwnersValid, &coverage_object_cache, - &location_cache, 2000, 1000000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), @@ -1432,7 +1424,6 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow hb_2.clone(), &AllOwnersValid, &coverage_object_cache, - &location_cache, 1000000, 1000000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC),