Skip to content

Commit

Permalink
Mobile Packet Verifier Add new Metric for pending_dc_burn (#881)
Browse files Browse the repository at this point in the history
* Refactor mobile-packet-verifier to have pending_burns module

* small refactor around dc_to_burn

* add new metric for pending_dc_burns per payer

* remove todo!

* Update mobile_packet_verifier/src/burner.rs

Co-authored-by: Michael Jeffrey <[email protected]>

* update run loop in mobile-packet-verifier to have predictable ordering

* cast sum to bigint when initializing pending burns metric

---------

Co-authored-by: Michael Jeffrey <[email protected]>
  • Loading branch information
bbalser and michaeldjeffrey authored Oct 25, 2024
1 parent 38e3329 commit 6e12d90
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 132 deletions.
29 changes: 5 additions & 24 deletions mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ use helium_proto::services::poc_mobile::{
};
use sqlx::{Postgres, Transaction};

use crate::{event_ids, MobileConfigResolverExt};
use crate::{event_ids, pending_burns, MobileConfigResolverExt};

pub async fn accumulate_sessions(
mobile_config: &impl MobileConfigResolverExt,
conn: &mut Transaction<'_, Postgres>,
txn: &mut Transaction<'_, Postgres>,
verified_data_session_report_sink: &FileSinkClient<VerifiedDataTransferIngestReportV1>,
curr_file_ts: DateTime<Utc>,
reports: impl Stream<Item = DataTransferSessionIngestReport>,
) -> anyhow::Result<()> {
tokio::pin!(reports);

while let Some(report) = reports.next().await {
let report_validity = verify_report(conn, mobile_config, &report).await?;
let report_validity = verify_report(txn, mobile_config, &report).await?;
write_verified_report(
verified_data_session_report_sink,
report_validity,
Expand All @@ -37,26 +37,7 @@ pub async fn accumulate_sessions(
continue;
}

let event = report.report.data_transfer_usage;
sqlx::query(
r#"
INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $6)
ON CONFLICT (pub_key, payer) DO UPDATE SET
uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes,
downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes,
rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes,
last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp)
"#
)
.bind(event.pub_key)
.bind(event.payer)
.bind(event.upload_bytes as i64)
.bind(event.download_bytes as i64)
.bind(report.report.rewardable_bytes as i64)
.bind(curr_file_ts)
.execute(&mut *conn)
.await?;
pending_burns::save(&mut *txn, &report.report, curr_file_ts).await?;
}

Ok(())
Expand Down Expand Up @@ -125,7 +106,7 @@ mod tests {
use helium_proto::services::poc_mobile::DataTransferRadioAccessTechnology;
use sqlx::PgPool;

use crate::burner::DataTransferSession;
use crate::pending_burns::DataTransferSession;

use super::*;

Expand Down
103 changes: 11 additions & 92 deletions mobile_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,10 @@
use chrono::{DateTime, Utc};
use file_store::{file_sink::FileSinkClient, traits::TimestampEncode};
use file_store::file_sink::FileSinkClient;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::packet_verifier::ValidDataTransferSession;
use solana::burn::SolanaNetwork;
use sqlx::{FromRow, Pool, Postgres};
use std::collections::HashMap;
use sqlx::{Pool, Postgres};

#[derive(FromRow)]
pub struct DataTransferSession {
pub_key: PublicKeyBinary,
payer: PublicKeyBinary,
uploaded_bytes: i64,
downloaded_bytes: i64,
rewardable_bytes: i64,
first_timestamp: DateTime<Utc>,
last_timestamp: DateTime<Utc>,
}

#[derive(Default)]
pub struct PayerTotals {
total_dcs: u64,
sessions: Vec<DataTransferSession>,
}

impl PayerTotals {
fn push_sess(&mut self, sess: DataTransferSession) {
self.total_dcs += bytes_to_dc(sess.rewardable_bytes as u64);
self.sessions.push(sess);
}
}
use crate::pending_burns;

pub struct Burner<S> {
valid_sessions: FileSinkClient<ValidDataTransferSession>,
Expand All @@ -44,49 +20,17 @@ impl<S> Burner<S> {
}
}

#[derive(thiserror::Error, Debug)]
pub enum BurnError<E> {
#[error("file store error: {0}")]
FileStoreError(#[from] file_store::Error),
#[error("sql error: {0}")]
SqlError(#[from] sqlx::Error),
#[error("solana error: {0}")]
SolanaError(E),
}

impl<S> Burner<S>
where
S: SolanaNetwork,
{
pub async fn burn(&self, pool: &Pool<Postgres>) -> Result<(), BurnError<S::Error>> {
// Fetch all of the sessions
let sessions: Vec<DataTransferSession> =
sqlx::query_as("SELECT * FROM data_transfer_sessions")
.fetch_all(pool)
.await?;

// Fetch all of the sessions and group by the payer
let mut payer_totals = HashMap::<PublicKeyBinary, PayerTotals>::new();
for session in sessions.into_iter() {
payer_totals
.entry(session.payer.clone())
.or_default()
.push_sess(session);
}
pub async fn burn(&self, pool: &Pool<Postgres>) -> anyhow::Result<()> {
for payer_pending_burn in pending_burns::get_all_payer_burns(pool).await? {
let payer = payer_pending_burn.payer;
let total_dcs = payer_pending_burn.total_dcs;
let sessions = payer_pending_burn.sessions;

for (
payer,
PayerTotals {
total_dcs,
sessions,
},
) in payer_totals.into_iter()
{
let payer_balance = self
.solana
.payer_balance(&payer)
.await
.map_err(BurnError::SolanaError)?;
let payer_balance = self.solana.payer_balance(&payer).await?;

if payer_balance < total_dcs {
tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs");
Expand All @@ -107,28 +51,11 @@ where
.increment(total_dcs);

// Delete from the data transfer session and write out to S3

sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1")
.bind(&payer)
.execute(pool)
.await?;
pending_burns::delete_for_payer(pool, &payer, total_dcs).await?;

for session in sessions {
let num_dcs = bytes_to_dc(session.rewardable_bytes as u64);
self.valid_sessions
.write(
ValidDataTransferSession {
pub_key: session.pub_key.into(),
payer: session.payer.into(),
upload_bytes: session.uploaded_bytes as u64,
download_bytes: session.downloaded_bytes as u64,
rewardable_bytes: session.rewardable_bytes as u64,
num_dcs,
first_timestamp: session.first_timestamp.encode_timestamp_millis(),
last_timestamp: session.last_timestamp.encode_timestamp_millis(),
},
&[],
)
.write(ValidDataTransferSession::from(session), &[])
.await?;
}
}
Expand All @@ -146,11 +73,3 @@ where
Ok(())
}
}

const BYTES_PER_DC: u64 = 20_000;

fn bytes_to_dc(bytes: u64) -> u64 {
let bytes = bytes.max(BYTES_PER_DC);
// Integer div/ceil from: https://stackoverflow.com/a/2745086
(bytes + BYTES_PER_DC - 1) / BYTES_PER_DC
}
35 changes: 19 additions & 16 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
burner::Burner, event_ids::EventIdPurger, settings::Settings, MobileConfigClients,
MobileConfigResolverExt,
burner::Burner, event_ids::EventIdPurger, pending_burns, settings::Settings,
MobileConfigClients, MobileConfigResolverExt,
};
use anyhow::{bail, Result};
use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -73,23 +73,13 @@ where
S: SolanaNetwork,
MCR: MobileConfigResolverExt,
{
pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> {
pub async fn run(mut self, mut shutdown: triggered::Listener) -> Result<()> {
// Set the initial burn period to one minute
let mut burn_time = Instant::now() + Duration::from_secs(60);
loop {
tokio::select! {
file = self.reports.recv() => {
let Some(file) = file else {
anyhow::bail!("FileInfoPoller sender was dropped unexpectedly");
};
tracing::info!("Verifying file: {}", file.file_info);
let ts = file.file_info.timestamp;
let mut transaction = self.pool.begin().await?;
let reports = file.into_stream(&mut transaction).await?;
crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?;
transaction.commit().await?;
self.verified_data_session_report_sink.commit().await?;
},
biased;
_ = &mut shutdown => return Ok(()),
_ = sleep_until(burn_time) => {
// It's time to burn
match self.burner.burn(&self.pool).await {
Expand All @@ -102,7 +92,18 @@ where
}
}
}
_ = shutdown.clone() => return Ok(()),
file = self.reports.recv() => {
let Some(file) = file else {
anyhow::bail!("FileInfoPoller sender was dropped unexpectedly");
};
tracing::info!("Verifying file: {}", file.file_info);
let ts = file.file_info.timestamp;
let mut transaction = self.pool.begin().await?;
let reports = file.into_stream(&mut transaction).await?;
crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?;
transaction.commit().await?;
self.verified_data_session_report_sink.commit().await?;
}
}
}
}
Expand All @@ -119,6 +120,8 @@ impl Cmd {
let pool = settings.database.connect("mobile-packet-verifier").await?;
sqlx::migrate!().run(&pool).await?;

pending_burns::initialize(&pool).await?;

// Set up the solana network:
let solana = if settings.enable_solana_integration {
let Some(ref solana_settings) = settings.solana else {
Expand Down
1 change: 1 addition & 0 deletions mobile_packet_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod accumulate;
pub mod burner;
pub mod daemon;
pub mod event_ids;
pub mod pending_burns;
pub mod settings;

pub struct MobileConfigClients {
Expand Down
Loading

0 comments on commit 6e12d90

Please sign in to comment.