Skip to content

Commit

Permalink
- Rework location cache to have 2 hashmap and not cleanup anymore
Browse files Browse the repository at this point in the history
- Remove location_validation_timestamp from cbrs_heartbeats
- Handle GatewayResolution::GatewayNotAsserted  for cbrs and insert in cache
  • Loading branch information
macpie committed Oct 17, 2024
1 parent eeea8e9 commit 1004ffb
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 93 deletions.
1 change: 0 additions & 1 deletion mobile_verifier/migrations/39_update_cbrs_hearbeats.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
ALTER TABLE cbrs_heartbeats
ADD COLUMN location_validation_timestamp TIMESTAMPTZ,
ADD COLUMN lat DOUBLE PRECISION NOT NULL DEFAULT 0.0,
ADD COLUMN lon DOUBLE PRECISION NOT NULL DEFAULT 0.0;
107 changes: 61 additions & 46 deletions mobile_verifier/src/heartbeats/location_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use file_store::radio_location_estimates::Entity;
use helium_crypto::PublicKeyBinary;
use sqlx::PgPool;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::info;
use tokio::sync::{Mutex, MutexGuard};

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum LocationCacheKey {
Expand All @@ -29,56 +28,57 @@ impl LocationCacheValue {
}
}

type LocationCacheData = HashMap<LocationCacheKey, LocationCacheValue>;

/// A cache WiFi/Cbrs heartbeat locations
#[derive(Clone)]
pub struct LocationCache {
pool: PgPool,
data: Arc<Mutex<HashMap<LocationCacheKey, LocationCacheValue>>>,
wifi: Arc<Mutex<LocationCacheData>>,
cbrs: Arc<Mutex<LocationCacheData>>,
}

