Skip to content

Commit

Permalink
Update main query for mobile rewards to simplify query and location t… (
Browse files Browse the repository at this point in the history
#694)

* Update main query for mobile rewards to simplify query and location trust score calculation

* Remove commented out code and fix query to remove empty string weirdness

* fix sql file formatting

* Remove old valid_heartbeats query

* Fmt valid_radios.sql

* Fix formatting and unneeded sql alias

---------

Co-authored-by: Matthew Plant <[email protected]>
  • Loading branch information
bbalser and maplant authored Jan 9, 2024
1 parent dccc965 commit ca8715d
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 348 deletions.
3 changes: 1 addition & 2 deletions mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ impl Cmd {
let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?;

let heartbeats =
HeartbeatReward::validated(&pool, &epoch, settings.max_asserted_distance_deviation)
.await?;
HeartbeatReward::validated(&pool, &epoch, settings.max_asserted_distance_deviation);
let speedtest_averages =
SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?;
let reward_shares =
Expand Down
94 changes: 14 additions & 80 deletions mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use file_store::{
file_sink::FileSinkClient, heartbeat::CbrsHeartbeatIngestReport,
wifi_heartbeat::WifiHeartbeatIngestReport,
};
use futures::stream::{Stream, StreamExt, TryStreamExt};
use futures::stream::{Stream, StreamExt};
use h3o::{CellIndex, LatLng};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile as proto;
use retainer::Cache;
use rust_decimal::Decimal;
use sqlx::{postgres::PgTypeInfo, Decode, Encode, Postgres, Transaction, Type};
use std::{collections::HashMap, ops::Range, pin::pin, time};
use std::{ops::Range, pin::pin, time};
use uuid::Uuid;

/// Minimum number of heartbeats required to give a reward to the hotspot.
Expand Down Expand Up @@ -262,28 +262,14 @@ impl From<WifiHeartbeatIngestReport> for Heartbeat {
}
}

#[derive(Debug, Clone, sqlx::FromRow)]
pub struct HeartbeatRow {
pub hotspot_key: PublicKeyBinary,
// cell hb only
pub cbsd_id: Option<String>,
pub cell_type: CellType,
// wifi hb only
pub location_validation_timestamp: Option<DateTime<Utc>>,
pub distance_to_asserted: Option<i64>,
pub coverage_object: Uuid,
pub latest_timestamp: DateTime<Utc>,
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct HeartbeatReward {
pub hotspot_key: PublicKeyBinary,
pub cell_type: CellType,
// cell hb only
pub cbsd_id: Option<String>,
pub location_trust_score_multiplier: Decimal,
pub cell_type: CellType,
pub location_trust_multiplier: Decimal,
pub coverage_object: Uuid,
pub latest_timestamp: DateTime<Utc>,
}

impl HeartbeatReward {
Expand All @@ -306,72 +292,20 @@ impl HeartbeatReward {
}

pub fn reward_weight(&self) -> Decimal {
self.location_trust_score_multiplier
self.location_trust_multiplier
}

pub async fn validated<'a>(
pub fn validated<'a>(
exec: impl sqlx::PgExecutor<'a> + Copy + 'a,
epoch: &'a Range<DateTime<Utc>>,
max_distance_to_asserted: u32,
) -> anyhow::Result<impl Stream<Item = HeartbeatReward> + 'a> {
let heartbeat_rows =
sqlx::query_as::<_, HeartbeatRow>(include_str!("valid_heartbeats.sql"))
.bind(epoch.start)
.bind(epoch.end)
.bind(MINIMUM_HEARTBEAT_COUNT)
.fetch(exec)
.try_fold(
HashMap::<(PublicKeyBinary, Option<String>), Vec<HeartbeatRow>>::new(),
|mut map, row| async move {
map.entry((row.hotspot_key.clone(), row.cbsd_id.clone()))
.or_default()
.push(row);

Ok(map)
},
)
.await?;

Ok(
futures::stream::iter(heartbeat_rows).map(move |((hotspot_key, cbsd_id), rows)| {
let first = rows.first().unwrap();
let average_location_trust_score = rows
.iter()
.map(|row| {
row.cell_type.location_weight(
row.location_validation_timestamp,
row.distance_to_asserted,
max_distance_to_asserted,
)
})
.sum::<Decimal>()
/ Decimal::new(rows.len() as i64, 0);

HeartbeatReward {
hotspot_key,
cell_type: first.cell_type,
cbsd_id,
location_trust_score_multiplier: average_location_trust_score,
coverage_object: first.coverage_object,
latest_timestamp: first.latest_timestamp,
}
}),
)
}

pub fn from_heartbeat_row(value: HeartbeatRow, max_distance_to_asserted: u32) -> Self {
Self {
hotspot_key: value.hotspot_key,
cell_type: value.cell_type,
cbsd_id: value.cbsd_id,
location_trust_score_multiplier: value.cell_type.location_weight(
value.location_validation_timestamp,
value.distance_to_asserted,
max_distance_to_asserted,
),
coverage_object: value.coverage_object,
latest_timestamp: value.latest_timestamp,
}
) -> impl Stream<Item = Result<HeartbeatReward, sqlx::Error>> + 'a {
sqlx::query_as::<_, HeartbeatReward>(include_str!("valid_radios.sql"))
.bind(epoch.start)
.bind(epoch.end)
.bind(MINIMUM_HEARTBEAT_COUNT)
.bind(max_distance_to_asserted as i32)
.fetch(exec)
}
}

