Skip to content

Commit

Permalink
make tests compatible with filesink oneshot
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Dec 14, 2023
1 parent 02d6523 commit fcf1d2a
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 201 deletions.
31 changes: 20 additions & 11 deletions mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,11 @@ async fn reward_poc(
let mut allocated_poc_rewards = 0_u64;
for (poc_reward_amount, mobile_reward_share) in mobile_reward_shares {
allocated_poc_rewards += poc_reward_amount;
mobile_rewards.write(mobile_reward_share, []).await?;
// Await the returned one shot to ensure that we wrote the file
// .await??;
mobile_rewards
.write(mobile_reward_share, [])
.await?
// Await the returned one shot to ensure that we wrote the file
.await??;
}
// write out any unallocated poc reward
let unallocated_poc_reward_amount = (total_poc_rewards
Expand Down Expand Up @@ -316,9 +318,11 @@ pub async fn reward_dc(
let total_dc_rewards = transfer_rewards.total();
for (dc_reward_amount, mobile_reward_share) in transfer_rewards.into_rewards(reward_period) {
allocated_dc_rewards += dc_reward_amount;
mobile_rewards.write(mobile_reward_share, []).await?;
// Await the returned one shot to ensure that we wrote the file
// .await??;
mobile_rewards
.write(mobile_reward_share, [])
.await?
// Await the returned one shot to ensure that we wrote the file
.await??;
}
// write out any unallocated dc reward
let unallocated_dc_reward_amount = (total_dc_rewards - Decimal::from(allocated_dc_rewards))
Expand Down Expand Up @@ -358,9 +362,11 @@ pub async fn reward_mappers(
mapping_shares.into_subscriber_rewards(reward_period, rewards_per_share)
{
allocated_mapping_rewards += reward_amount;
mobile_rewards.write(mapping_share.clone(), []).await?;
// Await the returned one shot to ensure that we wrote the file
// .await??;
mobile_rewards
.write(mapping_share.clone(), [])
.await?
// Await the returned one shot to ensure that we wrote the file
.await??;
}

// write out any unallocated mapping rewards
Expand Down Expand Up @@ -424,7 +430,7 @@ pub async fn reward_service_providers(
sp_shares.into_service_provider_rewards(reward_period, rewards_per_share)
{
allocated_sp_rewards += amount;
mobile_rewards.write(sp_share.clone(), []).await?;
mobile_rewards.write(sp_share.clone(), []).await?.await??;
}
// write out any unallocated service provider reward
let unallocated_sp_reward_amount = (total_sp_rewards - Decimal::from(allocated_sp_rewards))
Expand Down Expand Up @@ -456,7 +462,10 @@ async fn write_unallocated_reward(
amount: unallocated_amount,
})),
};
mobile_rewards.write(unallocated_reward, []).await?;
mobile_rewards
.write(unallocated_reward, [])
.await?
.await??;
};
Ok(())
}
Expand Down
47 changes: 27 additions & 20 deletions mobile_verifier/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,92 +23,99 @@ pub struct MockFileSinkReceiver {

#[allow(dead_code)]
impl MockFileSinkReceiver {
pub async fn receive(&mut self) -> SinkMessage {
pub async fn receive(&mut self) -> Option<Vec<u8>> {
match timeout(seconds(2), self.receiver.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => panic!("server closed connection while waiting for message"),
Err(_) => panic!("timeout while waiting for message"),
Ok(Some(SinkMessage::Data(on_write_tx, msg))) => {
let _ = on_write_tx.send(Ok(()));
Some(msg)
}
Ok(None) => None,
Err(e) => panic!("timeout while waiting for message1 {:?}", e),
Ok(Some(unexpected_msg)) => {
println!("ignoring unexpected msg {:?}", unexpected_msg);
None
}
}
}

pub fn assert_no_messages(mut self) {
pub fn assert_no_messages(&mut self) {
let Err(TryRecvError::Empty) = self.receiver.try_recv() else {
panic!("receiver should have been empty")
};
}

pub async fn receive_radio_reward(&mut self) -> RadioReward {
match self.receive().await {
SinkMessage::Data(_, bytes) => {
Some(bytes) => {
let mobile_reward = MobileRewardShare::decode(bytes.as_slice())
.expect("decode mobile reward share report");
.expect("failed to decode expected radio reward");
println!("mobile_reward: {:?}", mobile_reward);
match mobile_reward.reward {
Some(MobileReward::RadioReward(r)) => r,
_ => panic!("failed to get radio reward"),
}
}
_ => panic!("invalid mobile reward share"),
None => panic!("failed to receive radio reward"),
}
}

pub async fn receive_gateway_reward(&mut self) -> GatewayReward {
match self.receive().await {
SinkMessage::Data(_, bytes) => {
Some(bytes) => {
let mobile_reward = MobileRewardShare::decode(bytes.as_slice())
.expect("decode mobile reward share report");
.expect("failed to decode expected gateway reward");
println!("mobile_reward: {:?}", mobile_reward);
match mobile_reward.reward {
Some(MobileReward::GatewayReward(r)) => r,
_ => panic!("failed to get gateway reward"),
}
}
_ => panic!("invalid mobile reward share"),
None => panic!("failed to receive gateway reward"),
}
}

pub async fn receive_service_provider_reward(&mut self) -> ServiceProviderReward {
match self.receive().await {
SinkMessage::Data(_, bytes) => {
Some(bytes) => {
let mobile_reward = MobileRewardShare::decode(bytes.as_slice())
.expect("decode mobile reward share report");
.expect("failed to decode expected service provider reward");
println!("mobile_reward: {:?}", mobile_reward);
match mobile_reward.reward {
Some(MobileReward::ServiceProviderReward(r)) => r,
_ => panic!("failed to get service provider reward"),
}
}
_ => panic!("invalid mobile reward share"),
None => panic!("failed to receive service provider reward"),
}
}

pub async fn receive_subscriber_reward(&mut self) -> SubscriberReward {
match self.receive().await {
SinkMessage::Data(_, bytes) => {
Some(bytes) => {
let mobile_reward = MobileRewardShare::decode(bytes.as_slice())
.expect("decode mobile reward share report");
.expect("failed to decode expected subscriber reward");
println!("mobile_reward: {:?}", mobile_reward);
match mobile_reward.reward {
Some(MobileReward::SubscriberReward(r)) => r,
_ => panic!("failed to get subscriber reward"),
}
}
_ => panic!("invalid mobile reward share"),
None => panic!("failed to receive subscriber reward"),
}
}

pub async fn receive_unallocated_reward(&mut self) -> UnallocatedReward {
match self.receive().await {
SinkMessage::Data(_, bytes) => {
Some(bytes) => {
let mobile_reward = MobileRewardShare::decode(bytes.as_slice())
.expect("decode mobile reward share report");
.expect("failed to decode expected unallocated reward");
println!("mobile_reward: {:?}", mobile_reward);
match mobile_reward.reward {
Some(MobileReward::UnallocatedReward(r)) => r,
_ => panic!("failed to get unallocated reward"),
}
}
_ => panic!("invalid mobile reward share"),
None => panic!("failed to receive unallocated reward"),
}
}
}
Expand Down
106 changes: 62 additions & 44 deletions mobile_verifier/tests/rewarder_mappers.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
mod common;
use crate::common::MockFileSinkReceiver;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use file_store::mobile_subscriber::{SubscriberLocationIngestReport, SubscriberLocationReq};
use helium_crypto::PublicKeyBinary;
use helium_proto::{services::poc_mobile::UnallocatedRewardType, Message};
use helium_proto::{
services::poc_mobile::{SubscriberReward, UnallocatedReward, UnallocatedRewardType},
Message,
};
use mobile_verifier::{reward_shares, rewarder, subscriber_location};
use rust_decimal::prelude::*;
use sqlx::{PgPool, Postgres, Transaction};
Expand All @@ -15,7 +19,7 @@ const SUBSCRIBER_3: &str = "subscriber3";
const HOTSPOT_1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6";

#[sqlx::test]
async fn test_mappers(pool: PgPool) -> anyhow::Result<()> {
async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> {
let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink();
let now = Utc::now();
let epoch = (now - ChronoDuration::hours(24))..now;
Expand All @@ -25,58 +29,72 @@ async fn test_mappers(pool: PgPool) -> anyhow::Result<()> {
seed_mapping_data(epoch.end, &mut txn).await?;
txn.commit().await.expect("db txn failed");

// run rewards for mappers
rewarder::reward_mappers(&pool.clone(), &mobile_rewards_client, &epoch).await?;
let subscriber_reward1 = mobile_rewards.receive_subscriber_reward().await;
let subscriber_reward2 = mobile_rewards.receive_subscriber_reward().await;
let subscriber_reward3 = mobile_rewards.receive_subscriber_reward().await;
tokio::select!(
// run rewards for mappers
_ = rewarder::reward_mappers(&pool, &mobile_rewards_client, &epoch) => {},
Ok((subscriber_rewards, unallocated_reward)) = receive_expected_rewards(&mut mobile_rewards) => {
// assert the mapper rewards
// all 3 subscribers will have an equal share,
// requirement is 1 event per epoch
// subscriber 1 has two events, other two subscribers one event
assert_eq!(
SUBSCRIBER_1.to_string().encode_to_vec(),
subscriber_rewards[0].subscriber_id
);
assert_eq!(5464480874316, subscriber_rewards[0].discovery_location_amount);

// assert the mapper rewards
// all 3 subscribers will have an equal share,
// requirement is 1 event per epoch
// subscriber 1 has two events, other two subscribers one event
assert_eq!(
SUBSCRIBER_1.to_string().encode_to_vec(),
subscriber_reward1.subscriber_id
);
assert_eq!(5464480874316, subscriber_reward1.discovery_location_amount);
assert_eq!(
SUBSCRIBER_2.to_string().encode_to_vec(),
subscriber_rewards[1].subscriber_id
);
assert_eq!(5464480874316, subscriber_rewards[2].discovery_location_amount);

assert_eq!(
SUBSCRIBER_2.to_string().encode_to_vec(),
subscriber_reward2.subscriber_id
);
assert_eq!(5464480874316, subscriber_reward2.discovery_location_amount);
assert_eq!(
SUBSCRIBER_3.to_string().encode_to_vec(),
subscriber_rewards[2].subscriber_id
);
assert_eq!(5464480874316, subscriber_rewards[2].discovery_location_amount);

// confirm our unallocated amount
assert_eq!(
UnallocatedRewardType::Mapper as i32,
unallocated_reward.reward_type
);
assert_eq!(2, unallocated_reward.amount);

assert_eq!(
SUBSCRIBER_3.to_string().encode_to_vec(),
subscriber_reward3.subscriber_id
// confirm the total rewards allocated matches expectations
let expected_sum = reward_shares::get_scheduled_tokens_for_mappers(epoch.end - epoch.start)
.to_u64()
.unwrap();
assert_eq!(
expected_sum,
subscriber_rewards[0].discovery_location_amount
+ subscriber_rewards[1].discovery_location_amount
+ subscriber_rewards[2].discovery_location_amount
+ unallocated_reward.amount
);
}
);
assert_eq!(5464480874316, subscriber_reward2.discovery_location_amount);
Ok(())
}

// confirm our unallocated amount
async fn receive_expected_rewards(
mobile_rewards: &mut MockFileSinkReceiver,
) -> anyhow::Result<(Vec<SubscriberReward>, UnallocatedReward)> {
// get the filestore outputs from rewards run
// we will have 3 radio rewards, 1 wifi radio and 2 cbrs radios
let subscriber_reward1 = mobile_rewards.receive_subscriber_reward().await;
let subscriber_reward2 = mobile_rewards.receive_subscriber_reward().await;
let subscriber_reward3 = mobile_rewards.receive_subscriber_reward().await;
let subscriber_rewards = vec![subscriber_reward1, subscriber_reward2, subscriber_reward3];

// expect one unallocated reward
let unallocated_reward = mobile_rewards.receive_unallocated_reward().await;
assert_eq!(
UnallocatedRewardType::Mapper as i32,
unallocated_reward.reward_type
);
assert_eq!(2, unallocated_reward.amount);

// should be no further msgs
mobile_rewards.assert_no_messages();

// confirm the total rewards allocated matches expectations
let expected_sum = reward_shares::get_scheduled_tokens_for_mappers(epoch.end - epoch.start)
.to_u64()
.unwrap();
assert_eq!(
expected_sum,
subscriber_reward1.discovery_location_amount
+ subscriber_reward2.discovery_location_amount
+ subscriber_reward3.discovery_location_amount
+ unallocated_reward.amount
);

Ok(())
Ok((subscriber_rewards, unallocated_reward))
}

async fn seed_mapping_data(
Expand Down
46 changes: 28 additions & 18 deletions mobile_verifier/tests/rewarder_oracles.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod common;
use crate::common::MockFileSinkReceiver;
use chrono::{Duration as ChronoDuration, Utc};
use helium_proto::services::poc_mobile::UnallocatedRewardType;
use helium_proto::services::poc_mobile::{UnallocatedReward, UnallocatedRewardType};
use mobile_verifier::{reward_shares, rewarder};
use rust_decimal::prelude::*;
use sqlx::PgPool;
Expand All @@ -11,27 +12,36 @@ async fn test_oracle_rewards(_pool: PgPool) -> anyhow::Result<()> {
let now = Utc::now();
let epoch = (now - ChronoDuration::hours(24))..now;

// run rewards for oracles
rewarder::reward_oracles(&mobile_rewards_client, &epoch).await?;
tokio::select!(
// run rewards for oracles
_ = rewarder::reward_oracles(&mobile_rewards_client, &epoch) => {},
Ok(unallocated_reward) = receive_expected_rewards(&mut mobile_rewards) => {
assert_eq!(
UnallocatedRewardType::Oracle as i32,
unallocated_reward.reward_type
);
// confirm our unallocated amount
assert_eq!(65_573_770_491_803, unallocated_reward.amount);

// oracle rewards are assigned to unallocated atm
let unallocated_reward = mobile_rewards.receive_unallocated_reward().await;

assert_eq!(
UnallocatedRewardType::Oracle as i32,
unallocated_reward.reward_type
// confirm the total rewards allocated matches expectations
let expected_sum = reward_shares::get_scheduled_tokens_for_oracles(epoch.end - epoch.start)
.to_u64()
.unwrap();
assert_eq!(expected_sum, unallocated_reward.amount);
}
);
// confirm our unallocated amount
assert_eq!(65_573_770_491_803, unallocated_reward.amount);
Ok(())
}

async fn receive_expected_rewards(
mobile_rewards: &mut MockFileSinkReceiver,
) -> anyhow::Result<UnallocatedReward> {
// expect one unallocated reward
// as oracle rewards are currently 100% unallocated
let unallocated_reward = mobile_rewards.receive_unallocated_reward().await;

// should be no further msgs
mobile_rewards.assert_no_messages();

// confirm the total rewards allocated matches expectations
let expected_sum = reward_shares::get_scheduled_tokens_for_oracles(epoch.end - epoch.start)
.to_u64()
.unwrap();
assert_eq!(expected_sum, unallocated_reward.amount);

Ok(())
Ok(unallocated_reward)
}
Loading

0 comments on commit fcf1d2a

Please sign in to comment.