Skip to content

Commit

Permalink
Update cache to hydrate on new
Browse files Browse the repository at this point in the history
Remove rewarder part to put in diff PR
  • Loading branch information
macpie committed Oct 17, 2024
1 parent 1004ffb commit 62bde0f
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 93 deletions.
2 changes: 1 addition & 1 deletion mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Cmd {
let (new_coverage_obj_notifier, new_coverage_obj_notification) =
new_coverage_object_notification_channel();

let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;

TaskManager::builder()
.add_task(file_upload_server)
Expand Down
89 changes: 82 additions & 7 deletions mobile_verifier/src/heartbeats/location_cache.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use chrono::{DateTime, Duration, Utc};
use file_store::radio_location_estimates::Entity;
use futures::StreamExt;
use helium_crypto::PublicKeyBinary;
use sqlx::PgPool;
use std::{collections::HashMap, sync::Arc};
use sqlx::{PgPool, Row};
use std::{collections::HashMap, str::FromStr, sync::Arc};
use tokio::sync::{Mutex, MutexGuard};

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
Expand Down Expand Up @@ -39,15 +40,18 @@ pub struct LocationCache {
}

impl LocationCache {
pub fn new(pool: &PgPool) -> Self {
pub async fn new(pool: &PgPool) -> anyhow::Result<Self> {
let wifi = Arc::new(Mutex::new(HashMap::new()));
let cbrs = Arc::new(Mutex::new(HashMap::new()));
// TODO: We could spawn an hydrate from DB here?
Self {

hydrate_wifi(pool, &wifi).await?;
hydrate_cbrs(pool, &cbrs).await?;

Ok(Self {
pool: pool.clone(),
wifi,
cbrs,
}
})
}

pub async fn get(&self, key: LocationCacheKey) -> anyhow::Result<Option<LocationCacheValue>> {
Expand Down Expand Up @@ -165,7 +169,7 @@ impl LocationCache {
FROM cbrs_heartbeats
WHERE latest_timestamp IS NOT NULL
AND latest_timestamp >= $1
AND hotspot_key = $2
AND cbsd_id = $2
ORDER BY latest_timestamp DESC
LIMIT 1
"#,
Expand All @@ -187,6 +191,77 @@ impl LocationCache {
}
}

async fn hydrate_wifi(pool: &PgPool, mutex: &Arc<Mutex<LocationCacheData>>) -> anyhow::Result<()> {
let mut rows = sqlx::query(
r#"
SELECT wh.lat, wh.lon, wh.location_validation_timestamp AS timestamp, wh.hotspot_key
FROM wifi_heartbeats wh
JOIN (
SELECT hotspot_key, MAX(location_validation_timestamp) AS max_timestamp
FROM wifi_heartbeats
WHERE location_validation_timestamp IS NOT NULL
GROUP BY hotspot_key
) latest ON wh.hotspot_key = latest.hotspot_key
AND wh.location_validation_timestamp = latest.max_timestamp
"#,
)
.fetch(pool);

while let Some(row_result) = rows.next().await {
let row = row_result?;

let hotspot_key: String = row.get("hotspot_key");
let pub_key_bin = PublicKeyBinary::from_str(&hotspot_key)?;
let key = LocationCacheKey::WifiPubKey(pub_key_bin);

let value = LocationCacheValue {
lat: row.get("lat"),
lon: row.get("lon"),
timestamp: row.get("timestamp"),
};

let mut data = mutex.lock().await;
data.insert(key.clone(), value);
}

Ok(())
}

async fn hydrate_cbrs(pool: &PgPool, mutex: &Arc<Mutex<LocationCacheData>>) -> anyhow::Result<()> {
let mut rows = sqlx::query(
r#"
SELECT ch.lat, ch.lon, ch.latest_timestamp AS timestamp, ch.cbsd_id
FROM cbrs_heartbeats ch
JOIN (
SELECT cbsd_id, MAX(latest_timestamp) AS max_timestamp
FROM cbrs_heartbeats
WHERE latest_timestamp IS NOT NULL
GROUP BY cbsd_id
) latest ON ch.cbsd_id = latest.cbsd_id
AND ch.latest_timestamp = latest.max_timestamp
"#,
)
.fetch(pool);

while let Some(row_result) = rows.next().await {
let row = row_result?;

let id: String = row.get("cbsd_id");
let key = LocationCacheKey::CbrsId(id);

let value = LocationCacheValue {
lat: row.get("lat"),
lon: row.get("lon"),
timestamp: row.get("timestamp"),
};

let mut data = mutex.lock().await;
data.insert(key.clone(), value);
}

Ok(())
}

pub fn key_to_entity(entity: LocationCacheKey) -> Entity {
match entity {
LocationCacheKey::CbrsId(id) => Entity::CbrsId(id),
Expand Down
73 changes: 2 additions & 71 deletions mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ use self::boosted_hex_eligibility::BoostedHexEligibility;
use crate::{
boosting_oracles::db::check_for_unprocessed_data_sets,
coverage, data_session,
heartbeats::{
self,
location_cache::{self, LocationCache},
HeartbeatReward,
},
heartbeats::{self, location_cache::LocationCache, HeartbeatReward},
radio_location_estimates, radio_threshold,
reward_shares::{
self, CalculatedPocRewardShares, CoverageShares, DataTransferAndPocAllocatedRewardBuckets,
Expand All @@ -25,7 +21,6 @@ use file_store::{
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode},
};
use futures_util::TryFutureExt;
use h3o::{LatLng, Resolution};
use helium_proto::{
reward_manifest::RewardData::MobileRewardData,
services::poc_mobile::{
Expand Down Expand Up @@ -433,7 +428,7 @@ async fn reward_poc(
speedtest_avg_sink: &FileSinkClient<proto::SpeedtestAvg>,
reward_period: &Range<DateTime<Utc>>,
reward_shares: DataTransferAndPocAllocatedRewardBuckets,
location_cache: &LocationCache,
_location_cache: &LocationCache,
) -> anyhow::Result<(Decimal, CalculatedPocRewardShares)> {
let heartbeats = HeartbeatReward::validated(pool, reward_period);
let speedtest_averages =
Expand All @@ -460,30 +455,6 @@ async fn reward_poc(
)
.await?;

{
// TODO: Maybe we should hydrate cache on start to avoid banning too many hotspot
let locations = location_cache.get_all().await;
for (key, value) in locations.iter() {
let entity = location_cache::key_to_entity(key.clone());
// Estimates are ordered by bigger radius first it should allow us to do less calculation
// and find a match faster
let estimates =
radio_location_estimates::get_valid_estimates(pool, &entity, dec!(0.75)).await?;
if estimates.is_empty() {
// TODO we ban that key
todo!()
} else {
match is_within_radius(value.lat, value.lon, estimates) {
Ok(true) => todo!(),
// TODO we ban that key
Ok(false) => todo!(),
// TODO we ban that key
Err(_) => todo!(),
}
}
}
}

let coverage_shares = CoverageShares::new(
pool,
heartbeats,
Expand Down Expand Up @@ -530,46 +501,6 @@ async fn reward_poc(
Ok((unallocated_poc_amount, calculated_poc_rewards_per_share))
}

fn is_within_radius(
loc_lat: f64,
loc_lon: f64,
estimates: Vec<(Decimal, Decimal, Decimal)>,
) -> anyhow::Result<bool> {
let resolution = Resolution::Twelve;

let point_a = LatLng::new(loc_lat, loc_lon)
.map_err(|e| anyhow::anyhow!("Invalid LatLng for A: {}", e))?;
let h3_index_a = point_a.to_cell(resolution);

for (radius_meters, lat, lon) in estimates {
let lat_f64 = lat
.to_f64()
.ok_or_else(|| anyhow::anyhow!("Failed to convert lat_b to f64"))?;
let lon_f64 = lon
.to_f64()
.ok_or_else(|| anyhow::anyhow!("Failed to convert lon_b to f64"))?;
let radius_meters_f64 = radius_meters
.to_f64()
.ok_or_else(|| anyhow::anyhow!("Failed to convert radius_meters to f64"))?;

let point_b = LatLng::new(lat_f64, lon_f64)
.map_err(|e| anyhow::anyhow!("Invalid LatLng for B: {}", e))?;
let h3_index_b = point_b.to_cell(resolution);

let grid_distance = h3_index_a
.grid_distance(h3_index_b)
.map_err(|e| anyhow::anyhow!("Failed to calculate grid distance: {}", e))?;

let max_grid_distance = (radius_meters_f64 / 9.0).round() as i32;

if grid_distance <= max_grid_distance {
return Ok(true);
}
}

Ok(false)
}

pub async fn reward_dc(
mobile_rewards: &FileSinkClient<proto::MobileRewardShare>,
reward_period: &Range<DateTime<Utc>>,
Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/tests/integrations/boosting_oracles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async fn test_footfall_and_urbanization_and_landtype(pool: PgPool) -> anyhow::Re

let coverage_objects = CoverageObjectCache::new(&pool);
let coverage_claim_time_cache = CoverageClaimTimeCache::new();
let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;

let epoch = start..end;
let mut heartbeats = pin!(ValidatedHeartbeat::validate_heartbeats(
Expand Down
14 changes: 7 additions & 7 deletions mobile_verifier/tests/integrations/hex_boosting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> {
.to_u64()
.unwrap();

let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;

let (_, rewards) = tokio::join!(
// run rewards for poc and dc
Expand Down Expand Up @@ -323,7 +323,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu
];

let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes);
let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
Expand Down Expand Up @@ -488,7 +488,7 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res

let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes);

let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;

let (_, rewards) = tokio::join!(
// run rewards for poc and dc
Expand Down Expand Up @@ -665,7 +665,7 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> {
];

let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes);
let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
Expand Down Expand Up @@ -799,7 +799,7 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow:
.to_u64()
.unwrap();

let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
Expand Down Expand Up @@ -980,7 +980,7 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust(
let total_poc_emissions = reward_shares::get_scheduled_tokens_for_poc(epoch_duration)
.to_u64()
.unwrap();
let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
Expand Down Expand Up @@ -1187,7 +1187,7 @@ async fn test_poc_with_cbrs_and_multi_coverage_boosted_hexes(pool: PgPool) -> an
.to_u64()
.unwrap();

let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
Expand Down
6 changes: 3 additions & 3 deletions mobile_verifier/tests/integrations/last_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn heartbeat_uses_last_good_location_when_invalid_location(
let epoch_end = epoch_start + Duration::days(2);

let coverage_objects = CoverageObjectCache::new(&pool);
let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;

let mut transaction = pool.begin().await?;
let coverage_object = coverage_object(&hotspot, &mut transaction).await?;
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn heartbeat_will_use_last_good_location_from_db(pool: PgPool) -> anyhow::
let epoch_end = epoch_start + Duration::days(2);

let coverage_objects = CoverageObjectCache::new(&pool);
let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;

let mut transaction = pool.begin().await?;
let coverage_object = coverage_object(&hotspot, &mut transaction).await?;
Expand Down Expand Up @@ -173,7 +173,7 @@ async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours(
let epoch_end = epoch_start + Duration::days(2);

let coverage_objects = CoverageObjectCache::new(&pool);
let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;

let mut transaction = pool.begin().await?;
let coverage_object = coverage_object(&hotspot, &mut transaction).await?;
Expand Down
4 changes: 2 additions & 2 deletions mobile_verifier/tests/integrations/modeled_coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ async fn process_input(
) -> anyhow::Result<()> {
let coverage_objects = CoverageObjectCache::new(pool);
let coverage_claim_time_cache = CoverageClaimTimeCache::new();
let location_cache = LocationCache::new(pool);
let location_cache = LocationCache::new(pool).await?;

let mut transaction = pool.begin().await?;
let mut coverage_objs = pin!(CoverageObject::validate_coverage_objects(
Expand Down Expand Up @@ -1376,7 +1376,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow

let max_covered_distance = 5_000;
let coverage_object_cache = CoverageObjectCache::new(&pool);
let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;

let mk_heartbeat = |latlng: LatLng| WifiHeartbeatIngestReport {
report: WifiHeartbeat {
Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/tests/integrations/rewarder_poc_dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> {
let boosted_hexes = vec![];

let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes);
let location_cache = LocationCache::new(&pool);
let location_cache = LocationCache::new(&pool).await?;
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
Expand Down

0 comments on commit 62bde0f

Please sign in to comment.