Expand Down
110 changes: 0 additions & 110 deletions mobile_verifier/src/heartbeats/valid_heartbeats.sql

This file was deleted.

101 changes: 101 additions & 0 deletions mobile_verifier/src/heartbeats/valid_radios.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
WITH latest_cbrs_hotspot AS (
SELECT DISTINCT ON (cbsd_id)
cbsd_id,
hotspot_key
FROM
cbrs_heartbeats
WHERE
truncated_timestamp >= $1
AND truncated_timestamp < $2
ORDER BY
cbsd_id,
latest_timestamp DESC
),
heartbeats AS (
SELECT
lch.hotspot_key,
ch.cbsd_id,
ch.cell_type,
CASE WHEN count(*) >= $3 THEN
1.0
ELSE
0.0
END AS heartbeat_multiplier,
1.0 AS location_trust_multiplier
FROM
cbrs_heartbeats ch
INNER JOIN latest_cbrs_hotspot lch ON ch.cbsd_id = lch.cbsd_id
WHERE
ch.truncated_timestamp >= $1
AND ch.truncated_timestamp < $2
GROUP BY
ch.cbsd_id,
lch.hotspot_key,
ch.cell_type
UNION
SELECT
hotspot_key,
NULL AS cbsd_id,
cell_type,
CASE WHEN count(*) >= $3 THEN
1.0
ELSE
0.0
END AS heartbeat_multiplier,
avg(
CASE WHEN location_validation_timestamp IS NULL THEN
0.25
WHEN distance_to_asserted > $4 THEN
0.25
ELSE
1.0
END) AS location_trust_multiplier
FROM
wifi_heartbeats
WHERE
truncated_timestamp >= $1
AND truncated_timestamp < $2
GROUP BY
hotspot_key,
cell_type
),
latest_uuids AS (( SELECT DISTINCT ON (hotspot_key,
cbsd_id)
hotspot_key,
cbsd_id,
coverage_object
FROM
cbrs_heartbeats ch
WHERE
truncated_timestamp >= $1
AND truncated_timestamp < $2
ORDER BY
hotspot_key,
cbsd_id,
truncated_timestamp DESC)
UNION ( SELECT DISTINCT ON (hotspot_key)
hotspot_key,
NULL AS cbsd_id,
coverage_object
FROM
wifi_heartbeats wh
WHERE
truncated_timestamp >= $1
AND truncated_timestamp < $2
ORDER BY
hotspot_key,
truncated_timestamp DESC))
SELECT
hb.hotspot_key,
hb.cbsd_id,
hb.cell_type,
hb.location_trust_multiplier,
u.coverage_object
FROM
heartbeats hb
INNER JOIN latest_uuids u ON hb.hotspot_key = u.hotspot_key
AND (hb.cbsd_id = u.cbsd_id
OR (hb.cbsd_id IS NULL
AND u.cbsd_id IS NULL))
WHERE
hb.heartbeat_multiplier = 1.0
Loading

0 comments on commit ca8715d

Please sign in to comment.