From 9d0bfeb5cbc0ae0883ed4f96ccc5dfddaee3c073 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 19 Sep 2024 16:48:56 -0700 Subject: [PATCH] Supporting material for promotion_fund workspace - ingest promotion rewards, nothing will be done with them until the processor is added into mobile-verifier. - dump reward files - add sp_allocations dummy field to rewarder output - reward indexer mobile promotion type added --- Cargo.lock | 140 ++++++++++++------ Cargo.toml | 4 +- file_store/src/cli/dump_mobile_rewards.rs | 18 +++ file_store/src/file_info.rs | 15 ++ file_store/src/lib.rs | 1 + file_store/src/promotion_reward.rs | 91 ++++++++++++ file_store/src/traits/file_sink_write.rs | 15 ++ file_store/src/traits/msg_verify.rs | 1 + ingest/src/server_mobile.rs | 42 +++++- ingest/tests/common/mod.rs | 2 + mobile_verifier/src/rewarder.rs | 2 + .../11_add_mobile_promotion_reward_type.sql | 1 + reward_index/src/indexer.rs | 32 +++- 13 files changed, 312 insertions(+), 52 deletions(-) create mode 100644 file_store/src/promotion_reward.rs create mode 100644 reward_index/migrations/11_add_mobile_promotion_reward_type.sql diff --git a/Cargo.lock b/Cargo.lock index 21189d7ec..82682b1dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,11 +1617,11 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#376765fe006051d6dcccf709def58e7ed291b845" +source = "git+https://github.com/helium/proto?branch=map/subscriber-referral#734bd1ef05e50f1a047a1dc28e5d78b24e7deccd" dependencies = [ "base64 0.21.7", "byteorder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "prost", "rand 0.8.5", "rand_chacha 0.3.0", @@ -1776,7 +1776,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "http-serde", "humantime-serde", @@ -2117,7 +2117,7 @@ dependencies = [ [[package]] name = "circuit-breaker" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -2618,7 +2618,7 @@ dependencies = [ "axum 0.7.4", "bs58 0.4.0", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "notify", "serde", @@ -2758,8 +2758,8 @@ dependencies = [ [[package]] name = "data-credits" -version = "0.2.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.2.2" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3137,7 +3137,7 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "fanout" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3200,7 +3200,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hex-literal", "http 0.2.11", "lazy_static", @@ -3215,8 +3215,8 @@ dependencies = [ "serde_json", "sha2 0.10.8", "sqlx", - "strum", - "strum_macros", + "strum 0.24.1", + "strum_macros 0.24.3", "task-manager", "tempfile", "thiserror", @@ -3704,10 +3704,16 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "helium-anchor-gen" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3736,7 +3742,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.1.16", + "getrandom 0.2.10", "k256", "lazy_static", "multihash", @@ -3754,7 +3760,7 @@ dependencies = [ [[package]] name = "helium-entity-manager" version = "0.3.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3763,7 +3769,7 @@ dependencies = [ [[package]] name = "helium-lib" version = "0.0.0" -source = "git+https://github.com/helium/helium-wallet-rs.git?branch=master#a4db666b45a531d690e561c225ca23c503a08bd1" +source = "git+https://github.com/helium/helium-wallet-rs.git?branch=master#b54819ac4c4bd73be37d25d7d6d48842bbc95ea9" dependencies = [ "anchor-client", "anchor-spl", @@ -3776,8 +3782,9 @@ dependencies = [ "h3o", "helium-anchor-gen", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=master)", "hex", + "hex-literal", "itertools", "jsonrpc_client", "lazy_static", @@ -3794,10 +3801,27 @@ dependencies = [ "spl-account-compression", "spl-associated-token-account 3.0.2", "thiserror", + "tonic", "tracing", "url", ] +[[package]] +name = "helium-proto" +version = "0.1.0" +source = "git+https://github.com/helium/proto?branch=map/subscriber-referral#734bd1ef05e50f1a047a1dc28e5d78b24e7deccd" +dependencies = [ + "bytes", + "prost", + "prost-build", + "serde", + "serde_json", + "strum 0.26.3", + "strum_macros 0.26.4", + "tonic", + "tonic-build", +] + [[package]] name = "helium-proto" version = "0.1.0" @@ -3814,8 +3838,8 @@ dependencies = [ [[package]] name = "helium-sub-daos" -version = "0.1.5" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.1.8" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3853,7 +3877,7 @@ dependencies = [ "async-trait", "chrono", "derive_builder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hextree", "rust_decimal", "rust_decimal_macros", @@ -3868,8 +3892,8 @@ checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0" [[package]] name = "hexboosting" -version = "0.0.5" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.1.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -4269,7 +4293,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "humantime-serde", "metrics", @@ -4338,7 +4362,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hextree", "http 0.2.11", "http-serde", @@ -4380,7 +4404,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "http-serde", "humantime-serde", @@ -4422,7 +4446,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http-serde", "humantime-serde", "iot-config", @@ -4679,8 +4703,8 @@ dependencies = [ [[package]] name = "lazy-distributor" -version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.2.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -4689,7 +4713,7 @@ dependencies = [ [[package]] name = "lazy-transactions" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -5010,7 +5034,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hextree", "http 0.2.11", "http-serde", @@ -5050,7 +5074,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "mobile-config", "prost", "rand 0.8.5", @@ -5064,8 +5088,8 @@ dependencies = [ [[package]] name = "mobile-entity-manager" -version = "0.1.2" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.1.3" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -5086,7 +5110,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "http-serde", "humantime-serde", @@ -5130,7 +5154,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hex-assignments", "hextree", "http-serde", @@ -5497,7 +5521,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 1.1.3", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.58", @@ -5813,7 +5837,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "hyper 0.14.28", "jsonrpsee", @@ -5896,7 +5920,7 @@ dependencies = [ "futures-util", "helium-anchor-gen", "helium-lib", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -5920,7 +5944,7 @@ dependencies = [ [[package]] name = "price-oracle" version = "0.2.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -6012,7 +6036,7 @@ dependencies = [ "custom-tracing", "file-store", "futures", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -6043,7 +6067,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.4.0", + "heck 0.5.0", "itertools", "log", "multimap", @@ -6529,7 +6553,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "humantime-serde", "lazy_static", "metrics", @@ -6563,7 +6587,7 @@ dependencies = [ [[package]] name = "rewards-oracle" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -8629,7 +8653,16 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" dependencies = [ - "strum_macros", + "strum_macros 0.24.3", +] + +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros 0.26.4", ] [[package]] @@ -8645,6 +8678,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.58", +] + [[package]] name = "subtle" version = "2.4.1" @@ -9176,7 +9222,7 @@ dependencies = [ [[package]] name = "treasury-management" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -9222,7 +9268,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.7.3", + "rand 0.8.5", "static_assertions", ] @@ -9417,8 +9463,8 @@ dependencies = [ [[package]] name = "voter-stake-registry" -version = "0.3.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.3.3" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", diff --git a/Cargo.toml b/Cargo.toml index d891570a3..2b636c8f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,10 +71,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", ] } -helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "map/subscriber-referral", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "map/subscriber-referral" } solana-client = "1.18" solana-sdk = "1.18" solana-program = "1.18" diff --git a/file_store/src/cli/dump_mobile_rewards.rs b/file_store/src/cli/dump_mobile_rewards.rs index aadad8792..365fe12dc 100644 --- a/file_store/src/cli/dump_mobile_rewards.rs +++ b/file_store/src/cli/dump_mobile_rewards.rs @@ -3,6 +3,7 @@ use crate::{file_source, Result, Settings}; use futures::stream::StreamExt; use helium_crypto::PublicKey; use helium_proto::services::poc_mobile::mobile_reward_share::Reward::*; +use helium_proto::services::poc_mobile::promotion_reward::Entity; use helium_proto::services::poc_mobile::MobileRewardShare; use prost::Message; use serde_json::json; @@ -23,6 +24,7 @@ impl Cmd { let mut subscriber_reward = vec![]; let mut service_provider_reward = vec![]; let mut unallocated_reward = vec![]; + let mut promotion_reward = vec![]; while let Some(result) = file_stream.next().await { let msg = result?; @@ -60,6 +62,21 @@ impl Cmd { "unallocated_reward_type": reward.reward_type, "amount": reward.amount, })), + PromotionReward(reward) => { + let entity = reward.entity.unwrap(); + match entity { + Entity::SubscriberId(id) => promotion_reward.push(json!({ + "subscriber_id": uuid::Uuid::from_slice(&id).unwrap(), + "service_provider_amount": reward.service_provider_amount, + "matched_amount": reward.matched_amount, + })), + Entity::GatewayKey(key) => promotion_reward.push(json!({ + "gateway_key": PublicKey::try_from(key)?.to_string(), + "service_provider_amount": reward.service_provider_amount, + "matched_amount": reward.matched_amount, + })), + } + } }, None => todo!(), } @@ -71,6 +88,7 @@ impl Cmd { "gateway_reward": gateway_reward, "subscriber_reward": subscriber_reward, "service_provider_reward": service_provider_reward, + "promotion_reward": promotion_reward, "unallocated_reward": unallocated_reward, }))?; diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 100154d6c..8de56c265 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -164,6 +164,9 @@ pub const SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = "subscriber_verified_mapping_ingest_report"; pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = "verified_subscriber_verified_mapping_ingest_report"; +pub const PROMOTION_REWARD_INGEST_REPORT: &str = "promotion_reward_ingest_report"; +pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward"; +pub const SERVICE_PROVIDER_PROMOTION_FUND: &str = "service_provider_promotion_fund"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -220,6 +223,9 @@ pub enum FileType { VerifiedSPBoostedRewardsBannedRadioIngestReport, SubscriberVerifiedMappingEventIngestReport, VerifiedSubscriberVerifiedMappingEventIngestReport, + PromotionRewardIngestReport, + VerifiedPromotionReward, + ServiceProviderPromotionFund, } impl fmt::Display for FileType { @@ -291,6 +297,9 @@ impl fmt::Display for FileType { Self::VerifiedSubscriberVerifiedMappingEventIngestReport => { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT } + Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT, + Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD, + Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND, }; f.write_str(s) } @@ -365,6 +374,9 @@ impl FileType { Self::VerifiedSubscriberVerifiedMappingEventIngestReport => { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT } + Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT, + Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD, + Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND, } } } @@ -439,6 +451,9 @@ impl FromStr for FileType { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT => { Self::VerifiedSubscriberVerifiedMappingEventIngestReport } + PROMOTION_REWARD_INGEST_REPORT => Self::PromotionRewardIngestReport, + VERIFIED_PROMOTION_REWARD => Self::VerifiedPromotionReward, + SERVICE_PROVIDER_PROMOTION_FUND => Self::ServiceProviderPromotionFund, _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index 477c0dea9..9c5a132cd 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -20,6 +20,7 @@ pub mod mobile_radio_threshold; pub mod mobile_session; pub mod mobile_subscriber; pub mod mobile_transfer; +pub mod promotion_reward; pub mod reward_manifest; mod settings; pub mod speedtest; diff --git a/file_store/src/promotion_reward.rs b/file_store/src/promotion_reward.rs new file mode 100644 index 000000000..c89132355 --- /dev/null +++ b/file_store/src/promotion_reward.rs @@ -0,0 +1,91 @@ +use crate::{ + traits::{MsgDecode, TimestampDecode, TimestampEncode}, + Error, Result, +}; +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::{ + self as proto, PromotionRewardIngestReportV1, PromotionRewardReqV1, +}; + +#[derive(Debug, Clone, PartialEq, Hash)] +pub enum Entity { + SubscriberId(Vec), + GatewayKey(PublicKeyBinary), +} + +impl From for Entity { + fn from(entity: proto::promotion_reward_req_v1::Entity) -> Self { + match entity { + proto::promotion_reward_req_v1::Entity::SubscriberId(v) => Entity::SubscriberId(v), + proto::promotion_reward_req_v1::Entity::GatewayKey(k) => Entity::GatewayKey(k.into()), + } + } +} + +impl From for proto::promotion_reward_req_v1::Entity { + fn from(entity: Entity) -> Self { + match entity { + Entity::SubscriberId(v) => proto::promotion_reward_req_v1::Entity::SubscriberId(v), + Entity::GatewayKey(k) => proto::promotion_reward_req_v1::Entity::GatewayKey(k.into()), + } + } +} + +impl From for proto::promotion_reward::Entity { + fn from(entity: Entity) -> Self { + match entity { + Entity::SubscriberId(v) => proto::promotion_reward::Entity::SubscriberId(v), + Entity::GatewayKey(k) => proto::promotion_reward::Entity::GatewayKey(k.into()), + } + } +} + +#[derive(Clone)] +pub struct PromotionReward { + pub entity: Entity, + pub shares: u64, + pub timestamp: DateTime, + pub received_timestamp: DateTime, + pub carrier_pub_key: PublicKeyBinary, + pub signature: Vec, +} + +impl MsgDecode for PromotionReward { + type Msg = PromotionRewardIngestReportV1; +} + +impl TryFrom for PromotionReward { + type Error = Error; + + fn try_from(v: PromotionRewardIngestReportV1) -> Result { + let received_timestamp = v.received_timestamp.to_timestamp_millis()?; + let Some(v) = v.report else { + return Err(Error::NotFound("report".to_string())); + }; + Ok(Self { + entity: if let Some(entity) = v.entity { + entity.into() + } else { + return Err(Error::NotFound("entity".to_string())); + }, + shares: v.shares, + timestamp: v.timestamp.to_timestamp()?, + received_timestamp, + carrier_pub_key: v.carrier_pub_key.into(), + signature: v.signature, + }) + } +} + +impl From for PromotionRewardReqV1 { + fn from(v: PromotionReward) -> Self { + Self { + entity: Some(v.entity.into()), + shares: v.shares, + timestamp: v.timestamp.encode_timestamp(), + carrier_pub_key: v.carrier_pub_key.into(), + signature: v.signature, + } + } +} diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index baf598fc6..651ebe080 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -268,3 +268,18 @@ impl_file_sink!( FileType::RewardManifest.to_str(), "reward_manifest" ); +impl_file_sink!( + proto::ServiceProviderPromotionFundV1, + FileType::ServiceProviderPromotionFund.to_str(), + "service_provider_promotion_fund" +); +impl_file_sink!( + poc_mobile::PromotionRewardIngestReportV1, + FileType::PromotionRewardIngestReport.to_str(), + "promotion_reward_ingest_report" +); +impl_file_sink!( + poc_mobile::VerifiedPromotionRewardV1, + FileType::VerifiedPromotionReward.to_str(), + "verified_promotion_reward" +); diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index 47bb6cb40..017a1e315 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -95,6 +95,7 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature); impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature); +impl_msg_verify!(poc_mobile::PromotionRewardReqV1, signature); #[cfg(test)] mod test { diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index c1e91ded1..71bf2bf07 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -14,7 +14,8 @@ use helium_proto::services::poc_mobile::{ CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1, InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1, - InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1, + InvalidatedRadioThresholdReportRespV1, PromotionRewardIngestReportV1, PromotionRewardReqV1, + PromotionRewardRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1, RadioThresholdReportRespV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1, ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1, SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1, @@ -46,6 +47,7 @@ pub struct GrpcServer { sp_boosted_rewards_ban_sink: FileSinkClient, subscriber_mapping_event_sink: FileSinkClient, + promotion_reward_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -85,6 +87,7 @@ impl GrpcServer { ServiceProviderBoostedRewardsBannedRadioIngestReportV1, >, subscriber_mapping_event_sink: FileSinkClient, + promotion_reward_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -100,6 +103,7 @@ impl GrpcServer { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, + promotion_reward_sink, required_network, address, api_token, @@ -437,6 +441,30 @@ impl poc_mobile::PocMobile for GrpcServer { let id = timestamp.to_string(); Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id })) } + + async fn submit_promotion_reward( + &self, + request: Request, + ) -> GrpcResult { + let received_timestamp: u64 = Utc::now().timestamp_millis() as u64; + let event = request.into_inner(); + + custom_tracing::record_b58("pub_key", &event.carrier_pub_key); + + let report = self + .verify_public_key(event.carrier_pub_key.as_ref()) + .and_then(|public_key| self.verify_network(public_key)) + .and_then(|public_key| self.verify_signature(public_key, event)) + .map(|(_, event)| PromotionRewardIngestReportV1 { + received_timestamp, + report: Some(event), + })?; + + let _ = self.promotion_reward_sink.write(report, []).await; + + let id = received_timestamp.to_string(); + Ok(Response::new(PromotionRewardRespV1 { id })) + } } pub async fn grpc_server(settings: &Settings) -> Result<()> { @@ -546,6 +574,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ) .await?; + let (subscriber_referral_eligibility_sink, subscriber_referral_eligibility_server) = + PromotionRewardIngestReportV1::file_sink( + store_base_path, + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), + env!("CARGO_PKG_NAME"), + ) + .await?; + let Some(api_token) = settings .token .as_ref() @@ -565,6 +603,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, + subscriber_referral_eligibility_sink, settings.network, settings.listen_addr, api_token, @@ -588,6 +627,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .add_task(coverage_object_report_sink_server) .add_task(sp_boosted_rewards_ban_sink_server) .add_task(subscriber_mapping_event_server) + .add_task(subscriber_referral_eligibility_server) .add_task(grpc_server) .build() .start() diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index 52512eba1..5584416a0 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -44,6 +44,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10); let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10); let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10); + let (promotion_rewards_tx, _rx) = tokio::sync::mpsc::channel(10); tokio::spawn(async move { let grpc_server = GrpcServer::new( @@ -57,6 +58,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { FileSinkClient::new(coverage_obj_tx, "noop"), FileSinkClient::new(sp_boosted_tx, "noop"), FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"), + FileSinkClient::new(promotion_rewards_tx, "noop"), Network::MainNet, socket_addr, api_token, diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 76cb12313..f755e95cc 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -311,6 +311,8 @@ where boosted_poc_bones_per_reward_share: Some(helium_proto::Decimal { value: poc_dc_shares.boost.to_string(), }), + // TODO: Filled in with the next PR + sp_allocations: vec![] }; self.reward_manifests .write( diff --git a/reward_index/migrations/11_add_mobile_promotion_reward_type.sql b/reward_index/migrations/11_add_mobile_promotion_reward_type.sql new file mode 100644 index 000000000..c728749de --- /dev/null +++ b/reward_index/migrations/11_add_mobile_promotion_reward_type.sql @@ -0,0 +1 @@ +ALTER TYPE reward_type ADD VALUE 'mobile_promotion'; diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index f2823e778..c61c0699a 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -7,8 +7,13 @@ use file_store::{ use futures::{stream, StreamExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::{ - services::poc_lora::{iot_reward_share::Reward as IotReward, IotRewardShare}, - services::poc_mobile::{mobile_reward_share::Reward as MobileReward, MobileRewardShare}, + services::{ + poc_lora::{iot_reward_share::Reward as IotReward, IotRewardShare}, + poc_mobile::{ + mobile_reward_share::Reward as MobileReward, promotion_reward::Entity, + MobileRewardShare, PromotionReward, + }, + }, Message, ServiceProvider, }; use poc_metrics::record_duration; @@ -32,6 +37,7 @@ pub enum RewardType { IotOperational, MobileSubscriber, MobileServiceProvider, + MobilePromotion, MobileUnallocated, IotUnallocated, } @@ -185,6 +191,28 @@ impl Indexer { }, r.amount, ))), + Some(MobileReward::PromotionReward(PromotionReward { + entity: Some(Entity::SubscriberId(subscriber_id)), + service_provider_amount, + matched_amount, + })) => Ok(Some(( + RewardKey { + key: bs58::encode(&subscriber_id).into_string(), + reward_type: RewardType::MobilePromotion, + }, + service_provider_amount + matched_amount, + ))), + Some(MobileReward::PromotionReward(PromotionReward { + entity: Some(Entity::GatewayKey(gateway_key)), + service_provider_amount, + matched_amount, + })) => Ok(Some(( + RewardKey { + key: PublicKeyBinary::from(gateway_key).to_string(), + reward_type: RewardType::MobilePromotion, + }, + service_provider_amount + matched_amount, + ))), _ => bail!("got an invalid reward share"), } }