diff --git a/Cargo.lock b/Cargo.lock index f4d616807..b8c2ad9c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6138,32 +6138,6 @@ dependencies = [ "yansi", ] -[[package]] -name = "promotion-fund" -version = "0.1.0" -dependencies = [ - "anyhow", - "async-trait", - "chrono", - "clap 4.4.8", - "config", - "custom-tracing", - "file-store", - "futures", - "helium-proto", - "humantime-serde", - "metrics", - "metrics-exporter-prometheus", - "poc-metrics", - "serde", - "solana", - "task-manager", - "tokio", - "tracing", - "tracing-subscriber", - "triggered", -] - [[package]] name = "proptest" version = "1.5.0" diff --git a/Cargo.toml b/Cargo.toml index d891570a3..a45355a99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ members = [ "mobile_verifier", "poc_entropy", "price", - "promotion_fund", "reward_index", "reward_scheduler", "solana", diff --git a/promotion_fund/Cargo.toml b/promotion_fund/Cargo.toml deleted file mode 100644 index d6d3418dd..000000000 --- a/promotion_fund/Cargo.toml +++ /dev/null @@ -1,30 +0,0 @@ -[package] -name = "promotion-fund" -version = "0.1.0" -description = "Service Provider promotion fund tracking for the Helium Network" -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -anyhow = { workspace = true } -async-trait = { workspace = true } -chrono = { workspace = true } -clap = { workspace = true } -config = { workspace = true } -futures = { workspace = true } -helium-proto = { workspace = true } -humantime-serde = { workspace = true } -metrics = { workspace = true } -metrics-exporter-prometheus = { workspace = true } -serde = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { workspace = true } -triggered = { workspace = true } - -custom-tracing = { path = "../custom_tracing" } -file-store = { path = "../file_store" } -poc-metrics = { path = "../metrics" } -solana = { path = "../solana" } -task-manager = { path = "../task_manager" } diff --git a/promotion_fund/README.md b/promotion_fund/README.md deleted file mode 100644 index f024562ac..000000000 --- a/promotion_fund/README.md +++ /dev/null @@ -1,18 +0,0 @@ -* Promotion Fund Server - -## S3 Inputs - -| File Type | Pattern | | -| :---- | :---- | :---- | -| ServiceProviderPromotionFundV1 | service_provider_promotion_fund.\* | [Proto](https://github.com/helium/proto/blob/map/subscriber-referral/src/service_provider.proto#L9) | - -## S3 Outpus - -| File Type | Pattern | | -| :---- | :---- | :---- | -| ServiceProviderPromotionFundV1 | service_provider_promotion_fund.\* | [Proto](https://github.com/helium/proto/blob/map/subscriber-referral/src/service_provider.proto#L9) | - - -## Server - -The server loads the latest Service Provider Promotion Funds from S3, and every `Settings.solana_check_interval` Promotion Allocation for each Service Provider in the [proto enum](https://github.com/helium/proto/blob/376765fe006051d6dcccf709def58e7ed291b845/src/service_provider.proto#L5). If the Basis Points returned are different from what is stored in S3, a new report is be report. diff --git a/promotion_fund/pkg/settings-template.toml b/promotion_fund/pkg/settings-template.toml deleted file mode 100644 index 1f7a7c3d8..000000000 --- a/promotion_fund/pkg/settings-template.toml +++ /dev/null @@ -1,57 +0,0 @@ -# RUST_LOG compatible settings string -# -log = "promotion_fund=info" - -# Destination of file to be watched for dynamically updating log level. -# Write a RUST_LOG compatible string to see new logs. -# -# custom_tracing = "./tracing.cfg" - -# Temporary storage for Service Provider Promotion Funds before uploading to S3 -# -file_sink_cache = "/tmp/oracles/promotion-fund" - -# How often to check with Solana for updates to Service Provider Funds -# -solana_check_interval = "6 hours" - -# On startup, how far back do we read from S3 to get the latest Service Provider -# Fund allocation values. -# -# lookback_start_after = 0 - -[solana] -# Solana RPC. This may contain a secret -# -rpc_url = "https://api.devnet.solana.com" - -# Public key for the DNT Mint (Mobile mint) -# -dnt_mint = "mb1eu7TzEc71KxDpsmsKoucSSuuoGLv1drys1oP2jh6" - -[file_store_output] -# Output bucket name for Service Provider Promotion Funds -# -bucket = "service-provider-promotions" - -# Region for bucket. Defaults to below -# -# region = "us-west-2" - -# Optional URL for AWS api endpoint. Inferred from aws config settings or aws -# IAM context by default -# -# endpoint = "https://aws-s3-bucket.aws.com" - -# Access Key when using S3 locally -# -# access_key_id = "" - -# Secret Key when using S3 locally -# -# secret_access_key = "" - -[metrics] -# Prometheus endpoint -# -# endpoint = "127.0.0.1:19001" diff --git a/promotion_fund/src/daemon.rs b/promotion_fund/src/daemon.rs deleted file mode 100644 index fb49bd176..000000000 --- a/promotion_fund/src/daemon.rs +++ /dev/null @@ -1,172 +0,0 @@ -use std::{collections::HashMap, time::Duration}; - -use anyhow::{Context, Result}; -use chrono::Utc; -use file_store::{ - file_info_poller::{FileInfoPollerParser, ProstFileInfoPollerParser}, - file_sink::FileSinkClient, - FileStore, FileType, -}; -use futures::TryFutureExt; -use helium_proto::{IntoEnumIterator, ServiceProvider, ServiceProviderPromotionFundV1}; -use solana::carrier::SolanaRpc; -use task_manager::ManagedTask; -use tokio::time::{self, Interval}; - -use crate::{compare_s3_and_solana_values, settings::Settings, Action, S3Value, SolanaValue}; - -const PROMOTION_FUND_LAST_SOLANA_FETCH_TIME: &str = "promotion_fund_last_solana_fetch_time"; - -pub struct Daemon { - s3_current: S3Value, - solana_client: SolanaRpc, - file_sink: FileSinkClient, - solana_check_interval: Interval, -} - -impl ManagedTask for Daemon { - fn start_task( - self: Box, - shutdown: triggered::Listener, - ) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> { - let handle = tokio::spawn(self.run(shutdown)); - - Box::pin( - handle - .map_err(anyhow::Error::from) - .and_then(|result| async move { result.map_err(anyhow::Error::from) }), - ) - } -} - -impl Daemon { - pub fn new( - s3_current: S3Value, - solana_client: SolanaRpc, - file_sink: FileSinkClient, - solana_check_interval: Option, - ) -> Self { - Self { - s3_current, - solana_client, - file_sink, - solana_check_interval: solana_check_interval.unwrap_or(time::interval(Duration::MAX)), - } - } - - pub async fn from_settings( - settings: &Settings, - file_sink: FileSinkClient, - ) -> anyhow::Result { - let s3_current = fetch_s3_bps(settings).await?; - let solana_client = SolanaRpc::new(&settings.solana).context("making solana client")?; - let check_timer = tokio::time::interval(settings.solana_check_interval); - - Ok(Self::new( - s3_current, - solana_client, - file_sink, - Some(check_timer), - )) - } - - pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - loop { - tokio::select! { - _ = shutdown.clone() => break, - _ = self.solana_check_interval.tick() => self.handle_tick().await? - } - } - - Ok(()) - } - - pub async fn handle_tick(&mut self) -> Result<()> { - let solana_current = match fetch_solana_bps(&self.solana_client).await { - Ok(solana_current) => { - metrics::gauge!(PROMOTION_FUND_LAST_SOLANA_FETCH_TIME) - .set(Utc::now().timestamp() as f64); - solana_current - } - Err(err) => { - tracing::error!(?err, "failed to get bps from solana"); - return Ok(()); - } - }; - - let action = compare_s3_and_solana_values(&self.s3_current, &solana_current); - match action { - Action::Noop => tracing::info!("nothing to do"), - Action::Write => { - tracing::info!(items = solana_current.len(), "writing new file"); - self.store_solana_values(&solana_current); - write_protos(&self.file_sink, solana_current).await?; - } - } - - Ok(()) - } - - fn store_solana_values(&mut self, promo_funds: &[ServiceProviderPromotionFundV1]) { - self.s3_current.clear(); - - for promo_fund_v1 in promo_funds { - self.s3_current - .insert(promo_fund_v1.service_provider, promo_fund_v1.bps); - } - } -} - -pub async fn fetch_s3_bps(settings: &Settings) -> anyhow::Result { - let file_store = FileStore::from_settings(&settings.file_store_output).await?; - let mut results = HashMap::new(); - - let all = file_store - .list_all( - FileType::ServiceProviderPromotionFund.to_str(), - settings.lookback_start_after, - None, - ) - .await?; - - if let Some(last) = all.last() { - let byte_stream = file_store.get_raw(&last.key).await?; - let data: Vec = - ProstFileInfoPollerParser.parse(byte_stream).await?; - for sp_promo_fund in data { - results.insert(sp_promo_fund.service_provider, sp_promo_fund.bps); - } - } - - Ok(results) -} - -pub async fn fetch_solana_bps(client: &SolanaRpc) -> anyhow::Result { - let mut results = Vec::new(); - for service_provider in ServiceProvider::iter() { - let bps = client - .fetch_incentive_escrow_fund_bps(&service_provider.to_string()) - .await - .with_context(|| format!("fetching solana bps for {service_provider:?}"))?; - - let proto = ServiceProviderPromotionFundV1 { - timestamp: Utc::now().timestamp_millis() as u64, - service_provider: service_provider.into(), - bps: bps as u32, - }; - results.push(proto); - } - - Ok(results) -} - -pub async fn write_protos( - file_sink: &FileSinkClient, - promo_funds: Vec, -) -> anyhow::Result<()> { - for proto in promo_funds { - file_sink.write(proto, []).await?.await??; - } - file_sink.commit().await?.await??; - Ok(()) -} diff --git a/promotion_fund/src/lib.rs b/promotion_fund/src/lib.rs deleted file mode 100644 index 900f97fb9..000000000 --- a/promotion_fund/src/lib.rs +++ /dev/null @@ -1,140 +0,0 @@ -use std::collections::HashMap; - -use file_store::{ - file_sink::FileSinkClient, - file_upload::FileUpload, - traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, - FileSink, -}; -use helium_proto::ServiceProviderPromotionFundV1; -use settings::Settings; - -pub mod daemon; -pub mod settings; - -type ServiceProviderInt = i32; -type BasisPoints = u32; - -type S3Value = HashMap; -type SolanaValue = Vec; - -#[derive(Debug, PartialEq)] -pub enum Action { - Write, - Noop, -} - -fn compare_s3_and_solana_values(s3_current: &S3Value, solana_current: &SolanaValue) -> Action { - for sp_fund in solana_current { - // A Service Provider missing from the S3 file only - // matters if their Solana BPS is >0. - let s3_bps = s3_current.get(&sp_fund.service_provider).unwrap_or(&0); - if s3_bps != &sp_fund.bps { - return Action::Write; - } - } - - Action::Noop -} - -pub async fn make_promotion_fund_file_sink( - settings: &Settings, - upload: FileUpload, -) -> anyhow::Result<( - FileSinkClient, - FileSink, -)> { - let (sink, sink_server) = ServiceProviderPromotionFundV1::file_sink( - &settings.file_sink_cache, - upload, - FileSinkCommitStrategy::Manual, - FileSinkRollTime::Default, - env!("CARGO_PKG_NAME"), - ) - .await?; - Ok((sink, sink_server)) -} - -#[cfg(test)] -mod tests { - use helium_proto::ServiceProvider; - - use super::*; - - #[test] - fn noop_when_nothing_in_s3_or_solana() { - let action = compare_s3_and_solana_values(&HashMap::new(), &vec![]); - assert_eq!(Action::Noop, action); - } - - #[test] - fn noop_when_new_solana_value_is_zero() { - let s3_promos = HashMap::from_iter([]); - let solana_promos = vec![ServiceProviderPromotionFundV1 { - timestamp: 0, - service_provider: ServiceProvider::HeliumMobile as i32, - bps: 0, - }]; - - let action = compare_s3_and_solana_values(&s3_promos, &solana_promos); - assert_eq!(Action::Noop, action); - } - - #[test] - fn noop_when_values_are_the_same() { - let s3_promos = HashMap::from_iter([(ServiceProvider::HeliumMobile as i32, 1)]); - let solana_promos = vec![ServiceProviderPromotionFundV1 { - timestamp: 0, - service_provider: ServiceProvider::HeliumMobile as i32, - bps: 1, - }]; - - let action = compare_s3_and_solana_values(&s3_promos, &solana_promos); - assert_eq!(Action::Noop, action); - } - - #[test] - fn write_when_new_values_from_solana() { - let sp_promos = vec![ServiceProviderPromotionFundV1 { - timestamp: 0, - service_provider: ServiceProvider::HeliumMobile as i32, - bps: 1, - }]; - - let action = compare_s3_and_solana_values(&HashMap::new(), &sp_promos); - assert_eq!(Action::Write, action); - } - - #[test] - fn write_when_solana_differs_from_s3() { - let s3_promos = HashMap::from_iter([(ServiceProvider::HeliumMobile as i32, 1)]); - let solana_promos = vec![ServiceProviderPromotionFundV1 { - timestamp: 0, - service_provider: ServiceProvider::HeliumMobile as i32, - bps: 2, - }]; - - let action = compare_s3_and_solana_values(&s3_promos, &solana_promos); - assert_eq!(Action::Write, action); - } - - #[test] - fn all_items_written_when_one_is_different() { - let s3_promos = HashMap::from_iter([(ServiceProvider::HeliumMobile as i32, 1), (1, 1)]); - let solana_promos = vec![ - ServiceProviderPromotionFundV1 { - timestamp: 0, - service_provider: ServiceProvider::HeliumMobile as i32, - bps: 2, - }, - ServiceProviderPromotionFundV1 { - timestamp: 0, - service_provider: 1, - bps: 1, - }, - ]; - - let action = compare_s3_and_solana_values(&s3_promos, &solana_promos); - assert_eq!(Action::Write, action); - } -} diff --git a/promotion_fund/src/main.rs b/promotion_fund/src/main.rs deleted file mode 100644 index 693d3bdcd..000000000 --- a/promotion_fund/src/main.rs +++ /dev/null @@ -1,119 +0,0 @@ -use std::{path::PathBuf, time::Duration}; - -use anyhow::{Context, Result}; -use clap::{Parser, Subcommand}; -use file_store::file_upload::FileUpload; -use helium_proto::ServiceProvider; -use humantime_serde::re::humantime::format_duration; -use promotion_fund::{ - daemon::{fetch_s3_bps, fetch_solana_bps, write_protos, Daemon}, - make_promotion_fund_file_sink, - settings::Settings, -}; -use solana::carrier::SolanaRpc; -use task_manager::TaskManager; - -#[derive(Debug, Parser)] -struct Cli { - #[clap(short, long)] - config: Option, - #[clap(subcommand)] - cmd: Cmd, -} - -#[derive(Debug, Subcommand)] -enum Cmd { - /// Fetch current values from Solana and output a file to S3 - /// - /// A file will be output regardless of how recently another file was - /// written to S3. - WriteSolana, - /// Print the current values from S3 - PrintS3, - /// Check Solana for new values every `solana_check_interval` - /// - /// When the values from Solana do not match the latest values in S3, a new - /// S3 file will be output. - Server, -} - -#[tokio::main] -async fn main() -> Result<()> { - let cli = Cli::parse(); - let settings = Settings::new(cli.config).context("reading settings")?; - custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?; - poc_metrics::start_metrics(&settings.metrics)?; - - match cli.cmd { - Cmd::WriteSolana => write_solana(&settings).await?, - Cmd::PrintS3 => print_s3(&settings).await?, - Cmd::Server => run_server(&settings).await?, - }; - - Ok(()) -} - -async fn run_server(settings: &Settings) -> Result<()> { - let (upload, upload_server) = FileUpload::from_settings_tm(&settings.file_store_output).await?; - let (promotion_funds_sink, promotion_fund_server) = - make_promotion_fund_file_sink(settings, upload).await?; - - let state = Daemon::from_settings(settings, promotion_funds_sink).await?; - - tracing::info!( - check_interval = %format_duration(settings.solana_check_interval), - metrics = %settings.metrics.endpoint, - "starting promotion_fund server" - ); - - TaskManager::builder() - .add_task(upload_server) - .add_task(promotion_fund_server) - .add_task(state) - .build() - .start() - .await -} - -async fn print_s3(settings: &Settings) -> Result<()> { - let s3_current = fetch_s3_bps(settings).await?; - if s3_current.is_empty() { - tracing::warn!("nothing read from s3"); - } - for (sp_int, bps) in s3_current.iter() { - let sp = ServiceProvider::try_from(*sp_int); - tracing::info!(?sp, bps); - } - - Ok(()) -} - -async fn write_solana(settings: &Settings) -> Result<()> { - let (trigger, listener) = triggered::trigger(); - let (upload, upload_server) = FileUpload::from_settings_tm(&settings.file_store_output).await?; - let (promotion_funds_sink, promotion_fund_server) = - make_promotion_fund_file_sink(settings, upload).await?; - - let handle = tokio::spawn(async move { - tokio::try_join!( - upload_server.run(listener.clone()), - promotion_fund_server.run(listener) - ) - }); - - let solana = SolanaRpc::new(&settings.solana).context("making solana client")?; - let promo_funds = fetch_solana_bps(&solana).await?; - write_protos(&promotion_funds_sink, promo_funds).await?; - tracing::info!("file written, waiting for upload..."); - - // allow time for the upload to s3 - tokio::time::sleep(Duration::from_secs(5)).await; - - trigger.trigger(); - if let Err(err) = handle.await { - tracing::warn!(?err, "something went wrong"); - return Err(anyhow::Error::from(err)); - } - - Ok(()) -} diff --git a/promotion_fund/src/settings.rs b/promotion_fund/src/settings.rs deleted file mode 100644 index c62a1c19e..000000000 --- a/promotion_fund/src/settings.rs +++ /dev/null @@ -1,70 +0,0 @@ -use std::{ - path::{Path, PathBuf}, - time::Duration, -}; - -use chrono::{DateTime, Utc}; -use config::{Config, Environment, File}; -use humantime_serde::re::humantime; - -#[derive(Debug, serde::Deserialize)] -pub struct Settings { - /// RUST_LOG compatible settings string. - #[serde(default = "default_log")] - pub log: String, - #[serde(default)] - pub custom_tracing: custom_tracing::Settings, - /// Temporary storage before writing to S3 - pub file_sink_cache: PathBuf, - /// How often to check for updates of service provider promotion values from - /// solana. (default: 6 hours) - #[serde(with = "humantime_serde", default = "default_solana_check_interval")] - pub solana_check_interval: Duration, - /// How far back do we go looking for the latest values in s3 on startup. If - /// there become many files, update this value to a window just past the - /// most recent. Value only used on startup. - /// (default: unix epoch) - #[serde(default = "default_lookback_start_after")] - pub lookback_start_after: DateTime, - /// Solana RPC settings - pub solana: solana::carrier::Settings, - /// File Store Bucket Settings - pub file_store_output: file_store::Settings, - /// Metrics Settings - pub metrics: poc_metrics::Settings, -} - -fn default_log() -> String { - "promotion_fund=info".to_string() -} - -fn default_solana_check_interval() -> Duration { - humantime::parse_duration("6 hours").unwrap() -} - -fn default_lookback_start_after() -> DateTime { - DateTime::UNIX_EPOCH -} - -impl Settings { - /// Load Settings from a given path. Settings are loaded from a given - /// optional path and can be overriden with environment variables. - /// - /// Environemnt overrides have the same name as the entries in the settings - /// file in uppercase and prefixed with "PROMO_". For example - /// "PROMO_LOG" will override the log setting. - pub fn new>(path: Option

) -> Result { - let mut builder = Config::builder(); - - if let Some(file) = path { - // Add optional settings file - builder = builder - .add_source(File::with_name(&file.as_ref().to_string_lossy()).required(false)); - } - - builder - .add_source(Environment::with_prefix("PROMO").separator("_")) - .build() - .and_then(|config| config.try_deserialize()) - } -}