Skip to content

Commit

Permalink
Add promotion funds workspace
Browse files Browse the repository at this point in the history
This workspace is all about dealing with Service Provider Promotion Fund
allocation.

HIP-114
https://github.com/helium/HIP/blob/main/0114-incentive-escrow-fund-for-subscriber-referrals.md

Service Provider Promotions are stored in CarrierV0 on Solana.
To keep the mobile-verifier from talking to a chain, this service will
periodically check Solana and compare Service Providers allocations to
what is stored in S3.

If the values have changed, a new file will be output to a bucket for
the mobile-verifier rewarder to read from.

NOTE: Allocation Values are stored in Bps (Basis Points)
https://www.investopedia.com/terms/b/basispoint.asp

** Commands

*** ./promotion_fund write-solana

Fetch Allocation values from Solana and write them to S3.
This command _always_ writes an S3 file.

*** ./promotion_fund print-s3

Using the lookback time in the provided settings file, show the
Allocation values this service would start up with.

*** ./promotion_fund server

Start a server that reads from S3, then checks with Solana periodically
for updated Allocatino values. Writing new files when needed.
  • Loading branch information
michaeldjeffrey committed Sep 19, 2024
1 parent 8525c3c commit 1b0fe2c
Show file tree
Hide file tree
Showing 11 changed files with 666 additions and 0 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ members = [
"mobile_verifier",
"poc_entropy",
"price",
"promotion_fund",
"reward_index",
"reward_scheduler",
"solana",
Expand Down
30 changes: 30 additions & 0 deletions promotion_fund/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "promotion_fund"
version = "0.1.0"
description = "Service Provider promotion fund tracking for the Helium Network"
authors.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
config = { workspace = true }
futures = { workspace = true }
helium-proto = { workspace = true }
humantime-serde = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
triggered = { workspace = true }

custom-tracing = { path = "../custom_tracing" }
file-store = { path = "../file_store" }
poc-metrics = { path = "../metrics" }
solana = { path = "../solana" }
task-manager = { path = "../task_manager" }
18 changes: 18 additions & 0 deletions promotion_fund/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
* Promotion Fund Server

## S3 Inputs

| File Type | Pattern | |
| :---- | :---- | :---- |
| ServiceProviderPromotionFundV1 | service_provider_promotion_fund.\* | [Proto](https://github.com/helium/proto/blob/map/subscriber-referral/src/service_provider.proto#L9) |

## S3 Outpus

| File Type | Pattern | |
| :---- | :---- | :---- |
| ServiceProviderPromotionFundV1 | service_provider_promotion_fund.\* | [Proto](https://github.com/helium/proto/blob/map/subscriber-referral/src/service_provider.proto#L9) |


## Server

The server loads the latest Service Provider Promotion Funds from S3, and every `Settings.solana_check_interval` Promotion Allocation for each Service Provider in the [proto enum](https://github.com/helium/proto/blob/376765fe006051d6dcccf709def58e7ed291b845/src/service_provider.proto#L5). If the Basis Points returned are different from what is stored in S3, a new report is be report.
52 changes: 52 additions & 0 deletions promotion_fund/pkg/settings-template.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# RUST_LOG compatible settings string
#
log = "promotion_fund=info"

# Destination of file to be watched for dynamically updating log level.
# Write a RUST_LOG compatible string to see new logs.
#
# custom_tracing = "./tracing.cfg"

# Temporary storage for Service Provider Promotion Funds before uploading to S3
#
file_sink_cache = "/tmp/oracles/promotion-fund"

# How often to check with Solana for updates to Service Provider Funds
#
solana_check_interval = "6 hours"

[solana]
# Solana RPC. This may contain a secret
#
rpc_url = "https://api.devnet.solana.com"

# Public key for the DNT Mint (Mobile mint)
#
dnt_mint = "mb1eu7TzEc71KxDpsmsKoucSSuuoGLv1drys1oP2jh6"

[file_store_output]
# Output bucket name for Service Provider Promotion Funds
#
bucket = "service-provider-promotions"

# Region for bucket. Defaults to below
#
# region = "us-west-2"

# Optional URL for AWS api endpoint. Inferred from aws config settings or aws
# IAM context by default
#
# endpoint = "https://aws-s3-bucket.aws.com"

# Access Key when using S3 locally
#
# access_key_id = ""

# Secret Key when using S3 locally
#
# secret_access_key = ""

[metrics]
# Prometheus endpoint
#
# endpoint = "127.0.0.1:19001"
168 changes: 168 additions & 0 deletions promotion_fund/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::{collections::HashMap, time::Duration};

use anyhow::{Context, Result};
use chrono::Utc;
use file_store::{
file_info_poller::{FileInfoPollerParser, ProstFileInfoPollerParser},
file_sink::FileSinkClient,
FileStore, FileType,
};
use futures::TryFutureExt;
use helium_proto::{IntoEnumIterator, ServiceProvider, ServiceProviderPromotionFundV1};
use solana::carrier::SolanaRpc;
use task_manager::ManagedTask;
use tokio::time::{self, Interval};

use crate::{compare_s3_and_solana_values, settings::Settings, Action, S3Value, SolanaValue};

const PROMOTION_FUND_LAST_SOLANA_FETCH_TIME: &str = "promotion_fund_last_solana_fetch_time";

pub struct Daemon {
s3_current: S3Value,
solana_client: SolanaRpc,
file_sink: FileSinkClient<ServiceProviderPromotionFundV1>,
solana_check_interval: Interval,
}

impl ManagedTask for Daemon {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> {
let handle = tokio::spawn(self.run(shutdown));

Box::pin(
handle
.map_err(anyhow::Error::from)
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
)
}
}

impl Daemon {
pub fn new(
s3_current: S3Value,
solana_client: SolanaRpc,
file_sink: FileSinkClient<ServiceProviderPromotionFundV1>,
solana_check_interval: Option<Interval>,
) -> Self {
Self {
s3_current,
solana_client,
file_sink,
solana_check_interval: solana_check_interval.unwrap_or(time::interval(Duration::MAX)),
}
}

pub async fn from_settings(
settings: &Settings,
file_sink: FileSinkClient<ServiceProviderPromotionFundV1>,
) -> anyhow::Result<Self> {
let s3_current = fetch_s3_bps(&settings.file_store_output).await?;
let solana_client = SolanaRpc::new(&settings.solana).context("making solana client")?;
let check_timer = tokio::time::interval(settings.solana_check_interval);

Ok(Self::new(
s3_current,
solana_client,
file_sink,
Some(check_timer),
))
}

pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> {
loop {
tokio::select! {
_ = shutdown.clone() => break,
_ = self.solana_check_interval.tick() => self.handle_tick().await?
}
}

Ok(())
}

pub async fn handle_tick(&mut self) -> Result<()> {
let solana_current = match fetch_solana_bps(&self.solana_client).await {
Ok(solana_current) => {
metrics::gauge!(PROMOTION_FUND_LAST_SOLANA_FETCH_TIME)
.set(Utc::now().timestamp() as f64);
solana_current
}
Err(err) => {
tracing::error!(?err, "failed to get bps from solana");
return Ok(());
}
};

let action = compare_s3_and_solana_values(&self.s3_current, &solana_current);
match action {
Action::Noop => tracing::info!("nothing to do"),
Action::Write => {
tracing::info!(items = solana_current.len(), "writing new file");
self.store_solana_values(&solana_current);
write_protos(&self.file_sink, solana_current).await?;
}
}

Ok(())
}

fn store_solana_values(&mut self, promo_funds: &[ServiceProviderPromotionFundV1]) {
self.s3_current.clear();

for promo_fund_v1 in promo_funds {
self.s3_current
.insert(promo_fund_v1.service_provider, promo_fund_v1.bps);
}
}
}

pub async fn fetch_s3_bps(settings: &file_store::Settings) -> anyhow::Result<S3Value> {
let file_store = FileStore::from_settings(settings).await?;
let mut results = HashMap::new();

let all = file_store
.list_all(FileType::ServiceProviderPromotionFund.to_str(), None, None)
.await?;

if let Some(last) = all.last() {
let byte_stream = file_store.get_raw(&last.key).await?;
let data: Vec<ServiceProviderPromotionFundV1> =
ProstFileInfoPollerParser.parse(byte_stream).await?;
for sp_promo_fund in data {
results.insert(sp_promo_fund.service_provider, sp_promo_fund.bps);
}
}

Ok(results)
}

pub async fn fetch_solana_bps(client: &SolanaRpc) -> anyhow::Result<SolanaValue> {
let mut results = Vec::new();
for service_provider in ServiceProvider::iter() {
let bps = client
.fetch_incentive_escrow_fund_bps(&service_provider.to_string())
.await
.with_context(|| format!("fetching solana bps for {service_provider:?}"))?;

let proto = ServiceProviderPromotionFundV1 {
timestamp: Utc::now().timestamp_millis() as u64,
service_provider: service_provider.into(),
bps: bps as u32,
};
results.push(proto);
}

Ok(results)
}

pub async fn write_protos(
file_sink: &FileSinkClient<ServiceProviderPromotionFundV1>,
promo_funds: Vec<ServiceProviderPromotionFundV1>,
) -> anyhow::Result<()> {
for proto in promo_funds {
file_sink.write(proto, []).await?.await??;
}
file_sink.commit().await?.await??;
Ok(())
}
Loading

0 comments on commit 1b0fe2c

Please sign in to comment.