From 6791644802e2ed6abfcb48c88a9fcacd5276fad9 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Fri, 2 Feb 2024 11:02:12 -0500 Subject: [PATCH] Fix speedtest average calculation while processing speedtest file and output speedtest average used in reward calculation (#725) * Change speedtest average to only include past 48 hours when calculating average * remove unneeded function * Output speedtest averages during reward calculation --- mobile_verifier/src/cli/server.rs | 3 +- mobile_verifier/src/reward_shares.rs | 32 +++--- mobile_verifier/src/rewarder.rs | 19 +++- mobile_verifier/src/speedtests.rs | 22 ++-- mobile_verifier/src/speedtests_average.rs | 118 ++++++-------------- mobile_verifier/tests/common/mod.rs | 30 +++++- mobile_verifier/tests/modeled_coverage.rs | 36 +++---- mobile_verifier/tests/rewarder_poc_dc.rs | 9 +- mobile_verifier/tests/speedtests.rs | 124 ++++++++++++++++++++++ 9 files changed, 264 insertions(+), 129 deletions(-) create mode 100644 mobile_verifier/tests/speedtests.rs diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index f78e93124..bd3ba34fa 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -155,7 +155,7 @@ impl Cmd { pool.clone(), gateway_client, speedtests, - speedtests_avg, + speedtests_avg.clone(), speedtests_validity, ); @@ -217,6 +217,7 @@ impl Cmd { mobile_rewards, reward_manifests, price_tracker, + speedtests_avg, ); // subscriber location diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index 57c336284..f9d53e447 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -1262,16 +1262,16 @@ mod test { acceptable_speedtest(gw11.clone(), timestamp), ]; - let gw1_average = SpeedtestAverage::from(&gw1_speedtests); - let gw2_average = SpeedtestAverage::from(&gw2_speedtests); - let gw3_average = SpeedtestAverage::from(&gw3_speedtests); - let gw4_average = SpeedtestAverage::from(&gw4_speedtests); - let gw5_average = SpeedtestAverage::from(&gw5_speedtests); - let gw6_average = SpeedtestAverage::from(&gw6_speedtests); - let gw7_average = SpeedtestAverage::from(&gw7_speedtests); - let gw9_average = SpeedtestAverage::from(&gw9_speedtests); - let gw10_average = SpeedtestAverage::from(&gw10_speedtests); - let gw11_average = SpeedtestAverage::from(&gw11_speedtests); + let gw1_average = SpeedtestAverage::from(gw1_speedtests); + let gw2_average = SpeedtestAverage::from(gw2_speedtests); + let gw3_average = SpeedtestAverage::from(gw3_speedtests); + let gw4_average = SpeedtestAverage::from(gw4_speedtests); + let gw5_average = SpeedtestAverage::from(gw5_speedtests); + let gw6_average = SpeedtestAverage::from(gw6_speedtests); + let gw7_average = SpeedtestAverage::from(gw7_speedtests); + let gw9_average = SpeedtestAverage::from(gw9_speedtests); + let gw10_average = SpeedtestAverage::from(gw10_speedtests); + let gw11_average = SpeedtestAverage::from(gw11_speedtests); let mut averages = HashMap::new(); averages.insert(gw1.clone(), gw1_average); averages.insert(gw2.clone(), gw2_average); @@ -1442,8 +1442,8 @@ mod test { acceptable_speedtest(gw2.clone(), timestamp), ]; - let gw1_average = SpeedtestAverage::from(&gw1_speedtests); - let gw2_average = SpeedtestAverage::from(&gw2_speedtests); + let gw1_average = SpeedtestAverage::from(gw1_speedtests); + let gw2_average = SpeedtestAverage::from(gw2_speedtests); let mut averages = HashMap::new(); averages.insert(gw1.clone(), gw1_average); averages.insert(gw2.clone(), gw2_average); @@ -1569,8 +1569,8 @@ mod test { acceptable_speedtest(gw2.clone(), timestamp), ]; - let gw1_average = SpeedtestAverage::from(&gw1_speedtests); - let gw2_average = SpeedtestAverage::from(&gw2_speedtests); + let gw1_average = SpeedtestAverage::from(gw1_speedtests); + let gw2_average = SpeedtestAverage::from(gw2_speedtests); let mut averages = HashMap::new(); averages.insert(gw1.clone(), gw1_average); averages.insert(gw2.clone(), gw2_average); @@ -1696,8 +1696,8 @@ mod test { acceptable_speedtest(gw2.clone(), timestamp), ]; - let gw1_average = SpeedtestAverage::from(&gw1_speedtests); - let gw2_average = SpeedtestAverage::from(&gw2_speedtests); + let gw1_average = SpeedtestAverage::from(gw1_speedtests); + let gw2_average = SpeedtestAverage::from(gw2_speedtests); let mut averages = HashMap::new(); averages.insert(gw1.clone(), gw1_average); averages.insert(gw2.clone(), gw2_average); diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index b3e980d29..9b381ed8b 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -37,6 +37,7 @@ pub struct Rewarder { pub mobile_rewards: FileSinkClient, reward_manifests: FileSinkClient, price_tracker: PriceTracker, + speedtest_averages: FileSinkClient, } impl Rewarder @@ -52,6 +53,7 @@ where mobile_rewards: FileSinkClient, reward_manifests: FileSinkClient, price_tracker: PriceTracker, + speedtest_averages: FileSinkClient, ) -> Self { Self { pool, @@ -61,6 +63,7 @@ where mobile_rewards, reward_manifests, price_tracker, + speedtest_averages, } } @@ -179,6 +182,7 @@ where reward_poc_and_dc( &self.pool, &self.mobile_rewards, + &self.speedtest_averages, reward_period, mobile_bone_price, ) @@ -200,6 +204,7 @@ where // process rewards for oracles reward_oracles(&self.mobile_rewards, reward_period).await?; + self.speedtest_averages.commit().await?; let written_files = self.mobile_rewards.commit().await?.await??; let mut transaction = self.pool.begin().await?; @@ -254,6 +259,7 @@ where pub async fn reward_poc_and_dc( pool: &Pool, mobile_rewards: &FileSinkClient, + speedtest_avg_sink: &FileSinkClient, reward_period: &Range>, mobile_bone_price: Decimal, ) -> anyhow::Result<()> { @@ -271,7 +277,14 @@ pub async fn reward_poc_and_dc( }; telemetry::data_transfer_rewards_scale(scale); - reward_poc(pool, mobile_rewards, reward_period, transfer_rewards_sum).await?; + reward_poc( + pool, + mobile_rewards, + speedtest_avg_sink, + reward_period, + transfer_rewards_sum, + ) + .await?; reward_dc(mobile_rewards, reward_period, transfer_rewards).await?; @@ -281,6 +294,7 @@ pub async fn reward_poc_and_dc( async fn reward_poc( pool: &Pool, mobile_rewards: &FileSinkClient, + speedtest_avg_sink: &FileSinkClient, reward_period: &Range>, transfer_reward_sum: Decimal, ) -> anyhow::Result<()> { @@ -291,6 +305,9 @@ async fn reward_poc( let heartbeats = HeartbeatReward::validated(pool, reward_period); let speedtest_averages = SpeedtestAverages::aggregate_epoch_averages(reward_period.end, pool).await?; + + speedtest_averages.write_all(speedtest_avg_sink).await?; + let coverage_points = CoveragePoints::aggregate_points(pool, heartbeats, &speedtest_averages, reward_period.end) .await?; diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 94af0acc1..f4bdb1355 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -92,7 +92,7 @@ where Ok(()) } - async fn process_file( + pub async fn process_file( &self, file: FileInfoStream, ) -> anyhow::Result<()> { @@ -105,13 +105,12 @@ where save_speedtest(&speedtest_report.report, &mut transaction).await?; let latest_speedtests = get_latest_speedtests_for_pubkey( &speedtest_report.report.pubkey, + speedtest_report.report.timestamp, &mut transaction, ) .await?; - let average = SpeedtestAverage::from(&latest_speedtests); - average - .write(&self.speedtest_avg_file_sink, latest_speedtests) - .await?; + let average = SpeedtestAverage::from(latest_speedtests); + average.write(&self.speedtest_avg_file_sink).await?; } // write out paper trail of speedtest validity self.write_verified_speedtest(speedtest_report, result) @@ -200,12 +199,23 @@ pub async fn save_speedtest( pub async fn get_latest_speedtests_for_pubkey( pubkey: &PublicKeyBinary, + timestamp: DateTime, exec: &mut Transaction<'_, Postgres>, ) -> Result, sqlx::Error> { let speedtests = sqlx::query_as::<_, Speedtest>( - "SELECT * FROM speedtests where pubkey = $1 order by timestamp desc limit $2", + r#" + SELECT * + FROM speedtests + WHERE pubkey = $1 + AND timestamp >= $2 + AND timestamp <= $3 + ORDER BY timestamp DESC + LIMIT $4 + "#, ) .bind(pubkey) + .bind(timestamp - Duration::hours(SPEEDTEST_LAPSE)) + .bind(timestamp) .bind(SPEEDTEST_AVG_MAX_DATA_POINTS as i64) .fetch_all(exec) .await?; diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs index 0ef236ab7..0c37c21dc 100644 --- a/mobile_verifier/src/speedtests_average.rs +++ b/mobile_verifier/src/speedtests_average.rs @@ -1,5 +1,5 @@ use crate::speedtests::{self, Speedtest}; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use file_store::{ file_sink::FileSinkClient, traits::{MsgTimestamp, TimestampEncode}, @@ -27,22 +27,18 @@ pub struct SpeedtestAverage { pub latency_avg_ms: u32, pub validity: proto::SpeedtestAvgValidity, pub reward_multiplier: Decimal, + pub speedtests: Vec, } -impl<'a, I: ?Sized> From<&'a I> for SpeedtestAverage -where - &'a I: IntoIterator, -{ - fn from(iter: &'a I) -> Self { +impl From> for SpeedtestAverage { + fn from(speedtests: Vec) -> Self { let mut id = vec![]; // eww! let mut window_size = 0; let mut sum_upload = 0; let mut sum_download = 0; let mut sum_latency = 0; - for Speedtest { report, .. } in - speedtests_without_lapsed(iter.into_iter(), Duration::hours(SPEEDTEST_LAPSE)) - { + for Speedtest { report, .. } in speedtests.iter() { id = report.pubkey.as_ref().to_vec(); // eww! sum_upload += report.upload_speed; sum_download += report.download_speed; @@ -75,6 +71,7 @@ where latency_avg_ms, validity, reward_multiplier, + speedtests, } } else { SpeedtestAverage { @@ -85,17 +82,14 @@ where latency_avg_ms: sum_latency, validity: proto::SpeedtestAvgValidity::TooFewSamples, reward_multiplier: Decimal::ZERO, + speedtests, } } } } impl SpeedtestAverage { - pub async fn write( - &self, - filesink: &FileSinkClient, - speedtests: Vec, - ) -> file_store::Result { + pub async fn write(&self, filesink: &FileSinkClient) -> file_store::Result { filesink .write( proto::SpeedtestAvg { @@ -104,17 +98,16 @@ impl SpeedtestAverage { download_speed_avg_bps: self.download_speed_avg_bps, latency_avg_ms: self.latency_avg_ms, timestamp: Utc::now().encode_timestamp(), - speedtests: speedtests_without_lapsed( - speedtests.iter(), - Duration::hours(SPEEDTEST_LAPSE), - ) - .map(|st| proto::Speedtest { - timestamp: st.report.timestamp(), - upload_speed_bps: st.report.upload_speed, - download_speed_bps: st.report.download_speed, - latency_ms: st.report.latency, - }) - .collect(), + speedtests: self + .speedtests + .iter() + .map(|st| proto::Speedtest { + timestamp: st.report.timestamp(), + upload_speed_bps: st.report.upload_speed, + download_speed_bps: st.report.download_speed, + latency_ms: st.report.latency, + }) + .collect(), validity: self.validity as i32, reward_multiplier: self.reward_multiplier.try_into().unwrap(), }, @@ -204,6 +197,14 @@ pub struct SpeedtestAverages { } impl SpeedtestAverages { + pub async fn write_all(&self, sink: &FileSinkClient) -> anyhow::Result<()> { + for speedtest in self.averages.values() { + speedtest.write(sink).await?; + } + + Ok(()) + } + pub fn get_average(&self, pub_key: &PublicKeyBinary) -> Option { self.averages.get(pub_key).cloned() } @@ -216,7 +217,7 @@ impl SpeedtestAverages { .await? .into_iter() .map(|(pub_key, speedtests)| { - let average = SpeedtestAverage::from(&speedtests); + let average = SpeedtestAverage::from(speedtests); (pub_key, average) }) .collect(); @@ -261,20 +262,6 @@ pub fn validity( proto::SpeedtestAvgValidity::Valid } -fn speedtests_without_lapsed<'a>( - iterable: impl Iterator, - lapse_cliff: Duration, -) -> impl Iterator { - let mut last_timestamp = None; - iterable.take_while(move |speedtest| match last_timestamp { - Some(ts) if ts - speedtest.report.timestamp > lapse_cliff => false, - None | Some(_) => { - last_timestamp = Some(speedtest.report.timestamp); - true - } - }) -} - const fn mbps(mbps: u64) -> u64 { mbps * 125000 } @@ -317,11 +304,11 @@ mod test { fn check_known_valid() { let speedtests = known_speedtests(); assert_ne!( - SpeedtestAverage::from(&speedtests[0..5]).tier(), + SpeedtestAverage::from(speedtests[0..5].to_vec()).tier(), SpeedtestTier::Acceptable, ); assert_eq!( - SpeedtestAverage::from(&speedtests[0..6]).tier(), + SpeedtestAverage::from(speedtests[0..6].to_vec()).tier(), SpeedtestTier::Acceptable ); } @@ -330,15 +317,15 @@ mod test { fn check_minimum_known_valid() { let speedtests = known_speedtests(); assert_ne!( - SpeedtestAverage::from(&speedtests[4..4]).tier(), + SpeedtestAverage::from(speedtests[4..4].to_vec()).tier(), SpeedtestTier::Acceptable ); assert_eq!( - SpeedtestAverage::from(&speedtests[4..=5]).tier(), + SpeedtestAverage::from(speedtests[4..=5].to_vec()).tier(), SpeedtestTier::Acceptable ); assert_eq!( - SpeedtestAverage::from(&speedtests[4..=6]).tier(), + SpeedtestAverage::from(speedtests[4..=6].to_vec()).tier(), SpeedtestTier::Acceptable ); } @@ -347,7 +334,7 @@ mod test { fn check_minimum_known_invalid() { let speedtests = known_speedtests(); assert_ne!( - SpeedtestAverage::from(&speedtests[5..6]).tier(), + SpeedtestAverage::from(speedtests[5..6].to_vec()).tier(), SpeedtestTier::Acceptable ); } @@ -421,45 +408,6 @@ mod test { ] } - #[test] - fn check_speedtest_without_lapsed() { - let speedtest_cutoff = Duration::hours(10); - let contiguos_speedtests = known_speedtests(); - let contiguous_speedtests = - speedtests_without_lapsed(contiguos_speedtests.iter(), speedtest_cutoff); - let pubkey: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" - .parse() - .expect("failed owner parse"); - let disjoint_speedtests = vec![ - default_cellspeedtest( - pubkey.clone(), - parse_dt("2022-08-02 6:00:00 +0000"), - bytes_per_s(20), - bytes_per_s(150), - 70, - ), - default_cellspeedtest( - pubkey.clone(), - parse_dt("2022-08-01 18:00:00 +0000"), - bytes_per_s(10), - bytes_per_s(118), - 50, - ), - default_cellspeedtest( - pubkey, - parse_dt("2022-08-01 12:00:00 +0000"), - bytes_per_s(30), - bytes_per_s(112), - 40, - ), - ]; - let disjoint_speedtests = - speedtests_without_lapsed(disjoint_speedtests.iter(), speedtest_cutoff); - - assert_eq!(contiguous_speedtests.count(), 8); - assert_eq!(disjoint_speedtests.count(), 1); - } - fn default_cellspeedtest( pubkey: PublicKeyBinary, timestamp: DateTime, diff --git a/mobile_verifier/tests/common/mod.rs b/mobile_verifier/tests/common/mod.rs index a81dfa10f..b52f6307a 100644 --- a/mobile_verifier/tests/common/mod.rs +++ b/mobile_verifier/tests/common/mod.rs @@ -2,7 +2,7 @@ use file_store::file_sink::{FileSinkClient, Message as SinkMessage}; use helium_proto::{ services::poc_mobile::{ mobile_reward_share::Reward as MobileReward, GatewayReward, MobileRewardShare, RadioReward, - ServiceProviderReward, SubscriberReward, UnallocatedReward, + ServiceProviderReward, SpeedtestAvg, SubscriberReward, UnallocatedReward, }, Message, }; @@ -37,12 +37,40 @@ impl MockFileSinkReceiver { } } + pub async fn get_all(&mut self) -> Vec> { + let mut buf = Vec::new(); + while let Ok(SinkMessage::Data(on_write_tx, msg)) = self.receiver.try_recv() { + let _ = on_write_tx.send(Ok(())); + buf.push(msg); + } + buf + } + pub fn assert_no_messages(&mut self) { let Err(TryRecvError::Empty) = self.receiver.try_recv() else { panic!("receiver should have been empty") }; } + pub async fn receive_speedtest_avg(&mut self) -> SpeedtestAvg { + match self.receive().await { + Some(bytes) => { + SpeedtestAvg::decode(bytes.as_slice()).expect("Not a valid speedtest average") + } + None => panic!("failed to receive speedtest average"), + } + } + + pub async fn get_all_speedtest_avgs(&mut self) -> Vec { + self.get_all() + .await + .into_iter() + .map(|bytes| { + SpeedtestAvg::decode(bytes.as_slice()).expect("Not a valid speedtest average") + }) + .collect() + } + pub async fn receive_radio_reward(&mut self) -> RadioReward { match self.receive().await { Some(bytes) => { diff --git a/mobile_verifier/tests/modeled_coverage.rs b/mobile_verifier/tests/modeled_coverage.rs index 72f191885..281439853 100644 --- a/mobile_verifier/tests/modeled_coverage.rs +++ b/mobile_verifier/tests/modeled_coverage.rs @@ -468,7 +468,7 @@ async fn scenario_one(pool: PgPool) -> anyhow::Result<()> { acceptable_speedtest(owner.clone(), end), ]; let mut averages = HashMap::new(); - averages.insert(owner.clone(), SpeedtestAverage::from(&owner_speedtests)); + averages.insert(owner.clone(), SpeedtestAverage::from(owner_speedtests)); let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; @@ -560,8 +560,8 @@ async fn scenario_two(pool: PgPool) -> anyhow::Result<()> { acceptable_speedtest(owner_2.clone(), end), ]; let mut averages = HashMap::new(); - averages.insert(owner_1.clone(), SpeedtestAverage::from(&speedtests_1)); - averages.insert(owner_2.clone(), SpeedtestAverage::from(&speedtests_2)); + averages.insert(owner_1.clone(), SpeedtestAverage::from(speedtests_1)); + averages.insert(owner_2.clone(), SpeedtestAverage::from(speedtests_2)); let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; @@ -790,12 +790,12 @@ async fn scenario_three(pool: PgPool) -> anyhow::Result<()> { acceptable_speedtest(owner_6.clone(), end), ]; let mut averages = HashMap::new(); - averages.insert(owner_1.clone(), SpeedtestAverage::from(&speedtests_1)); - averages.insert(owner_2.clone(), SpeedtestAverage::from(&speedtests_2)); - averages.insert(owner_3.clone(), SpeedtestAverage::from(&speedtests_3)); - averages.insert(owner_4.clone(), SpeedtestAverage::from(&speedtests_4)); - averages.insert(owner_5.clone(), SpeedtestAverage::from(&speedtests_5)); - averages.insert(owner_6.clone(), SpeedtestAverage::from(&speedtests_6)); + averages.insert(owner_1.clone(), SpeedtestAverage::from(speedtests_1)); + averages.insert(owner_2.clone(), SpeedtestAverage::from(speedtests_2)); + averages.insert(owner_3.clone(), SpeedtestAverage::from(speedtests_3)); + averages.insert(owner_4.clone(), SpeedtestAverage::from(speedtests_4)); + averages.insert(owner_5.clone(), SpeedtestAverage::from(speedtests_5)); + averages.insert(owner_6.clone(), SpeedtestAverage::from(speedtests_6)); let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; @@ -860,7 +860,7 @@ async fn scenario_four(pool: PgPool) -> anyhow::Result<()> { acceptable_speedtest(owner.clone(), end), ]; let mut averages = HashMap::new(); - averages.insert(owner.clone(), SpeedtestAverage::from(&owner_speedtests)); + averages.insert(owner.clone(), SpeedtestAverage::from(owner_speedtests)); let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; @@ -951,8 +951,8 @@ async fn scenario_five(pool: PgPool) -> anyhow::Result<()> { acceptable_speedtest(owner_2.clone(), end), ]; let mut averages = HashMap::new(); - averages.insert(owner_1.clone(), SpeedtestAverage::from(&speedtests_1)); - averages.insert(owner_2.clone(), SpeedtestAverage::from(&speedtests_2)); + averages.insert(owner_1.clone(), SpeedtestAverage::from(speedtests_1)); + averages.insert(owner_2.clone(), SpeedtestAverage::from(speedtests_2)); let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; @@ -1187,12 +1187,12 @@ async fn scenario_six(pool: PgPool) -> anyhow::Result<()> { acceptable_speedtest(owner_6.clone(), end), ]; let mut averages = HashMap::new(); - averages.insert(owner_1.clone(), SpeedtestAverage::from(&speedtests_1)); - averages.insert(owner_2.clone(), SpeedtestAverage::from(&speedtests_2)); - averages.insert(owner_3.clone(), SpeedtestAverage::from(&speedtests_3)); - averages.insert(owner_4.clone(), SpeedtestAverage::from(&speedtests_4)); - averages.insert(owner_5.clone(), SpeedtestAverage::from(&speedtests_5)); - averages.insert(owner_6.clone(), SpeedtestAverage::from(&speedtests_6)); + averages.insert(owner_1.clone(), SpeedtestAverage::from(speedtests_1)); + averages.insert(owner_2.clone(), SpeedtestAverage::from(speedtests_2)); + averages.insert(owner_3.clone(), SpeedtestAverage::from(speedtests_3)); + averages.insert(owner_4.clone(), SpeedtestAverage::from(speedtests_4)); + averages.insert(owner_5.clone(), SpeedtestAverage::from(speedtests_5)); + averages.insert(owner_6.clone(), SpeedtestAverage::from(speedtests_6)); let speedtest_avgs = SpeedtestAverages { averages }; let reward_period = start..end; diff --git a/mobile_verifier/tests/rewarder_poc_dc.rs b/mobile_verifier/tests/rewarder_poc_dc.rs index bf1c37e84..b26dc19c3 100644 --- a/mobile_verifier/tests/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/rewarder_poc_dc.rs @@ -30,6 +30,7 @@ const PAYER_1: &str = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"; #[sqlx::test] async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + let (speedtest_avg_client, _speedtest_avg_server) = common::create_file_sink(); let now = Utc::now(); let epoch = (now - ChronoDuration::hours(24))..now; @@ -42,7 +43,13 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { let (_, rewards) = tokio::join!( // run rewards for poc and dc - rewarder::reward_poc_and_dc(&pool, &mobile_rewards_client, &epoch, dec!(0.0001)), + rewarder::reward_poc_and_dc( + &pool, + &mobile_rewards_client, + &speedtest_avg_client, + &epoch, + dec!(0.0001) + ), receive_expected_rewards(&mut mobile_rewards) ); if let Ok((poc_rewards, dc_rewards, unallocated_poc_reward)) = rewards { diff --git a/mobile_verifier/tests/speedtests.rs b/mobile_verifier/tests/speedtests.rs new file mode 100644 index 000000000..75e35b264 --- /dev/null +++ b/mobile_verifier/tests/speedtests.rs @@ -0,0 +1,124 @@ +mod common; +use chrono::{DateTime, NaiveDateTime, Utc}; +use file_store::{ + file_info_poller::FileInfoStream, + speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, + FileInfo, +}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::SpeedtestAvgValidity; +use mobile_config::{ + client::gateway_client::GatewayInfoResolver, + gateway_info::{DeviceType, GatewayInfo, GatewayInfoStream}, +}; +use mobile_verifier::speedtests::SpeedtestDaemon; +use sqlx::{Pool, Postgres}; + +#[derive(thiserror::Error, Debug)] +enum MockError {} + +#[derive(Clone)] +struct MockGatewayInfoResolver {} + +#[async_trait::async_trait] +impl GatewayInfoResolver for MockGatewayInfoResolver { + type Error = MockError; + + async fn resolve_gateway_info( + &self, + address: &PublicKeyBinary, + ) -> Result, Self::Error> { + Ok(Some(GatewayInfo { + address: address.clone(), + metadata: None, + device_type: DeviceType::Cbrs, + })) + } + + async fn stream_gateways_info(&mut self) -> Result { + todo!() + } +} + +#[sqlx::test] +async fn speedtests_average_should_only_include_last_48_hours( + pool: Pool, +) -> anyhow::Result<()> { + let (_tx, rx) = tokio::sync::mpsc::channel(2); + let gateway_info_resolver = MockGatewayInfoResolver {}; + let (speedtest_avg_client, mut speedtest_avg_receiver) = common::create_file_sink(); + let (verified_client, _verified_receiver) = common::create_file_sink(); + + let daemon = SpeedtestDaemon::new( + pool, + gateway_info_resolver, + rx, + speedtest_avg_client, + verified_client, + ); + + let hotspot: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + + let stream = file_info_stream(vec![ + speedtest(&hotspot, "2024-01-01 01:00:00", 0, 101, 11), + speedtest(&hotspot, "2024-01-02 01:00:00", 0, 99, 9), + speedtest(&hotspot, "2024-01-03 01:00:00", 0, 101, 11), + speedtest(&hotspot, "2024-01-04 01:00:00", 10, 100, 10), + speedtest(&hotspot, "2024-01-05 01:00:00", 10, 100, 10), + speedtest(&hotspot, "2024-01-06 01:00:00", 10, 100, 10), + ]); + + assert!(daemon.process_file(stream).await.is_ok()); + + let avgs = speedtest_avg_receiver.get_all_speedtest_avgs().await; + + assert_eq!(6, avgs.len()); + assert_eq!(SpeedtestAvgValidity::TooFewSamples, avgs[0].validity()); + assert_eq!(1.0, avgs[5].reward_multiplier); + + Ok(()) +} + +fn file_info_stream( + speedtests: Vec, +) -> FileInfoStream { + let file_info = FileInfo { + key: "key".to_string(), + prefix: "prefix".to_string(), + timestamp: Utc::now(), + size: 0, + }; + + FileInfoStream::new("default".to_string(), file_info, speedtests) +} + +fn speedtest( + pubkey: &PublicKeyBinary, + ts: &str, + u: u64, + d: u64, + l: u32, +) -> CellSpeedtestIngestReport { + CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: pubkey.clone(), + serial: "".to_string(), + timestamp: parse_dt(ts), + upload_speed: mbps(u), + download_speed: mbps(d), + latency: l, + }, + } +} + +fn parse_dt(dt: &str) -> DateTime { + NaiveDateTime::parse_from_str(dt, "%Y-%m-%d %H:%M:%S") + .expect("unable_to_parse") + .and_utc() +} + +fn mbps(mbps: u64) -> u64 { + mbps * 125000 +}