Skip to content

Commit

Permalink
Merge pull request #716 from helium/andymck/hex-boosting-support
Browse files Browse the repository at this point in the history
hex boosting support
  • Loading branch information
andymck authored Feb 12, 2024
2 parents c2bb156 + 308b248 commit d079cb1
Show file tree
Hide file tree
Showing 60 changed files with 4,940 additions and 672 deletions.
637 changes: 502 additions & 135 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ debug = true

[workspace]
members = [
"boost_manager",
"db_store",
"denylist",
"file_store",
Expand Down
52 changes: 52 additions & 0 deletions boost_manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[package]
name = "boost-manager"
version = "0.1.0"
description = "Hex boosting manager"
edition.workspace = true
authors.workspace = true
license.workspace = true


[dependencies]
anyhow = {workspace = true}
anchor-lang = "0.28"
anchor-spl = "0.28"
axum = {version = "0", features = ["tracing"]}
bs58 = {workspace = true}
config = {workspace = true}
clap = {workspace = true}
thiserror = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}
sqlx = {workspace = true}
base64 = {workspace = true}
sha2 = {workspace = true}
lazy_static = {workspace = true}
triggered = {workspace = true}
futures = {workspace = true}
futures-util = {workspace = true}
prost = {workspace = true}
once_cell = {workspace = true}
mobile-config = {path = "../mobile_config"}
file-store = {path = "../file_store"}
db-store = { path = "../db_store" }
poc-metrics = {path = "../metrics"}
tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
metrics = {workspace = true }
metrics-exporter-prometheus = { workspace = true }
helium-proto = { workspace = true }
helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]}
rust_decimal = {workspace = true}
rust_decimal_macros = {workspace = true}
tonic = {workspace = true}
rand = {workspace = true}
async-trait = {workspace = true}
task-manager = { path = "../task_manager" }
http = {workspace = true}
http-serde = {workspace = true}
solana = {path = "../solana"}
solana-sdk = {workspace = true}
10 changes: 10 additions & 0 deletions boost_manager/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Boost Manager

### S3 Inputs

