Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promotion Fund workspace #866

Merged
merged 5 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 116 additions & 44 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ members = [
"mobile_verifier",
"poc_entropy",
"price",
"promotion_fund",
"reward_index",
"reward_scheduler",
"solana",
Expand Down Expand Up @@ -70,10 +71,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch =
hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [
"disktree",
] }
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "map/subscriber-referral", features = [
"services",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "map/subscriber-referral" }
solana-client = "1.18"
solana-sdk = "1.18"
solana-program = "1.18"
Expand Down
18 changes: 18 additions & 0 deletions file_store/src/cli/dump_mobile_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{file_source, Result, Settings};
use futures::stream::StreamExt;
use helium_crypto::PublicKey;
use helium_proto::services::poc_mobile::mobile_reward_share::Reward::*;
use helium_proto::services::poc_mobile::promotion_reward::Entity;
use helium_proto::services::poc_mobile::MobileRewardShare;
use prost::Message;
use serde_json::json;
Expand All @@ -23,6 +24,7 @@ impl Cmd {
let mut subscriber_reward = vec![];
let mut service_provider_reward = vec![];
let mut unallocated_reward = vec![];
let mut promotion_reward = vec![];

while let Some(result) = file_stream.next().await {
let msg = result?;
Expand Down Expand Up @@ -60,6 +62,21 @@ impl Cmd {
"unallocated_reward_type": reward.reward_type,
"amount": reward.amount,
})),
PromotionReward(reward) => {
let entity = reward.entity.unwrap();
match entity {
Entity::SubscriberId(id) => promotion_reward.push(json!({
"subscriber_id": uuid::Uuid::from_slice(&id).unwrap(),
"service_provider_amount": reward.service_provider_amount,
"matched_amount": reward.matched_amount,
})),
Entity::GatewayKey(key) => promotion_reward.push(json!({
"gateway_key": PublicKey::try_from(key)?.to_string(),
"service_provider_amount": reward.service_provider_amount,
"matched_amount": reward.matched_amount,
})),
}
}
},
None => todo!(),
}
Expand All @@ -71,6 +88,7 @@ impl Cmd {
"gateway_reward": gateway_reward,
"subscriber_reward": subscriber_reward,
"service_provider_reward": service_provider_reward,
"promotion_reward": promotion_reward,
"unallocated_reward": unallocated_reward,
}))?;

Expand Down
15 changes: 15 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ pub const SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str =
"subscriber_verified_mapping_ingest_report";
pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str =
"verified_subscriber_verified_mapping_ingest_report";
pub const PROMOTION_REWARD_INGEST_REPORT: &str = "promotion_reward_ingest_report";
pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward";
pub const SERVICE_PROVIDER_PROMOTION_FUND: &str = "service_provider_promotion_fund";

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -220,6 +223,9 @@ pub enum FileType {
VerifiedSPBoostedRewardsBannedRadioIngestReport,
SubscriberVerifiedMappingEventIngestReport,
VerifiedSubscriberVerifiedMappingEventIngestReport,
PromotionRewardIngestReport,
VerifiedPromotionReward,
ServiceProviderPromotionFund,
}

impl fmt::Display for FileType {
Expand Down Expand Up @@ -291,6 +297,9 @@ impl fmt::Display for FileType {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport => {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT
}
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND,
};
f.write_str(s)
}
Expand Down Expand Up @@ -365,6 +374,9 @@ impl FileType {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport => {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT
}
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND,
}
}
}
Expand Down Expand Up @@ -439,6 +451,9 @@ impl FromStr for FileType {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT => {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport
}
PROMOTION_REWARD_INGEST_REPORT => Self::PromotionRewardIngestReport,
VERIFIED_PROMOTION_REWARD => Self::VerifiedPromotionReward,
SERVICE_PROVIDER_PROMOTION_FUND => Self::ServiceProviderPromotionFund,
_ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))),
};
Ok(result)
Expand Down
1 change: 1 addition & 0 deletions file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod mobile_radio_threshold;
pub mod mobile_session;
pub mod mobile_subscriber;
pub mod mobile_transfer;
pub mod promotion_reward;
pub mod reward_manifest;
mod settings;
pub mod speedtest;
Expand Down
91 changes: 91 additions & 0 deletions file_store/src/promotion_reward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::{
traits::{MsgDecode, TimestampDecode, TimestampEncode},
Error, Result,
};
use chrono::{DateTime, Utc};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
self as proto, PromotionRewardIngestReportV1, PromotionRewardReqV1,
};

