diff --git a/mobile_verifier/src/cli/reward_from_db.rs b/mobile_verifier/src/cli/reward_from_db.rs index a6979bcf5..7ea923d1d 100644 --- a/mobile_verifier/src/cli/reward_from_db.rs +++ b/mobile_verifier/src/cli/reward_from_db.rs @@ -36,8 +36,7 @@ impl Cmd { let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; let heartbeats = - HeartbeatReward::validated(&pool, &epoch, settings.max_asserted_distance_deviation) - .await?; + HeartbeatReward::validated(&pool, &epoch, settings.max_asserted_distance_deviation); let speedtest_averages = SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?; let reward_shares = diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 7608d060b..0e0d003da 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -12,14 +12,14 @@ use file_store::{ file_sink::FileSinkClient, heartbeat::CbrsHeartbeatIngestReport, wifi_heartbeat::WifiHeartbeatIngestReport, }; -use futures::stream::{Stream, StreamExt, TryStreamExt}; +use futures::stream::{Stream, StreamExt}; use h3o::{CellIndex, LatLng}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile as proto; use retainer::Cache; use rust_decimal::Decimal; use sqlx::{postgres::PgTypeInfo, Decode, Encode, Postgres, Transaction, Type}; -use std::{collections::HashMap, ops::Range, pin::pin, time}; +use std::{ops::Range, pin::pin, time}; use uuid::Uuid; /// Minimum number of heartbeats required to give a reward to the hotspot. @@ -262,28 +262,14 @@ impl From for Heartbeat { } } -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct HeartbeatRow { - pub hotspot_key: PublicKeyBinary, - // cell hb only - pub cbsd_id: Option, - pub cell_type: CellType, - // wifi hb only - pub location_validation_timestamp: Option>, - pub distance_to_asserted: Option, - pub coverage_object: Uuid, - pub latest_timestamp: DateTime, -} - -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, sqlx::FromRow)] pub struct HeartbeatReward { pub hotspot_key: PublicKeyBinary, - pub cell_type: CellType, // cell hb only pub cbsd_id: Option, - pub location_trust_score_multiplier: Decimal, + pub cell_type: CellType, + pub location_trust_multiplier: Decimal, pub coverage_object: Uuid, - pub latest_timestamp: DateTime, } impl HeartbeatReward { @@ -306,72 +292,20 @@ impl HeartbeatReward { } pub fn reward_weight(&self) -> Decimal { - self.location_trust_score_multiplier + self.location_trust_multiplier } - pub async fn validated<'a>( + pub fn validated<'a>( exec: impl sqlx::PgExecutor<'a> + Copy + 'a, epoch: &'a Range>, max_distance_to_asserted: u32, - ) -> anyhow::Result + 'a> { - let heartbeat_rows = - sqlx::query_as::<_, HeartbeatRow>(include_str!("valid_heartbeats.sql")) - .bind(epoch.start) - .bind(epoch.end) - .bind(MINIMUM_HEARTBEAT_COUNT) - .fetch(exec) - .try_fold( - HashMap::<(PublicKeyBinary, Option), Vec>::new(), - |mut map, row| async move { - map.entry((row.hotspot_key.clone(), row.cbsd_id.clone())) - .or_default() - .push(row); - - Ok(map) - }, - ) - .await?; - - Ok( - futures::stream::iter(heartbeat_rows).map(move |((hotspot_key, cbsd_id), rows)| { - let first = rows.first().unwrap(); - let average_location_trust_score = rows - .iter() - .map(|row| { - row.cell_type.location_weight( - row.location_validation_timestamp, - row.distance_to_asserted, - max_distance_to_asserted, - ) - }) - .sum::() - / Decimal::new(rows.len() as i64, 0); - - HeartbeatReward { - hotspot_key, - cell_type: first.cell_type, - cbsd_id, - location_trust_score_multiplier: average_location_trust_score, - coverage_object: first.coverage_object, - latest_timestamp: first.latest_timestamp, - } - }), - ) - } - - pub fn from_heartbeat_row(value: HeartbeatRow, max_distance_to_asserted: u32) -> Self { - Self { - hotspot_key: value.hotspot_key, - cell_type: value.cell_type, - cbsd_id: value.cbsd_id, - location_trust_score_multiplier: value.cell_type.location_weight( - value.location_validation_timestamp, - value.distance_to_asserted, - max_distance_to_asserted, - ), - coverage_object: value.coverage_object, - latest_timestamp: value.latest_timestamp, - } + ) -> impl Stream> + 'a { + sqlx::query_as::<_, HeartbeatReward>(include_str!("valid_radios.sql")) + .bind(epoch.start) + .bind(epoch.end) + .bind(MINIMUM_HEARTBEAT_COUNT) + .bind(max_distance_to_asserted as i32) + .fetch(exec) } } diff --git a/mobile_verifier/src/heartbeats/valid_heartbeats.sql b/mobile_verifier/src/heartbeats/valid_heartbeats.sql deleted file mode 100644 index 59d67bab7..000000000 --- a/mobile_verifier/src/heartbeats/valid_heartbeats.sql +++ /dev/null @@ -1,110 +0,0 @@ -WITH cbrs_coverage_objs AS ( - SELECT - t1.cbsd_id, - t1.coverage_object, - t1.latest_timestamp - FROM - cbrs_heartbeats t1 - WHERE - t1.latest_timestamp = ( - SELECT - MAX(t2.latest_timestamp) - FROM - cbrs_heartbeats t2 - WHERE - t2.cbsd_id = t1.cbsd_id - AND truncated_timestamp >= $1 - AND truncated_timestamp < $2) -), -wifi_coverage_objs AS ( - SELECT - t1.hotspot_key, - t1.coverage_object, - t1.latest_timestamp - FROM - wifi_heartbeats t1 - WHERE - t1.latest_timestamp = ( - SELECT - MAX(t2.latest_timestamp) - FROM - wifi_heartbeats t2 - WHERE - t2.hotspot_key = t1.hotspot_key - AND truncated_timestamp >= $1 - AND truncated_timestamp < $2) -), -latest_hotspots AS ( - SELECT - t1.cbsd_id, - t1.hotspot_key, - t1.latest_timestamp - FROM - cbrs_heartbeats t1 - WHERE - t1.latest_timestamp = ( - SELECT - MAX(t2.latest_timestamp) - FROM - cbrs_heartbeats t2 - WHERE - t2.cbsd_id = t1.cbsd_id - AND truncated_timestamp >= $1 - AND truncated_timestamp < $2)) -SELECT - latest_hotspots.hotspot_key, - cbrs_heartbeats.cbsd_id, - cell_type, - cbrs_coverage_objs.coverage_object, - cbrs_coverage_objs.latest_timestamp, - NULL AS location_validation_timestamp, - NULL AS distance_to_asserted -FROM - cbrs_heartbeats - INNER JOIN latest_hotspots ON cbrs_heartbeats.cbsd_id = latest_hotspots.cbsd_id - INNER JOIN cbrs_coverage_objs ON cbrs_heartbeats.cbsd_id = cbrs_coverage_objs.cbsd_id -WHERE - truncated_timestamp >= $1 - AND truncated_timestamp < $2 -GROUP BY - cbrs_heartbeats.cbsd_id, - latest_hotspots.hotspot_key, - cell_type, - cbrs_coverage_objs.coverage_object, - cbrs_coverage_objs.latest_timestamp -HAVING - count(*) >= $3 -UNION ALL -SELECT - wifi_grouped.hotspot_key, - NULL AS cbsd_id, - cell_type, - wifi_coverage_objs.coverage_object, - wifi_coverage_objs.latest_timestamp, - b.location_validation_timestamp, - b.distance_to_asserted -FROM ( - SELECT - hotspot_key, - cell_type - FROM - wifi_heartbeats - WHERE - truncated_timestamp >= $1 - AND truncated_timestamp < $2 - GROUP BY - hotspot_key, - cell_type - HAVING - count(*) >= $3) AS wifi_grouped - INNER JOIN ( - SELECT - hotspot_key, - location_validation_timestamp, - distance_to_asserted - FROM - wifi_heartbeats - WHERE - wifi_heartbeats.truncated_timestamp >= $1 - AND wifi_heartbeats.truncated_timestamp < $2) AS b ON b.hotspot_key = wifi_grouped.hotspot_key - INNER JOIN wifi_coverage_objs ON wifi_grouped.hotspot_key = wifi_coverage_objs.hotspot_key diff --git a/mobile_verifier/src/heartbeats/valid_radios.sql b/mobile_verifier/src/heartbeats/valid_radios.sql new file mode 100644 index 000000000..4527554d1 --- /dev/null +++ b/mobile_verifier/src/heartbeats/valid_radios.sql @@ -0,0 +1,101 @@ +WITH latest_cbrs_hotspot AS ( + SELECT DISTINCT ON (cbsd_id) + cbsd_id, + hotspot_key + FROM + cbrs_heartbeats + WHERE + truncated_timestamp >= $1 + AND truncated_timestamp < $2 + ORDER BY + cbsd_id, + latest_timestamp DESC +), +heartbeats AS ( + SELECT + lch.hotspot_key, + ch.cbsd_id, + ch.cell_type, + CASE WHEN count(*) >= $3 THEN + 1.0 + ELSE + 0.0 + END AS heartbeat_multiplier, + 1.0 AS location_trust_multiplier + FROM + cbrs_heartbeats ch + INNER JOIN latest_cbrs_hotspot lch ON ch.cbsd_id = lch.cbsd_id + WHERE + ch.truncated_timestamp >= $1 + AND ch.truncated_timestamp < $2 + GROUP BY + ch.cbsd_id, + lch.hotspot_key, + ch.cell_type + UNION + SELECT + hotspot_key, + NULL AS cbsd_id, + cell_type, + CASE WHEN count(*) >= $3 THEN + 1.0 + ELSE + 0.0 + END AS heartbeat_multiplier, + avg( + CASE WHEN location_validation_timestamp IS NULL THEN + 0.25 + WHEN distance_to_asserted > $4 THEN + 0.25 + ELSE + 1.0 + END) AS location_trust_multiplier +FROM + wifi_heartbeats + WHERE + truncated_timestamp >= $1 + AND truncated_timestamp < $2 + GROUP BY + hotspot_key, + cell_type +), +latest_uuids AS (( SELECT DISTINCT ON (hotspot_key, + cbsd_id) + hotspot_key, + cbsd_id, + coverage_object + FROM + cbrs_heartbeats ch + WHERE + truncated_timestamp >= $1 + AND truncated_timestamp < $2 + ORDER BY + hotspot_key, + cbsd_id, + truncated_timestamp DESC) + UNION ( SELECT DISTINCT ON (hotspot_key) + hotspot_key, + NULL AS cbsd_id, + coverage_object + FROM + wifi_heartbeats wh + WHERE + truncated_timestamp >= $1 + AND truncated_timestamp < $2 + ORDER BY + hotspot_key, + truncated_timestamp DESC)) +SELECT + hb.hotspot_key, + hb.cbsd_id, + hb.cell_type, + hb.location_trust_multiplier, + u.coverage_object +FROM + heartbeats hb + INNER JOIN latest_uuids u ON hb.hotspot_key = u.hotspot_key + AND (hb.cbsd_id = u.cbsd_id + OR (hb.cbsd_id IS NULL + AND u.cbsd_id IS NULL)) +WHERE + hb.heartbeat_multiplier = 1.0 diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index b07eef9c1..09f74510a 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -406,14 +406,14 @@ pub struct CoveragePoints { impl CoveragePoints { pub async fn aggregate_points( hex_streams: &impl CoveredHexStream, - heartbeats: impl Stream, + heartbeats: impl Stream>, speedtests: &SpeedtestAverages, period_end: DateTime, ) -> Result { let mut heartbeats = std::pin::pin!(heartbeats); let mut covered_hexes = CoveredHexes::default(); let mut coverage_points = HashMap::new(); - while let Some(heartbeat) = heartbeats.next().await { + while let Some(heartbeat) = heartbeats.next().await.transpose()? { let speedtest_multiplier = speedtests .get_average(&heartbeat.hotspot_key) .as_ref() @@ -435,7 +435,7 @@ impl CoveragePoints { .insert( opt_cbsd_id, RadioPoints::new( - heartbeat.location_trust_score_multiplier, + heartbeat.location_trust_multiplier, heartbeat.coverage_object, seniority.seniority_ts, ), @@ -595,7 +595,7 @@ mod test { coverage::{CoveredHexStream, HexCoverage, Seniority}, data_session, data_session::HotspotDataSession, - heartbeats::{HeartbeatReward, HeartbeatRow, KeyType, OwnedKeyType}, + heartbeats::{HeartbeatReward, KeyType, OwnedKeyType}, reward_shares, speedtests::Speedtest, speedtests_average::SpeedtestAverage, @@ -1005,146 +1005,118 @@ mod test { let now = Utc::now(); let timestamp = now - Duration::minutes(20); - let max_asserted_distance_deviation: u32 = 300; // setup heartbeats - let heartbeat_keys = vec![ - HeartbeatRow { + let heartbeat_rewards = vec![ + HeartbeatReward { cbsd_id: Some(c2.clone()), hotspot_key: gw2.clone(), coverage_object: cov_obj_2, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c2).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c4.clone()), hotspot_key: gw3.clone(), coverage_object: cov_obj_4, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c4).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c5.clone()), hotspot_key: gw4.clone(), coverage_object: cov_obj_5, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c5).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c6.clone()), hotspot_key: gw4.clone(), coverage_object: cov_obj_6, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c6).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c7.clone()), hotspot_key: gw4.clone(), coverage_object: cov_obj_7, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c7).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c8.clone()), hotspot_key: gw4.clone(), coverage_object: cov_obj_8, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c8).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c9.clone()), hotspot_key: gw4.clone(), coverage_object: cov_obj_9, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c9).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c10.clone()), hotspot_key: gw4.clone(), coverage_object: cov_obj_10, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c10).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c11.clone()), hotspot_key: gw4.clone(), coverage_object: cov_obj_11, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c11).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c12.clone()), hotspot_key: gw5.clone(), coverage_object: cov_obj_12, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c12).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c13.clone()), hotspot_key: gw6.clone(), coverage_object: cov_obj_13, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c13).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c14.clone()), hotspot_key: gw7.clone(), coverage_object: cov_obj_14, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c14).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: None, hotspot_key: gw9.clone(), cell_type: CellType::NovaGenericWifiIndoor, coverage_object: cov_obj_15, - latest_timestamp: DateTime::::MIN_UTC, - location_validation_timestamp: Some(timestamp), - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: None, hotspot_key: gw10.clone(), cell_type: CellType::NovaGenericWifiIndoor, coverage_object: cov_obj_16, - latest_timestamp: DateTime::::MIN_UTC, - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(0.25), }, - HeartbeatRow { + HeartbeatReward { cbsd_id: None, hotspot_key: gw11.clone(), cell_type: CellType::NovaGenericWifiIndoor, coverage_object: cov_obj_17, - latest_timestamp: DateTime::::MIN_UTC, - location_validation_timestamp: Some(timestamp), - distance_to_asserted: Some(10000), + location_trust_multiplier: dec!(0.25), }, - ]; + ] + .into_iter() + .map(Ok) + .collect::>>(); // Setup hex coverages let mut hex_coverage = HashMap::new(); @@ -1209,11 +1181,6 @@ mod test { simple_hex_coverage(&gw11, 0x8c2681a306607ff), ); - let heartbeat_rewards: Vec = heartbeat_keys - .into_iter() - .map(|row| HeartbeatReward::from_heartbeat_row(row, max_asserted_distance_deviation)) - .collect(); - // setup speedtests let last_speedtest = timestamp - Duration::hours(12); let gw1_speedtests = vec![ @@ -1396,7 +1363,6 @@ mod test { let now = Utc::now(); let timestamp = now - Duration::minutes(20); - let max_asserted_distance_deviation: u32 = 300; let g1_cov_obj = Uuid::new_v4(); let g2_cov_obj = Uuid::new_v4(); @@ -1405,33 +1371,27 @@ mod test { let c2 = "P27-SCE4255W".to_string(); // sercom indoor // setup heartbeats - let heartbeat_keys = vec![ + let heartbeat_rewards = vec![ // add wifi indoor HB - HeartbeatRow { + HeartbeatReward { cbsd_id: None, hotspot_key: gw1.clone(), cell_type: CellType::NovaGenericWifiIndoor, coverage_object: g1_cov_obj, - latest_timestamp: DateTime::::MIN_UTC, - location_validation_timestamp: Some(timestamp), - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, // add sercomm indoor HB - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c2.clone()), hotspot_key: gw2.clone(), cell_type: CellType::from_cbsd_id(&c2).unwrap(), - latest_timestamp: DateTime::::MIN_UTC, coverage_object: g2_cov_obj, - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - ]; - - let heartbeat_rewards: Vec = heartbeat_keys - .into_iter() - .map(|row| HeartbeatReward::from_heartbeat_row(row, max_asserted_distance_deviation)) - .collect(); + ] + .into_iter() + .map(Ok) + .collect::>>(); // setup speedtests let last_speedtest = timestamp - Duration::hours(12); @@ -1528,7 +1488,6 @@ mod test { let now = Utc::now(); let timestamp = now - Duration::minutes(20); - let max_asserted_distance_deviation: u32 = 300; // init cells and cell_types let c2 = "P27-SCE4255W".to_string(); // sercom indoor @@ -1537,35 +1496,29 @@ mod test { let g2_cov_obj = Uuid::new_v4(); // setup heartbeats - let heartbeat_keys = vec![ + let heartbeat_rewards = vec![ // add wifi indoor HB // with distance to asserted > than max allowed // this results in reward scale dropping to 0.25 - HeartbeatRow { + HeartbeatReward { cbsd_id: None, hotspot_key: gw1.clone(), cell_type: CellType::NovaGenericWifiIndoor, coverage_object: g1_cov_obj, - latest_timestamp: DateTime::::MIN_UTC, - location_validation_timestamp: Some(timestamp), - distance_to_asserted: Some(1000), + location_trust_multiplier: dec!(0.25), }, // add sercomm indoor HB - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c2.clone()), hotspot_key: gw2.clone(), coverage_object: g2_cov_obj, - latest_timestamp: DateTime::::MIN_UTC, cell_type: CellType::from_cbsd_id(&c2).unwrap(), - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - ]; - - let heartbeat_rewards: Vec = heartbeat_keys - .into_iter() - .map(|row| HeartbeatReward::from_heartbeat_row(row, max_asserted_distance_deviation)) - .collect(); + ] + .into_iter() + .map(Ok) + .collect::>>(); // setup speedtests let last_speedtest = timestamp - Duration::hours(12); @@ -1664,7 +1617,6 @@ mod test { let now = Utc::now(); let timestamp = now - Duration::minutes(20); - let max_asserted_distance_deviation: u32 = 300; let g1_cov_obj = Uuid::new_v4(); let g2_cov_obj = Uuid::new_v4(); @@ -1673,33 +1625,27 @@ mod test { let c2 = "P27-SCE4255W".to_string(); // sercom indoor // setup heartbeats - let heartbeat_keys = vec![ + let heartbeat_rewards = vec![ // add wifi indoor HB - HeartbeatRow { + HeartbeatReward { cbsd_id: None, hotspot_key: gw1.clone(), cell_type: CellType::NovaGenericWifiOutdoor, coverage_object: g1_cov_obj, - latest_timestamp: DateTime::::MIN_UTC, - location_validation_timestamp: Some(timestamp), - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, // add sercomm indoor HB - HeartbeatRow { + HeartbeatReward { cbsd_id: Some(c2.clone()), hotspot_key: gw2.clone(), cell_type: CellType::from_cbsd_id(&c2).unwrap(), - latest_timestamp: DateTime::::MIN_UTC, coverage_object: g2_cov_obj, - location_validation_timestamp: None, - distance_to_asserted: Some(1), + location_trust_multiplier: dec!(1.0), }, - ]; - - let heartbeat_rewards: Vec = heartbeat_keys - .into_iter() - .map(|row| HeartbeatReward::from_heartbeat_row(row, max_asserted_distance_deviation)) - .collect(); + ] + .into_iter() + .map(Ok) + .collect::>>(); // setup speedtests let last_speedtest = timestamp - Duration::hours(12); diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 72686c4b4..0b695e38d 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -243,8 +243,7 @@ where transfer_reward_sum: Decimal, ) -> anyhow::Result<()> { let heartbeats = - HeartbeatReward::validated(&self.pool, reward_period, self.max_distance_to_asserted) - .await?; + HeartbeatReward::validated(&self.pool, reward_period, self.max_distance_to_asserted); let speedtest_averages = SpeedtestAverages::aggregate_epoch_averages(reward_period.end, &self.pool).await?; let coverage_points = CoveragePoints::aggregate_points( diff --git a/mobile_verifier/tests/heartbeats.rs b/mobile_verifier/tests/heartbeats.rs index 716d100a3..4f9d41ecd 100644 --- a/mobile_verifier/tests/heartbeats.rs +++ b/mobile_verifier/tests/heartbeats.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Utc}; -use futures_util::StreamExt; +use futures_util::TryStreamExt; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::HeartbeatValidity; use mobile_verifier::cell_type::CellType; @@ -132,7 +132,6 @@ VALUES let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; - let latest_timestamp: DateTime = "2023-08-25 23:00:00.000000000 UTC".parse()?; let max_asserted_distance_deviation: u32 = 300; let heartbeat_reward: Vec<_> = HeartbeatReward::validated( @@ -140,9 +139,8 @@ VALUES &(start_period..end_period), max_asserted_distance_deviation, ) - .await? - .collect() - .await; + .try_collect() + .await?; assert_eq!( heartbeat_reward, @@ -150,8 +148,7 @@ VALUES hotspot_key: hotspot_2, cell_type, cbsd_id: Some(cbsd_id), - location_trust_score_multiplier: Decimal::ONE, - latest_timestamp, + location_trust_multiplier: Decimal::ONE, coverage_object, }] ); @@ -196,16 +193,14 @@ VALUES let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; - let latest_timestamp: DateTime = "2023-08-25 11:00:00.000000000 UTC".parse()?; let max_asserted_distance_deviation: u32 = 300; let heartbeat_reward: Vec<_> = HeartbeatReward::validated( &pool, &(start_period..end_period), max_asserted_distance_deviation, ) - .await? - .collect() - .await; + .try_collect() + .await?; assert_eq!( heartbeat_reward, @@ -213,8 +208,7 @@ VALUES hotspot_key: hotspot_2, cell_type, cbsd_id: Some(cbsd_id), - location_trust_score_multiplier: Decimal::ONE, - latest_timestamp, + location_trust_multiplier: Decimal::ONE, coverage_object, }] ); @@ -261,9 +255,8 @@ VALUES &(start_period..end_period), max_asserted_distance_deviation, ) - .await? - .collect() - .await; + .try_collect() + .await?; assert!(heartbeat_reward.is_empty()); @@ -303,16 +296,14 @@ VALUES let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; - let latest_timestamp: DateTime = "2023-08-25 11:00:00.000000000 UTC".parse()?; let max_asserted_distance_deviation: u32 = 300; let heartbeat_reward: Vec<_> = HeartbeatReward::validated( &pool, &(start_period..end_period), max_asserted_distance_deviation, ) - .await? - .collect() - .await; + .try_collect() + .await?; assert_eq!( heartbeat_reward, @@ -320,8 +311,7 @@ VALUES hotspot_key: hotspot, cell_type: CellType::NovaGenericWifiIndoor, cbsd_id: None, - location_trust_score_multiplier: dec!(1.0), - latest_timestamp, + location_trust_multiplier: dec!(1.0), coverage_object: latest_coverage_object, }] ); @@ -362,16 +352,14 @@ VALUES let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; - let latest_timestamp: DateTime = "2023-08-25 11:00:00.000000000 UTC".parse()?; let max_asserted_distance_deviation: u32 = 300; let heartbeat_reward: Vec<_> = HeartbeatReward::validated( &pool, &(start_period..end_period), max_asserted_distance_deviation, ) - .await? - .collect() - .await; + .try_collect() + .await?; assert_eq!( heartbeat_reward, @@ -379,8 +367,7 @@ VALUES hotspot_key: hotspot, cell_type: CellType::NovaGenericWifiIndoor, cbsd_id: None, - location_trust_score_multiplier: dec!(0.75), - latest_timestamp, + location_trust_multiplier: dec!(0.75), coverage_object: latest_coverage_object, }] ); diff --git a/mobile_verifier/tests/modeled_coverage.rs b/mobile_verifier/tests/modeled_coverage.rs index d65f64749..eca4d9aaa 100644 --- a/mobile_verifier/tests/modeled_coverage.rs +++ b/mobile_verifier/tests/modeled_coverage.rs @@ -466,7 +466,7 @@ async fn scenario_one(pool: PgPool) -> anyhow::Result<()> { let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; - let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000).await?; + let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000); let coverage_points = CoveragePoints::aggregate_points(&pool, heartbeats, &speedtest_avgs, end).await?; @@ -559,7 +559,7 @@ async fn scenario_two(pool: PgPool) -> anyhow::Result<()> { let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; - let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000).await?; + let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000); let coverage_points = CoveragePoints::aggregate_points(&pool, heartbeats, &speedtest_avgs, end).await?; @@ -793,7 +793,7 @@ async fn scenario_three(pool: PgPool) -> anyhow::Result<()> { let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; - let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000).await?; + let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000); let coverage_points = CoveragePoints::aggregate_points(&pool, heartbeats, &speedtest_avgs, end).await?; @@ -858,7 +858,7 @@ async fn scenario_four(pool: PgPool) -> anyhow::Result<()> { let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; - let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000).await?; + let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000); let coverage_points = CoveragePoints::aggregate_points(&pool, heartbeats, &speedtest_avgs, end).await?; @@ -950,7 +950,7 @@ async fn scenario_five(pool: PgPool) -> anyhow::Result<()> { let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; - let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000).await?; + let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000); let coverage_points = CoveragePoints::aggregate_points(&pool, heartbeats, &speedtest_avgs, end).await?; @@ -1190,7 +1190,7 @@ async fn scenario_six(pool: PgPool) -> anyhow::Result<()> { let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; - let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000).await?; + let heartbeats = HeartbeatReward::validated(&pool, &reward_period, 1000); let coverage_points = CoveragePoints::aggregate_points(&pool, heartbeats, &speedtest_avgs, end).await?;