| File Type | Pattern | |
| :--- |:-----------------------| :-- |
| RewardManifest | reward_manifest.\* | [Proto](https://github.com/helium/proto/blob/149997d2a74e08679e56c2c892d7e46f2d0d1c46/src/reward_manifest.proto#L5) |
| MobileRewardShare | mobile_reward_share.\* | [Proto](https://github.com/helium/proto/blob/149997d2a74e08679e56c2c892d7e46f2d0d1c46/src/service/poc_lora.proto#L171) |


27 changes: 27 additions & 0 deletions boost_manager/migrations/1_setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- This extension gives us `uuid_generate_v1mc()` which generates UUIDs that cluster better than `gen_random_uuid()`
-- while still being difficult to predict and enumerate.
-- Also, while unlikely, `gen_random_uuid()` can in theory produce collisions which can trigger spurious errors on
-- insertion, whereas it's much less likely with `uuid_generate_v1mc()`.
create extension if not exists "uuid-ossp";

create or replace function set_updated_at()
returns trigger as
$$
begin
NEW.updated_at = now();
return NEW;
end;
$$ language plpgsql;

create or replace function trigger_updated_at(tablename regclass)
returns void as
$$
begin
execute format('CREATE TRIGGER set_updated_at
BEFORE UPDATE
ON %s
FOR EACH ROW
WHEN (OLD is distinct from NEW)
EXECUTE FUNCTION set_updated_at();', tablename);
end;
$$ language plpgsql;
4 changes: 4 additions & 0 deletions boost_manager/migrations/2_meta.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
create table meta (
key text primary key not null,
value text
);
20 changes: 20 additions & 0 deletions boost_manager/migrations/3_activated_hexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE TYPE onchain_status AS ENUM (
'queued',
'pending',
'success',
'failed',
'cancelled'
);

create table activated_hexes (
location bigint primary key not null,
activation_ts timestamptz not null,
boosted_hex_pubkey text not null,
boost_config_pubkey text not null,
status onchain_status not null,
txn_id text,
retries integer not null default 0,
inserted_at timestamptz default now(),
updated_at timestamptz default now()
);

7 changes: 7 additions & 0 deletions boost_manager/migrations/4_files_processed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE files_processed (
file_name VARCHAR PRIMARY KEY,
file_type VARCHAR NOT NULL,
file_timestamp TIMESTAMPTZ NOT NULL,
processed_at TIMESTAMPTZ NOT NULL,
process_name text not null default 'default'
);
44 changes: 44 additions & 0 deletions boost_manager/pkg/settings-template.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
log = "boost_manager=info,solana=debug"

# Cache location for generated boost manager outputs; Required
cache = "/tmp/oracles/boost-manager"

start_after = 1702602001

enable_solana_integration = true

activation_check_interval = 30

[solana]
# Solana RPC. This may contain a secret
rpc_url = "https://api.devnet.solana.com"
# Path to the keypair used to sign data credit burn solana transactions
start_authority_keypair = ""
# Public key of the hex boost authority
hexboost_authority_pubkey = ""
# Solana cluster to use. "devnet" or "mainnet"
cluster = "devnet"

#
[database]
url = "postgresql://postgres:postgres@localhost:5432/hexboosting"
# Max connections to the database.
max_connections = 10

[verifier]
bucket = "mobile-verified"

[output]
bucket = "mobile-verified"

[mobile_config_client]
url = "http://localhost:6090"
config_pubkey = ""
signing_keypair = ""


[metrics]

# Endpoint for metrics. Default below
#
endpoint = "127.0.0.1:19001"
153 changes: 153 additions & 0 deletions boost_manager/src/activator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use crate::{db, telemetry};
use anyhow::Result;
use chrono::{DateTime, Utc};
use file_store::{
file_info_poller::FileInfoStream, reward_manifest::RewardManifest, FileInfo, FileStore,
};
use futures::{future::LocalBoxFuture, stream, StreamExt, TryFutureExt, TryStreamExt};
use helium_proto::{
services::poc_mobile::{
mobile_reward_share::Reward as MobileReward, BoostedHex as BoostedHexProto,
MobileRewardShare,
},
Message,
};
use mobile_config::{
boosted_hex_info::BoostedHexes,
client::{hex_boosting_client::HexBoostingInfoResolver, ClientError},
};
use poc_metrics::record_duration;
use sqlx::{Pool, Postgres, Transaction};
use std::str::FromStr;
use task_manager::ManagedTask;
use tokio::sync::mpsc::Receiver;

pub struct Activator<A> {
pool: Pool<Postgres>,
verifier_store: FileStore,
receiver: Receiver<FileInfoStream<RewardManifest>>,
hex_boosting_client: A,
}

impl<A> ManagedTask for Activator<A>
where
A: HexBoostingInfoResolver<Error = ClientError>,
{
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
let handle = tokio::spawn(self.run(shutdown));
Box::pin(
handle
.map_err(anyhow::Error::from)
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
)
}
}

impl<A> Activator<A>
where
A: HexBoostingInfoResolver<Error = ClientError>,
{
pub async fn new(
pool: Pool<Postgres>,
receiver: Receiver<FileInfoStream<RewardManifest>>,
hex_boosting_client: A,
verifier_store: FileStore,
) -> Result<Self> {
Ok(Self {
pool,
receiver,
hex_boosting_client,
verifier_store,
})
}

pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> {
tracing::info!("starting Activator");
loop {
tokio::select! {
biased;
_ = shutdown.clone() => break,
msg = self.receiver.recv() => if let Some(file_info_stream) = msg {
let key = &file_info_stream.file_info.key.clone();
tracing::info!(file = %key, "Received reward manifest file");

let mut txn = self.pool.begin().await?;
let mut stream = file_info_stream.into_stream(&mut txn).await?;

while let Some(reward_manifest) = stream.next().await {
record_duration!(
"reward_index_duration",
self.handle_rewards(&mut txn, reward_manifest).await?
)
}
txn.commit().await?;
tracing::info!(file = %key, "Completed processing reward file");
telemetry::last_reward_processed_time(&self.pool, Utc::now()).await?;
}
}
}
tracing::info!("stopping Activator");
Ok(())
}

async fn handle_rewards(
&mut self,
txn: &mut Transaction<'_, Postgres>,
manifest: RewardManifest,
) -> Result<()> {
// get latest boosted hexes info from mobile config
let boosted_hexes = BoostedHexes::get_all(&self.hex_boosting_client).await?;

// get the rewards file from the manifest
let manifest_time = manifest.end_timestamp;
let reward_files = stream::iter(
manifest
.written_files
.into_iter()
.map(|file_name| FileInfo::from_str(&file_name)),
)
.boxed();

// read in the rewards file
let mut reward_shares = self.verifier_store.source_unordered(5, reward_files);

while let Some(msg) = reward_shares.try_next().await? {
let share = MobileRewardShare::decode(msg)?;
if let Some(MobileReward::RadioReward(r)) = share.reward {
for hex in r.boosted_hexes.into_iter() {
process_boosted_hex(txn, manifest_time, &boosted_hexes, &hex).await?
}
}
}
Ok(())
}
}

pub async fn process_boosted_hex(
txn: &mut Transaction<'_, Postgres>,
manifest_time: DateTime<Utc>,
boosted_hexes: &BoostedHexes,
hex: &BoostedHexProto,
) -> Result<()> {
match boosted_hexes.hexes.get(&hex.location) {
Some(info) => {
if info.start_ts.is_none() {
db::insert_activated_hex(
txn,
hex.location,
&info.boosted_hex_pubkey,
&info.boost_config_pubkey,
manifest_time,
)
.await?;
}
}
None => {
tracing::warn!(hex = %hex.location, "got an invalid boosted hex");
}
}
Ok(())
}
Loading

0 comments on commit d079cb1

Please sign in to comment.