diff --git a/Cargo.lock b/Cargo.lock index 657b6ed44..94967cd0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#ed455afe59b5700fe9e1cbcffe0122133a0bd306" +source = "git+https://github.com/helium/proto?branch=master#a7c24d010911a9de4ce0c16eaef6b6eb2d306975" dependencies = [ "base64 0.21.0", "byteorder", @@ -1207,7 +1207,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.6", + "sha2 0.9.9", "thiserror", ] @@ -3047,7 +3047,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#ed455afe59b5700fe9e1cbcffe0122133a0bd306" +source = "git+https://github.com/helium/proto?branch=master#a7c24d010911a9de4ce0c16eaef6b6eb2d306975" dependencies = [ "bytes", "prost", @@ -8329,7 +8329,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.6", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 1dc6eea54..9a474a0b8 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -536,7 +536,7 @@ impl FileSink { } } -fn file_name(path_buf: &Path) -> Result { +pub fn file_name(path_buf: &Path) -> Result { path_buf .file_name() .map(|os_str| os_str.to_string_lossy().to_string()) diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index 9eeb7a041..5186fa990 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -18,13 +18,15 @@ const DEFAULT_PREC: u32 = 15; lazy_static! { // TODO: year 1 emissions allocate 30% of total to PoC with 6% to beacons and 24% to witnesses but subsequent years back // total PoC percentage off 1.5% each year; determine how beacons and witnesses will split the subsequent years' allocations - static ref REWARDS_PER_DAY: Decimal = (Decimal::from(32_500_000_000_u64) / Decimal::from(366)) * Decimal::from(1_000_000); // 88_797_814_207_650.273224043715847 + pub static ref REWARDS_PER_DAY: Decimal = (Decimal::from(32_500_000_000_u64) / Decimal::from(366)) * Decimal::from(1_000_000); // 88_797_814_207_650.273224043715847 static ref BEACON_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.06); static ref WITNESS_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.24); // Data transfer is allocated 50% of daily rewards static ref DATA_TRANSFER_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.50); // Operations fund is allocated 7% of daily rewards static ref OPERATIONS_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.07); + // Oracles fund is allocated 7% of daily rewards + static ref ORACLES_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.07); // dc remainer distributed at ration of 4:1 in favour of witnesses // ie WITNESS_REWARDS_PER_DAY_PERCENT:BEACON_REWARDS_PER_DAY_PERCENT static ref WITNESS_DC_REMAINER_PERCENT: Decimal = dec!(0.80); @@ -32,13 +34,13 @@ lazy_static! { static ref DC_USD_PRICE: Decimal = dec!(0.00001); } -fn get_tokens_by_duration(tokens: Decimal, duration: Duration) -> Decimal { +pub fn get_tokens_by_duration(tokens: Decimal, duration: Duration) -> Decimal { ((tokens / Decimal::from(Duration::hours(24).num_seconds())) * Decimal::from(duration.num_seconds())) .round_dp_with_strategy(DEFAULT_PREC, RoundingStrategy::MidpointNearestEven) } -fn get_scheduled_poc_tokens( +pub fn get_scheduled_poc_tokens( duration: Duration, dc_transfer_remainder: Decimal, ) -> (Decimal, Decimal) { @@ -52,21 +54,25 @@ fn get_scheduled_poc_tokens( ) } -fn get_scheduled_dc_tokens(duration: Duration) -> Decimal { +pub fn get_scheduled_dc_tokens(duration: Duration) -> Decimal { get_tokens_by_duration( *REWARDS_PER_DAY * *DATA_TRANSFER_REWARDS_PER_DAY_PERCENT, duration, ) } -fn get_scheduled_ops_fund_tokens(duration: Duration) -> u64 { +pub fn get_scheduled_ops_fund_tokens(duration: Duration) -> Decimal { get_tokens_by_duration( *REWARDS_PER_DAY * *OPERATIONS_REWARDS_PER_DAY_PERCENT, duration, ) - .round_dp_with_strategy(0, RoundingStrategy::ToZero) - .to_u64() - .unwrap_or(0) +} + +pub fn get_scheduled_oracle_tokens(duration: Duration) -> Decimal { + get_tokens_by_duration( + *REWARDS_PER_DAY * *ORACLES_REWARDS_PER_DAY_PERCENT, + duration, + ) } #[derive(sqlx::FromRow)] @@ -213,21 +219,16 @@ impl RewardShares { } } +pub type GatewayRewardShares = HashMap; + #[derive(Default)] pub struct GatewayShares { - pub shares: HashMap, + pub shares: GatewayRewardShares, } impl GatewayShares { - pub async fn aggregate( - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, - ) -> Result { - let mut shares = Self::default(); - // get all the shares, poc and dc - shares.aggregate_poc_shares(db, reward_period).await?; - shares.aggregate_dc_shares(db, reward_period).await?; - Ok(shares) + pub fn new(shares: GatewayRewardShares) -> anyhow::Result { + Ok(Self { shares }) } pub async fn clear_rewarded_shares( @@ -247,67 +248,58 @@ impl GatewayShares { .map(|_| ()) } - pub fn total_shares(&self) -> (Decimal, Decimal, Decimal) { - self.shares.iter().fold( - (Decimal::ZERO, Decimal::ZERO, Decimal::ZERO), - |(beacon_sum, witness_sum, dc_sum), (_, reward_shares)| { + pub fn into_iot_reward_shares( + self, + reward_period: &'_ Range>, + beacon_rewards_per_share: Decimal, + witness_rewards_per_share: Decimal, + dc_transfer_rewards_per_share: Decimal, + ) -> impl Iterator + '_ { + self.shares + .into_iter() + .map(move |(hotspot_key, reward_shares)| { + let beacon_amount = + compute_rewards(beacon_rewards_per_share, reward_shares.beacon_shares); + let witness_amount = + compute_rewards(witness_rewards_per_share, reward_shares.witness_shares); + let dc_transfer_amount = + compute_rewards(dc_transfer_rewards_per_share, reward_shares.dc_shares); + proto::GatewayReward { + hotspot_key: hotspot_key.into(), + beacon_amount, + witness_amount, + dc_transfer_amount, + } + }) + .filter(|reward_share| { + reward_share.beacon_amount > 0 + || reward_share.witness_amount > 0 + || reward_share.dc_transfer_amount > 0 + }) + .map(|gateway_reward| { + let total_gateway_reward = gateway_reward.dc_transfer_amount + + gateway_reward.beacon_amount + + gateway_reward.witness_amount; ( - beacon_sum + reward_shares.beacon_shares, - witness_sum + reward_shares.witness_shares, - dc_sum + reward_shares.dc_shares, + total_gateway_reward, + proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::GatewayReward(gateway_reward)), + }, ) - }, - ) - } - - async fn aggregate_poc_shares( - &mut self, - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, - ) -> Result<(), sqlx::Error> { - let mut rows = sqlx::query_as::<_, GatewayPocShare>( - "select * from gateway_shares where reward_timestamp > $1 and reward_timestamp <= $2", - ) - .bind(reward_period.start) - .bind(reward_period.end) - .fetch(db); - while let Some(gateway_share) = rows.try_next().await? { - self.shares - .entry(gateway_share.hotspot_key.clone()) - .or_default() - .add_poc_reward(&gateway_share) - } - Ok(()) - } - - async fn aggregate_dc_shares( - &mut self, - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, - ) -> Result<(), sqlx::Error> { - let mut rows = sqlx::query_as::<_, GatewayDCShare>( - "select hotspot_key, reward_timestamp, num_dcs::numeric, id from gateway_dc_shares where reward_timestamp > $1 and reward_timestamp <= $2", - ) - .bind(reward_period.start) - .bind(reward_period.end) - .fetch(db); - while let Some(gateway_share) = rows.try_next().await? { - self.shares - .entry(gateway_share.hotspot_key.clone()) - .or_default() - .add_dc_reward(&gateway_share) - } - Ok(()) + }) } - pub fn into_iot_reward_shares( - self, + pub async fn calculate_rewards_per_share( + &self, reward_period: &'_ Range>, iot_price: Decimal, - ) -> impl Iterator + '_ { + ) -> anyhow::Result<(Decimal, Decimal, Decimal)> { // the total number of shares for beacons, witnesses and data transfer // dc shares here is the sum of all spent data transfer DC this epoch let (total_beacon_shares, total_witness_shares, total_dc_shares) = self.total_shares(); + // the total number of iot rewards for dc transfer this epoch let total_dc_transfer_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); @@ -329,14 +321,13 @@ impl GatewayShares { reward_period.end - reward_period.start, dc_transfer_rewards_unused, ); - // work out the rewards per share for beacons, witnesses and dc transfer let beacon_rewards_per_share = rewards_per_share(total_beacon_rewards, total_beacon_shares); let witness_rewards_per_share = rewards_per_share(total_witness_rewards, total_witness_shares); let dc_transfer_rewards_per_share = rewards_per_share(total_dc_transfer_rewards_capped, total_dc_shares); - // compute the awards per hotspot + tracing::info!( %total_dc_shares, %total_dc_transfer_rewards_used, @@ -344,48 +335,24 @@ impl GatewayShares { %dc_transfer_rewards_per_share, "data transfer rewards" ); - self.shares - .into_iter() - .map(move |(hotspot_key, reward_shares)| proto::GatewayReward { - hotspot_key: hotspot_key.into(), - beacon_amount: compute_rewards( - beacon_rewards_per_share, - reward_shares.beacon_shares, - ), - witness_amount: compute_rewards( - witness_rewards_per_share, - reward_shares.witness_shares, - ), - dc_transfer_amount: compute_rewards( - dc_transfer_rewards_per_share, - reward_shares.dc_shares, - ), - }) - .filter(|reward_share| { - reward_share.beacon_amount > 0 - || reward_share.witness_amount > 0 - || reward_share.dc_transfer_amount > 0 - }) - .map(|gateway_reward| proto::IotRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::GatewayReward(gateway_reward)), - }) + Ok(( + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + )) } -} - -pub mod operational_rewards { - use super::*; - pub fn compute(reward_period: &Range>) -> proto::IotRewardShare { - let op_fund_reward = proto::OperationalReward { - amount: get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start), - }; - proto::IotRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::OperationalReward(op_fund_reward)), - } + pub fn total_shares(&self) -> (Decimal, Decimal, Decimal) { + self.shares.iter().fold( + (Decimal::ZERO, Decimal::ZERO, Decimal::ZERO), + |(beacon_sum, witness_sum, dc_sum), (_, reward_shares)| { + ( + beacon_sum + reward_shares.beacon_shares, + witness_sum + reward_shares.witness_shares, + dc_sum + reward_shares.dc_shares, + ) + }, + ) } } @@ -446,9 +413,62 @@ fn compute_rewards(rewards_per_share: Decimal, shares: Decimal) -> u64 { .unwrap_or(0) } +pub async fn aggregate_reward_shares( + db: impl sqlx::PgExecutor<'_> + Copy, + reward_period: &Range>, +) -> Result { + let mut shares = GatewayRewardShares::default(); + aggregate_poc_shares(&mut shares, db, reward_period).await?; + aggregate_dc_shares(&mut shares, db, reward_period).await?; + Ok(shares) +} + +async fn aggregate_poc_shares( + // &mut self, + shares: &mut GatewayRewardShares, + db: impl sqlx::PgExecutor<'_> + Copy, + reward_period: &Range>, +) -> Result<(), sqlx::Error> { + let mut rows = sqlx::query_as::<_, GatewayPocShare>( + "select * from gateway_shares where reward_timestamp > $1 and reward_timestamp <= $2", + ) + .bind(reward_period.start) + .bind(reward_period.end) + .fetch(db); + while let Some(gateway_share) = rows.try_next().await? { + shares + .entry(gateway_share.hotspot_key.clone()) + .or_default() + .add_poc_reward(&gateway_share) + } + Ok(()) +} + +async fn aggregate_dc_shares( + // &mut self, + shares: &mut GatewayRewardShares, + db: impl sqlx::PgExecutor<'_> + Copy, + reward_period: &Range>, +) -> Result<(), sqlx::Error> { + let mut rows = sqlx::query_as::<_, GatewayDCShare>( + "select hotspot_key, reward_timestamp, num_dcs::numeric, id from gateway_dc_shares where reward_timestamp > $1 and reward_timestamp <= $2", + ) + .bind(reward_period.start) + .bind(reward_period.end) + .fetch(db); + while let Some(gateway_share) = rows.try_next().await? { + shares + .entry(gateway_share.hotspot_key.clone()) + .or_default() + .add_dc_reward(&gateway_share) + } + Ok(()) +} + #[cfg(test)] mod test { use super::*; + use crate::reward_share; fn reward_shares_in_dec( beacon_shares: Decimal, @@ -473,15 +493,18 @@ mod test { println!("total_tokens_for_period: {total_tokens_for_period}"); let operation_tokens_for_period = get_scheduled_ops_fund_tokens(epoch_duration); - assert_eq!(258_993_624_772, operation_tokens_for_period); + assert_eq!( + dec!(258_993_624_772.313296903460838), + operation_tokens_for_period + ); } - #[test] + #[tokio::test] // test reward distribution where there is a fixed dc spend per gateway // with the total dc spend across all gateways being significantly lower than the // total epoch dc rewards amount // this results in a significant redistribution of dc rewards to POC - fn test_reward_share_calculation_fixed_dc_spend_with_transfer_distribution() { + async fn test_reward_share_calculation_fixed_dc_spend_with_transfer_distribution() { let iot_price = dec!(359); let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -552,14 +575,37 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let gw_shares = GatewayShares { shares }; + let gw_shares = GatewayShares::new(shares).unwrap(); + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gw_shares + .calculate_rewards_per_share(&reward_period, iot_price) + .await + .unwrap(); + + let (total_beacon_rewards, total_witness_rewards) = reward_share::get_scheduled_poc_tokens( + reward_period.end - reward_period.start, + dec!(0.0), + ); + let total_dc_rewards = + reward_share::get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + let mut rewards: HashMap = HashMap::new(); - let gw_reward_shares: Vec = gw_shares - .into_iot_reward_shares(&reward_period, iot_price) - .collect(); - for reward in gw_reward_shares { + let mut allocated_gateway_rewards = 0_u64; + for (reward_amount, reward) in gw_shares.into_iot_reward_shares( + &reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { + let gateway_reward_total = gateway_reward.beacon_amount + + gateway_reward.witness_amount + + gateway_reward.dc_transfer_amount; rewards.insert(gateway_reward.hotspot_key.clone().into(), gateway_reward); + assert_eq!(reward_amount, gateway_reward_total); + allocated_gateway_rewards += reward_amount; } } @@ -649,16 +695,18 @@ mod test { let exp_sum_poc_tokens = exp_total_beacon_tokens + exp_total_witness_tokens; println!("max poc rewards: {exp_sum_poc_tokens}"); println!("total actual poc rewards distributed: {sum_poc_amounts}"); - let poc_diff = exp_sum_poc_tokens.to_i64().unwrap() - sum_poc_amounts as i64; - // the sum of rewards distributed should not exceed the epoch amount - // but due to rounding whilst going to u64 in compute_rewards, - // is permitted to be a few bones less - assert_eq!(poc_diff, 5); + + // confirm the unallocated poc reward/dc amounts + // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount + // due to going from decimal to u64 + let unallocated_poc_reward_amount = + total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); + assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 6); } - #[test] + #[tokio::test] // test reward distribution where there is zero transfer of dc rewards to poc - fn test_reward_share_calculation_without_data_transfer_distribution() { + async fn test_reward_share_calculation_without_data_transfer_distribution() { let iot_price = dec!(359); let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -733,14 +781,34 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let gw_shares = GatewayShares { shares }; + let gw_shares = GatewayShares::new(shares).unwrap(); + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gw_shares + .calculate_rewards_per_share(&reward_period, iot_price) + .await + .unwrap(); + + let (total_beacon_rewards, total_witness_rewards) = + get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); + let total_dc_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + let mut rewards: HashMap = HashMap::new(); - let gw_reward_shares: Vec = gw_shares - .into_iot_reward_shares(&reward_period, iot_price) - .collect(); - for reward in gw_reward_shares { + let mut allocated_gateway_rewards = 0_u64; + for (reward_amount, reward) in gw_shares.into_iot_reward_shares( + &reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { + let gateway_reward_total = gateway_reward.beacon_amount + + gateway_reward.witness_amount + + gateway_reward.dc_transfer_amount; rewards.insert(gateway_reward.hotspot_key.clone().into(), gateway_reward); + assert_eq!(reward_amount, gateway_reward_total); + allocated_gateway_rewards += reward_amount; } } @@ -822,16 +890,18 @@ mod test { let exp_sum_poc_tokens = exp_total_beacon_tokens + exp_total_witness_tokens; println!("max poc rewards: {exp_sum_poc_tokens}"); println!("total actual poc rewards distributed: {sum_poc_amounts}"); - let poc_diff = exp_sum_poc_tokens.to_i64().unwrap() - sum_poc_amounts as i64; - // the sum of rewards distributed should not exceed the epoch amount - // but due to rounding whilst going to u64 in compute_rewards, - // is permitted to be a few bones less - assert_eq!(poc_diff, 6); + + // confirm the unallocated poc reward/dc amounts + // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount + // due to going from decimal to u64 + let unallocated_poc_reward_amount = + total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); + assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 8); } - #[test] + #[tokio::test] // test reward distribution where there is transfer of dc rewards to poc - fn test_reward_share_calculation_with_data_transfer_distribution() { + async fn test_reward_share_calculation_with_data_transfer_distribution() { let iot_price = dec!(359); let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -898,14 +968,34 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let gw_shares = GatewayShares { shares }; + let gw_shares = GatewayShares::new(shares).unwrap(); + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gw_shares + .calculate_rewards_per_share(&reward_period, iot_price) + .await + .unwrap(); + + let (total_beacon_rewards, total_witness_rewards) = + get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); + let total_dc_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + let mut rewards: HashMap = HashMap::new(); - let gw_reward_shares: Vec = gw_shares - .into_iot_reward_shares(&reward_period, iot_price) - .collect(); - for reward in gw_reward_shares { + let mut allocated_gateway_rewards = 0_u64; + for (reward_amount, reward) in gw_shares.into_iot_reward_shares( + &reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { + let gateway_reward_total = gateway_reward.beacon_amount + + gateway_reward.witness_amount + + gateway_reward.dc_transfer_amount; rewards.insert(gateway_reward.hotspot_key.clone().into(), gateway_reward); + assert_eq!(reward_amount, gateway_reward_total); + allocated_gateway_rewards += reward_amount; } } @@ -985,11 +1075,13 @@ mod test { let exp_sum_poc_tokens = exp_total_beacon_tokens + exp_total_witness_tokens; println!("max poc rewards: {exp_sum_poc_tokens}"); println!("total actual poc rewards distributed: {sum_poc_amounts}"); - let poc_diff = exp_sum_poc_tokens.to_u64().unwrap() - sum_poc_amounts; - // the sum of rewards distributed should not exceed the epoch amount - // but due to rounding whilst going to u64 in compute_rewards, - // is permitted to be a few bones less - assert_eq!(poc_diff, 7); + + // confirm the unallocated poc reward/dc amounts + // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount + // due to going from decimal to u64 + let unallocated_poc_reward_amount = + total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); + assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 7); } #[test] diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index fcd1002ff..1db4be290 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -1,15 +1,19 @@ use crate::{ - reward_share::{operational_rewards, GatewayShares}, + reward_share::{self, GatewayShares}, telemetry, }; use chrono::{DateTime, Duration, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink, traits::TimestampEncode}; use futures::future::LocalBoxFuture; +use helium_proto::services::poc_lora as proto; +use helium_proto::services::poc_lora::iot_reward_share::Reward as ProtoReward; +use helium_proto::services::poc_lora::{UnallocatedReward, UnallocatedRewardType}; use helium_proto::RewardManifest; use price::PriceTracker; use reward_scheduler::Scheduler; use rust_decimal::prelude::*; +use rust_decimal_macros::dec; use sqlx::{PgExecutor, PgPool, Pool, Postgres}; use std::ops::Range; use task_manager::ManagedTask; @@ -107,29 +111,23 @@ impl Rewarder { scheduler: &Scheduler, iot_price: Decimal, ) -> anyhow::Result<()> { - let gateway_reward_shares = - GatewayShares::aggregate(&self.pool, &scheduler.reward_period).await?; - - for reward_share in - gateway_reward_shares.into_iot_reward_shares(&scheduler.reward_period, iot_price) - { - self.rewards_sink - .write(reward_share, []) - .await? - // Await the returned oneshot to ensure we wrote the file - .await??; - } + let reward_period = &scheduler.reward_period; - self.rewards_sink - .write(operational_rewards::compute(&scheduler.reward_period), []) - .await? - // Await the returned oneshot to ensure we wrote the file - .await??; + // process rewards for poc and dc + reward_poc_and_dc(&self.pool, &self.rewards_sink, reward_period, iot_price).await?; + // process rewards for the operational fund + reward_operational(&self.rewards_sink, reward_period).await?; + // process rewards for the oracle + reward_oracles(&self.rewards_sink, reward_period).await?; + + // commit the filesink let written_files = self.rewards_sink.commit().await?.await??; + // purge db let mut transaction = self.pool.begin().await?; // Clear gateway shares table period to end of reward period - GatewayShares::clear_rewarded_shares(&mut transaction, scheduler.reward_period.end).await?; + GatewayShares::clear_rewarded_shares(&mut transaction, scheduler.reward_period.start) + .await?; save_rewarded_timestamp( "last_rewarded_end_time", &scheduler.reward_period.end, @@ -206,6 +204,145 @@ impl Rewarder { } } +pub async fn reward_poc_and_dc( + pool: &Pool, + rewards_sink: &file_sink::FileSinkClient, + reward_period: &Range>, + iot_price: Decimal, +) -> anyhow::Result<()> { + let reward_shares = reward_share::aggregate_reward_shares(pool, reward_period).await?; + let gateway_shares = GatewayShares::new(reward_shares)?; + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gateway_shares + .calculate_rewards_per_share(reward_period, iot_price) + .await?; + + // get the total poc and dc rewards for the period + let (total_beacon_rewards, total_witness_rewards) = + reward_share::get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); + let total_dc_rewards = + reward_share::get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + + let mut allocated_gateway_rewards = 0_u64; + for (gateway_reward_amount, reward_share) in gateway_shares.into_iot_reward_shares( + reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { + rewards_sink + .write(reward_share, []) + .await? + // Await the returned oneshot to ensure we wrote the file + .await??; + allocated_gateway_rewards += gateway_reward_amount; + } + // write out any unallocated poc reward + let unallocated_poc_reward_amount = (total_poc_dc_reward_allocation + - Decimal::from(allocated_gateway_rewards)) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + write_unallocated_reward( + rewards_sink, + UnallocatedRewardType::Poc, + unallocated_poc_reward_amount, + reward_period, + ) + .await?; + Ok(()) +} + +pub async fn reward_operational( + rewards_sink: &file_sink::FileSinkClient, + reward_period: &Range>, +) -> anyhow::Result<()> { + let total_operational_rewards = + reward_share::get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start); + let allocated_operational_rewards = total_operational_rewards + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + let op_fund_reward = proto::OperationalReward { + amount: allocated_operational_rewards, + }; + rewards_sink + .write( + proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::OperationalReward(op_fund_reward)), + }, + [], + ) + .await? + .await??; + // write out any unallocated operation rewards + // which for the operational fund can only relate to rounding issue + // in practice this should always be zero as there can be a max of + // one bone lost due to rounding when going from decimal to u64 + // but we run it anyway and if it is indeed zero nothing gets + // written out anyway + let unallocated_operation_reward_amount = (total_operational_rewards + - Decimal::from(allocated_operational_rewards)) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + write_unallocated_reward( + rewards_sink, + UnallocatedRewardType::Operation, + unallocated_operation_reward_amount, + reward_period, + ) + .await?; + Ok(()) +} + +pub async fn reward_oracles( + rewards_sink: &file_sink::FileSinkClient, + reward_period: &Range>, +) -> anyhow::Result<()> { + // atm 100% of oracle rewards are assigned to 'unallocated' + let total_oracle_rewards = + reward_share::get_scheduled_oracle_tokens(reward_period.end - reward_period.start); + let allocated_oracle_rewards = 0_u64; + let unallocated_oracle_reward_amount = (total_oracle_rewards + - Decimal::from(allocated_oracle_rewards)) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + write_unallocated_reward( + rewards_sink, + UnallocatedRewardType::Oracle, + unallocated_oracle_reward_amount, + reward_period, + ) + .await?; + Ok(()) +} + +async fn write_unallocated_reward( + rewards_sink: &file_sink::FileSinkClient, + unallocated_type: UnallocatedRewardType, + unallocated_amount: u64, + reward_period: &'_ Range>, +) -> anyhow::Result<()> { + if unallocated_amount > 0 { + let unallocated_reward = proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::UnallocatedReward(UnallocatedReward { + reward_type: unallocated_type as i32, + amount: unallocated_amount, + })), + }; + rewards_sink.write(unallocated_reward, []).await?.await??; + }; + Ok(()) +} + pub async fn fetch_rewarded_timestamp( timestamp_key: &str, db: impl PgExecutor<'_>, diff --git a/iot_verifier/tests/common/mod.rs b/iot_verifier/tests/common/mod.rs index f6e36c36b..1e4558475 100644 --- a/iot_verifier/tests/common/mod.rs +++ b/iot_verifier/tests/common/mod.rs @@ -9,8 +9,9 @@ use file_store::{ use helium_crypto::PublicKeyBinary; use helium_proto::{ services::poc_lora::{ + iot_reward_share::Reward as IotReward, GatewayReward, IotRewardShare, LoraBeaconIngestReportV1, LoraInvalidBeaconReportV1, LoraInvalidWitnessReportV1, LoraPocV1, - LoraWitnessIngestReportV1, + LoraWitnessIngestReportV1, OperationalReward, UnallocatedReward, }, DataRate, Region as ProtoRegion, }; @@ -29,7 +30,8 @@ use std::{self, ops::DerefMut, str::FromStr}; use tokio::{sync::mpsc::error::TryRecvError, sync::Mutex, time::timeout}; pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { - let (tx, rx) = tokio::sync::mpsc::channel(5); + let (tx, rx) = tokio::sync::mpsc::channel(10); + ( FileSinkClient::new(tx, "metric"), MockFileSinkReceiver { receiver: rx }, @@ -42,15 +44,22 @@ pub struct MockFileSinkReceiver { #[allow(dead_code)] impl MockFileSinkReceiver { - pub async fn receive(&mut self) -> SinkMessage { + pub async fn receive(&mut self) -> Option> { match timeout(seconds(2), self.receiver.recv()).await { - Ok(Some(msg)) => msg, - Ok(None) => panic!("server closed connection while waiting for message"), - Err(_) => panic!("timeout while waiting for message"), + Ok(Some(SinkMessage::Data(on_write_tx, msg))) => { + let _ = on_write_tx.send(Ok(())); + Some(msg) + } + Ok(None) => None, + Err(e) => panic!("timeout while waiting for message1 {:?}", e), + Ok(Some(unexpected_msg)) => { + println!("ignoring unexpected msg {:?}", unexpected_msg); + None + } } } - pub fn assert_no_messages(mut self) { + pub fn assert_no_messages(&mut self) { let Err(TryRecvError::Empty) = self.receiver.try_recv() else { panic!("receiver should have been empty") }; @@ -58,26 +67,71 @@ impl MockFileSinkReceiver { pub async fn receive_valid_poc(&mut self) -> LoraPocV1 { match self.receive().await { - SinkMessage::Data(_, bytes) => { - LoraPocV1::decode(bytes.as_slice()).expect("decode beacon report") + Some(bytes) => { + LoraPocV1::decode(bytes.as_slice()).expect("failed to decode expected valid poc") } - _ => panic!("invalid beacon message"), + None => panic!("failed to receive valid poc"), } } pub async fn receive_invalid_beacon(&mut self) -> LoraInvalidBeaconReportV1 { match self.receive().await { - SinkMessage::Data(_, bytes) => LoraInvalidBeaconReportV1::decode(bytes.as_slice()) - .expect("decode invalid beacon report"), - _ => panic!("invalid beacon message"), + Some(bytes) => LoraInvalidBeaconReportV1::decode(bytes.as_slice()) + .expect("failed to decode expected invalid beacon report"), + None => panic!("failed to receive invalid beacon"), } } pub async fn receive_invalid_witness(&mut self) -> LoraInvalidWitnessReportV1 { match self.receive().await { - SinkMessage::Data(_, bytes) => LoraInvalidWitnessReportV1::decode(bytes.as_slice()) - .expect("decode invalid witness report"), - _ => panic!("invalid witness message"), + Some(bytes) => LoraInvalidWitnessReportV1::decode(bytes.as_slice()) + .expect("failed to decode expected invalid witness report"), + None => panic!("failed to receive invalid witness"), + } + } + + pub async fn receive_gateway_reward(&mut self) -> GatewayReward { + match self.receive().await { + Some(bytes) => { + let iot_reward = IotRewardShare::decode(bytes.as_slice()) + .expect("failed to decode expected gateway reward"); + println!("iot_reward: {:?}", iot_reward); + match iot_reward.reward { + Some(IotReward::GatewayReward(r)) => r, + _ => panic!("failed to get gateway reward"), + } + } + None => panic!("failed to receive gateway reward"), + } + } + + pub async fn receive_operational_reward(&mut self) -> OperationalReward { + match self.receive().await { + Some(bytes) => { + let iot_reward = IotRewardShare::decode(bytes.as_slice()) + .expect("failed to decode expected operational reward"); + println!("iot_reward: {:?}", iot_reward); + match iot_reward.reward { + Some(IotReward::OperationalReward(r)) => r, + _ => panic!("failed to get operational reward"), + } + } + None => panic!("failed to receive operational reward"), + } + } + + pub async fn receive_unallocated_reward(&mut self) -> UnallocatedReward { + match self.receive().await { + Some(bytes) => { + let iot_reward = IotRewardShare::decode(bytes.as_slice()) + .expect("failed to decode expected unallocated reward"); + println!("iot_reward: {:?}", iot_reward); + match iot_reward.reward { + Some(IotReward::UnallocatedReward(r)) => r, + _ => panic!("failed to get unallocated reward"), + } + } + None => panic!("failed to receive unallocated reward"), } } } @@ -86,6 +140,7 @@ fn seconds(s: u64) -> std::time::Duration { std::time::Duration::from_secs(s) } +#[allow(dead_code)] pub fn create_valid_beacon_report( pubkey: &str, received_timestamp: DateTime, @@ -353,6 +408,7 @@ pub const POC_DATA: [u8; 51] = [ 203, 122, 146, 49, 241, 156, 148, 74, 246, 68, 17, 8, 212, 48, 6, 152, 58, 221, 158, 186, 101, 37, 59, 135, 126, 18, 72, 244, 65, 174, ]; +#[allow(dead_code)] pub const ENTROPY_TIMESTAMP: i64 = 1677163710000; const EU868_PARAMS: &[u8] = &[ diff --git a/iot_verifier/tests/rewarder_operations.rs b/iot_verifier/tests/rewarder_operations.rs new file mode 100644 index 000000000..041158def --- /dev/null +++ b/iot_verifier/tests/rewarder_operations.rs @@ -0,0 +1,34 @@ +mod common; +use chrono::{Duration as ChronoDuration, Utc}; +use iot_verifier::{reward_share, rewarder}; +use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; +use rust_decimal_macros::dec; + +#[tokio::test] +async fn test_operations() -> anyhow::Result<()> { + let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); + let now = Utc::now(); + let epoch = (now - ChronoDuration::hours(24))..now; + tokio::select!( + _ = rewarder::reward_operational(&iot_rewards_client, &epoch) => { println!("point 1")}, + ops_reward = iot_rewards.receive_operational_reward() => + { + println!("ops reward {:?}", ops_reward); + // confirm the total rewards allocated matches expectations + let expected_total = reward_share::get_scheduled_ops_fund_tokens(epoch.end - epoch.start) + .to_u64() + .unwrap(); + assert_eq!(ops_reward.amount, 6_215_846_994_535); + assert_eq!(ops_reward.amount, expected_total); + + // confirm the ops percentage amount matches expectations + let daily_total = *reward_share::REWARDS_PER_DAY; + let ops_percent = (Decimal::from(ops_reward.amount) / daily_total).round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(ops_percent, dec!(0.07)); + + // should be no further msgs + iot_rewards.assert_no_messages(); + }, + ); + Ok(()) +} diff --git a/iot_verifier/tests/rewarder_oracles.rs b/iot_verifier/tests/rewarder_oracles.rs new file mode 100644 index 000000000..e9822ee67 --- /dev/null +++ b/iot_verifier/tests/rewarder_oracles.rs @@ -0,0 +1,36 @@ +mod common; +use chrono::{Duration as ChronoDuration, Utc}; +use iot_verifier::{reward_share, rewarder}; +use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; +use rust_decimal_macros::dec; +use sqlx::PgPool; + +#[sqlx::test] +async fn test_oracles(_pool: PgPool) -> anyhow::Result<()> { + let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); + let now = Utc::now(); + let epoch = (now - ChronoDuration::hours(24))..now; + tokio::select!( + _ = rewarder::reward_oracles(&iot_rewards_client, &epoch) => {}, + // oracles rewards are 100% unallocated atm + unallocated_oracle_reward = iot_rewards.receive_unallocated_reward() => + { + println!("unallocated oracles reward {:?}", unallocated_oracle_reward); + // confirm the total rewards matches expectations + let expected_total = reward_share::get_scheduled_oracle_tokens(epoch.end - epoch.start) + .to_u64() + .unwrap(); + assert_eq!(unallocated_oracle_reward.amount, 6_215_846_994_535); + assert_eq!(unallocated_oracle_reward.amount, expected_total); + + // confirm the ops percentage amount matches expectations + let daily_total = *reward_share::REWARDS_PER_DAY; + let oracle_percent = (Decimal::from(unallocated_oracle_reward.amount) / daily_total).round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(oracle_percent, dec!(0.07)); + + // should be no further msgs + iot_rewards.assert_no_messages(); + }, + ); + Ok(()) +} diff --git a/iot_verifier/tests/rewarder_poc_dc.rs b/iot_verifier/tests/rewarder_poc_dc.rs new file mode 100644 index 000000000..bfc450e8e --- /dev/null +++ b/iot_verifier/tests/rewarder_poc_dc.rs @@ -0,0 +1,199 @@ +mod common; +use crate::common::MockFileSinkReceiver; +use chrono::{DateTime, Duration as ChronoDuration, Utc}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_lora::{GatewayReward, UnallocatedReward, UnallocatedRewardType}; +use iot_verifier::{ + poc_report::ReportType, + reward_share::{self, GatewayDCShare, GatewayPocShare}, + rewarder, +}; +use prost::Message; +use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; +use rust_decimal_macros::dec; +use sqlx::{PgPool, Postgres, Transaction}; +use std::{self, str::FromStr}; + +const HOTSPOT_1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; +const HOTSPOT_2: &str = "11uJHS2YaEWJqgqC7yza9uvSmpv5FWoMQXiP8WbxBGgNUmifUJf"; +const HOTSPOT_3: &str = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp"; +const HOTSPOT_4: &str = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"; + +#[sqlx::test] +async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { + let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); + let now = Utc::now(); + let epoch = (now - ChronoDuration::hours(24))..now; + + // seed all the things + let mut txn = pool.clone().begin().await?; + seed_pocs(epoch.start, &mut txn).await?; + seed_dc(epoch.start, &mut txn).await?; + txn.commit().await?; + + // run rewards for poc and dc + tokio::select!( + _ = rewarder::reward_poc_and_dc(&pool, &iot_rewards_client, &epoch, dec!(0.0001)) => {}, + Ok((gateway_rewards, unallocated_poc_reward)) = receive_expected_rewards(&mut iot_rewards) => { + + // assert the gateway rewards + assert_eq!( + gateway_rewards[0].hotspot_key, + PublicKeyBinary::from_str(HOTSPOT_1).unwrap().as_ref() + ); + assert_eq!(gateway_rewards[0].beacon_amount, 1_775_956_284_153); + assert_eq!(gateway_rewards[0].witness_amount, 0); + assert_eq!(gateway_rewards[0].dc_transfer_amount, 14_799_635_701_275); + + assert_eq!( + gateway_rewards[1].hotspot_key, + PublicKeyBinary::from_str(HOTSPOT_2).unwrap().as_ref() + ); + assert_eq!(gateway_rewards[1].beacon_amount, 0); + assert_eq!(gateway_rewards[1].witness_amount, 8_524_590_163_934); + assert_eq!(gateway_rewards[1].dc_transfer_amount, 29_599_271_402_550); + // hotspot 2 should have double the dc rewards of hotspot 1 + assert_eq!( + gateway_rewards[1].dc_transfer_amount, + gateway_rewards[0].dc_transfer_amount * 2 + ); + + assert_eq!( + gateway_rewards[2].hotspot_key, + PublicKeyBinary::from_str(HOTSPOT_3).unwrap().as_ref() + ); + // hotspot 2 has double reward scale of hotspot 1 and thus double the beacon amount + assert_eq!(gateway_rewards[2].beacon_amount, 3_551_912_568_306); + assert_eq!( + gateway_rewards[2].beacon_amount, + gateway_rewards[0].beacon_amount * 2 + ); + assert_eq!(gateway_rewards[2].witness_amount, 0); + assert_eq!(gateway_rewards[2].dc_transfer_amount, 0); + + assert_eq!( + gateway_rewards[3].hotspot_key, + PublicKeyBinary::from_str(HOTSPOT_4).unwrap().as_ref() + ); + assert_eq!(gateway_rewards[3].beacon_amount, 0); + assert_eq!(gateway_rewards[3].witness_amount, 12_786_885_245_901); + assert_eq!(gateway_rewards[3].dc_transfer_amount, 0); + + // assert our unallocated reward + assert_eq!( + UnallocatedRewardType::Poc as i32, + unallocated_poc_reward.reward_type + ); + assert_eq!(1, unallocated_poc_reward.amount); + + // confirm the total rewards allocated matches expectations + let poc_sum: u64 = gateway_rewards + .iter() + .map(|r| r.beacon_amount + r.witness_amount) + .sum(); + let dc_sum: u64 = gateway_rewards.iter().map(|r| r.dc_transfer_amount).sum(); + let unallocated_sum: u64 = unallocated_poc_reward.amount; + + let expected_dc = reward_share::get_scheduled_dc_tokens(epoch.end - epoch.start); + let (expected_beacon_sum, expected_witness_sum) = + reward_share::get_scheduled_poc_tokens(epoch.end - epoch.start, expected_dc); + let expected_total = + expected_beacon_sum.to_u64().unwrap() + expected_witness_sum.to_u64().unwrap(); + assert_eq!(expected_total, poc_sum + dc_sum + unallocated_sum); + + // confirm the poc & dc percentage amount matches expectations + let daily_total = *reward_share::REWARDS_PER_DAY; + let poc_dc_percent = (Decimal::from(poc_sum + dc_sum + unallocated_sum) / daily_total).round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(poc_dc_percent, dec!(0.8)); + + } + ); + Ok(()) +} + +async fn receive_expected_rewards( + iot_rewards: &mut MockFileSinkReceiver, +) -> anyhow::Result<(Vec, UnallocatedReward)> { + // get the filestore outputs from rewards run + // we will have 3 gateway rewards and one unallocated reward + let gateway_reward1 = iot_rewards.receive_gateway_reward().await; + let gateway_reward2 = iot_rewards.receive_gateway_reward().await; + let gateway_reward3 = iot_rewards.receive_gateway_reward().await; + let gateway_reward4 = iot_rewards.receive_gateway_reward().await; + let unallocated_poc_reward = iot_rewards.receive_unallocated_reward().await; + // should be no further msgs + iot_rewards.assert_no_messages(); + + // ordering is not guaranteed, so stick the rewards into a vec and sort + let mut gateway_rewards = vec![ + gateway_reward1, + gateway_reward2, + gateway_reward3, + gateway_reward4, + ]; + gateway_rewards.sort_by(|a, b| b.hotspot_key.cmp(&a.hotspot_key)); + Ok((gateway_rewards, unallocated_poc_reward)) +} +async fn seed_pocs(ts: DateTime, txn: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { + let poc_beacon_1 = GatewayPocShare { + hotspot_key: HOTSPOT_1.to_string().parse().unwrap(), + reward_type: ReportType::Beacon, + reward_timestamp: ts + ChronoDuration::hours(1), + hex_scale: dec!(1.0), + reward_unit: dec!(1.0), + poc_id: "poc_id_1".to_string().encode_to_vec(), + }; + + let poc_witness_1 = GatewayPocShare { + hotspot_key: HOTSPOT_2.to_string().parse().unwrap(), + reward_type: ReportType::Witness, + reward_timestamp: ts + ChronoDuration::hours(1), + hex_scale: dec!(1.0), + reward_unit: dec!(1.0), + poc_id: "poc_id_1".to_string().encode_to_vec(), + }; + + let poc_beacon_2 = GatewayPocShare { + hotspot_key: HOTSPOT_3.to_string().parse().unwrap(), + reward_type: ReportType::Beacon, + reward_timestamp: ts + ChronoDuration::hours(1), + hex_scale: dec!(1.0), + reward_unit: dec!(2.0), + poc_id: "poc_id_2".to_string().encode_to_vec(), + }; + + let poc_witness_2 = GatewayPocShare { + hotspot_key: HOTSPOT_4.to_string().parse().unwrap(), + reward_type: ReportType::Witness, + reward_timestamp: ts + ChronoDuration::hours(1), + hex_scale: dec!(1.0), + reward_unit: dec!(1.5), + poc_id: "poc_id_2".to_string().encode_to_vec(), + }; + poc_beacon_1.save(txn).await?; + poc_witness_1.save(txn).await?; + + poc_beacon_2.save(txn).await?; + poc_witness_2.save(txn).await?; + Ok(()) +} + +async fn seed_dc(ts: DateTime, txn: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { + let dc_share_1 = GatewayDCShare { + hotspot_key: HOTSPOT_1.to_string().parse().unwrap(), + reward_timestamp: ts + ChronoDuration::hours(1), + num_dcs: dec!(1000), + id: "dc_id_1".to_string().encode_to_vec(), + }; + + let dc_share_2 = GatewayDCShare { + hotspot_key: HOTSPOT_2.to_string().parse().unwrap(), + reward_timestamp: ts + ChronoDuration::hours(1), + num_dcs: dec!(2000), + id: "dc_id_2".to_string().encode_to_vec(), + }; + + dc_share_1.save(txn).await?; + dc_share_2.save(txn).await?; + Ok(()) +} diff --git a/iot_verifier/tests/runner_tests.rs b/iot_verifier/tests/runner_tests.rs index 0c7334548..56a945d13 100644 --- a/iot_verifier/tests/runner_tests.rs +++ b/iot_verifier/tests/runner_tests.rs @@ -362,7 +362,7 @@ async fn invalid_beacon_gateway_not_found_no_witnesses(pool: PgPool) -> anyhow:: #[sqlx::test] async fn invalid_beacon_bad_payload(pool: PgPool) -> anyhow::Result<()> { - let ctx = TestContext::setup(pool.clone()).await?; + let mut ctx = TestContext::setup(pool.clone()).await?; // // test with an invalid beacon, no witnesses // the beacon will have an invalid payload, resulting in an error diff --git a/reward_index/migrations/7_add_service_provider_reward_type.sql b/reward_index/migrations/10_add_service_provider_reward_type.sql similarity index 100% rename from reward_index/migrations/7_add_service_provider_reward_type.sql rename to reward_index/migrations/10_add_service_provider_reward_type.sql diff --git a/reward_index/migrations/9_add_iot_unallocated_reward_type.sql b/reward_index/migrations/9_add_iot_unallocated_reward_type.sql new file mode 100644 index 000000000..755146802 --- /dev/null +++ b/reward_index/migrations/9_add_iot_unallocated_reward_type.sql @@ -0,0 +1 @@ +ALTER TYPE reward_type ADD VALUE 'iot_unallocated'; diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index b75904dd7..97bd13091 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -35,6 +35,7 @@ pub enum RewardType { MobileSubscriber, MobileServiceProvider, MobileUnallocated, + IotUnallocated, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -198,6 +199,13 @@ impl Indexer { }, r.amount, )), + Some(IotReward::UnallocatedReward(r)) => Ok(( + RewardKey { + key: self.unallocated_reward_key.clone(), + reward_type: RewardType::IotUnallocated, + }, + r.amount, + )), _ => bail!("got an invalid iot reward share"), } }