Skip to content

Commit

Permalink
add new metric for pending_dc_burns per payer
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Oct 23, 2024
1 parent a41456c commit ecd8915
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 9 deletions.
2 changes: 1 addition & 1 deletion mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ mod tests {
use helium_proto::services::poc_mobile::DataTransferRadioAccessTechnology;
use sqlx::PgPool;

use crate::burner::DataTransferSession;
use crate::pending_burns::DataTransferSession;

use super::*;

Expand Down
4 changes: 2 additions & 2 deletions mobile_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ where
tracing::info!(%total_dcs, %payer, "Burning DC");
if self.burn_data_credits(&payer, total_dcs).await.is_err() {
// We have failed to burn data credits:
metrics::counter!("burned", "payer" => payer.to_string(), "success" => "false")
metrics::counter!("burned", "payer" => payer.to_string(), "success" => "alse")
.increment(total_dcs);
continue;
}
Expand All @@ -51,7 +51,7 @@ where
.increment(total_dcs);

// Delete from the data transfer session and write out to S3
pending_burns::delete_for_payer(pool, &payer).await?;
pending_burns::delete_for_payer(pool, &payer, total_dcs).await?;

for session in sessions {
self.valid_sessions
Expand Down
6 changes: 4 additions & 2 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
burner::Burner, event_ids::EventIdPurger, settings::Settings, MobileConfigClients,
MobileConfigResolverExt,
burner::Burner, event_ids::EventIdPurger, pending_burns, settings::Settings,
MobileConfigClients, MobileConfigResolverExt,
};
use anyhow::{bail, Result};
use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -119,6 +119,8 @@ impl Cmd {
let pool = settings.database.connect("mobile-packet-verifier").await?;
sqlx::migrate!().run(&pool).await?;

pending_burns::initialize(&pool).await?;

// Set up the solana network:
let solana = if settings.enable_solana_integration {
let Some(ref solana_settings) = settings.solana else {
Expand Down
51 changes: 47 additions & 4 deletions mobile_packet_verifier/src/pending_burns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use chrono::{DateTime, Utc};
use file_store::{mobile_session::DataTransferSessionReq, traits::TimestampEncode};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::packet_verifier::ValidDataTransferSession;
use sqlx::{prelude::FromRow, Pool, Postgres, Transaction};
use sqlx::{prelude::FromRow, Pool, Postgres, Row, Transaction};

const METRIC_NAME: &str = "pending_dc_burn";

#[derive(FromRow)]
pub struct DataTransferSession {
Expand All @@ -26,6 +28,7 @@ impl DataTransferSession {
impl From<DataTransferSession> for ValidDataTransferSession {
fn from(session: DataTransferSession) -> Self {
let num_dcs = session.dc_to_burn();

ValidDataTransferSession {
pub_key: session.pub_key.into(),
payer: session.payer.into(),
Expand All @@ -45,6 +48,27 @@ pub struct PendingPayerBurn {
pub sessions: Vec<DataTransferSession>,
}

pub async fn initialize(conn: &Pool<Postgres>) -> anyhow::Result<()> {
let results = sqlx::query(
r#"
SELECT payer, sum(rewardable_bytes) as total_rewardable_bytes
FROM data_transfer_sessions
GROUP BY payer
"#,
)
.fetch_all(conn)
.await?;

for row in results {
let payer: PublicKeyBinary = row.get("payer");
let total_rewardable_bytes: u64 = row.get::<i64, _>("total_rewardable_bytes") as u64;

set_metric(&payer, bytes_to_dc(total_rewardable_bytes));
}

todo!()
}

pub async fn get_all(conn: &Pool<Postgres>) -> anyhow::Result<Vec<DataTransferSession>> {
sqlx::query_as("SELECT * FROM data_transfer_sessions")
.fetch_all(conn)
Expand Down Expand Up @@ -92,6 +116,8 @@ pub async fn save(
req: &DataTransferSessionReq,
last_timestamp: DateTime<Utc>,
) -> anyhow::Result<()> {
let dc_to_burn = bytes_to_dc(req.rewardable_bytes);

sqlx::query(
r#"
INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp)
Expand All @@ -112,19 +138,36 @@ pub async fn save(
.execute(txn)
.await?;

increment_metric(&req.data_transfer_usage.payer, dc_to_burn);

Ok(())
}

pub async fn delete_for_payer(
conn: &Pool<Postgres>,
payer: &PublicKeyBinary,
burnt_dc: u64,
) -> anyhow::Result<()> {
sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1")
.bind(payer)
.execute(conn)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
.await?;

decrement_metric(payer, burnt_dc);

Ok(())
}

fn set_metric(payer: &PublicKeyBinary, value: u64) {
metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).set(value as f64);
}

fn increment_metric(payer: &PublicKeyBinary, value: u64) {
metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).increment(value as f64);
}

fn decrement_metric(payer: &PublicKeyBinary, value: u64) {
metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).decrement(value as f64);
}

const BYTES_PER_DC: u64 = 20_000;
Expand Down

0 comments on commit ecd8915

Please sign in to comment.