Skip to content

Commit

Permalink
Fix speedtest average calculation while processing speedtest file and…
Browse files Browse the repository at this point in the history
… output speedtest average used in reward calculation (#725)

* Change speedtest average to only include past 48 hours when calculating average

* remove unneeded function

* Output speedtest averages during reward calculation
  • Loading branch information
bbalser authored Feb 2, 2024
1 parent 5d61419 commit 6791644
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 129 deletions.
3 changes: 2 additions & 1 deletion mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl Cmd {
pool.clone(),
gateway_client,
speedtests,
speedtests_avg,
speedtests_avg.clone(),
speedtests_validity,
);

Expand Down Expand Up @@ -217,6 +217,7 @@ impl Cmd {
mobile_rewards,
reward_manifests,
price_tracker,
speedtests_avg,
);

// subscriber location
Expand Down
32 changes: 16 additions & 16 deletions mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1262,16 +1262,16 @@ mod test {
acceptable_speedtest(gw11.clone(), timestamp),
];

let gw1_average = SpeedtestAverage::from(&gw1_speedtests);
let gw2_average = SpeedtestAverage::from(&gw2_speedtests);
let gw3_average = SpeedtestAverage::from(&gw3_speedtests);
let gw4_average = SpeedtestAverage::from(&gw4_speedtests);
let gw5_average = SpeedtestAverage::from(&gw5_speedtests);
let gw6_average = SpeedtestAverage::from(&gw6_speedtests);
let gw7_average = SpeedtestAverage::from(&gw7_speedtests);
let gw9_average = SpeedtestAverage::from(&gw9_speedtests);
let gw10_average = SpeedtestAverage::from(&gw10_speedtests);
let gw11_average = SpeedtestAverage::from(&gw11_speedtests);
let gw1_average = SpeedtestAverage::from(gw1_speedtests);
let gw2_average = SpeedtestAverage::from(gw2_speedtests);
let gw3_average = SpeedtestAverage::from(gw3_speedtests);
let gw4_average = SpeedtestAverage::from(gw4_speedtests);
let gw5_average = SpeedtestAverage::from(gw5_speedtests);
let gw6_average = SpeedtestAverage::from(gw6_speedtests);
let gw7_average = SpeedtestAverage::from(gw7_speedtests);
let gw9_average = SpeedtestAverage::from(gw9_speedtests);
let gw10_average = SpeedtestAverage::from(gw10_speedtests);
let gw11_average = SpeedtestAverage::from(gw11_speedtests);
let mut averages = HashMap::new();
averages.insert(gw1.clone(), gw1_average);
averages.insert(gw2.clone(), gw2_average);
Expand Down Expand Up @@ -1442,8 +1442,8 @@ mod test {
acceptable_speedtest(gw2.clone(), timestamp),
];

let gw1_average = SpeedtestAverage::from(&gw1_speedtests);
let gw2_average = SpeedtestAverage::from(&gw2_speedtests);
let gw1_average = SpeedtestAverage::from(gw1_speedtests);
let gw2_average = SpeedtestAverage::from(gw2_speedtests);
let mut averages = HashMap::new();
averages.insert(gw1.clone(), gw1_average);
averages.insert(gw2.clone(), gw2_average);
Expand Down Expand Up @@ -1569,8 +1569,8 @@ mod test {
acceptable_speedtest(gw2.clone(), timestamp),
];

let gw1_average = SpeedtestAverage::from(&gw1_speedtests);
let gw2_average = SpeedtestAverage::from(&gw2_speedtests);
let gw1_average = SpeedtestAverage::from(gw1_speedtests);
let gw2_average = SpeedtestAverage::from(gw2_speedtests);
let mut averages = HashMap::new();
averages.insert(gw1.clone(), gw1_average);
averages.insert(gw2.clone(), gw2_average);
Expand Down Expand Up @@ -1696,8 +1696,8 @@ mod test {
acceptable_speedtest(gw2.clone(), timestamp),
];

let gw1_average = SpeedtestAverage::from(&gw1_speedtests);
let gw2_average = SpeedtestAverage::from(&gw2_speedtests);
let gw1_average = SpeedtestAverage::from(gw1_speedtests);
let gw2_average = SpeedtestAverage::from(gw2_speedtests);
let mut averages = HashMap::new();
averages.insert(gw1.clone(), gw1_average);
averages.insert(gw2.clone(), gw2_average);
Expand Down
19 changes: 18 additions & 1 deletion mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct Rewarder<A> {
pub mobile_rewards: FileSinkClient,
reward_manifests: FileSinkClient,
price_tracker: PriceTracker,
speedtest_averages: FileSinkClient,
}

impl<A> Rewarder<A>
Expand All @@ -52,6 +53,7 @@ where
mobile_rewards: FileSinkClient,
reward_manifests: FileSinkClient,
price_tracker: PriceTracker,
speedtest_averages: FileSinkClient,
) -> Self {
Self {
pool,
Expand All @@ -61,6 +63,7 @@ where
mobile_rewards,
reward_manifests,
price_tracker,
speedtest_averages,
}
}