#[derive(Debug, Clone, PartialEq, Hash)]
pub enum Entity {
SubscriberId(Vec<u8>),
GatewayKey(PublicKeyBinary),
}

impl From<proto::promotion_reward_req_v1::Entity> for Entity {
fn from(entity: proto::promotion_reward_req_v1::Entity) -> Self {
match entity {
proto::promotion_reward_req_v1::Entity::SubscriberId(v) => Entity::SubscriberId(v),
proto::promotion_reward_req_v1::Entity::GatewayKey(k) => Entity::GatewayKey(k.into()),
}
}
}

impl From<Entity> for proto::promotion_reward_req_v1::Entity {
fn from(entity: Entity) -> Self {
match entity {
Entity::SubscriberId(v) => proto::promotion_reward_req_v1::Entity::SubscriberId(v),
Entity::GatewayKey(k) => proto::promotion_reward_req_v1::Entity::GatewayKey(k.into()),
}
}
}

impl From<Entity> for proto::promotion_reward::Entity {
fn from(entity: Entity) -> Self {
match entity {
Entity::SubscriberId(v) => proto::promotion_reward::Entity::SubscriberId(v),
Entity::GatewayKey(k) => proto::promotion_reward::Entity::GatewayKey(k.into()),
}
}
}

#[derive(Clone)]
pub struct PromotionReward {
pub entity: Entity,
pub shares: u64,
pub timestamp: DateTime<Utc>,
pub received_timestamp: DateTime<Utc>,
pub carrier_pub_key: PublicKeyBinary,
pub signature: Vec<u8>,
}

impl MsgDecode for PromotionReward {
type Msg = PromotionRewardIngestReportV1;
}

impl TryFrom<PromotionRewardIngestReportV1> for PromotionReward {
type Error = Error;

fn try_from(v: PromotionRewardIngestReportV1) -> Result<Self> {
let received_timestamp = v.received_timestamp.to_timestamp_millis()?;
let Some(v) = v.report else {
return Err(Error::NotFound("report".to_string()));
};
Ok(Self {
entity: if let Some(entity) = v.entity {
entity.into()
} else {
return Err(Error::NotFound("entity".to_string()));
},
shares: v.shares,
timestamp: v.timestamp.to_timestamp()?,
received_timestamp,
carrier_pub_key: v.carrier_pub_key.into(),
signature: v.signature,
})
}
}

impl From<PromotionReward> for PromotionRewardReqV1 {
fn from(v: PromotionReward) -> Self {
Self {
entity: Some(v.entity.into()),
shares: v.shares,
timestamp: v.timestamp.encode_timestamp(),
carrier_pub_key: v.carrier_pub_key.into(),
signature: v.signature,
}
}
}
15 changes: 15 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,18 @@ impl_file_sink!(
FileType::RewardManifest.to_str(),
"reward_manifest"
);
impl_file_sink!(
proto::ServiceProviderPromotionFundV1,
FileType::ServiceProviderPromotionFund.to_str(),
"service_provider_promotion_fund"
);
impl_file_sink!(
poc_mobile::PromotionRewardIngestReportV1,
FileType::PromotionRewardIngestReport.to_str(),
"promotion_reward_ingest_report"
);
impl_file_sink!(
poc_mobile::VerifiedPromotionRewardV1,
FileType::VerifiedPromotionReward.to_str(),
"verified_promotion_reward"
);
1 change: 1 addition & 0 deletions file_store/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature);
impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature);
impl_msg_verify!(poc_mobile::PromotionRewardReqV1, signature);

