From 79925fad2114b6d680dea79045519aa3a9d64518 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Sun, 7 Jan 2024 19:49:51 +0000 Subject: [PATCH] refactor and address review comments --- iot_verifier/src/reward_share.rs | 320 +++++++++++++--------- iot_verifier/src/rewarder.rs | 30 +- iot_verifier/tests/rewarder_operations.rs | 5 +- 3 files changed, 206 insertions(+), 149 deletions(-) diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index 35b8d9cff..5186fa990 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -219,25 +219,16 @@ impl RewardShares { } } +pub type GatewayRewardShares = HashMap; + #[derive(Default)] pub struct GatewayShares { - pub shares: HashMap, - pub beacon_rewards_per_share: Decimal, - pub witness_rewards_per_share: Decimal, - pub dc_transfer_rewards_per_share: Decimal, - pub total_rewards_for_poc_and_dc: Decimal, + pub shares: GatewayRewardShares, } impl GatewayShares { - pub async fn aggregate( - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, - ) -> Result { - let mut shares = Self::default(); - // get all the shares, poc and dc - shares.aggregate_poc_shares(db, reward_period).await?; - shares.aggregate_dc_shares(db, reward_period).await?; - Ok(shares) + pub fn new(shares: GatewayRewardShares) -> anyhow::Result { + Ok(Self { shares }) } pub async fn clear_rewarded_shares( @@ -257,64 +248,54 @@ impl GatewayShares { .map(|_| ()) } - pub fn total_shares(&self) -> (Decimal, Decimal, Decimal) { - self.shares.iter().fold( - (Decimal::ZERO, Decimal::ZERO, Decimal::ZERO), - |(beacon_sum, witness_sum, dc_sum), (_, reward_shares)| { + pub fn into_iot_reward_shares( + self, + reward_period: &'_ Range>, + beacon_rewards_per_share: Decimal, + witness_rewards_per_share: Decimal, + dc_transfer_rewards_per_share: Decimal, + ) -> impl Iterator + '_ { + self.shares + .into_iter() + .map(move |(hotspot_key, reward_shares)| { + let beacon_amount = + compute_rewards(beacon_rewards_per_share, reward_shares.beacon_shares); + let witness_amount = + compute_rewards(witness_rewards_per_share, reward_shares.witness_shares); + let dc_transfer_amount = + compute_rewards(dc_transfer_rewards_per_share, reward_shares.dc_shares); + proto::GatewayReward { + hotspot_key: hotspot_key.into(), + beacon_amount, + witness_amount, + dc_transfer_amount, + } + }) + .filter(|reward_share| { + reward_share.beacon_amount > 0 + || reward_share.witness_amount > 0 + || reward_share.dc_transfer_amount > 0 + }) + .map(|gateway_reward| { + let total_gateway_reward = gateway_reward.dc_transfer_amount + + gateway_reward.beacon_amount + + gateway_reward.witness_amount; ( - beacon_sum + reward_shares.beacon_shares, - witness_sum + reward_shares.witness_shares, - dc_sum + reward_shares.dc_shares, + total_gateway_reward, + proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::GatewayReward(gateway_reward)), + }, ) - }, - ) - } - - async fn aggregate_poc_shares( - &mut self, - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, - ) -> Result<(), sqlx::Error> { - let mut rows = sqlx::query_as::<_, GatewayPocShare>( - "select * from gateway_shares where reward_timestamp > $1 and reward_timestamp <= $2", - ) - .bind(reward_period.start) - .bind(reward_period.end) - .fetch(db); - while let Some(gateway_share) = rows.try_next().await? { - self.shares - .entry(gateway_share.hotspot_key.clone()) - .or_default() - .add_poc_reward(&gateway_share) - } - Ok(()) - } - - async fn aggregate_dc_shares( - &mut self, - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, - ) -> Result<(), sqlx::Error> { - let mut rows = sqlx::query_as::<_, GatewayDCShare>( - "select hotspot_key, reward_timestamp, num_dcs::numeric, id from gateway_dc_shares where reward_timestamp > $1 and reward_timestamp <= $2", - ) - .bind(reward_period.start) - .bind(reward_period.end) - .fetch(db); - while let Some(gateway_share) = rows.try_next().await? { - self.shares - .entry(gateway_share.hotspot_key.clone()) - .or_default() - .add_dc_reward(&gateway_share) - } - Ok(()) + }) } - pub fn calculate_rewards_per_share_and_total_usage( - &mut self, + pub async fn calculate_rewards_per_share( + &self, reward_period: &'_ Range>, iot_price: Decimal, - ) { + ) -> anyhow::Result<(Decimal, Decimal, Decimal)> { // the total number of shares for beacons, witnesses and data transfer // dc shares here is the sum of all spent data transfer DC this epoch let (total_beacon_shares, total_witness_shares, total_dc_shares) = self.total_shares(); @@ -354,51 +335,24 @@ impl GatewayShares { %dc_transfer_rewards_per_share, "data transfer rewards" ); - self.beacon_rewards_per_share = beacon_rewards_per_share; - self.witness_rewards_per_share = witness_rewards_per_share; - self.dc_transfer_rewards_per_share = dc_transfer_rewards_per_share; - self.total_rewards_for_poc_and_dc = - total_beacon_rewards + total_witness_rewards + total_dc_transfer_rewards_capped; + Ok(( + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + )) } - pub fn into_iot_reward_shares( - self, - reward_period: &'_ Range>, - ) -> impl Iterator + '_ { - self.shares - .into_iter() - .map(move |(hotspot_key, reward_shares)| { - let beacon_amount = - compute_rewards(self.beacon_rewards_per_share, reward_shares.beacon_shares); - let witness_amount = - compute_rewards(self.witness_rewards_per_share, reward_shares.witness_shares); - let dc_transfer_amount = - compute_rewards(self.dc_transfer_rewards_per_share, reward_shares.dc_shares); - proto::GatewayReward { - hotspot_key: hotspot_key.into(), - beacon_amount, - witness_amount, - dc_transfer_amount, - } - }) - .filter(|reward_share| { - reward_share.beacon_amount > 0 - || reward_share.witness_amount > 0 - || reward_share.dc_transfer_amount > 0 - }) - .map(|gateway_reward| { - let total_gateway_reward = gateway_reward.dc_transfer_amount - + gateway_reward.beacon_amount - + gateway_reward.witness_amount; + pub fn total_shares(&self) -> (Decimal, Decimal, Decimal) { + self.shares.iter().fold( + (Decimal::ZERO, Decimal::ZERO, Decimal::ZERO), + |(beacon_sum, witness_sum, dc_sum), (_, reward_shares)| { ( - total_gateway_reward, - proto::IotRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::GatewayReward(gateway_reward)), - }, + beacon_sum + reward_shares.beacon_shares, + witness_sum + reward_shares.witness_shares, + dc_sum + reward_shares.dc_shares, ) - }) + }, + ) } } @@ -459,9 +413,62 @@ fn compute_rewards(rewards_per_share: Decimal, shares: Decimal) -> u64 { .unwrap_or(0) } +pub async fn aggregate_reward_shares( + db: impl sqlx::PgExecutor<'_> + Copy, + reward_period: &Range>, +) -> Result { + let mut shares = GatewayRewardShares::default(); + aggregate_poc_shares(&mut shares, db, reward_period).await?; + aggregate_dc_shares(&mut shares, db, reward_period).await?; + Ok(shares) +} + +async fn aggregate_poc_shares( + // &mut self, + shares: &mut GatewayRewardShares, + db: impl sqlx::PgExecutor<'_> + Copy, + reward_period: &Range>, +) -> Result<(), sqlx::Error> { + let mut rows = sqlx::query_as::<_, GatewayPocShare>( + "select * from gateway_shares where reward_timestamp > $1 and reward_timestamp <= $2", + ) + .bind(reward_period.start) + .bind(reward_period.end) + .fetch(db); + while let Some(gateway_share) = rows.try_next().await? { + shares + .entry(gateway_share.hotspot_key.clone()) + .or_default() + .add_poc_reward(&gateway_share) + } + Ok(()) +} + +async fn aggregate_dc_shares( + // &mut self, + shares: &mut GatewayRewardShares, + db: impl sqlx::PgExecutor<'_> + Copy, + reward_period: &Range>, +) -> Result<(), sqlx::Error> { + let mut rows = sqlx::query_as::<_, GatewayDCShare>( + "select hotspot_key, reward_timestamp, num_dcs::numeric, id from gateway_dc_shares where reward_timestamp > $1 and reward_timestamp <= $2", + ) + .bind(reward_period.start) + .bind(reward_period.end) + .fetch(db); + while let Some(gateway_share) = rows.try_next().await? { + shares + .entry(gateway_share.hotspot_key.clone()) + .or_default() + .add_dc_reward(&gateway_share) + } + Ok(()) +} + #[cfg(test)] mod test { use super::*; + use crate::reward_share; fn reward_shares_in_dec( beacon_shares: Decimal, @@ -492,12 +499,12 @@ mod test { ); } - #[test] + #[tokio::test] // test reward distribution where there is a fixed dc spend per gateway // with the total dc spend across all gateways being significantly lower than the // total epoch dc rewards amount // this results in a significant redistribution of dc rewards to POC - fn test_reward_share_calculation_fixed_dc_spend_with_transfer_distribution() { + async fn test_reward_share_calculation_fixed_dc_spend_with_transfer_distribution() { let iot_price = dec!(359); let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -568,15 +575,30 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let mut gw_shares = GatewayShares { - shares, - ..Default::default() - }; + let gw_shares = GatewayShares::new(shares).unwrap(); + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gw_shares + .calculate_rewards_per_share(&reward_period, iot_price) + .await + .unwrap(); + + let (total_beacon_rewards, total_witness_rewards) = reward_share::get_scheduled_poc_tokens( + reward_period.end - reward_period.start, + dec!(0.0), + ); + let total_dc_rewards = + reward_share::get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + let mut rewards: HashMap = HashMap::new(); - gw_shares.calculate_rewards_per_share_and_total_usage(&reward_period, iot_price); - let total_rewards_for_poc_and_dc = gw_shares.total_rewards_for_poc_and_dc; let mut allocated_gateway_rewards = 0_u64; - for (reward_amount, reward) in gw_shares.into_iot_reward_shares(&reward_period) { + for (reward_amount, reward) in gw_shares.into_iot_reward_shares( + &reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { let gateway_reward_total = gateway_reward.beacon_amount + gateway_reward.witness_amount @@ -678,13 +700,13 @@ mod test { // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount // due to going from decimal to u64 let unallocated_poc_reward_amount = - total_rewards_for_poc_and_dc - Decimal::from(allocated_gateway_rewards); + total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 6); } - #[test] + #[tokio::test] // test reward distribution where there is zero transfer of dc rewards to poc - fn test_reward_share_calculation_without_data_transfer_distribution() { + async fn test_reward_share_calculation_without_data_transfer_distribution() { let iot_price = dec!(359); let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -759,15 +781,27 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let mut gw_shares = GatewayShares { - shares, - ..Default::default() - }; + let gw_shares = GatewayShares::new(shares).unwrap(); + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gw_shares + .calculate_rewards_per_share(&reward_period, iot_price) + .await + .unwrap(); + + let (total_beacon_rewards, total_witness_rewards) = + get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); + let total_dc_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + let mut rewards: HashMap = HashMap::new(); - gw_shares.calculate_rewards_per_share_and_total_usage(&reward_period, iot_price); - let total_rewards_for_poc_and_dc = gw_shares.total_rewards_for_poc_and_dc; let mut allocated_gateway_rewards = 0_u64; - for (reward_amount, reward) in gw_shares.into_iot_reward_shares(&reward_period) { + for (reward_amount, reward) in gw_shares.into_iot_reward_shares( + &reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { let gateway_reward_total = gateway_reward.beacon_amount + gateway_reward.witness_amount @@ -861,13 +895,13 @@ mod test { // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount // due to going from decimal to u64 let unallocated_poc_reward_amount = - total_rewards_for_poc_and_dc - Decimal::from(allocated_gateway_rewards); + total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 8); } - #[test] + #[tokio::test] // test reward distribution where there is transfer of dc rewards to poc - fn test_reward_share_calculation_with_data_transfer_distribution() { + async fn test_reward_share_calculation_with_data_transfer_distribution() { let iot_price = dec!(359); let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -934,15 +968,27 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let mut gw_shares = GatewayShares { - shares, - ..Default::default() - }; + let gw_shares = GatewayShares::new(shares).unwrap(); + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gw_shares + .calculate_rewards_per_share(&reward_period, iot_price) + .await + .unwrap(); + + let (total_beacon_rewards, total_witness_rewards) = + get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); + let total_dc_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + let mut rewards: HashMap = HashMap::new(); - gw_shares.calculate_rewards_per_share_and_total_usage(&reward_period, iot_price); - let total_rewards_for_poc_and_dc = gw_shares.total_rewards_for_poc_and_dc; let mut allocated_gateway_rewards = 0_u64; - for (reward_amount, reward) in gw_shares.into_iot_reward_shares(&reward_period) { + for (reward_amount, reward) in gw_shares.into_iot_reward_shares( + &reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { let gateway_reward_total = gateway_reward.beacon_amount + gateway_reward.witness_amount @@ -1034,7 +1080,7 @@ mod test { // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount // due to going from decimal to u64 let unallocated_poc_reward_amount = - total_rewards_for_poc_and_dc - Decimal::from(allocated_gateway_rewards); + total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 7); } diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index 3d0e95927..1db4be290 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -13,6 +13,7 @@ use helium_proto::RewardManifest; use price::PriceTracker; use reward_scheduler::Scheduler; use rust_decimal::prelude::*; +use rust_decimal_macros::dec; use sqlx::{PgExecutor, PgPool, Pool, Postgres}; use std::ops::Range; use task_manager::ManagedTask; @@ -209,17 +210,28 @@ pub async fn reward_poc_and_dc( reward_period: &Range>, iot_price: Decimal, ) -> anyhow::Result<()> { - // aggregate the poc and dc data per gateway - let mut gateway_reward_shares = GatewayShares::aggregate(pool, reward_period).await?; + let reward_shares = reward_share::aggregate_reward_shares(pool, reward_period).await?; + let gateway_shares = GatewayShares::new(reward_shares)?; + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gateway_shares + .calculate_rewards_per_share(reward_period, iot_price) + .await?; - // work out rewards per share and sum up total usage for poc and dc - gateway_reward_shares.calculate_rewards_per_share_and_total_usage(reward_period, iot_price); - let total_gateway_reward_allocation = gateway_reward_shares.total_rewards_for_poc_and_dc; + // get the total poc and dc rewards for the period + let (total_beacon_rewards, total_witness_rewards) = + reward_share::get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); + let total_dc_rewards = + reward_share::get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; let mut allocated_gateway_rewards = 0_u64; - for (gateway_reward_amount, reward_share) in - gateway_reward_shares.into_iot_reward_shares(reward_period) - { + for (gateway_reward_amount, reward_share) in gateway_shares.into_iot_reward_shares( + reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { rewards_sink .write(reward_share, []) .await? @@ -228,7 +240,7 @@ pub async fn reward_poc_and_dc( allocated_gateway_rewards += gateway_reward_amount; } // write out any unallocated poc reward - let unallocated_poc_reward_amount = (total_gateway_reward_allocation + let unallocated_poc_reward_amount = (total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards)) .round_dp_with_strategy(0, RoundingStrategy::ToZero) .to_u64() diff --git a/iot_verifier/tests/rewarder_operations.rs b/iot_verifier/tests/rewarder_operations.rs index 5acc2a82d..041158def 100644 --- a/iot_verifier/tests/rewarder_operations.rs +++ b/iot_verifier/tests/rewarder_operations.rs @@ -3,10 +3,9 @@ use chrono::{Duration as ChronoDuration, Utc}; use iot_verifier::{reward_share, rewarder}; use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; use rust_decimal_macros::dec; -use sqlx::PgPool; -#[sqlx::test] -async fn test_operations(_pool: PgPool) -> anyhow::Result<()> { +#[tokio::test] +async fn test_operations() -> anyhow::Result<()> { let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); let now = Utc::now(); let epoch = (now - ChronoDuration::hours(24))..now;