impl LocationCache {
pub fn new(pool: &PgPool) -> Self {
let data = Arc::new(Mutex::new(
HashMap::<LocationCacheKey, LocationCacheValue>::new(),
));
let data_clone = data.clone();
tokio::spawn(async move {
loop {
// Sleep 1 hour
let duration = core::time::Duration::from_secs(60 * 60);
tokio::time::sleep(duration).await;

let now = Utc::now();
// Set the 12-hour threshold
let twelve_hours_ago = now - Duration::hours(12);

let mut data = data_clone.lock().await;
let size_before = data.len() as f64;

// Retain only values that are within the last 12 hours
data.retain(|_, v| v.timestamp > twelve_hours_ago);

let size_after = data.len() as f64;
info!("cleaned {}", size_before - size_after);
}
});
let wifi = Arc::new(Mutex::new(HashMap::new()));
let cbrs = Arc::new(Mutex::new(HashMap::new()));
// TODO: We could spawn an hydrate from DB here?
Self {
pool: pool.clone(),
data,
wifi,
cbrs,
}
}

pub async fn get(&self, key: LocationCacheKey) -> anyhow::Result<Option<LocationCacheValue>> {
{
let data = self.data.lock().await;
let data = self.key_to_lock(&key).await;
if let Some(&value) = data.get(&key) {
return Ok(Some(value));
}
}
match key {
LocationCacheKey::WifiPubKey(pub_key_bin) => {
self.fetch_wifi_and_insert(pub_key_bin).await
}
LocationCacheKey::CbrsId(id) => self.fetch_cbrs_and_insert(id).await,
}
}

pub async fn get_recent(
&self,
key: LocationCacheKey,
when: Duration,
) -> anyhow::Result<Option<LocationCacheValue>> {
{
let data = self.key_to_lock(&key).await;
if let Some(&value) = data.get(&key) {
let now = Utc::now();
let twelve_hours_ago = now - Duration::hours(12);
if value.timestamp > twelve_hours_ago {
return Ok(None);
} else {
let before = now - when;
if value.timestamp > before {
return Ok(Some(value));
} else {
return Ok(None);
}
}
}
Expand All @@ -90,28 +90,41 @@ impl LocationCache {
}
}

pub async fn get_all(&self) -> HashMap<LocationCacheKey, LocationCacheValue> {
let data = self.data.lock().await;
data.clone()
pub async fn get_all(&self) -> LocationCacheData {
let wifi_data = self.wifi.lock().await;
let mut wifi_data_cloned = wifi_data.clone();

let cbrs_data = self.cbrs.lock().await;
let cbrs_data_cloned = cbrs_data.clone();

wifi_data_cloned.extend(cbrs_data_cloned);
wifi_data_cloned
}

pub async fn insert(
&self,
key: LocationCacheKey,
value: LocationCacheValue,
) -> anyhow::Result<()> {
let mut data = self.data.lock().await;
let mut data = self.key_to_lock(&key).await;
data.insert(key, value);
Ok(())
}

/// Only used for testing.
pub async fn remove(&self, key: LocationCacheKey) -> anyhow::Result<()> {
let mut data = self.data.lock().await;
let mut data = self.key_to_lock(&key).await;
data.remove(&key);
Ok(())
}

async fn key_to_lock(&self, key: &LocationCacheKey) -> MutexGuard<'_, LocationCacheData> {
match key {
LocationCacheKey::WifiPubKey(_) => self.wifi.lock().await,
LocationCacheKey::CbrsId(_) => self.cbrs.lock().await,
}
}

async fn fetch_wifi_and_insert(
&self,
pub_key_bin: PublicKeyBinary,
Expand All @@ -134,8 +147,9 @@ impl LocationCache {
match sqlx_return {
None => Ok(None),
Some(value) => {
let mut data = self.data.lock().await;
data.insert(LocationCacheKey::WifiPubKey(pub_key_bin), value);
let key = LocationCacheKey::WifiPubKey(pub_key_bin);
let mut data = self.key_to_lock(&key).await;
data.insert(key, value);
Ok(Some(value))
}
}
Expand All @@ -147,12 +161,12 @@ impl LocationCache {
) -> anyhow::Result<Option<LocationCacheValue>> {
let sqlx_return: Option<LocationCacheValue> = sqlx::query_as(
r#"
SELECT lat, lon, location_validation_timestamp AS timestamp
SELECT lat, lon, latest_timestamp AS timestamp
FROM cbrs_heartbeats
WHERE location_validation_timestamp IS NOT NULL
AND location_validation_timestamp >= $1
WHERE latest_timestamp IS NOT NULL
AND latest_timestamp >= $1
AND hotspot_key = $2
ORDER BY location_validation_timestamp DESC
ORDER BY latest_timestamp DESC
LIMIT 1
"#,
)
Expand All @@ -164,8 +178,9 @@ impl LocationCache {
match sqlx_return {
None => Ok(None),
Some(value) => {
let mut data = self.data.lock().await;
data.insert(LocationCacheKey::CbrsId(cbsd_id), value);
let key = LocationCacheKey::CbrsId(cbsd_id);
let mut data = self.key_to_lock(&key).await;
data.insert(key, value);
Ok(Some(value))
}
}
Expand Down
69 changes: 32 additions & 37 deletions mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ impl ValidatedHeartbeat {
proto::HeartbeatValidity::UnsupportedLocation,
));
}

match gateway_info_resolver
.resolve_gateway(&heartbeat.hotspot_key)
.await?
Expand All @@ -495,34 +494,6 @@ impl ValidatedHeartbeat {
Some(coverage_object.meta),
proto::HeartbeatValidity::InvalidDeviceType,
)),
// TODO do we get there when CBRS?
// Should I then update location form here then?
GatewayResolution::GatewayNotFound if heartbeat.hb_type == HbType::Cbrs => {
if let (Some(location_validation_timestamp), Some(cbsd_id)) = (
heartbeat.location_validation_timestamp,
heartbeat.cbsd_id.clone(),
) {
location_cache
.insert(
LocationCacheKey::CbrsId(cbsd_id),
LocationCacheValue::new(
heartbeat.lat,
heartbeat.lon,
location_validation_timestamp,
),
)
.await?;
};

Ok(Self::new(
heartbeat,
cell_type,
dec!(0),
None,
Some(coverage_object.meta),
proto::HeartbeatValidity::GatewayNotFound,
))
}
GatewayResolution::GatewayNotFound => Ok(Self::new(
heartbeat,
cell_type,
Expand All @@ -531,22 +502,47 @@ impl ValidatedHeartbeat {
Some(coverage_object.meta),
proto::HeartbeatValidity::GatewayNotFound,
)),
GatewayResolution::GatewayNotAsserted if heartbeat.hb_type == HbType::Wifi => {
Ok(Self::new(
GatewayResolution::GatewayNotAsserted => match heartbeat.hb_type {
HbType::Wifi => Ok(Self::new(
heartbeat,
cell_type,
dec!(0),
None,
Some(coverage_object.meta),
proto::HeartbeatValidity::GatewayNotAsserted,
))
}
)),
HbType::Cbrs => {
if let Some(cbsd_id) = heartbeat.cbsd_id.clone() {
location_cache
.insert(
LocationCacheKey::CbrsId(cbsd_id),
LocationCacheValue::new(
heartbeat.lat,
heartbeat.lon,
heartbeat.timestamp,
),
)
.await?;
};
Ok(Self::new(
heartbeat,
cell_type,
dec!(1.0),
None,
Some(coverage_object.meta),
proto::HeartbeatValidity::Valid,
))
}
},
GatewayResolution::AssertedLocation(location) if heartbeat.hb_type == HbType::Wifi => {
let asserted_latlng: LatLng = CellIndex::try_from(location)?.into();
let is_valid = match heartbeat.location_validation_timestamp {
None => {
if let Some(last_location) = location_cache
.get(LocationCacheKey::WifiPubKey(heartbeat.hotspot_key.clone()))
.get_recent(
LocationCacheKey::WifiPubKey(heartbeat.hotspot_key.clone()),
Duration::hours(12),
)
.await?
{
heartbeat.lat = last_location.lat;
Expand Down Expand Up @@ -702,8 +698,8 @@ impl ValidatedHeartbeat {
let truncated_timestamp = self.truncated_timestamp()?;
sqlx::query(
r#"
INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, location_validation_timestamp, lat, lon)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, lat, lon)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (cbsd_id, truncated_timestamp) DO UPDATE SET
latest_timestamp = EXCLUDED.latest_timestamp,
coverage_object = EXCLUDED.coverage_object
Expand All @@ -716,7 +712,6 @@ impl ValidatedHeartbeat {
.bind(truncated_timestamp)
.bind(self.heartbeat.coverage_object)
.bind(self.location_trust_score_multiplier)
.bind(self.heartbeat.location_validation_timestamp)
.bind(self.heartbeat.lat)
.bind(self.heartbeat.lon)
.execute(&mut *exec)
Expand Down
4 changes: 2 additions & 2 deletions mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,15 @@ async fn reward_poc(
fn is_within_radius(
loc_lat: f64,
loc_lon: f64,
comparators: Vec<(Decimal, Decimal, Decimal)>,
estimates: Vec<(Decimal, Decimal, Decimal)>,
) -> anyhow::Result<bool> {
let resolution = Resolution::Twelve;

let point_a = LatLng::new(loc_lat, loc_lon)
.map_err(|e| anyhow::anyhow!("Invalid LatLng for A: {}", e))?;
let h3_index_a = point_a.to_cell(resolution);

for (radius_meters, lat, lon) in comparators {
for (radius_meters, lat, lon) in estimates {
let lat_f64 = lat
.to_f64()
.ok_or_else(|| anyhow::anyhow!("Failed to convert lat_b to f64"))?;
Expand Down
8 changes: 1 addition & 7 deletions mobile_verifier/tests/integrations/last_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours(

let validated_heartbeat_1 = ValidatedHeartbeat::validate(
heartbeat(&hotspot, &coverage_object)
.location_validation_timestamp(Utc::now())
.timestamp(Utc::now() - Duration::hours(12) - Duration::seconds(1))
.location_validation_timestamp(Utc::now() - Duration::hours(12) - Duration::seconds(1))
.build(),
&GatewayClientAllOwnersValid,
&coverage_objects,
Expand Down Expand Up @@ -248,11 +247,6 @@ impl HeartbeatBuilder {
self
}

fn timestamp(mut self, ts: DateTime<Utc>) -> Self {
self.timestamp = Some(ts);
self
}

fn build(self) -> Heartbeat {
let (lat, lon) = self.latlng.unwrap_or_else(|| {
let lat_lng: LatLng = self
Expand Down

0 comments on commit 1004ffb

Please sign in to comment.