#[cfg(test)]
mod test {
Expand Down
42 changes: 41 additions & 1 deletion ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use helium_proto::services::poc_mobile::{
CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1,
DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1,
InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1,
InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1,
InvalidatedRadioThresholdReportRespV1, PromotionRewardIngestReportV1, PromotionRewardReqV1,
PromotionRewardRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1,
RadioThresholdReportRespV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1,
SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1,
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct GrpcServer {
sp_boosted_rewards_ban_sink:
FileSinkClient<ServiceProviderBoostedRewardsBannedRadioIngestReportV1>,
subscriber_mapping_event_sink: FileSinkClient<SubscriberVerifiedMappingEventIngestReportV1>,
promotion_reward_sink: FileSinkClient<PromotionRewardIngestReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand Down Expand Up @@ -85,6 +87,7 @@ impl GrpcServer {
ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
>,
subscriber_mapping_event_sink: FileSinkClient<SubscriberVerifiedMappingEventIngestReportV1>,
promotion_reward_sink: FileSinkClient<PromotionRewardIngestReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand All @@ -100,6 +103,7 @@ impl GrpcServer {
coverage_object_report_sink,
sp_boosted_rewards_ban_sink,
subscriber_mapping_event_sink,
promotion_reward_sink,
required_network,
address,
api_token,
Expand Down Expand Up @@ -437,6 +441,30 @@ impl poc_mobile::PocMobile for GrpcServer {
let id = timestamp.to_string();
Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id }))
}

async fn submit_promotion_reward(
&self,
request: Request<PromotionRewardReqV1>,
) -> GrpcResult<PromotionRewardRespV1> {
let received_timestamp: u64 = Utc::now().timestamp_millis() as u64;
let event = request.into_inner();

custom_tracing::record_b58("pub_key", &event.carrier_pub_key);

let report = self
.verify_public_key(event.carrier_pub_key.as_ref())
.and_then(|public_key| self.verify_network(public_key))
.and_then(|public_key| self.verify_signature(public_key, event))
.map(|(_, event)| PromotionRewardIngestReportV1 {
received_timestamp,
report: Some(event),
})?;

let _ = self.promotion_reward_sink.write(report, []).await;

let id = received_timestamp.to_string();
Ok(Response::new(PromotionRewardRespV1 { id }))
}
}

pub async fn grpc_server(settings: &Settings) -> Result<()> {
Expand Down Expand Up @@ -546,6 +574,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
)
.await?;

let (subscriber_referral_eligibility_sink, subscriber_referral_eligibility_server) =
PromotionRewardIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;

let Some(api_token) = settings
.token
.as_ref()
Expand All @@ -565,6 +603,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
coverage_object_report_sink,
sp_boosted_rewards_ban_sink,
subscriber_mapping_event_sink,
subscriber_referral_eligibility_sink,
settings.network,
settings.listen_addr,
api_token,
Expand All @@ -588,6 +627,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.add_task(coverage_object_report_sink_server)
.add_task(sp_boosted_rewards_ban_sink_server)
.add_task(subscriber_mapping_event_server)
.add_task(subscriber_referral_eligibility_server)
.add_task(grpc_server)
.build()
.start()
Expand Down
2 changes: 2 additions & 0 deletions ingest/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10);
let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10);
let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10);
let (promotion_rewards_tx, _rx) = tokio::sync::mpsc::channel(10);

tokio::spawn(async move {
let grpc_server = GrpcServer::new(
Expand All @@ -57,6 +58,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
FileSinkClient::new(coverage_obj_tx, "noop"),
FileSinkClient::new(sp_boosted_tx, "noop"),
FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"),
FileSinkClient::new(promotion_rewards_tx, "noop"),
Network::MainNet,
socket_addr,
api_token,
Expand Down
2 changes: 2 additions & 0 deletions mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ where
boosted_poc_bones_per_reward_share: Some(helium_proto::Decimal {
value: poc_dc_shares.boost.to_string(),
}),
// TODO: Filled in with the next PR
sp_allocations: vec![],
};
self.reward_manifests
.write(
Expand Down
30 changes: 30 additions & 0 deletions promotion_fund/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "promotion_fund"
version = "0.1.0"
description = "Service Provider promotion fund tracking for the Helium Network"
authors.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
config = { workspace = true }
futures = { workspace = true }
helium-proto = { workspace = true }
humantime-serde = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
triggered = { workspace = true }

custom-tracing = { path = "../custom_tracing" }
file-store = { path = "../file_store" }
poc-metrics = { path = "../metrics" }
solana = { path = "../solana" }
task-manager = { path = "../task_manager" }
Loading