From ee390054fedabdaf71e6e4007b2b798c57443536 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 23 Oct 2024 08:37:18 -0400 Subject: [PATCH 1/7] Refactor mobile-packet-verifier to have pending_burns module --- mobile_packet_verifier/src/accumulate.rs | 27 +--- mobile_packet_verifier/src/burner.rs | 103 ++-------------- mobile_packet_verifier/src/lib.rs | 1 + mobile_packet_verifier/src/pending_burns.rs | 129 ++++++++++++++++++++ 4 files changed, 145 insertions(+), 115 deletions(-) create mode 100644 mobile_packet_verifier/src/pending_burns.rs diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 88947194a..214363e62 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -9,11 +9,11 @@ use helium_proto::services::poc_mobile::{ }; use sqlx::{Postgres, Transaction}; -use crate::{event_ids, MobileConfigResolverExt}; +use crate::{event_ids, pending_burns, MobileConfigResolverExt}; pub async fn accumulate_sessions( mobile_config: &impl MobileConfigResolverExt, - conn: &mut Transaction<'_, Postgres>, + txn: &mut Transaction<'_, Postgres>, verified_data_session_report_sink: &FileSinkClient, curr_file_ts: DateTime, reports: impl Stream, @@ -21,7 +21,7 @@ pub async fn accumulate_sessions( tokio::pin!(reports); while let Some(report) = reports.next().await { - let report_validity = verify_report(conn, mobile_config, &report).await?; + let report_validity = verify_report(txn, mobile_config, &report).await?; write_verified_report( verified_data_session_report_sink, report_validity, @@ -37,26 +37,7 @@ pub async fn accumulate_sessions( continue; } - let event = report.report.data_transfer_usage; - sqlx::query( - r#" - INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) - VALUES ($1, $2, $3, $4, $5, $6, $6) - ON CONFLICT (pub_key, payer) DO UPDATE SET - uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, - downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, - rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes, - last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) - "# - ) - .bind(event.pub_key) - .bind(event.payer) - .bind(event.upload_bytes as i64) - .bind(event.download_bytes as i64) - .bind(report.report.rewardable_bytes as i64) - .bind(curr_file_ts) - .execute(&mut *conn) - .await?; + pending_burns::save(&mut *txn, &report.report, curr_file_ts).await?; } Ok(()) diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index f3cb54617..78c424d87 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -1,34 +1,10 @@ -use chrono::{DateTime, Utc}; -use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; +use file_store::file_sink::FileSinkClient; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; use solana::burn::SolanaNetwork; -use sqlx::{FromRow, Pool, Postgres}; -use std::collections::HashMap; +use sqlx::{Pool, Postgres}; -#[derive(FromRow)] -pub struct DataTransferSession { - pub_key: PublicKeyBinary, - payer: PublicKeyBinary, - uploaded_bytes: i64, - downloaded_bytes: i64, - rewardable_bytes: i64, - first_timestamp: DateTime, - last_timestamp: DateTime, -} - -#[derive(Default)] -pub struct PayerTotals { - total_dcs: u64, - sessions: Vec, -} - -impl PayerTotals { - fn push_sess(&mut self, sess: DataTransferSession) { - self.total_dcs += bytes_to_dc(sess.rewardable_bytes as u64); - self.sessions.push(sess); - } -} +use crate::pending_burns; pub struct Burner { valid_sessions: FileSinkClient, @@ -44,49 +20,17 @@ impl Burner { } } -#[derive(thiserror::Error, Debug)] -pub enum BurnError { - #[error("file store error: {0}")] - FileStoreError(#[from] file_store::Error), - #[error("sql error: {0}")] - SqlError(#[from] sqlx::Error), - #[error("solana error: {0}")] - SolanaError(E), -} - impl Burner where S: SolanaNetwork, { - pub async fn burn(&self, pool: &Pool) -> Result<(), BurnError> { - // Fetch all of the sessions - let sessions: Vec = - sqlx::query_as("SELECT * FROM data_transfer_sessions") - .fetch_all(pool) - .await?; - - // Fetch all of the sessions and group by the payer - let mut payer_totals = HashMap::::new(); - for session in sessions.into_iter() { - payer_totals - .entry(session.payer.clone()) - .or_default() - .push_sess(session); - } + pub async fn burn(&self, pool: &Pool) -> anyhow::Result<()> { + for payer_pending_burn in pending_burns::get_all_payer_burns(pool).await? { + let payer = payer_pending_burn.payer; + let total_dcs = payer_pending_burn.total_dcs; + let sessions = payer_pending_burn.sessions; - for ( - payer, - PayerTotals { - total_dcs, - sessions, - }, - ) in payer_totals.into_iter() - { - let payer_balance = self - .solana - .payer_balance(&payer) - .await - .map_err(BurnError::SolanaError)?; + let payer_balance = self.solana.payer_balance(&payer).await?; if payer_balance < total_dcs { tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs"); @@ -107,28 +51,11 @@ where .increment(total_dcs); // Delete from the data transfer session and write out to S3 - - sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") - .bind(&payer) - .execute(pool) - .await?; + pending_burns::delete_for_payer(pool, &payer).await?; for session in sessions { - let num_dcs = bytes_to_dc(session.rewardable_bytes as u64); self.valid_sessions - .write( - ValidDataTransferSession { - pub_key: session.pub_key.into(), - payer: session.payer.into(), - upload_bytes: session.uploaded_bytes as u64, - download_bytes: session.downloaded_bytes as u64, - rewardable_bytes: session.rewardable_bytes as u64, - num_dcs, - first_timestamp: session.first_timestamp.encode_timestamp_millis(), - last_timestamp: session.last_timestamp.encode_timestamp_millis(), - }, - &[], - ) + .write(ValidDataTransferSession::from(session), &[]) .await?; } } @@ -146,11 +73,3 @@ where Ok(()) } } - -const BYTES_PER_DC: u64 = 20_000; - -fn bytes_to_dc(bytes: u64) -> u64 { - let bytes = bytes.max(BYTES_PER_DC); - // Integer div/ceil from: https://stackoverflow.com/a/2745086 - (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC -} diff --git a/mobile_packet_verifier/src/lib.rs b/mobile_packet_verifier/src/lib.rs index 9d80a855e..9ddb71634 100644 --- a/mobile_packet_verifier/src/lib.rs +++ b/mobile_packet_verifier/src/lib.rs @@ -8,6 +8,7 @@ pub mod accumulate; pub mod burner; pub mod daemon; pub mod event_ids; +pub mod pending_burns; pub mod settings; pub struct MobileConfigClients { diff --git a/mobile_packet_verifier/src/pending_burns.rs b/mobile_packet_verifier/src/pending_burns.rs new file mode 100644 index 000000000..96d55081d --- /dev/null +++ b/mobile_packet_verifier/src/pending_burns.rs @@ -0,0 +1,129 @@ +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use file_store::{mobile_session::DataTransferSessionReq, traits::TimestampEncode}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::packet_verifier::ValidDataTransferSession; +use sqlx::{prelude::FromRow, Pool, Postgres, Transaction}; + +#[derive(FromRow)] +pub struct DataTransferSession { + pub_key: PublicKeyBinary, + payer: PublicKeyBinary, + uploaded_bytes: i64, + downloaded_bytes: i64, + rewardable_bytes: i64, + first_timestamp: DateTime, + last_timestamp: DateTime, +} + +impl From for ValidDataTransferSession { + fn from(session: DataTransferSession) -> Self { + ValidDataTransferSession { + pub_key: session.pub_key.into(), + payer: session.payer.into(), + upload_bytes: session.uploaded_bytes as u64, + download_bytes: session.downloaded_bytes as u64, + rewardable_bytes: session.rewardable_bytes as u64, + num_dcs: bytes_to_dc(session.rewardable_bytes as u64), + first_timestamp: session.first_timestamp.encode_timestamp_millis(), + last_timestamp: session.last_timestamp.encode_timestamp_millis(), + } + } +} + +pub struct PendingPayerBurn { + pub payer: PublicKeyBinary, + pub total_dcs: u64, + pub sessions: Vec, +} + +pub async fn get_all(conn: &Pool) -> anyhow::Result> { + sqlx::query_as("SELECT * FROM data_transfer_sessions") + .fetch_all(conn) + .await + .map_err(anyhow::Error::from) +} + +pub async fn get_all_payer_burns(conn: &Pool) -> anyhow::Result> { + let pending_payer_burns = get_all(conn) + .await? + .into_iter() + .fold( + HashMap::::new(), + |mut map, session| { + let dc_to_burn = bytes_to_dc(session.rewardable_bytes as u64); + + match map.get_mut(&session.payer) { + Some(pending_payer_burn) => { + pending_payer_burn.total_dcs += dc_to_burn; + pending_payer_burn.sessions.push(session); + } + None => { + map.insert( + session.payer.clone(), + PendingPayerBurn { + payer: session.payer.clone(), + total_dcs: dc_to_burn, + sessions: vec![session], + }, + ); + } + } + + map + }, + ) + .into_values() + .collect(); + + Ok(pending_payer_burns) +} + +pub async fn save( + txn: &mut Transaction<'_, Postgres>, + req: &DataTransferSessionReq, + last_timestamp: DateTime, +) -> anyhow::Result<()> { + sqlx::query( + r#" + INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) + VALUES ($1, $2, $3, $4, $5, $6, $6) + ON CONFLICT (pub_key, payer) DO UPDATE SET + uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, + downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, + rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes, + last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) + "# + ) + .bind(&req.data_transfer_usage.pub_key) + .bind(&req.data_transfer_usage.payer) + .bind(req.data_transfer_usage.upload_bytes as i64) + .bind(req.data_transfer_usage.download_bytes as i64) + .bind(req.rewardable_bytes as i64) + .bind(last_timestamp) + .execute(txn) + .await?; + + Ok(()) +} + +pub async fn delete_for_payer( + conn: &Pool, + payer: &PublicKeyBinary, +) -> anyhow::Result<()> { + sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") + .bind(payer) + .execute(conn) + .await + .map(|_| ()) + .map_err(anyhow::Error::from) +} + +const BYTES_PER_DC: u64 = 20_000; + +fn bytes_to_dc(bytes: u64) -> u64 { + let bytes = bytes.max(BYTES_PER_DC); + // Integer div/ceil from: https://stackoverflow.com/a/2745086 + (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC +} From a41456cd94ac1230c9382ea168cdc01ea02a53a8 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 23 Oct 2024 08:50:53 -0400 Subject: [PATCH 2/7] small refactor around dc_to_burn --- mobile_packet_verifier/src/pending_burns.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/mobile_packet_verifier/src/pending_burns.rs b/mobile_packet_verifier/src/pending_burns.rs index 96d55081d..7611ba23a 100644 --- a/mobile_packet_verifier/src/pending_burns.rs +++ b/mobile_packet_verifier/src/pending_burns.rs @@ -17,15 +17,22 @@ pub struct DataTransferSession { last_timestamp: DateTime, } +impl DataTransferSession { + pub fn dc_to_burn(&self) -> u64 { + bytes_to_dc(self.rewardable_bytes as u64) + } +} + impl From for ValidDataTransferSession { fn from(session: DataTransferSession) -> Self { + let num_dcs = session.dc_to_burn(); ValidDataTransferSession { pub_key: session.pub_key.into(), payer: session.payer.into(), upload_bytes: session.uploaded_bytes as u64, download_bytes: session.downloaded_bytes as u64, rewardable_bytes: session.rewardable_bytes as u64, - num_dcs: bytes_to_dc(session.rewardable_bytes as u64), + num_dcs, first_timestamp: session.first_timestamp.encode_timestamp_millis(), last_timestamp: session.last_timestamp.encode_timestamp_millis(), } @@ -52,7 +59,7 @@ pub async fn get_all_payer_burns(conn: &Pool) -> anyhow::Result::new(), |mut map, session| { - let dc_to_burn = bytes_to_dc(session.rewardable_bytes as u64); + let dc_to_burn = session.dc_to_burn(); match map.get_mut(&session.payer) { Some(pending_payer_burn) => { From ecd8915d606daddf0b546211c9e02c214f5c364c Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 23 Oct 2024 10:42:34 -0400 Subject: [PATCH 3/7] add new metric for pending_dc_burns per payer --- mobile_packet_verifier/src/accumulate.rs | 2 +- mobile_packet_verifier/src/burner.rs | 4 +- mobile_packet_verifier/src/daemon.rs | 6 ++- mobile_packet_verifier/src/pending_burns.rs | 51 +++++++++++++++++++-- 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 214363e62..9de1c6669 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -106,7 +106,7 @@ mod tests { use helium_proto::services::poc_mobile::DataTransferRadioAccessTechnology; use sqlx::PgPool; - use crate::burner::DataTransferSession; + use crate::pending_burns::DataTransferSession; use super::*; diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index 78c424d87..e17858033 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -40,7 +40,7 @@ where tracing::info!(%total_dcs, %payer, "Burning DC"); if self.burn_data_credits(&payer, total_dcs).await.is_err() { // We have failed to burn data credits: - metrics::counter!("burned", "payer" => payer.to_string(), "success" => "false") + metrics::counter!("burned", "payer" => payer.to_string(), "success" => "alse") .increment(total_dcs); continue; } @@ -51,7 +51,7 @@ where .increment(total_dcs); // Delete from the data transfer session and write out to S3 - pending_burns::delete_for_payer(pool, &payer).await?; + pending_burns::delete_for_payer(pool, &payer, total_dcs).await?; for session in sessions { self.valid_sessions diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 6a7a8853f..3439b1033 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -1,6 +1,6 @@ use crate::{ - burner::Burner, event_ids::EventIdPurger, settings::Settings, MobileConfigClients, - MobileConfigResolverExt, + burner::Burner, event_ids::EventIdPurger, pending_burns, settings::Settings, + MobileConfigClients, MobileConfigResolverExt, }; use anyhow::{bail, Result}; use chrono::{TimeZone, Utc}; @@ -119,6 +119,8 @@ impl Cmd { let pool = settings.database.connect("mobile-packet-verifier").await?; sqlx::migrate!().run(&pool).await?; + pending_burns::initialize(&pool).await?; + // Set up the solana network: let solana = if settings.enable_solana_integration { let Some(ref solana_settings) = settings.solana else { diff --git a/mobile_packet_verifier/src/pending_burns.rs b/mobile_packet_verifier/src/pending_burns.rs index 7611ba23a..f86065ace 100644 --- a/mobile_packet_verifier/src/pending_burns.rs +++ b/mobile_packet_verifier/src/pending_burns.rs @@ -4,7 +4,9 @@ use chrono::{DateTime, Utc}; use file_store::{mobile_session::DataTransferSessionReq, traits::TimestampEncode}; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; -use sqlx::{prelude::FromRow, Pool, Postgres, Transaction}; +use sqlx::{prelude::FromRow, Pool, Postgres, Row, Transaction}; + +const METRIC_NAME: &str = "pending_dc_burn"; #[derive(FromRow)] pub struct DataTransferSession { @@ -26,6 +28,7 @@ impl DataTransferSession { impl From for ValidDataTransferSession { fn from(session: DataTransferSession) -> Self { let num_dcs = session.dc_to_burn(); + ValidDataTransferSession { pub_key: session.pub_key.into(), payer: session.payer.into(), @@ -45,6 +48,27 @@ pub struct PendingPayerBurn { pub sessions: Vec, } +pub async fn initialize(conn: &Pool) -> anyhow::Result<()> { + let results = sqlx::query( + r#" + SELECT payer, sum(rewardable_bytes) as total_rewardable_bytes + FROM data_transfer_sessions + GROUP BY payer + "#, + ) + .fetch_all(conn) + .await?; + + for row in results { + let payer: PublicKeyBinary = row.get("payer"); + let total_rewardable_bytes: u64 = row.get::("total_rewardable_bytes") as u64; + + set_metric(&payer, bytes_to_dc(total_rewardable_bytes)); + } + + todo!() +} + pub async fn get_all(conn: &Pool) -> anyhow::Result> { sqlx::query_as("SELECT * FROM data_transfer_sessions") .fetch_all(conn) @@ -92,6 +116,8 @@ pub async fn save( req: &DataTransferSessionReq, last_timestamp: DateTime, ) -> anyhow::Result<()> { + let dc_to_burn = bytes_to_dc(req.rewardable_bytes); + sqlx::query( r#" INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) @@ -112,19 +138,36 @@ pub async fn save( .execute(txn) .await?; + increment_metric(&req.data_transfer_usage.payer, dc_to_burn); + Ok(()) } pub async fn delete_for_payer( conn: &Pool, payer: &PublicKeyBinary, + burnt_dc: u64, ) -> anyhow::Result<()> { sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") .bind(payer) .execute(conn) - .await - .map(|_| ()) - .map_err(anyhow::Error::from) + .await?; + + decrement_metric(payer, burnt_dc); + + Ok(()) +} + +fn set_metric(payer: &PublicKeyBinary, value: u64) { + metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).set(value as f64); +} + +fn increment_metric(payer: &PublicKeyBinary, value: u64) { + metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).increment(value as f64); +} + +fn decrement_metric(payer: &PublicKeyBinary, value: u64) { + metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).decrement(value as f64); } const BYTES_PER_DC: u64 = 20_000; From b229614927f3340a996b01c8e2610fd69bf8c8ca Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 23 Oct 2024 13:23:08 -0400 Subject: [PATCH 4/7] remove todo! --- mobile_packet_verifier/src/pending_burns.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mobile_packet_verifier/src/pending_burns.rs b/mobile_packet_verifier/src/pending_burns.rs index f86065ace..4a77ba4f6 100644 --- a/mobile_packet_verifier/src/pending_burns.rs +++ b/mobile_packet_verifier/src/pending_burns.rs @@ -66,7 +66,7 @@ pub async fn initialize(conn: &Pool) -> anyhow::Result<()> { set_metric(&payer, bytes_to_dc(total_rewardable_bytes)); } - todo!() + Ok(()) } pub async fn get_all(conn: &Pool) -> anyhow::Result> { From 4976c4e1f13cd6525b858e3c6f8dc78da8f508a2 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 23 Oct 2024 13:23:49 -0400 Subject: [PATCH 5/7] Update mobile_packet_verifier/src/burner.rs Co-authored-by: Michael Jeffrey --- mobile_packet_verifier/src/burner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index e17858033..73d792563 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -40,7 +40,7 @@ where tracing::info!(%total_dcs, %payer, "Burning DC"); if self.burn_data_credits(&payer, total_dcs).await.is_err() { // We have failed to burn data credits: - metrics::counter!("burned", "payer" => payer.to_string(), "success" => "alse") + metrics::counter!("burned", "payer" => payer.to_string(), "success" => "false") .increment(total_dcs); continue; } From 34b759b1a244ce88076683d0b1e796564ebb4284 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Thu, 24 Oct 2024 08:47:29 -0400 Subject: [PATCH 6/7] update run loop in mobile-packet-verifier to have predictable ordering --- mobile_packet_verifier/src/daemon.rs | 29 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 3439b1033..685272b91 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -73,23 +73,13 @@ where S: SolanaNetwork, MCR: MobileConfigResolverExt, { - pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> { + pub async fn run(mut self, mut shutdown: triggered::Listener) -> Result<()> { // Set the initial burn period to one minute let mut burn_time = Instant::now() + Duration::from_secs(60); loop { tokio::select! { - file = self.reports.recv() => { - let Some(file) = file else { - anyhow::bail!("FileInfoPoller sender was dropped unexpectedly"); - }; - tracing::info!("Verifying file: {}", file.file_info); - let ts = file.file_info.timestamp; - let mut transaction = self.pool.begin().await?; - let reports = file.into_stream(&mut transaction).await?; - crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?; - transaction.commit().await?; - self.verified_data_session_report_sink.commit().await?; - }, + biased; + _ = &mut shutdown => return Ok(()), _ = sleep_until(burn_time) => { // It's time to burn match self.burner.burn(&self.pool).await { @@ -102,7 +92,18 @@ where } } } - _ = shutdown.clone() => return Ok(()), + file = self.reports.recv() => { + let Some(file) = file else { + anyhow::bail!("FileInfoPoller sender was dropped unexpectedly"); + }; + tracing::info!("Verifying file: {}", file.file_info); + let ts = file.file_info.timestamp; + let mut transaction = self.pool.begin().await?; + let reports = file.into_stream(&mut transaction).await?; + crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?; + transaction.commit().await?; + self.verified_data_session_report_sink.commit().await?; + } } } } From 80da5a4a03faf9298128cab44ad8e545cf2d6708 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Fri, 25 Oct 2024 08:06:08 -0400 Subject: [PATCH 7/7] cast sum to bigint when initializing pending burns metric --- mobile_packet_verifier/src/pending_burns.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mobile_packet_verifier/src/pending_burns.rs b/mobile_packet_verifier/src/pending_burns.rs index 4a77ba4f6..b6dca6384 100644 --- a/mobile_packet_verifier/src/pending_burns.rs +++ b/mobile_packet_verifier/src/pending_burns.rs @@ -51,7 +51,7 @@ pub struct PendingPayerBurn { pub async fn initialize(conn: &Pool) -> anyhow::Result<()> { let results = sqlx::query( r#" - SELECT payer, sum(rewardable_bytes) as total_rewardable_bytes + SELECT payer, sum(rewardable_bytes)::bigint as total_rewardable_bytes FROM data_transfer_sessions GROUP BY payer "#,