From 35a0ecf9437d7fd466194b881ee75a12866f93ba Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Thu, 30 Nov 2023 14:22:03 +0000 Subject: [PATCH 01/10] add support for unallocated rewards output --- Cargo.lock | 4 +- Cargo.toml | 4 +- iot_verifier/src/reward_share.rs | 202 +++++++++++++++++++------------ iot_verifier/src/rewarder.rs | 132 +++++++++++++++++--- 4 files changed, 242 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 657b6ed44..fbe24a461 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#ed455afe59b5700fe9e1cbcffe0122133a0bd306" +source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#e7763c24b5e4d0767d889950ab743795115b0e72" dependencies = [ "base64 0.21.0", "byteorder", @@ -3047,7 +3047,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#ed455afe59b5700fe9e1cbcffe0122133a0bd306" +source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#e7763c24b5e4d0767d889950ab743795115b0e72" dependencies = [ "bytes", "prost", diff --git a/Cargo.toml b/Cargo.toml index acc156d5a..858803922 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,14 +60,14 @@ sqlx = {version = "0", features = [ "runtime-tokio-rustls" ]} helium-crypto = {version = "0.8.1", features=["sqlx-postgres", "multisig"]} -helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]} +helium-proto = {git = "https://github.com/helium/proto", branch = "andymck/iot-unallocated-rewards", features = ["services"]} hextree = "*" solana-client = "1.14" solana-sdk = "1.14" solana-program = "1.11" spl-token = "3.5.0" reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]} -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "andymck/iot-unallocated-rewards" } humantime = "2" metrics = "0" metrics-exporter-prometheus = "0" diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index 9eeb7a041..0614aacbf 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -38,7 +38,7 @@ fn get_tokens_by_duration(tokens: Decimal, duration: Duration) -> Decimal { .round_dp_with_strategy(DEFAULT_PREC, RoundingStrategy::MidpointNearestEven) } -fn get_scheduled_poc_tokens( +pub fn get_scheduled_poc_tokens( duration: Duration, dc_transfer_remainder: Decimal, ) -> (Decimal, Decimal) { @@ -52,21 +52,18 @@ fn get_scheduled_poc_tokens( ) } -fn get_scheduled_dc_tokens(duration: Duration) -> Decimal { +pub fn get_scheduled_dc_tokens(duration: Duration) -> Decimal { get_tokens_by_duration( *REWARDS_PER_DAY * *DATA_TRANSFER_REWARDS_PER_DAY_PERCENT, duration, ) } -fn get_scheduled_ops_fund_tokens(duration: Duration) -> u64 { +pub fn get_scheduled_ops_fund_tokens(duration: Duration) -> Decimal { get_tokens_by_duration( *REWARDS_PER_DAY * *OPERATIONS_REWARDS_PER_DAY_PERCENT, duration, ) - .round_dp_with_strategy(0, RoundingStrategy::ToZero) - .to_u64() - .unwrap_or(0) } #[derive(sqlx::FromRow)] @@ -216,6 +213,10 @@ impl RewardShares { #[derive(Default)] pub struct GatewayShares { pub shares: HashMap, + pub beacon_rewards_per_share: Decimal, + pub witness_rewards_per_share: Decimal, + pub dc_transfer_rewards_per_share: Decimal, + pub total_rewards_for_poc_and_dc: Decimal, } impl GatewayShares { @@ -300,14 +301,15 @@ impl GatewayShares { Ok(()) } - pub fn into_iot_reward_shares( - self, + pub fn calculate_rewards_per_share_and_total_usage( + &mut self, reward_period: &'_ Range>, iot_price: Decimal, - ) -> impl Iterator + '_ { + ) { // the total number of shares for beacons, witnesses and data transfer // dc shares here is the sum of all spent data transfer DC this epoch let (total_beacon_shares, total_witness_shares, total_dc_shares) = self.total_shares(); + // the total number of iot rewards for dc transfer this epoch let total_dc_transfer_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); @@ -329,14 +331,13 @@ impl GatewayShares { reward_period.end - reward_period.start, dc_transfer_rewards_unused, ); - // work out the rewards per share for beacons, witnesses and dc transfer let beacon_rewards_per_share = rewards_per_share(total_beacon_rewards, total_beacon_shares); let witness_rewards_per_share = rewards_per_share(total_witness_rewards, total_witness_shares); let dc_transfer_rewards_per_share = rewards_per_share(total_dc_transfer_rewards_capped, total_dc_shares); - // compute the awards per hotspot + tracing::info!( %total_dc_shares, %total_dc_transfer_rewards_used, @@ -344,51 +345,54 @@ impl GatewayShares { %dc_transfer_rewards_per_share, "data transfer rewards" ); + self.beacon_rewards_per_share = beacon_rewards_per_share; + self.witness_rewards_per_share = witness_rewards_per_share; + self.dc_transfer_rewards_per_share = dc_transfer_rewards_per_share; + self.total_rewards_for_poc_and_dc = + total_beacon_rewards + total_witness_rewards + total_dc_transfer_rewards_capped; + } + + pub fn into_iot_reward_shares( + self, + reward_period: &'_ Range>, + ) -> impl Iterator + '_ { self.shares .into_iter() - .map(move |(hotspot_key, reward_shares)| proto::GatewayReward { - hotspot_key: hotspot_key.into(), - beacon_amount: compute_rewards( - beacon_rewards_per_share, - reward_shares.beacon_shares, - ), - witness_amount: compute_rewards( - witness_rewards_per_share, - reward_shares.witness_shares, - ), - dc_transfer_amount: compute_rewards( - dc_transfer_rewards_per_share, - reward_shares.dc_shares, - ), + .map(move |(hotspot_key, reward_shares)| { + let beacon_amount = + compute_rewards(self.beacon_rewards_per_share, reward_shares.beacon_shares); + let witness_amount = + compute_rewards(self.witness_rewards_per_share, reward_shares.witness_shares); + let dc_transfer_amount = + compute_rewards(self.dc_transfer_rewards_per_share, reward_shares.dc_shares); + proto::GatewayReward { + hotspot_key: hotspot_key.into(), + beacon_amount, + witness_amount, + dc_transfer_amount, + } }) .filter(|reward_share| { reward_share.beacon_amount > 0 || reward_share.witness_amount > 0 || reward_share.dc_transfer_amount > 0 }) - .map(|gateway_reward| proto::IotRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::GatewayReward(gateway_reward)), + .map(|gateway_reward| { + let total_gateway_reward = gateway_reward.dc_transfer_amount + + gateway_reward.beacon_amount + + gateway_reward.witness_amount; + ( + total_gateway_reward, + proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::GatewayReward(gateway_reward)), + }, + ) }) } } -pub mod operational_rewards { - use super::*; - - pub fn compute(reward_period: &Range>) -> proto::IotRewardShare { - let op_fund_reward = proto::OperationalReward { - amount: get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start), - }; - proto::IotRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::OperationalReward(op_fund_reward)), - } - } -} - /// returns the equiv iot bones value for a specified dc amount pub fn dc_to_iot_bones(dc_amount: Decimal, iot_price: Decimal) -> Decimal { // iot prices are supplied in 10^6 *per iot token* @@ -473,7 +477,10 @@ mod test { println!("total_tokens_for_period: {total_tokens_for_period}"); let operation_tokens_for_period = get_scheduled_ops_fund_tokens(epoch_duration); - assert_eq!(258_993_624_772, operation_tokens_for_period); + assert_eq!( + dec!(258_993_624_772.313296903460838), + operation_tokens_for_period + ); } #[test] @@ -552,14 +559,25 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let gw_shares = GatewayShares { shares }; + let mut gw_shares = GatewayShares { + shares, + ..Default::default() + }; let mut rewards: HashMap = HashMap::new(); - let gw_reward_shares: Vec = gw_shares - .into_iot_reward_shares(&reward_period, iot_price) - .collect(); - for reward in gw_reward_shares { + gw_shares.calculate_rewards_per_share_and_total_usage(&reward_period, iot_price); + let total_rewards_for_poc_and_dc = gw_shares.total_rewards_for_poc_and_dc; + let mut allocated_gateway_rewards = 0_u64; + for (reward_amount, reward) in gw_shares.into_iot_reward_shares(&reward_period) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { - rewards.insert(gateway_reward.hotspot_key.clone().into(), gateway_reward); + let gateway_reward_total = gateway_reward.beacon_amount + + gateway_reward.witness_amount + + gateway_reward.dc_transfer_amount; + rewards.insert( + gateway_reward.hotspot_key.clone().into(), + gateway_reward, + ); + assert_eq!(reward_amount, gateway_reward_total); + allocated_gateway_rewards += reward_amount; } } @@ -649,11 +667,13 @@ mod test { let exp_sum_poc_tokens = exp_total_beacon_tokens + exp_total_witness_tokens; println!("max poc rewards: {exp_sum_poc_tokens}"); println!("total actual poc rewards distributed: {sum_poc_amounts}"); - let poc_diff = exp_sum_poc_tokens.to_i64().unwrap() - sum_poc_amounts as i64; - // the sum of rewards distributed should not exceed the epoch amount - // but due to rounding whilst going to u64 in compute_rewards, - // is permitted to be a few bones less - assert_eq!(poc_diff, 5); + + // confirm the unallocated poc reward/dc amounts + // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount + // due to going from decimal to u64 + let unallocated_poc_reward_amount = + total_rewards_for_poc_and_dc - Decimal::from(allocated_gateway_rewards); + assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 6); } #[test] @@ -733,14 +753,25 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let gw_shares = GatewayShares { shares }; + let mut gw_shares = GatewayShares { + shares, + ..Default::default() + }; let mut rewards: HashMap = HashMap::new(); - let gw_reward_shares: Vec = gw_shares - .into_iot_reward_shares(&reward_period, iot_price) - .collect(); - for reward in gw_reward_shares { + gw_shares.calculate_rewards_per_share_and_total_usage(&reward_period, iot_price); + let total_rewards_for_poc_and_dc = gw_shares.total_rewards_for_poc_and_dc; + let mut allocated_gateway_rewards = 0_u64; + for (reward_amount, reward) in gw_shares.into_iot_reward_shares(&reward_period) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { - rewards.insert(gateway_reward.hotspot_key.clone().into(), gateway_reward); + let gateway_reward_total = gateway_reward.beacon_amount + + gateway_reward.witness_amount + + gateway_reward.dc_transfer_amount; + rewards.insert( + gateway_reward.hotspot_key.clone().into(), + gateway_reward, + ); + assert_eq!(reward_amount, gateway_reward_total); + allocated_gateway_rewards += reward_amount; } } @@ -822,11 +853,13 @@ mod test { let exp_sum_poc_tokens = exp_total_beacon_tokens + exp_total_witness_tokens; println!("max poc rewards: {exp_sum_poc_tokens}"); println!("total actual poc rewards distributed: {sum_poc_amounts}"); - let poc_diff = exp_sum_poc_tokens.to_i64().unwrap() - sum_poc_amounts as i64; - // the sum of rewards distributed should not exceed the epoch amount - // but due to rounding whilst going to u64 in compute_rewards, - // is permitted to be a few bones less - assert_eq!(poc_diff, 6); + + // confirm the unallocated poc reward/dc amounts + // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount + // due to going from decimal to u64 + let unallocated_poc_reward_amount = + total_rewards_for_poc_and_dc - Decimal::from(allocated_gateway_rewards); + assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 8); } #[test] @@ -898,14 +931,25 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let gw_shares = GatewayShares { shares }; + let mut gw_shares = GatewayShares { + shares, + ..Default::default() + }; let mut rewards: HashMap = HashMap::new(); - let gw_reward_shares: Vec = gw_shares - .into_iot_reward_shares(&reward_period, iot_price) - .collect(); - for reward in gw_reward_shares { + gw_shares.calculate_rewards_per_share_and_total_usage(&reward_period, iot_price); + let total_rewards_for_poc_and_dc = gw_shares.total_rewards_for_poc_and_dc; + let mut allocated_gateway_rewards = 0_u64; + for (reward_amount, reward) in gw_shares.into_iot_reward_shares(&reward_period) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { - rewards.insert(gateway_reward.hotspot_key.clone().into(), gateway_reward); + let gateway_reward_total = gateway_reward.beacon_amount + + gateway_reward.witness_amount + + gateway_reward.dc_transfer_amount; + rewards.insert( + gateway_reward.hotspot_key.clone().into(), + gateway_reward, + ); + assert_eq!(reward_amount, gateway_reward_total); + allocated_gateway_rewards += reward_amount; } } @@ -985,11 +1029,13 @@ mod test { let exp_sum_poc_tokens = exp_total_beacon_tokens + exp_total_witness_tokens; println!("max poc rewards: {exp_sum_poc_tokens}"); println!("total actual poc rewards distributed: {sum_poc_amounts}"); - let poc_diff = exp_sum_poc_tokens.to_u64().unwrap() - sum_poc_amounts; - // the sum of rewards distributed should not exceed the epoch amount - // but due to rounding whilst going to u64 in compute_rewards, - // is permitted to be a few bones less - assert_eq!(poc_diff, 7); + + // confirm the unallocated poc reward/dc amounts + // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount + // due to going from decimal to u64 + let unallocated_poc_reward_amount = + total_rewards_for_poc_and_dc - Decimal::from(allocated_gateway_rewards); + assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 7); } #[test] diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index fcd1002ff..0f86cf08a 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -1,11 +1,14 @@ use crate::{ - reward_share::{operational_rewards, GatewayShares}, + reward_share::{self, GatewayShares}, telemetry, }; use chrono::{DateTime, Duration, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink, traits::TimestampEncode}; use futures::future::LocalBoxFuture; +use helium_proto::services::poc_lora as proto; +use helium_proto::services::poc_lora::iot_reward_share::Reward as ProtoReward; +use helium_proto::services::poc_lora::{UnallocatedReward, UnallocatedRewardType}; use helium_proto::RewardManifest; use price::PriceTracker; use reward_scheduler::Scheduler; @@ -107,26 +110,16 @@ impl Rewarder { scheduler: &Scheduler, iot_price: Decimal, ) -> anyhow::Result<()> { - let gateway_reward_shares = - GatewayShares::aggregate(&self.pool, &scheduler.reward_period).await?; + let reward_period = &scheduler.reward_period; - for reward_share in - gateway_reward_shares.into_iot_reward_shares(&scheduler.reward_period, iot_price) - { - self.rewards_sink - .write(reward_share, []) - .await? - // Await the returned oneshot to ensure we wrote the file - .await??; - } - - self.rewards_sink - .write(operational_rewards::compute(&scheduler.reward_period), []) - .await? - // Await the returned oneshot to ensure we wrote the file - .await??; + // process rewards for poc and dc + self.reward_poc_and_dc(reward_period, iot_price).await?; + // process rewards for the operational fund + self.reward_operational(reward_period).await?; + // commit the filesink let written_files = self.rewards_sink.commit().await?.await??; + // purge db let mut transaction = self.pool.begin().await?; // Clear gateway shares table period to end of reward period GatewayShares::clear_rewarded_shares(&mut transaction, scheduler.reward_period.end).await?; @@ -161,6 +154,109 @@ impl Rewarder { Ok(()) } + async fn reward_poc_and_dc( + &self, + reward_period: &Range>, + iot_price: Decimal, + ) -> anyhow::Result<()> { + // aggregate the poc and dc data per gateway + let mut gateway_reward_shares = GatewayShares::aggregate(&self.pool, reward_period).await?; + + // work out rewards per share and sum up total usage for poc and dc + gateway_reward_shares.calculate_rewards_per_share_and_total_usage(reward_period, iot_price); + let total_gateway_reward_allocation = gateway_reward_shares.total_rewards_for_poc_and_dc; + + let mut allocated_gateway_rewards = 0_u64; + for (gateway_reward_amount, reward_share) in + gateway_reward_shares.into_iot_reward_shares(reward_period) + { + self.rewards_sink + .write(reward_share, []) + .await? + // Await the returned oneshot to ensure we wrote the file + .await??; + allocated_gateway_rewards += gateway_reward_amount; + } + // write out any unallocated poc reward + let unallocated_poc_reward_amount = (total_gateway_reward_allocation + - Decimal::from(allocated_gateway_rewards)) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + self.write_unallocated_reward( + UnallocatedRewardType::Poc, + unallocated_poc_reward_amount, + reward_period, + ) + .await?; + Ok(()) + } + + async fn reward_operational(&self, reward_period: &Range>) -> anyhow::Result<()> { + let total_operational_rewards = + reward_share::get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start); + let allocated_operational_rewards = total_operational_rewards + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + let op_fund_reward = proto::OperationalReward { + amount: allocated_operational_rewards, + }; + self.rewards_sink + .write( + proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::OperationalReward(op_fund_reward)), + }, + [], + ) + .await? + // Await the returned oneshot to ensure we wrote the file + .await??; + // write out any unallocated mapping rewards + // which for the operational fund can only relate to rounding issue + // in practice this should always be zero as there can be a max of + // one bone lost due to rounding when going from decimal to u64 + // but we run it anyway and if it is indeed zero nothing gets + // written out anyway + let unallocated_operation_reward_amount = (total_operational_rewards + - Decimal::from(allocated_operational_rewards)) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + self.write_unallocated_reward( + UnallocatedRewardType::Oracle, + unallocated_operation_reward_amount, + reward_period, + ) + .await?; + Ok(()) + } + + async fn write_unallocated_reward( + &self, + unallocated_type: UnallocatedRewardType, + unallocated_amount: u64, + reward_period: &'_ Range>, + ) -> anyhow::Result<()> { + if unallocated_amount > 0 { + let unallocated_reward = proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::UnallocatedReward(UnallocatedReward { + reward_type: unallocated_type as i32, + amount: unallocated_amount, + })), + }; + self.rewards_sink + .write(unallocated_reward, []) + .await? + .await??; + }; + Ok(()) + } + async fn data_current_check( &self, reward_period: &Range>, From 5f63dca885a51e5eb43ef09e289aa58882205e5d Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Tue, 12 Dec 2023 13:03:21 +0000 Subject: [PATCH 02/10] add iot verifier rewarder integration tests --- Cargo.lock | 1 + file_store/src/file_sink.rs | 11 +- iot_verifier/Cargo.toml | 1 + iot_verifier/src/rewarder.rs | 212 +++++++++++----------- iot_verifier/tests/common/mod.rs | 88 +++++++-- iot_verifier/tests/rewarder_operations.rs | 29 +++ iot_verifier/tests/rewarder_poc_dc.rs | 193 ++++++++++++++++++++ iot_verifier/tests/runner_tests.rs | 2 +- 8 files changed, 413 insertions(+), 124 deletions(-) create mode 100644 iot_verifier/tests/rewarder_operations.rs create mode 100644 iot_verifier/tests/rewarder_poc_dc.rs diff --git a/Cargo.lock b/Cargo.lock index fbe24a461..3448cf4a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3550,6 +3550,7 @@ dependencies = [ "sqlx", "task-manager", "thiserror", + "time", "tokio", "tokio-stream", "tokio-util", diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 1dc6eea54..f0656d732 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -169,6 +169,13 @@ pub struct FileSinkClient { metric: &'static str, } +// #[async_trait] +// pub trait FileSinkClientTrait { +// type Error; +// async fn new<'a>(sender: MessageSender, metric: &'static str) +// -> Self; +// } + const OK_LABEL: Label = Label::from_static_parts("status", "ok"); const ERROR_LABEL: Label = Label::from_static_parts("status", "error"); const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); @@ -195,7 +202,7 @@ impl FileSinkClient { .chain(std::iter::once(OK_LABEL)) .collect::>() ); - tracing::debug!("file_sink write succeeded for {:?}", self.metric); + tracing::info!("file_sink write succeeded for {:?}", self.metric); Ok(on_write_rx) } Err(SendTimeoutError::Closed(_)) => { @@ -536,7 +543,7 @@ impl FileSink { } } -fn file_name(path_buf: &Path) -> Result { +pub fn file_name(path_buf: &Path) -> Result { path_buf .file_name() .map(|os_str| os_str.to_string_lossy().to_string()) diff --git a/iot_verifier/Cargo.toml b/iot_verifier/Cargo.toml index 6bbf783c8..d5af9347a 100644 --- a/iot_verifier/Cargo.toml +++ b/iot_verifier/Cargo.toml @@ -55,3 +55,4 @@ price = { path = "../price" } tokio-util = { workspace = true } tokio-stream = { workspace = true } task-manager = { path = "../task_manager" } +time = "0.3.17" diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index 0f86cf08a..bac3a6ecf 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -113,9 +113,9 @@ impl Rewarder { let reward_period = &scheduler.reward_period; // process rewards for poc and dc - self.reward_poc_and_dc(reward_period, iot_price).await?; + reward_poc_and_dc(&self.pool, &self.rewards_sink, reward_period, iot_price).await?; // process rewards for the operational fund - self.reward_operational(reward_period).await?; + reward_operational(&self.rewards_sink, reward_period).await?; // commit the filesink let written_files = self.rewards_sink.commit().await?.await??; @@ -154,109 +154,6 @@ impl Rewarder { Ok(()) } - async fn reward_poc_and_dc( - &self, - reward_period: &Range>, - iot_price: Decimal, - ) -> anyhow::Result<()> { - // aggregate the poc and dc data per gateway - let mut gateway_reward_shares = GatewayShares::aggregate(&self.pool, reward_period).await?; - - // work out rewards per share and sum up total usage for poc and dc - gateway_reward_shares.calculate_rewards_per_share_and_total_usage(reward_period, iot_price); - let total_gateway_reward_allocation = gateway_reward_shares.total_rewards_for_poc_and_dc; - - let mut allocated_gateway_rewards = 0_u64; - for (gateway_reward_amount, reward_share) in - gateway_reward_shares.into_iot_reward_shares(reward_period) - { - self.rewards_sink - .write(reward_share, []) - .await? - // Await the returned oneshot to ensure we wrote the file - .await??; - allocated_gateway_rewards += gateway_reward_amount; - } - // write out any unallocated poc reward - let unallocated_poc_reward_amount = (total_gateway_reward_allocation - - Decimal::from(allocated_gateway_rewards)) - .round_dp_with_strategy(0, RoundingStrategy::ToZero) - .to_u64() - .unwrap_or(0); - self.write_unallocated_reward( - UnallocatedRewardType::Poc, - unallocated_poc_reward_amount, - reward_period, - ) - .await?; - Ok(()) - } - - async fn reward_operational(&self, reward_period: &Range>) -> anyhow::Result<()> { - let total_operational_rewards = - reward_share::get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start); - let allocated_operational_rewards = total_operational_rewards - .round_dp_with_strategy(0, RoundingStrategy::ToZero) - .to_u64() - .unwrap_or(0); - let op_fund_reward = proto::OperationalReward { - amount: allocated_operational_rewards, - }; - self.rewards_sink - .write( - proto::IotRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::OperationalReward(op_fund_reward)), - }, - [], - ) - .await? - // Await the returned oneshot to ensure we wrote the file - .await??; - // write out any unallocated mapping rewards - // which for the operational fund can only relate to rounding issue - // in practice this should always be zero as there can be a max of - // one bone lost due to rounding when going from decimal to u64 - // but we run it anyway and if it is indeed zero nothing gets - // written out anyway - let unallocated_operation_reward_amount = (total_operational_rewards - - Decimal::from(allocated_operational_rewards)) - .round_dp_with_strategy(0, RoundingStrategy::ToZero) - .to_u64() - .unwrap_or(0); - self.write_unallocated_reward( - UnallocatedRewardType::Oracle, - unallocated_operation_reward_amount, - reward_period, - ) - .await?; - Ok(()) - } - - async fn write_unallocated_reward( - &self, - unallocated_type: UnallocatedRewardType, - unallocated_amount: u64, - reward_period: &'_ Range>, - ) -> anyhow::Result<()> { - if unallocated_amount > 0 { - let unallocated_reward = proto::IotRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::UnallocatedReward(UnallocatedReward { - reward_type: unallocated_type as i32, - amount: unallocated_amount, - })), - }; - self.rewards_sink - .write(unallocated_reward, []) - .await? - .await??; - }; - Ok(()) - } - async fn data_current_check( &self, reward_period: &Range>, @@ -302,6 +199,111 @@ impl Rewarder { } } +pub async fn reward_poc_and_dc( + pool: &Pool, + rewards_sink: &file_sink::FileSinkClient, + reward_period: &Range>, + iot_price: Decimal, +) -> anyhow::Result<()> { + // aggregate the poc and dc data per gateway + let mut gateway_reward_shares = GatewayShares::aggregate(pool, reward_period).await?; + + // work out rewards per share and sum up total usage for poc and dc + gateway_reward_shares.calculate_rewards_per_share_and_total_usage(reward_period, iot_price); + let total_gateway_reward_allocation = gateway_reward_shares.total_rewards_for_poc_and_dc; + + let mut allocated_gateway_rewards = 0_u64; + for (gateway_reward_amount, reward_share) in + gateway_reward_shares.into_iot_reward_shares(reward_period) + { + rewards_sink + .write(reward_share, []) + .await? + // Await the returned oneshot to ensure we wrote the file + .await??; + allocated_gateway_rewards += gateway_reward_amount; + } + // write out any unallocated poc reward + let unallocated_poc_reward_amount = (total_gateway_reward_allocation + - Decimal::from(allocated_gateway_rewards)) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + write_unallocated_reward( + rewards_sink, + UnallocatedRewardType::Poc, + unallocated_poc_reward_amount, + reward_period, + ) + .await?; + Ok(()) +} + +pub async fn reward_operational( + rewards_sink: &file_sink::FileSinkClient, + reward_period: &Range>, +) -> anyhow::Result<()> { + let total_operational_rewards = + reward_share::get_scheduled_ops_fund_tokens(reward_period.end - reward_period.start); + let allocated_operational_rewards = total_operational_rewards + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + let op_fund_reward = proto::OperationalReward { + amount: allocated_operational_rewards, + }; + rewards_sink + .write( + proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::OperationalReward(op_fund_reward)), + }, + [], + ) + .await? + .await??; + // write out any unallocated mapping rewards + // which for the operational fund can only relate to rounding issue + // in practice this should always be zero as there can be a max of + // one bone lost due to rounding when going from decimal to u64 + // but we run it anyway and if it is indeed zero nothing gets + // written out anyway + let unallocated_operation_reward_amount = (total_operational_rewards + - Decimal::from(allocated_operational_rewards)) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + write_unallocated_reward( + rewards_sink, + UnallocatedRewardType::Oracle, + unallocated_operation_reward_amount, + reward_period, + ) + .await?; + Ok(()) +} + +async fn write_unallocated_reward( + rewards_sink: &file_sink::FileSinkClient, + unallocated_type: UnallocatedRewardType, + unallocated_amount: u64, + reward_period: &'_ Range>, +) -> anyhow::Result<()> { + if unallocated_amount > 0 { + let unallocated_reward = proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::UnallocatedReward(UnallocatedReward { + reward_type: unallocated_type as i32, + amount: unallocated_amount, + })), + }; + rewards_sink.write(unallocated_reward, []).await?.await??; + }; + Ok(()) +} + pub async fn fetch_rewarded_timestamp( timestamp_key: &str, db: impl PgExecutor<'_>, diff --git a/iot_verifier/tests/common/mod.rs b/iot_verifier/tests/common/mod.rs index f6e36c36b..1e4558475 100644 --- a/iot_verifier/tests/common/mod.rs +++ b/iot_verifier/tests/common/mod.rs @@ -9,8 +9,9 @@ use file_store::{ use helium_crypto::PublicKeyBinary; use helium_proto::{ services::poc_lora::{ + iot_reward_share::Reward as IotReward, GatewayReward, IotRewardShare, LoraBeaconIngestReportV1, LoraInvalidBeaconReportV1, LoraInvalidWitnessReportV1, LoraPocV1, - LoraWitnessIngestReportV1, + LoraWitnessIngestReportV1, OperationalReward, UnallocatedReward, }, DataRate, Region as ProtoRegion, }; @@ -29,7 +30,8 @@ use std::{self, ops::DerefMut, str::FromStr}; use tokio::{sync::mpsc::error::TryRecvError, sync::Mutex, time::timeout}; pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { - let (tx, rx) = tokio::sync::mpsc::channel(5); + let (tx, rx) = tokio::sync::mpsc::channel(10); + ( FileSinkClient::new(tx, "metric"), MockFileSinkReceiver { receiver: rx }, @@ -42,15 +44,22 @@ pub struct MockFileSinkReceiver { #[allow(dead_code)] impl MockFileSinkReceiver { - pub async fn receive(&mut self) -> SinkMessage { + pub async fn receive(&mut self) -> Option> { match timeout(seconds(2), self.receiver.recv()).await { - Ok(Some(msg)) => msg, - Ok(None) => panic!("server closed connection while waiting for message"), - Err(_) => panic!("timeout while waiting for message"), + Ok(Some(SinkMessage::Data(on_write_tx, msg))) => { + let _ = on_write_tx.send(Ok(())); + Some(msg) + } + Ok(None) => None, + Err(e) => panic!("timeout while waiting for message1 {:?}", e), + Ok(Some(unexpected_msg)) => { + println!("ignoring unexpected msg {:?}", unexpected_msg); + None + } } } - pub fn assert_no_messages(mut self) { + pub fn assert_no_messages(&mut self) { let Err(TryRecvError::Empty) = self.receiver.try_recv() else { panic!("receiver should have been empty") }; @@ -58,26 +67,71 @@ impl MockFileSinkReceiver { pub async fn receive_valid_poc(&mut self) -> LoraPocV1 { match self.receive().await { - SinkMessage::Data(_, bytes) => { - LoraPocV1::decode(bytes.as_slice()).expect("decode beacon report") + Some(bytes) => { + LoraPocV1::decode(bytes.as_slice()).expect("failed to decode expected valid poc") } - _ => panic!("invalid beacon message"), + None => panic!("failed to receive valid poc"), } } pub async fn receive_invalid_beacon(&mut self) -> LoraInvalidBeaconReportV1 { match self.receive().await { - SinkMessage::Data(_, bytes) => LoraInvalidBeaconReportV1::decode(bytes.as_slice()) - .expect("decode invalid beacon report"), - _ => panic!("invalid beacon message"), + Some(bytes) => LoraInvalidBeaconReportV1::decode(bytes.as_slice()) + .expect("failed to decode expected invalid beacon report"), + None => panic!("failed to receive invalid beacon"), } } pub async fn receive_invalid_witness(&mut self) -> LoraInvalidWitnessReportV1 { match self.receive().await { - SinkMessage::Data(_, bytes) => LoraInvalidWitnessReportV1::decode(bytes.as_slice()) - .expect("decode invalid witness report"), - _ => panic!("invalid witness message"), + Some(bytes) => LoraInvalidWitnessReportV1::decode(bytes.as_slice()) + .expect("failed to decode expected invalid witness report"), + None => panic!("failed to receive invalid witness"), + } + } + + pub async fn receive_gateway_reward(&mut self) -> GatewayReward { + match self.receive().await { + Some(bytes) => { + let iot_reward = IotRewardShare::decode(bytes.as_slice()) + .expect("failed to decode expected gateway reward"); + println!("iot_reward: {:?}", iot_reward); + match iot_reward.reward { + Some(IotReward::GatewayReward(r)) => r, + _ => panic!("failed to get gateway reward"), + } + } + None => panic!("failed to receive gateway reward"), + } + } + + pub async fn receive_operational_reward(&mut self) -> OperationalReward { + match self.receive().await { + Some(bytes) => { + let iot_reward = IotRewardShare::decode(bytes.as_slice()) + .expect("failed to decode expected operational reward"); + println!("iot_reward: {:?}", iot_reward); + match iot_reward.reward { + Some(IotReward::OperationalReward(r)) => r, + _ => panic!("failed to get operational reward"), + } + } + None => panic!("failed to receive operational reward"), + } + } + + pub async fn receive_unallocated_reward(&mut self) -> UnallocatedReward { + match self.receive().await { + Some(bytes) => { + let iot_reward = IotRewardShare::decode(bytes.as_slice()) + .expect("failed to decode expected unallocated reward"); + println!("iot_reward: {:?}", iot_reward); + match iot_reward.reward { + Some(IotReward::UnallocatedReward(r)) => r, + _ => panic!("failed to get unallocated reward"), + } + } + None => panic!("failed to receive unallocated reward"), } } } @@ -86,6 +140,7 @@ fn seconds(s: u64) -> std::time::Duration { std::time::Duration::from_secs(s) } +#[allow(dead_code)] pub fn create_valid_beacon_report( pubkey: &str, received_timestamp: DateTime, @@ -353,6 +408,7 @@ pub const POC_DATA: [u8; 51] = [ 203, 122, 146, 49, 241, 156, 148, 74, 246, 68, 17, 8, 212, 48, 6, 152, 58, 221, 158, 186, 101, 37, 59, 135, 126, 18, 72, 244, 65, 174, ]; +#[allow(dead_code)] pub const ENTROPY_TIMESTAMP: i64 = 1677163710000; const EU868_PARAMS: &[u8] = &[ diff --git a/iot_verifier/tests/rewarder_operations.rs b/iot_verifier/tests/rewarder_operations.rs new file mode 100644 index 000000000..bccc85586 --- /dev/null +++ b/iot_verifier/tests/rewarder_operations.rs @@ -0,0 +1,29 @@ +mod common; +use chrono::{Duration as ChronoDuration, Utc}; +use iot_verifier::{reward_share, rewarder}; +use rust_decimal::prelude::ToPrimitive; +use sqlx::PgPool; + +#[sqlx::test] +async fn test_operations(_pool: PgPool) -> anyhow::Result<()> { + let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); + let now = Utc::now(); + let epoch = (now - ChronoDuration::hours(24))..now; + tokio::select!( + _ = rewarder::reward_operational(&iot_rewards_client, &epoch) => { println!("point 1")}, + ops_reward = iot_rewards.receive_operational_reward() => + { + println!("ops reward {:?}", ops_reward); + // confirm the total rewards allocated matches expectations + let expected_total = reward_share::get_scheduled_ops_fund_tokens(epoch.end - epoch.start) + .to_u64() + .unwrap(); + assert_eq!(ops_reward.amount, 6215846994535); + assert_eq!(ops_reward.amount, expected_total); + + // should be no further msgs + iot_rewards.assert_no_messages(); + }, + ); + Ok(()) +} diff --git a/iot_verifier/tests/rewarder_poc_dc.rs b/iot_verifier/tests/rewarder_poc_dc.rs new file mode 100644 index 000000000..43ecbb30e --- /dev/null +++ b/iot_verifier/tests/rewarder_poc_dc.rs @@ -0,0 +1,193 @@ +mod common; +use crate::common::MockFileSinkReceiver; +use chrono::{DateTime, Duration as ChronoDuration, Utc}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_lora::{GatewayReward, UnallocatedReward, UnallocatedRewardType}; +use iot_verifier::{ + poc_report::ReportType, + reward_share::{self, GatewayDCShare, GatewayPocShare}, + rewarder, +}; +use prost::Message; +use rust_decimal::prelude::ToPrimitive; +use rust_decimal_macros::dec; +use sqlx::{PgPool, Postgres, Transaction}; +use std::{self, str::FromStr}; + +const HOTSPOT_1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; +const HOTSPOT_2: &str = "11uJHS2YaEWJqgqC7yza9uvSmpv5FWoMQXiP8WbxBGgNUmifUJf"; +const HOTSPOT_3: &str = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp"; +const HOTSPOT_4: &str = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"; + +#[sqlx::test] +async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { + let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); + let now = Utc::now(); + let epoch = (now - ChronoDuration::hours(24))..now; + + // seed all the things + let mut txn = pool.clone().begin().await?; + seed_pocs(epoch.start, &mut txn).await?; + seed_dc(epoch.start, &mut txn).await?; + txn.commit().await?; + + // run rewards for poc and dc + tokio::select!( + _ = rewarder::reward_poc_and_dc(&pool, &iot_rewards_client, &epoch, dec!(0.0001)) => {}, + Ok((gateway_rewards, unallocated_poc_reward)) = receive_expected_rewards(&mut iot_rewards) => { + + // assert the gateway rewards + assert_eq!( + gateway_rewards[0].hotspot_key, + PublicKeyBinary::from_str(HOTSPOT_1).unwrap().as_ref() + ); + assert_eq!(gateway_rewards[0].beacon_amount, 1775956284153); + assert_eq!(gateway_rewards[0].witness_amount, 0); + assert_eq!(gateway_rewards[0].dc_transfer_amount, 14799635701275); + + assert_eq!( + gateway_rewards[1].hotspot_key, + PublicKeyBinary::from_str(HOTSPOT_2).unwrap().as_ref() + ); + assert_eq!(gateway_rewards[1].beacon_amount, 0); + assert_eq!(gateway_rewards[1].witness_amount, 8524590163934); + assert_eq!(gateway_rewards[1].dc_transfer_amount, 29599271402550); + // hotspot 2 should have double the dc rewards of hotspot 1 + assert_eq!( + gateway_rewards[1].dc_transfer_amount, + gateway_rewards[0].dc_transfer_amount * 2 + ); + + assert_eq!( + gateway_rewards[2].hotspot_key, + PublicKeyBinary::from_str(HOTSPOT_3).unwrap().as_ref() + ); + // hotspot 2 has double reward scale of hotspot 1 and thus double the beacon amount + assert_eq!(gateway_rewards[2].beacon_amount, 3551912568306); + assert_eq!( + gateway_rewards[2].beacon_amount, + gateway_rewards[0].beacon_amount * 2 + ); + assert_eq!(gateway_rewards[2].witness_amount, 0); + assert_eq!(gateway_rewards[2].dc_transfer_amount, 0); + + assert_eq!( + gateway_rewards[3].hotspot_key, + PublicKeyBinary::from_str(HOTSPOT_4).unwrap().as_ref() + ); + assert_eq!(gateway_rewards[3].beacon_amount, 0); + assert_eq!(gateway_rewards[3].witness_amount, 12786885245901); + assert_eq!(gateway_rewards[3].dc_transfer_amount, 0); + + // assert our unallocated reward + assert_eq!( + UnallocatedRewardType::Poc as i32, + unallocated_poc_reward.reward_type + ); + assert_eq!(1, unallocated_poc_reward.amount); + + // confirm the total rewards allocated matches expectations + let poc_sum: u64 = gateway_rewards + .iter() + .map(|r| r.beacon_amount + r.witness_amount) + .sum(); + let dc_sum: u64 = gateway_rewards.iter().map(|r| r.dc_transfer_amount).sum(); + let unallocated_sum: u64 = unallocated_poc_reward.amount; + + let expected_dc = reward_share::get_scheduled_dc_tokens(epoch.end - epoch.start); + let (expected_beacon_sum, expected_witness_sum) = + reward_share::get_scheduled_poc_tokens(epoch.end - epoch.start, expected_dc); + let expected_total = + expected_beacon_sum.to_u64().unwrap() + expected_witness_sum.to_u64().unwrap(); + assert_eq!(expected_total, poc_sum + dc_sum + unallocated_sum); + } + ); + Ok(()) +} + +async fn receive_expected_rewards( + iot_rewards: &mut MockFileSinkReceiver, +) -> anyhow::Result<(Vec, UnallocatedReward)> { + // get the filestore outputs from rewards run + // we will have 3 gateway rewards and one unallocated reward + let gateway_reward1 = iot_rewards.receive_gateway_reward().await; + let gateway_reward2 = iot_rewards.receive_gateway_reward().await; + let gateway_reward3 = iot_rewards.receive_gateway_reward().await; + let gateway_reward4 = iot_rewards.receive_gateway_reward().await; + let unallocated_poc_reward = iot_rewards.receive_unallocated_reward().await; + // should be no further msgs + iot_rewards.assert_no_messages(); + + // ordering is not guaranteed, so stick the rewards into a vec and sort + let mut gateway_rewards = vec![ + gateway_reward1, + gateway_reward2, + gateway_reward3, + gateway_reward4, + ]; + gateway_rewards.sort_by(|a, b| b.hotspot_key.cmp(&a.hotspot_key)); + Ok((gateway_rewards, unallocated_poc_reward)) +} +async fn seed_pocs(ts: DateTime, txn: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { + let poc_beacon_1 = GatewayPocShare { + hotspot_key: HOTSPOT_1.to_string().parse().unwrap(), + reward_type: ReportType::Beacon, + reward_timestamp: ts + ChronoDuration::hours(1), + hex_scale: dec!(1.0), + reward_unit: dec!(1.0), + poc_id: "poc_id_1".to_string().encode_to_vec(), + }; + + let poc_witness_1 = GatewayPocShare { + hotspot_key: HOTSPOT_2.to_string().parse().unwrap(), + reward_type: ReportType::Witness, + reward_timestamp: ts + ChronoDuration::hours(1), + hex_scale: dec!(1.0), + reward_unit: dec!(1.0), + poc_id: "poc_id_1".to_string().encode_to_vec(), + }; + + let poc_beacon_2 = GatewayPocShare { + hotspot_key: HOTSPOT_3.to_string().parse().unwrap(), + reward_type: ReportType::Beacon, + reward_timestamp: ts + ChronoDuration::hours(1), + hex_scale: dec!(1.0), + reward_unit: dec!(2.0), + poc_id: "poc_id_2".to_string().encode_to_vec(), + }; + + let poc_witness_2 = GatewayPocShare { + hotspot_key: HOTSPOT_4.to_string().parse().unwrap(), + reward_type: ReportType::Witness, + reward_timestamp: ts + ChronoDuration::hours(1), + hex_scale: dec!(1.0), + reward_unit: dec!(1.5), + poc_id: "poc_id_2".to_string().encode_to_vec(), + }; + poc_beacon_1.save(txn).await?; + poc_witness_1.save(txn).await?; + + poc_beacon_2.save(txn).await?; + poc_witness_2.save(txn).await?; + Ok(()) +} + +async fn seed_dc(ts: DateTime, txn: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { + let dc_share_1 = GatewayDCShare { + hotspot_key: HOTSPOT_1.to_string().parse().unwrap(), + reward_timestamp: ts + ChronoDuration::hours(1), + num_dcs: dec!(1000), + id: "dc_id_1".to_string().encode_to_vec(), + }; + + let dc_share_2 = GatewayDCShare { + hotspot_key: HOTSPOT_2.to_string().parse().unwrap(), + reward_timestamp: ts + ChronoDuration::hours(1), + num_dcs: dec!(2000), + id: "dc_id_2".to_string().encode_to_vec(), + }; + + dc_share_1.save(txn).await?; + dc_share_2.save(txn).await?; + Ok(()) +} diff --git a/iot_verifier/tests/runner_tests.rs b/iot_verifier/tests/runner_tests.rs index 0c7334548..56a945d13 100644 --- a/iot_verifier/tests/runner_tests.rs +++ b/iot_verifier/tests/runner_tests.rs @@ -362,7 +362,7 @@ async fn invalid_beacon_gateway_not_found_no_witnesses(pool: PgPool) -> anyhow:: #[sqlx::test] async fn invalid_beacon_bad_payload(pool: PgPool) -> anyhow::Result<()> { - let ctx = TestContext::setup(pool.clone()).await?; + let mut ctx = TestContext::setup(pool.clone()).await?; // // test with an invalid beacon, no witnesses // the beacon will have an invalid payload, resulting in an error From d426b88704e66553967194c03fdde3e05a551959 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 15 Dec 2023 10:14:03 +0000 Subject: [PATCH 03/10] indexer support --- Cargo.lock | 4 ++-- iot_verifier/src/reward_share.rs | 15 +++------------ reward_index/src/indexer.rs | 8 ++++++++ reward_index/src/settings.rs | 1 + 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3448cf4a6..e373593e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#e7763c24b5e4d0767d889950ab743795115b0e72" +source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#bfb7ac55ad0d2bcb06c7d9100aecb9feeb280ad9" dependencies = [ "base64 0.21.0", "byteorder", @@ -3047,7 +3047,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#e7763c24b5e4d0767d889950ab743795115b0e72" +source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#bfb7ac55ad0d2bcb06c7d9100aecb9feeb280ad9" dependencies = [ "bytes", "prost", diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index 0614aacbf..f043710da 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -572,10 +572,7 @@ mod test { let gateway_reward_total = gateway_reward.beacon_amount + gateway_reward.witness_amount + gateway_reward.dc_transfer_amount; - rewards.insert( - gateway_reward.hotspot_key.clone().into(), - gateway_reward, - ); + rewards.insert(gateway_reward.hotspot_key.clone().into(), gateway_reward); assert_eq!(reward_amount, gateway_reward_total); allocated_gateway_rewards += reward_amount; } @@ -766,10 +763,7 @@ mod test { let gateway_reward_total = gateway_reward.beacon_amount + gateway_reward.witness_amount + gateway_reward.dc_transfer_amount; - rewards.insert( - gateway_reward.hotspot_key.clone().into(), - gateway_reward, - ); + rewards.insert(gateway_reward.hotspot_key.clone().into(), gateway_reward); assert_eq!(reward_amount, gateway_reward_total); allocated_gateway_rewards += reward_amount; } @@ -944,10 +938,7 @@ mod test { let gateway_reward_total = gateway_reward.beacon_amount + gateway_reward.witness_amount + gateway_reward.dc_transfer_amount; - rewards.insert( - gateway_reward.hotspot_key.clone().into(), - gateway_reward, - ); + rewards.insert(gateway_reward.hotspot_key.clone().into(), gateway_reward); assert_eq!(reward_amount, gateway_reward_total); allocated_gateway_rewards += reward_amount; } diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index b75904dd7..97bd13091 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -35,6 +35,7 @@ pub enum RewardType { MobileSubscriber, MobileServiceProvider, MobileUnallocated, + IotUnallocated, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -198,6 +199,13 @@ impl Indexer { }, r.amount, )), + Some(IotReward::UnallocatedReward(r)) => Ok(( + RewardKey { + key: self.unallocated_reward_key.clone(), + reward_type: RewardType::IotUnallocated, + }, + r.amount, + )), _ => bail!("got an invalid iot reward share"), } } diff --git a/reward_index/src/settings.rs b/reward_index/src/settings.rs index 790f1375b..541f1f553 100644 --- a/reward_index/src/settings.rs +++ b/reward_index/src/settings.rs @@ -39,6 +39,7 @@ pub struct Settings { pub unallocated_reward_entity_key: Option, #[serde(default = "default_start_after")] pub start_after: u64, + pub unallocated_reward_entity_key: Option, } pub fn default_start_after() -> u64 { From a1ab4860335abf38d93f66efbaeb5e7dea20ed35 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Thu, 4 Jan 2024 12:09:07 +0000 Subject: [PATCH 04/10] assert expected reward percentages, tidy up --- file_store/src/file_sink.rs | 9 +-------- iot_verifier/src/reward_share.rs | 4 ++-- iot_verifier/src/rewarder.rs | 3 ++- iot_verifier/tests/rewarder_operations.rs | 8 +++++++- iot_verifier/tests/rewarder_poc_dc.rs | 8 +++++++- 5 files changed, 19 insertions(+), 13 deletions(-) diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index f0656d732..9a474a0b8 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -169,13 +169,6 @@ pub struct FileSinkClient { metric: &'static str, } -// #[async_trait] -// pub trait FileSinkClientTrait { -// type Error; -// async fn new<'a>(sender: MessageSender, metric: &'static str) -// -> Self; -// } - const OK_LABEL: Label = Label::from_static_parts("status", "ok"); const ERROR_LABEL: Label = Label::from_static_parts("status", "error"); const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); @@ -202,7 +195,7 @@ impl FileSinkClient { .chain(std::iter::once(OK_LABEL)) .collect::>() ); - tracing::info!("file_sink write succeeded for {:?}", self.metric); + tracing::debug!("file_sink write succeeded for {:?}", self.metric); Ok(on_write_rx) } Err(SendTimeoutError::Closed(_)) => { diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index f043710da..0d40298e4 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -18,7 +18,7 @@ const DEFAULT_PREC: u32 = 15; lazy_static! { // TODO: year 1 emissions allocate 30% of total to PoC with 6% to beacons and 24% to witnesses but subsequent years back // total PoC percentage off 1.5% each year; determine how beacons and witnesses will split the subsequent years' allocations - static ref REWARDS_PER_DAY: Decimal = (Decimal::from(32_500_000_000_u64) / Decimal::from(366)) * Decimal::from(1_000_000); // 88_797_814_207_650.273224043715847 + pub static ref REWARDS_PER_DAY: Decimal = (Decimal::from(32_500_000_000_u64) / Decimal::from(366)) * Decimal::from(1_000_000); // 88_797_814_207_650.273224043715847 static ref BEACON_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.06); static ref WITNESS_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.24); // Data transfer is allocated 50% of daily rewards @@ -32,7 +32,7 @@ lazy_static! { static ref DC_USD_PRICE: Decimal = dec!(0.00001); } -fn get_tokens_by_duration(tokens: Decimal, duration: Duration) -> Decimal { +pub fn get_tokens_by_duration(tokens: Decimal, duration: Duration) -> Decimal { ((tokens / Decimal::from(Duration::hours(24).num_seconds())) * Decimal::from(duration.num_seconds())) .round_dp_with_strategy(DEFAULT_PREC, RoundingStrategy::MidpointNearestEven) diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index bac3a6ecf..09f115a49 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -122,7 +122,8 @@ impl Rewarder { // purge db let mut transaction = self.pool.begin().await?; // Clear gateway shares table period to end of reward period - GatewayShares::clear_rewarded_shares(&mut transaction, scheduler.reward_period.end).await?; + GatewayShares::clear_rewarded_shares(&mut transaction, scheduler.reward_period.start) + .await?; save_rewarded_timestamp( "last_rewarded_end_time", &scheduler.reward_period.end, diff --git a/iot_verifier/tests/rewarder_operations.rs b/iot_verifier/tests/rewarder_operations.rs index bccc85586..4a09e310c 100644 --- a/iot_verifier/tests/rewarder_operations.rs +++ b/iot_verifier/tests/rewarder_operations.rs @@ -1,7 +1,8 @@ mod common; use chrono::{Duration as ChronoDuration, Utc}; use iot_verifier::{reward_share, rewarder}; -use rust_decimal::prelude::ToPrimitive; +use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; +use rust_decimal_macros::dec; use sqlx::PgPool; #[sqlx::test] @@ -21,6 +22,11 @@ async fn test_operations(_pool: PgPool) -> anyhow::Result<()> { assert_eq!(ops_reward.amount, 6215846994535); assert_eq!(ops_reward.amount, expected_total); + // confirm the ops percentage amount matches expectations + let daily_total = *reward_share::REWARDS_PER_DAY; + let ops_percent = (Decimal::from(ops_reward.amount) / daily_total).round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(ops_percent, dec!(0.07)); + // should be no further msgs iot_rewards.assert_no_messages(); }, diff --git a/iot_verifier/tests/rewarder_poc_dc.rs b/iot_verifier/tests/rewarder_poc_dc.rs index 43ecbb30e..6634ab57a 100644 --- a/iot_verifier/tests/rewarder_poc_dc.rs +++ b/iot_verifier/tests/rewarder_poc_dc.rs @@ -9,7 +9,7 @@ use iot_verifier::{ rewarder, }; use prost::Message; -use rust_decimal::prelude::ToPrimitive; +use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; use std::{self, str::FromStr}; @@ -100,6 +100,12 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { let expected_total = expected_beacon_sum.to_u64().unwrap() + expected_witness_sum.to_u64().unwrap(); assert_eq!(expected_total, poc_sum + dc_sum + unallocated_sum); + + // confirm the poc & dc percentage amount matches expectations + let daily_total = *reward_share::REWARDS_PER_DAY; + let poc_dc_percent = (Decimal::from(poc_sum + dc_sum + unallocated_sum) / daily_total).round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(poc_dc_percent, dec!(0.8)); + } ); Ok(()) From dee99e9db26822422036783932c520d40d8d134f Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 5 Jan 2024 10:40:16 +0000 Subject: [PATCH 05/10] add oracle rewards output, misc fixes --- Cargo.lock | 4 +-- iot_verifier/src/reward_share.rs | 9 ++++++ iot_verifier/src/rewarder.rs | 30 +++++++++++++++++-- iot_verifier/tests/rewarder_operations.rs | 2 +- iot_verifier/tests/rewarder_oracles.rs | 36 +++++++++++++++++++++++ iot_verifier/tests/rewarder_poc_dc.rs | 12 ++++---- 6 files changed, 82 insertions(+), 11 deletions(-) create mode 100644 iot_verifier/tests/rewarder_oracles.rs diff --git a/Cargo.lock b/Cargo.lock index e373593e3..4d3dda024 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#bfb7ac55ad0d2bcb06c7d9100aecb9feeb280ad9" +source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#60e6f52a982da8ba00a209c2a1a34c42f9bcf730" dependencies = [ "base64 0.21.0", "byteorder", @@ -3047,7 +3047,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#bfb7ac55ad0d2bcb06c7d9100aecb9feeb280ad9" +source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#60e6f52a982da8ba00a209c2a1a34c42f9bcf730" dependencies = [ "bytes", "prost", diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index 0d40298e4..35b8d9cff 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -25,6 +25,8 @@ lazy_static! { static ref DATA_TRANSFER_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.50); // Operations fund is allocated 7% of daily rewards static ref OPERATIONS_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.07); + // Oracles fund is allocated 7% of daily rewards + static ref ORACLES_REWARDS_PER_DAY_PERCENT: Decimal = dec!(0.07); // dc remainer distributed at ration of 4:1 in favour of witnesses // ie WITNESS_REWARDS_PER_DAY_PERCENT:BEACON_REWARDS_PER_DAY_PERCENT static ref WITNESS_DC_REMAINER_PERCENT: Decimal = dec!(0.80); @@ -66,6 +68,13 @@ pub fn get_scheduled_ops_fund_tokens(duration: Duration) -> Decimal { ) } +pub fn get_scheduled_oracle_tokens(duration: Duration) -> Decimal { + get_tokens_by_duration( + *REWARDS_PER_DAY * *ORACLES_REWARDS_PER_DAY_PERCENT, + duration, + ) +} + #[derive(sqlx::FromRow)] pub struct GatewayPocShare { pub hotspot_key: PublicKeyBinary, diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index 09f115a49..3d0e95927 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -116,6 +116,9 @@ impl Rewarder { reward_poc_and_dc(&self.pool, &self.rewards_sink, reward_period, iot_price).await?; // process rewards for the operational fund reward_operational(&self.rewards_sink, reward_period).await?; + // process rewards for the oracle + reward_oracles(&self.rewards_sink, reward_period).await?; + // commit the filesink let written_files = self.rewards_sink.commit().await?.await??; @@ -264,7 +267,7 @@ pub async fn reward_operational( ) .await? .await??; - // write out any unallocated mapping rewards + // write out any unallocated operation rewards // which for the operational fund can only relate to rounding issue // in practice this should always be zero as there can be a max of // one bone lost due to rounding when going from decimal to u64 @@ -277,7 +280,7 @@ pub async fn reward_operational( .unwrap_or(0); write_unallocated_reward( rewards_sink, - UnallocatedRewardType::Oracle, + UnallocatedRewardType::Operation, unallocated_operation_reward_amount, reward_period, ) @@ -285,6 +288,29 @@ pub async fn reward_operational( Ok(()) } +pub async fn reward_oracles( + rewards_sink: &file_sink::FileSinkClient, + reward_period: &Range>, +) -> anyhow::Result<()> { + // atm 100% of oracle rewards are assigned to 'unallocated' + let total_oracle_rewards = + reward_share::get_scheduled_oracle_tokens(reward_period.end - reward_period.start); + let allocated_oracle_rewards = 0_u64; + let unallocated_oracle_reward_amount = (total_oracle_rewards + - Decimal::from(allocated_oracle_rewards)) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0); + write_unallocated_reward( + rewards_sink, + UnallocatedRewardType::Oracle, + unallocated_oracle_reward_amount, + reward_period, + ) + .await?; + Ok(()) +} + async fn write_unallocated_reward( rewards_sink: &file_sink::FileSinkClient, unallocated_type: UnallocatedRewardType, diff --git a/iot_verifier/tests/rewarder_operations.rs b/iot_verifier/tests/rewarder_operations.rs index 4a09e310c..5acc2a82d 100644 --- a/iot_verifier/tests/rewarder_operations.rs +++ b/iot_verifier/tests/rewarder_operations.rs @@ -19,7 +19,7 @@ async fn test_operations(_pool: PgPool) -> anyhow::Result<()> { let expected_total = reward_share::get_scheduled_ops_fund_tokens(epoch.end - epoch.start) .to_u64() .unwrap(); - assert_eq!(ops_reward.amount, 6215846994535); + assert_eq!(ops_reward.amount, 6_215_846_994_535); assert_eq!(ops_reward.amount, expected_total); // confirm the ops percentage amount matches expectations diff --git a/iot_verifier/tests/rewarder_oracles.rs b/iot_verifier/tests/rewarder_oracles.rs new file mode 100644 index 000000000..e9822ee67 --- /dev/null +++ b/iot_verifier/tests/rewarder_oracles.rs @@ -0,0 +1,36 @@ +mod common; +use chrono::{Duration as ChronoDuration, Utc}; +use iot_verifier::{reward_share, rewarder}; +use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; +use rust_decimal_macros::dec; +use sqlx::PgPool; + +#[sqlx::test] +async fn test_oracles(_pool: PgPool) -> anyhow::Result<()> { + let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); + let now = Utc::now(); + let epoch = (now - ChronoDuration::hours(24))..now; + tokio::select!( + _ = rewarder::reward_oracles(&iot_rewards_client, &epoch) => {}, + // oracles rewards are 100% unallocated atm + unallocated_oracle_reward = iot_rewards.receive_unallocated_reward() => + { + println!("unallocated oracles reward {:?}", unallocated_oracle_reward); + // confirm the total rewards matches expectations + let expected_total = reward_share::get_scheduled_oracle_tokens(epoch.end - epoch.start) + .to_u64() + .unwrap(); + assert_eq!(unallocated_oracle_reward.amount, 6_215_846_994_535); + assert_eq!(unallocated_oracle_reward.amount, expected_total); + + // confirm the ops percentage amount matches expectations + let daily_total = *reward_share::REWARDS_PER_DAY; + let oracle_percent = (Decimal::from(unallocated_oracle_reward.amount) / daily_total).round_dp_with_strategy(2, RoundingStrategy::MidpointNearestEven); + assert_eq!(oracle_percent, dec!(0.07)); + + // should be no further msgs + iot_rewards.assert_no_messages(); + }, + ); + Ok(()) +} diff --git a/iot_verifier/tests/rewarder_poc_dc.rs b/iot_verifier/tests/rewarder_poc_dc.rs index 6634ab57a..bfc450e8e 100644 --- a/iot_verifier/tests/rewarder_poc_dc.rs +++ b/iot_verifier/tests/rewarder_poc_dc.rs @@ -41,17 +41,17 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { gateway_rewards[0].hotspot_key, PublicKeyBinary::from_str(HOTSPOT_1).unwrap().as_ref() ); - assert_eq!(gateway_rewards[0].beacon_amount, 1775956284153); + assert_eq!(gateway_rewards[0].beacon_amount, 1_775_956_284_153); assert_eq!(gateway_rewards[0].witness_amount, 0); - assert_eq!(gateway_rewards[0].dc_transfer_amount, 14799635701275); + assert_eq!(gateway_rewards[0].dc_transfer_amount, 14_799_635_701_275); assert_eq!( gateway_rewards[1].hotspot_key, PublicKeyBinary::from_str(HOTSPOT_2).unwrap().as_ref() ); assert_eq!(gateway_rewards[1].beacon_amount, 0); - assert_eq!(gateway_rewards[1].witness_amount, 8524590163934); - assert_eq!(gateway_rewards[1].dc_transfer_amount, 29599271402550); + assert_eq!(gateway_rewards[1].witness_amount, 8_524_590_163_934); + assert_eq!(gateway_rewards[1].dc_transfer_amount, 29_599_271_402_550); // hotspot 2 should have double the dc rewards of hotspot 1 assert_eq!( gateway_rewards[1].dc_transfer_amount, @@ -63,7 +63,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { PublicKeyBinary::from_str(HOTSPOT_3).unwrap().as_ref() ); // hotspot 2 has double reward scale of hotspot 1 and thus double the beacon amount - assert_eq!(gateway_rewards[2].beacon_amount, 3551912568306); + assert_eq!(gateway_rewards[2].beacon_amount, 3_551_912_568_306); assert_eq!( gateway_rewards[2].beacon_amount, gateway_rewards[0].beacon_amount * 2 @@ -76,7 +76,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { PublicKeyBinary::from_str(HOTSPOT_4).unwrap().as_ref() ); assert_eq!(gateway_rewards[3].beacon_amount, 0); - assert_eq!(gateway_rewards[3].witness_amount, 12786885245901); + assert_eq!(gateway_rewards[3].witness_amount, 12_786_885_245_901); assert_eq!(gateway_rewards[3].dc_transfer_amount, 0); // assert our unallocated reward From 463967a955b3209a60385525b7469b1e677f5cf8 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 5 Jan 2024 13:27:41 +0000 Subject: [PATCH 06/10] tidy up and fix rebase artifacts --- Cargo.lock | 4 ++-- reward_index/src/settings.rs | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d3dda024..34b6085ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#60e6f52a982da8ba00a209c2a1a34c42f9bcf730" +source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#d9506a5c453aab28dd728097fdec0e553bfd00ed" dependencies = [ "base64 0.21.0", "byteorder", @@ -3047,7 +3047,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#60e6f52a982da8ba00a209c2a1a34c42f9bcf730" +source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#d9506a5c453aab28dd728097fdec0e553bfd00ed" dependencies = [ "bytes", "prost", diff --git a/reward_index/src/settings.rs b/reward_index/src/settings.rs index 541f1f553..790f1375b 100644 --- a/reward_index/src/settings.rs +++ b/reward_index/src/settings.rs @@ -39,7 +39,6 @@ pub struct Settings { pub unallocated_reward_entity_key: Option, #[serde(default = "default_start_after")] pub start_after: u64, - pub unallocated_reward_entity_key: Option, } pub fn default_start_after() -> u64 { From a7c54408507155dace083ae4f7a0ad982d8b57b0 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 5 Jan 2024 14:12:44 +0000 Subject: [PATCH 07/10] add missing migration checkin & renum --- ...r_reward_type.sql => 10_add_service_provider_reward_type.sql} | 0 reward_index/migrations/9_add_iot_unallocated_reward_type.sql | 1 + 2 files changed, 1 insertion(+) rename reward_index/migrations/{7_add_service_provider_reward_type.sql => 10_add_service_provider_reward_type.sql} (100%) create mode 100644 reward_index/migrations/9_add_iot_unallocated_reward_type.sql diff --git a/reward_index/migrations/7_add_service_provider_reward_type.sql b/reward_index/migrations/10_add_service_provider_reward_type.sql similarity index 100% rename from reward_index/migrations/7_add_service_provider_reward_type.sql rename to reward_index/migrations/10_add_service_provider_reward_type.sql diff --git a/reward_index/migrations/9_add_iot_unallocated_reward_type.sql b/reward_index/migrations/9_add_iot_unallocated_reward_type.sql new file mode 100644 index 000000000..755146802 --- /dev/null +++ b/reward_index/migrations/9_add_iot_unallocated_reward_type.sql @@ -0,0 +1 @@ +ALTER TYPE reward_type ADD VALUE 'iot_unallocated'; From 6749669f49fb4dc41ad3c08e37a03e041628c3d5 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 5 Jan 2024 14:39:08 +0000 Subject: [PATCH 08/10] remove unused dep ref --- Cargo.lock | 1 - iot_verifier/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 34b6085ff..b431e571e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3550,7 +3550,6 @@ dependencies = [ "sqlx", "task-manager", "thiserror", - "time", "tokio", "tokio-stream", "tokio-util", diff --git a/iot_verifier/Cargo.toml b/iot_verifier/Cargo.toml index d5af9347a..6bbf783c8 100644 --- a/iot_verifier/Cargo.toml +++ b/iot_verifier/Cargo.toml @@ -55,4 +55,3 @@ price = { path = "../price" } tokio-util = { workspace = true } tokio-stream = { workspace = true } task-manager = { path = "../task_manager" } -time = "0.3.17" From ca34170d63bb491a0f86d6650e3587eedca1d194 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Sun, 7 Jan 2024 20:02:37 +0000 Subject: [PATCH 09/10] bump proto --- Cargo.lock | 8 ++++---- Cargo.toml | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b431e571e..94967cd0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#d9506a5c453aab28dd728097fdec0e553bfd00ed" +source = "git+https://github.com/helium/proto?branch=master#a7c24d010911a9de4ce0c16eaef6b6eb2d306975" dependencies = [ "base64 0.21.0", "byteorder", @@ -1207,7 +1207,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.6", + "sha2 0.9.9", "thiserror", ] @@ -3047,7 +3047,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=andymck/iot-unallocated-rewards#d9506a5c453aab28dd728097fdec0e553bfd00ed" +source = "git+https://github.com/helium/proto?branch=master#a7c24d010911a9de4ce0c16eaef6b6eb2d306975" dependencies = [ "bytes", "prost", @@ -8329,7 +8329,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.6", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/Cargo.toml b/Cargo.toml index 858803922..acc156d5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,14 +60,14 @@ sqlx = {version = "0", features = [ "runtime-tokio-rustls" ]} helium-crypto = {version = "0.8.1", features=["sqlx-postgres", "multisig"]} -helium-proto = {git = "https://github.com/helium/proto", branch = "andymck/iot-unallocated-rewards", features = ["services"]} +helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]} hextree = "*" solana-client = "1.14" solana-sdk = "1.14" solana-program = "1.11" spl-token = "3.5.0" reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]} -beacon = { git = "https://github.com/helium/proto", branch = "andymck/iot-unallocated-rewards" } +beacon = { git = "https://github.com/helium/proto", branch = "master" } humantime = "2" metrics = "0" metrics-exporter-prometheus = "0" From 79925fad2114b6d680dea79045519aa3a9d64518 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Sun, 7 Jan 2024 19:49:51 +0000 Subject: [PATCH 10/10] refactor and address review comments --- iot_verifier/src/reward_share.rs | 320 +++++++++++++--------- iot_verifier/src/rewarder.rs | 30 +- iot_verifier/tests/rewarder_operations.rs | 5 +- 3 files changed, 206 insertions(+), 149 deletions(-) diff --git a/iot_verifier/src/reward_share.rs b/iot_verifier/src/reward_share.rs index 35b8d9cff..5186fa990 100644 --- a/iot_verifier/src/reward_share.rs +++ b/iot_verifier/src/reward_share.rs @@ -219,25 +219,16 @@ impl RewardShares { } } +pub type GatewayRewardShares = HashMap; + #[derive(Default)] pub struct GatewayShares { - pub shares: HashMap, - pub beacon_rewards_per_share: Decimal, - pub witness_rewards_per_share: Decimal, - pub dc_transfer_rewards_per_share: Decimal, - pub total_rewards_for_poc_and_dc: Decimal, + pub shares: GatewayRewardShares, } impl GatewayShares { - pub async fn aggregate( - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, - ) -> Result { - let mut shares = Self::default(); - // get all the shares, poc and dc - shares.aggregate_poc_shares(db, reward_period).await?; - shares.aggregate_dc_shares(db, reward_period).await?; - Ok(shares) + pub fn new(shares: GatewayRewardShares) -> anyhow::Result { + Ok(Self { shares }) } pub async fn clear_rewarded_shares( @@ -257,64 +248,54 @@ impl GatewayShares { .map(|_| ()) } - pub fn total_shares(&self) -> (Decimal, Decimal, Decimal) { - self.shares.iter().fold( - (Decimal::ZERO, Decimal::ZERO, Decimal::ZERO), - |(beacon_sum, witness_sum, dc_sum), (_, reward_shares)| { + pub fn into_iot_reward_shares( + self, + reward_period: &'_ Range>, + beacon_rewards_per_share: Decimal, + witness_rewards_per_share: Decimal, + dc_transfer_rewards_per_share: Decimal, + ) -> impl Iterator + '_ { + self.shares + .into_iter() + .map(move |(hotspot_key, reward_shares)| { + let beacon_amount = + compute_rewards(beacon_rewards_per_share, reward_shares.beacon_shares); + let witness_amount = + compute_rewards(witness_rewards_per_share, reward_shares.witness_shares); + let dc_transfer_amount = + compute_rewards(dc_transfer_rewards_per_share, reward_shares.dc_shares); + proto::GatewayReward { + hotspot_key: hotspot_key.into(), + beacon_amount, + witness_amount, + dc_transfer_amount, + } + }) + .filter(|reward_share| { + reward_share.beacon_amount > 0 + || reward_share.witness_amount > 0 + || reward_share.dc_transfer_amount > 0 + }) + .map(|gateway_reward| { + let total_gateway_reward = gateway_reward.dc_transfer_amount + + gateway_reward.beacon_amount + + gateway_reward.witness_amount; ( - beacon_sum + reward_shares.beacon_shares, - witness_sum + reward_shares.witness_shares, - dc_sum + reward_shares.dc_shares, + total_gateway_reward, + proto::IotRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(ProtoReward::GatewayReward(gateway_reward)), + }, ) - }, - ) - } - - async fn aggregate_poc_shares( - &mut self, - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, - ) -> Result<(), sqlx::Error> { - let mut rows = sqlx::query_as::<_, GatewayPocShare>( - "select * from gateway_shares where reward_timestamp > $1 and reward_timestamp <= $2", - ) - .bind(reward_period.start) - .bind(reward_period.end) - .fetch(db); - while let Some(gateway_share) = rows.try_next().await? { - self.shares - .entry(gateway_share.hotspot_key.clone()) - .or_default() - .add_poc_reward(&gateway_share) - } - Ok(()) - } - - async fn aggregate_dc_shares( - &mut self, - db: impl sqlx::PgExecutor<'_> + Copy, - reward_period: &Range>, - ) -> Result<(), sqlx::Error> { - let mut rows = sqlx::query_as::<_, GatewayDCShare>( - "select hotspot_key, reward_timestamp, num_dcs::numeric, id from gateway_dc_shares where reward_timestamp > $1 and reward_timestamp <= $2", - ) - .bind(reward_period.start) - .bind(reward_period.end) - .fetch(db); - while let Some(gateway_share) = rows.try_next().await? { - self.shares - .entry(gateway_share.hotspot_key.clone()) - .or_default() - .add_dc_reward(&gateway_share) - } - Ok(()) + }) } - pub fn calculate_rewards_per_share_and_total_usage( - &mut self, + pub async fn calculate_rewards_per_share( + &self, reward_period: &'_ Range>, iot_price: Decimal, - ) { + ) -> anyhow::Result<(Decimal, Decimal, Decimal)> { // the total number of shares for beacons, witnesses and data transfer // dc shares here is the sum of all spent data transfer DC this epoch let (total_beacon_shares, total_witness_shares, total_dc_shares) = self.total_shares(); @@ -354,51 +335,24 @@ impl GatewayShares { %dc_transfer_rewards_per_share, "data transfer rewards" ); - self.beacon_rewards_per_share = beacon_rewards_per_share; - self.witness_rewards_per_share = witness_rewards_per_share; - self.dc_transfer_rewards_per_share = dc_transfer_rewards_per_share; - self.total_rewards_for_poc_and_dc = - total_beacon_rewards + total_witness_rewards + total_dc_transfer_rewards_capped; + Ok(( + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + )) } - pub fn into_iot_reward_shares( - self, - reward_period: &'_ Range>, - ) -> impl Iterator + '_ { - self.shares - .into_iter() - .map(move |(hotspot_key, reward_shares)| { - let beacon_amount = - compute_rewards(self.beacon_rewards_per_share, reward_shares.beacon_shares); - let witness_amount = - compute_rewards(self.witness_rewards_per_share, reward_shares.witness_shares); - let dc_transfer_amount = - compute_rewards(self.dc_transfer_rewards_per_share, reward_shares.dc_shares); - proto::GatewayReward { - hotspot_key: hotspot_key.into(), - beacon_amount, - witness_amount, - dc_transfer_amount, - } - }) - .filter(|reward_share| { - reward_share.beacon_amount > 0 - || reward_share.witness_amount > 0 - || reward_share.dc_transfer_amount > 0 - }) - .map(|gateway_reward| { - let total_gateway_reward = gateway_reward.dc_transfer_amount - + gateway_reward.beacon_amount - + gateway_reward.witness_amount; + pub fn total_shares(&self) -> (Decimal, Decimal, Decimal) { + self.shares.iter().fold( + (Decimal::ZERO, Decimal::ZERO, Decimal::ZERO), + |(beacon_sum, witness_sum, dc_sum), (_, reward_shares)| { ( - total_gateway_reward, - proto::IotRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::GatewayReward(gateway_reward)), - }, + beacon_sum + reward_shares.beacon_shares, + witness_sum + reward_shares.witness_shares, + dc_sum + reward_shares.dc_shares, ) - }) + }, + ) } } @@ -459,9 +413,62 @@ fn compute_rewards(rewards_per_share: Decimal, shares: Decimal) -> u64 { .unwrap_or(0) } +pub async fn aggregate_reward_shares( + db: impl sqlx::PgExecutor<'_> + Copy, + reward_period: &Range>, +) -> Result { + let mut shares = GatewayRewardShares::default(); + aggregate_poc_shares(&mut shares, db, reward_period).await?; + aggregate_dc_shares(&mut shares, db, reward_period).await?; + Ok(shares) +} + +async fn aggregate_poc_shares( + // &mut self, + shares: &mut GatewayRewardShares, + db: impl sqlx::PgExecutor<'_> + Copy, + reward_period: &Range>, +) -> Result<(), sqlx::Error> { + let mut rows = sqlx::query_as::<_, GatewayPocShare>( + "select * from gateway_shares where reward_timestamp > $1 and reward_timestamp <= $2", + ) + .bind(reward_period.start) + .bind(reward_period.end) + .fetch(db); + while let Some(gateway_share) = rows.try_next().await? { + shares + .entry(gateway_share.hotspot_key.clone()) + .or_default() + .add_poc_reward(&gateway_share) + } + Ok(()) +} + +async fn aggregate_dc_shares( + // &mut self, + shares: &mut GatewayRewardShares, + db: impl sqlx::PgExecutor<'_> + Copy, + reward_period: &Range>, +) -> Result<(), sqlx::Error> { + let mut rows = sqlx::query_as::<_, GatewayDCShare>( + "select hotspot_key, reward_timestamp, num_dcs::numeric, id from gateway_dc_shares where reward_timestamp > $1 and reward_timestamp <= $2", + ) + .bind(reward_period.start) + .bind(reward_period.end) + .fetch(db); + while let Some(gateway_share) = rows.try_next().await? { + shares + .entry(gateway_share.hotspot_key.clone()) + .or_default() + .add_dc_reward(&gateway_share) + } + Ok(()) +} + #[cfg(test)] mod test { use super::*; + use crate::reward_share; fn reward_shares_in_dec( beacon_shares: Decimal, @@ -492,12 +499,12 @@ mod test { ); } - #[test] + #[tokio::test] // test reward distribution where there is a fixed dc spend per gateway // with the total dc spend across all gateways being significantly lower than the // total epoch dc rewards amount // this results in a significant redistribution of dc rewards to POC - fn test_reward_share_calculation_fixed_dc_spend_with_transfer_distribution() { + async fn test_reward_share_calculation_fixed_dc_spend_with_transfer_distribution() { let iot_price = dec!(359); let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -568,15 +575,30 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let mut gw_shares = GatewayShares { - shares, - ..Default::default() - }; + let gw_shares = GatewayShares::new(shares).unwrap(); + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gw_shares + .calculate_rewards_per_share(&reward_period, iot_price) + .await + .unwrap(); + + let (total_beacon_rewards, total_witness_rewards) = reward_share::get_scheduled_poc_tokens( + reward_period.end - reward_period.start, + dec!(0.0), + ); + let total_dc_rewards = + reward_share::get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + let mut rewards: HashMap = HashMap::new(); - gw_shares.calculate_rewards_per_share_and_total_usage(&reward_period, iot_price); - let total_rewards_for_poc_and_dc = gw_shares.total_rewards_for_poc_and_dc; let mut allocated_gateway_rewards = 0_u64; - for (reward_amount, reward) in gw_shares.into_iot_reward_shares(&reward_period) { + for (reward_amount, reward) in gw_shares.into_iot_reward_shares( + &reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { let gateway_reward_total = gateway_reward.beacon_amount + gateway_reward.witness_amount @@ -678,13 +700,13 @@ mod test { // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount // due to going from decimal to u64 let unallocated_poc_reward_amount = - total_rewards_for_poc_and_dc - Decimal::from(allocated_gateway_rewards); + total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 6); } - #[test] + #[tokio::test] // test reward distribution where there is zero transfer of dc rewards to poc - fn test_reward_share_calculation_without_data_transfer_distribution() { + async fn test_reward_share_calculation_without_data_transfer_distribution() { let iot_price = dec!(359); let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -759,15 +781,27 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let mut gw_shares = GatewayShares { - shares, - ..Default::default() - }; + let gw_shares = GatewayShares::new(shares).unwrap(); + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gw_shares + .calculate_rewards_per_share(&reward_period, iot_price) + .await + .unwrap(); + + let (total_beacon_rewards, total_witness_rewards) = + get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); + let total_dc_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + let mut rewards: HashMap = HashMap::new(); - gw_shares.calculate_rewards_per_share_and_total_usage(&reward_period, iot_price); - let total_rewards_for_poc_and_dc = gw_shares.total_rewards_for_poc_and_dc; let mut allocated_gateway_rewards = 0_u64; - for (reward_amount, reward) in gw_shares.into_iot_reward_shares(&reward_period) { + for (reward_amount, reward) in gw_shares.into_iot_reward_shares( + &reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { let gateway_reward_total = gateway_reward.beacon_amount + gateway_reward.witness_amount @@ -861,13 +895,13 @@ mod test { // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount // due to going from decimal to u64 let unallocated_poc_reward_amount = - total_rewards_for_poc_and_dc - Decimal::from(allocated_gateway_rewards); + total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 8); } - #[test] + #[tokio::test] // test reward distribution where there is transfer of dc rewards to poc - fn test_reward_share_calculation_with_data_transfer_distribution() { + async fn test_reward_share_calculation_with_data_transfer_distribution() { let iot_price = dec!(359); let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -934,15 +968,27 @@ mod test { reward_shares_in_dec(dec!(150), dec!(350), gw6_dc_spend), ); // 0.0150, 0.0350 - let mut gw_shares = GatewayShares { - shares, - ..Default::default() - }; + let gw_shares = GatewayShares::new(shares).unwrap(); + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gw_shares + .calculate_rewards_per_share(&reward_period, iot_price) + .await + .unwrap(); + + let (total_beacon_rewards, total_witness_rewards) = + get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); + let total_dc_rewards = get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; + let mut rewards: HashMap = HashMap::new(); - gw_shares.calculate_rewards_per_share_and_total_usage(&reward_period, iot_price); - let total_rewards_for_poc_and_dc = gw_shares.total_rewards_for_poc_and_dc; let mut allocated_gateway_rewards = 0_u64; - for (reward_amount, reward) in gw_shares.into_iot_reward_shares(&reward_period) { + for (reward_amount, reward) in gw_shares.into_iot_reward_shares( + &reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { if let Some(ProtoReward::GatewayReward(gateway_reward)) = reward.reward { let gateway_reward_total = gateway_reward.beacon_amount + gateway_reward.witness_amount @@ -1034,7 +1080,7 @@ mod test { // we can loose up to 1 bone per gateway for each of beacon_amount, witness_amount and dc_amount // due to going from decimal to u64 let unallocated_poc_reward_amount = - total_rewards_for_poc_and_dc - Decimal::from(allocated_gateway_rewards); + total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards); assert_eq!(unallocated_poc_reward_amount.to_u64().unwrap(), 7); } diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index 3d0e95927..1db4be290 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -13,6 +13,7 @@ use helium_proto::RewardManifest; use price::PriceTracker; use reward_scheduler::Scheduler; use rust_decimal::prelude::*; +use rust_decimal_macros::dec; use sqlx::{PgExecutor, PgPool, Pool, Postgres}; use std::ops::Range; use task_manager::ManagedTask; @@ -209,17 +210,28 @@ pub async fn reward_poc_and_dc( reward_period: &Range>, iot_price: Decimal, ) -> anyhow::Result<()> { - // aggregate the poc and dc data per gateway - let mut gateway_reward_shares = GatewayShares::aggregate(pool, reward_period).await?; + let reward_shares = reward_share::aggregate_reward_shares(pool, reward_period).await?; + let gateway_shares = GatewayShares::new(reward_shares)?; + let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) = + gateway_shares + .calculate_rewards_per_share(reward_period, iot_price) + .await?; - // work out rewards per share and sum up total usage for poc and dc - gateway_reward_shares.calculate_rewards_per_share_and_total_usage(reward_period, iot_price); - let total_gateway_reward_allocation = gateway_reward_shares.total_rewards_for_poc_and_dc; + // get the total poc and dc rewards for the period + let (total_beacon_rewards, total_witness_rewards) = + reward_share::get_scheduled_poc_tokens(reward_period.end - reward_period.start, dec!(0.0)); + let total_dc_rewards = + reward_share::get_scheduled_dc_tokens(reward_period.end - reward_period.start); + let total_poc_dc_reward_allocation = + total_beacon_rewards + total_witness_rewards + total_dc_rewards; let mut allocated_gateway_rewards = 0_u64; - for (gateway_reward_amount, reward_share) in - gateway_reward_shares.into_iot_reward_shares(reward_period) - { + for (gateway_reward_amount, reward_share) in gateway_shares.into_iot_reward_shares( + reward_period, + beacon_rewards_per_share, + witness_rewards_per_share, + dc_transfer_rewards_per_share, + ) { rewards_sink .write(reward_share, []) .await? @@ -228,7 +240,7 @@ pub async fn reward_poc_and_dc( allocated_gateway_rewards += gateway_reward_amount; } // write out any unallocated poc reward - let unallocated_poc_reward_amount = (total_gateway_reward_allocation + let unallocated_poc_reward_amount = (total_poc_dc_reward_allocation - Decimal::from(allocated_gateway_rewards)) .round_dp_with_strategy(0, RoundingStrategy::ToZero) .to_u64() diff --git a/iot_verifier/tests/rewarder_operations.rs b/iot_verifier/tests/rewarder_operations.rs index 5acc2a82d..041158def 100644 --- a/iot_verifier/tests/rewarder_operations.rs +++ b/iot_verifier/tests/rewarder_operations.rs @@ -3,10 +3,9 @@ use chrono::{Duration as ChronoDuration, Utc}; use iot_verifier::{reward_share, rewarder}; use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; use rust_decimal_macros::dec; -use sqlx::PgPool; -#[sqlx::test] -async fn test_operations(_pool: PgPool) -> anyhow::Result<()> { +#[tokio::test] +async fn test_operations() -> anyhow::Result<()> { let (iot_rewards_client, mut iot_rewards) = common::create_file_sink(); let now = Utc::now(); let epoch = (now - ChronoDuration::hours(24))..now;