Expand Down Expand Up @@ -179,6 +182,7 @@ where
reward_poc_and_dc(
&self.pool,
&self.mobile_rewards,
&self.speedtest_averages,
reward_period,
mobile_bone_price,
)
Expand All @@ -200,6 +204,7 @@ where
// process rewards for oracles
reward_oracles(&self.mobile_rewards, reward_period).await?;

self.speedtest_averages.commit().await?;
let written_files = self.mobile_rewards.commit().await?.await??;

let mut transaction = self.pool.begin().await?;
Expand Down Expand Up @@ -254,6 +259,7 @@ where
pub async fn reward_poc_and_dc(
pool: &Pool<Postgres>,
mobile_rewards: &FileSinkClient,
speedtest_avg_sink: &FileSinkClient,
reward_period: &Range<DateTime<Utc>>,
mobile_bone_price: Decimal,
) -> anyhow::Result<()> {
Expand All @@ -271,7 +277,14 @@ pub async fn reward_poc_and_dc(
};
telemetry::data_transfer_rewards_scale(scale);

reward_poc(pool, mobile_rewards, reward_period, transfer_rewards_sum).await?;
reward_poc(
pool,
mobile_rewards,
speedtest_avg_sink,
reward_period,
transfer_rewards_sum,
)
.await?;

reward_dc(mobile_rewards, reward_period, transfer_rewards).await?;

Expand All @@ -281,6 +294,7 @@ pub async fn reward_poc_and_dc(
async fn reward_poc(
pool: &Pool<Postgres>,
mobile_rewards: &FileSinkClient,
speedtest_avg_sink: &FileSinkClient,
reward_period: &Range<DateTime<Utc>>,
transfer_reward_sum: Decimal,
) -> anyhow::Result<()> {
Expand All @@ -291,6 +305,9 @@ async fn reward_poc(
let heartbeats = HeartbeatReward::validated(pool, reward_period);
let speedtest_averages =
SpeedtestAverages::aggregate_epoch_averages(reward_period.end, pool).await?;

speedtest_averages.write_all(speedtest_avg_sink).await?;

let coverage_points =
CoveragePoints::aggregate_points(pool, heartbeats, &speedtest_averages, reward_period.end)
.await?;
Expand Down
22 changes: 16 additions & 6 deletions mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ where
Ok(())
}

async fn process_file(
pub async fn process_file(
&self,
file: FileInfoStream<CellSpeedtestIngestReport>,
) -> anyhow::Result<()> {
Expand All @@ -105,13 +105,12 @@ where
save_speedtest(&speedtest_report.report, &mut transaction).await?;
let latest_speedtests = get_latest_speedtests_for_pubkey(
&speedtest_report.report.pubkey,
speedtest_report.report.timestamp,
&mut transaction,
)
.await?;
let average = SpeedtestAverage::from(&latest_speedtests);
average
.write(&self.speedtest_avg_file_sink, latest_speedtests)
.await?;
let average = SpeedtestAverage::from(latest_speedtests);
average.write(&self.speedtest_avg_file_sink).await?;
}
// write out paper trail of speedtest validity
self.write_verified_speedtest(speedtest_report, result)
Expand Down Expand Up @@ -200,12 +199,23 @@ pub async fn save_speedtest(

pub async fn get_latest_speedtests_for_pubkey(
pubkey: &PublicKeyBinary,
timestamp: DateTime<Utc>,
exec: &mut Transaction<'_, Postgres>,
) -> Result<Vec<Speedtest>, sqlx::Error> {
let speedtests = sqlx::query_as::<_, Speedtest>(
"SELECT * FROM speedtests where pubkey = $1 order by timestamp desc limit $2",
r#"
SELECT *
FROM speedtests
WHERE pubkey = $1
AND timestamp >= $2
AND timestamp <= $3
ORDER BY timestamp DESC
LIMIT $4
"#,
)
.bind(pubkey)
.bind(timestamp - Duration::hours(SPEEDTEST_LAPSE))
.bind(timestamp)
.bind(SPEEDTEST_AVG_MAX_DATA_POINTS as i64)
.fetch_all(exec)
.await?;
Expand Down
Loading

0 comments on commit 6791644

Please sign in to comment.