diff --git a/Cargo.lock b/Cargo.lock index a6e59f875..178e3ac5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1627,7 +1627,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", ] @@ -9851,7 +9851,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", "twox-hash", "xorf", diff --git a/boost_manager/src/main.rs b/boost_manager/src/main.rs index 3fd948667..f55e923ad 100644 --- a/boost_manager/src/main.rs +++ b/boost_manager/src/main.rs @@ -5,9 +5,10 @@ use boost_manager::{ }; use clap::Parser; use file_store::{ - file_info_poller::LookbackBehavior, file_sink, file_source, file_upload, - reward_manifest::RewardManifest, FileStore, FileType, + file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest, + traits::FileSinkWriteExt, FileStore, FileType, }; +use helium_proto::BoostedHexUpdateV1; use mobile_config::client::hex_boosting_client::HexBoostingClient; use solana::start_boost::SolanaRpc; use std::{ @@ -99,14 +100,12 @@ impl Server { .await?; // setup the writer for our updated hexes - let (updated_hexes_sink, updated_hexes_sink_server) = file_sink::FileSinkBuilder::new( - FileType::BoostedHexUpdate, + let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_boosted_hex_update"), + Some(Duration::from_secs(5 * 60)), + env!("CARGO_PKG_NAME"), ) - .roll_time(Duration::from_secs(5 * 60)) - .create() .await?; // The server to monitor rewards and activate any newly seen boosted hexes diff --git a/boost_manager/src/watcher.rs b/boost_manager/src/watcher.rs index d800fc078..c669cb1f2 100644 --- a/boost_manager/src/watcher.rs +++ b/boost_manager/src/watcher.rs @@ -19,7 +19,7 @@ const LAST_PROCESSED_TIMESTAMP_KEY: &str = "last_processed_hex_boosting_info"; pub struct Watcher { pub pool: Pool, pub hex_boosting_client: A, - pub file_sink: FileSinkClient, + pub file_sink: FileSinkClient, } impl ManagedTask for Watcher @@ -45,7 +45,7 @@ where { pub async fn new( pool: Pool, - file_sink: FileSinkClient, + file_sink: FileSinkClient, hex_boosting_client: A, ) -> Result { Ok(Self { diff --git a/boost_manager/tests/integrations/common/mod.rs b/boost_manager/tests/integrations/common/mod.rs index 5eded5d29..73d262261 100644 --- a/boost_manager/tests/integrations/common/mod.rs +++ b/boost_manager/tests/integrations/common/mod.rs @@ -40,7 +40,7 @@ impl HexBoostingInfoResolver for MockHexBoostingClient { } pub struct MockFileSinkReceiver { - pub receiver: tokio::sync::mpsc::Receiver, + pub receiver: tokio::sync::mpsc::Receiver>, } impl MockFileSinkReceiver { @@ -48,7 +48,7 @@ impl MockFileSinkReceiver { match timeout(seconds(2), self.receiver.recv()).await { Ok(Some(SinkMessage::Data(on_write_tx, msg))) => { let _ = on_write_tx.send(Ok(())); - Some(msg) + Some(msg.encode_to_vec()) } Ok(None) => None, Err(e) => panic!("timeout while waiting for message1 {:?}", e), @@ -81,12 +81,12 @@ impl MockFileSinkReceiver { } } -pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { +pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { let (tx, rx) = tokio::sync::mpsc::channel(20); ( FileSinkClient { sender: tx, - metric: "metric", + metric: "metric".into(), }, MockFileSinkReceiver { receiver: rx }, ) diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 7c486d99c..100154d6c 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -297,7 +297,7 @@ impl fmt::Display for FileType { } impl FileType { - pub fn to_str(&self) -> &'static str { + pub const fn to_str(&self) -> &'static str { match self { Self::InvalidatedRadioThresholdReq => INVALIDATED_RADIO_THRESHOLD_REQ, Self::InvalidatedRadioThresholdIngestReport => { diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index f102e155a..81548daa1 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -1,4 +1,4 @@ -use crate::{file_upload::FileUpload, Error, Result}; +use crate::{file_upload::FileUpload, traits::MsgBytes, Error, Result}; use async_compression::tokio::write::GzipEncoder; use bytes::Bytes; use chrono::{DateTime, Utc}; @@ -45,16 +45,16 @@ fn transport_sink(transport: &mut Transport) -> &mut Sink { } #[derive(Debug)] -pub enum Message { - Data(oneshot::Sender, Vec), +pub enum Message { + Data(oneshot::Sender, T), Commit(oneshot::Sender>), Rollback(oneshot::Sender>), } -pub type MessageSender = mpsc::Sender; -pub type MessageReceiver = mpsc::Receiver; +pub type MessageSender = mpsc::Sender>; +pub type MessageReceiver = mpsc::Receiver>; -fn message_channel(size: usize) -> (MessageSender, MessageReceiver) { +fn message_channel(size: usize) -> (MessageSender, MessageReceiver) { mpsc::channel(size) } @@ -66,7 +66,7 @@ pub struct FileSinkBuilder { roll_time: Duration, file_upload: FileUpload, auto_commit: bool, - metric: &'static str, + metric: String, } impl FileSinkBuilder { @@ -74,7 +74,7 @@ impl FileSinkBuilder { prefix: impl ToString, target_path: &Path, file_upload: FileUpload, - metric: &'static str, + metric: impl Into, ) -> Self { Self { prefix: prefix.to_string(), @@ -84,7 +84,7 @@ impl FileSinkBuilder { roll_time: Duration::from_secs(DEFAULT_SINK_ROLL_SECS), file_upload, auto_commit: true, - metric, + metric: metric.into(), } } @@ -120,7 +120,10 @@ impl FileSinkBuilder { } } - pub async fn create(self) -> Result<(FileSinkClient, FileSink)> { + pub async fn create(self) -> Result<(FileSinkClient, FileSink)> + where + T: MsgBytes, + { let (tx, rx) = message_channel(50); let client = FileSinkClient { @@ -128,7 +131,7 @@ impl FileSinkBuilder { metric: self.metric, }; - metrics::counter!(client.metric, vec![OK_LABEL]); + metrics::counter!(client.metric.clone(), vec![OK_LABEL]); let mut sink = FileSink { target_path: self.target_path, @@ -148,33 +151,35 @@ impl FileSinkBuilder { } #[derive(Debug, Clone)] -pub struct FileSinkClient { - pub sender: MessageSender, - pub metric: &'static str, +pub struct FileSinkClient { + pub sender: MessageSender, + pub metric: String, } const OK_LABEL: Label = Label::from_static_parts("status", "ok"); const ERROR_LABEL: Label = Label::from_static_parts("status", "error"); const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); -impl FileSinkClient { - pub fn new(sender: MessageSender, metric: &'static str) -> Self { - Self { sender, metric } +impl FileSinkClient { + pub fn new(sender: MessageSender, metric: impl Into) -> Self { + Self { + sender, + metric: metric.into(), + } } - pub async fn write( + pub async fn write( &self, item: T, labels: impl IntoIterator, ) -> Result> { let (on_write_tx, on_write_rx) = oneshot::channel(); - let bytes = item.encode_to_vec(); let labels = labels.into_iter().map(Label::from); tokio::select! { - result = self.sender.send_timeout(Message::Data(on_write_tx, bytes), SEND_TIMEOUT) => match result { + result = self.sender.send_timeout(Message::Data(on_write_tx, item), SEND_TIMEOUT) => match result { Ok(_) => { metrics::counter!( - self.metric, + self.metric.clone(), labels .chain(std::iter::once(OK_LABEL)) .collect::>() @@ -184,7 +189,7 @@ impl FileSinkClient { } Err(SendTimeoutError::Closed(_)) => { metrics::counter!( - self.metric, + self.metric.clone(), labels .chain(std::iter::once(ERROR_LABEL)) .collect::>() @@ -203,7 +208,7 @@ impl FileSinkClient { /// Writes all messages to the file sink, return the last oneshot pub async fn write_all( &self, - items: impl IntoIterator, + items: impl IntoIterator, ) -> Result>> { let mut last_oneshot = None; for item in items { @@ -244,14 +249,14 @@ impl FileSinkClient { } #[derive(Debug)] -pub struct FileSink { +pub struct FileSink { target_path: PathBuf, tmp_path: PathBuf, prefix: String, max_size: usize, roll_time: Duration, - messages: MessageReceiver, + messages: MessageReceiver, file_upload: FileUpload, staged_files: Vec, auto_commit: bool, @@ -273,7 +278,7 @@ impl ActiveSink { } } -impl ManagedTask for FileSink { +impl ManagedTask for FileSink { fn start_task( self: Box, shutdown: triggered::Listener, @@ -288,7 +293,7 @@ impl ManagedTask for FileSink { } } -impl FileSink { +impl FileSink { async fn init(&mut self) -> Result { fs::create_dir_all(&self.target_path).await?; fs::create_dir_all(&self.tmp_path).await?; @@ -350,8 +355,8 @@ impl FileSink { _ = shutdown.clone() => break, _ = rollover_timer.tick() => self.maybe_roll().await?, msg = self.messages.recv() => match msg { - Some(Message::Data(on_write_tx, bytes)) => { - let res = match self.write(Bytes::from(bytes)).await { + Some(Message::Data(on_write_tx, item)) => { + let res = match self.write(item.as_bytes()).await { Ok(_) => Ok(()), Err(err) => { tracing::error!("failed to store {}: {err:?}", &self.prefix); diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs new file mode 100644 index 000000000..fcd539cc7 --- /dev/null +++ b/file_store/src/traits/file_sink_write.rs @@ -0,0 +1,249 @@ +use std::{path::Path, time::Duration}; + +use crate::{ + file_sink::{FileSinkClient, DEFAULT_SINK_ROLL_SECS}, + file_upload::FileUpload, + traits::msg_bytes::MsgBytes, + FileSink, FileSinkBuilder, FileType, Result, +}; +use helium_proto::{ + self as proto, + services::{packet_verifier, poc_lora, poc_mobile}, + Message, +}; + +pub const DEFAULT_ROLL_TIME: Duration = Duration::from_secs(DEFAULT_SINK_ROLL_SECS); + +#[async_trait::async_trait] +pub trait FileSinkWriteExt +where + Self: Sized + MsgBytes + Send, +{ + const FILE_PREFIX: &'static str; + const METRIC_SUFFIX: &'static str; + + // The `auto_commit` option and `roll_time` option are incompatible with + // each other. It doesn't make sense to roll a file every so often _and_ + // commit it every time something is written. If a roll_time is provided, + // `auto_commit` is set to false. + async fn file_sink( + target_path: &Path, + file_upload: FileUpload, + roll_time: Option, + metric_prefix: &str, + ) -> Result<(FileSinkClient, FileSink)> { + let builder = FileSinkBuilder::new( + Self::FILE_PREFIX.to_string(), + target_path, + file_upload, + format!("{}_{}", metric_prefix, Self::METRIC_SUFFIX), + ); + + let builder = if let Some(duration) = roll_time { + builder.auto_commit(false).roll_time(duration) + } else { + builder.auto_commit(true) + }; + + let file_sink = builder.create().await?; + Ok(file_sink) + } +} + +macro_rules! impl_file_sink { + ($msg_type:ty, $file_prefix:expr, $metric_suffix:expr) => { + #[async_trait::async_trait] + impl FileSinkWriteExt for $msg_type { + const FILE_PREFIX: &'static str = $file_prefix; + const METRIC_SUFFIX: &'static str = $metric_suffix; + } + + impl MsgBytes for $msg_type { + fn as_bytes(&self) -> bytes::Bytes { + bytes::Bytes::from(self.encode_to_vec()) + } + } + }; +} + +impl_file_sink!( + packet_verifier::InvalidPacket, + FileType::InvalidPacket.to_str(), + "invalid_packets" +); +impl_file_sink!( + packet_verifier::ValidDataTransferSession, + FileType::ValidDataTransferSession.to_str(), + "valid_data_transfer_session" +); +impl_file_sink!( + packet_verifier::ValidPacket, + FileType::IotValidPacket.to_str(), + "valid_packets" +); +impl_file_sink!( + poc_lora::IotRewardShare, + FileType::IotRewardShare.to_str(), + "gateway_reward_shares" +); +impl_file_sink!( + poc_lora::LoraBeaconIngestReportV1, + FileType::IotBeaconIngestReport.to_str(), + "beacon_report" +); +impl_file_sink!( + poc_lora::LoraInvalidBeaconReportV1, + FileType::IotInvalidBeaconReport.to_str(), + "invalid_beacon" +); +impl_file_sink!( + poc_lora::LoraInvalidWitnessReportV1, + FileType::IotInvalidWitnessReport.to_str(), + "invalid_witness_report" +); +impl_file_sink!(poc_lora::LoraPocV1, FileType::IotPoc.to_str(), "valid_poc"); +impl_file_sink!( + poc_lora::LoraWitnessIngestReportV1, + FileType::IotWitnessIngestReport.to_str(), + "witness_report" +); +impl_file_sink!( + poc_lora::NonRewardablePacket, + FileType::NonRewardablePacket.to_str(), + "non_rewardable_packet" +); +impl_file_sink!( + poc_mobile::CellHeartbeatIngestReportV1, + FileType::CbrsHeartbeatIngestReport.to_str(), + "heartbeat_report" +); +impl_file_sink!( + poc_mobile::CoverageObjectIngestReportV1, + FileType::CoverageObjectIngestReport.to_str(), + "coverage_object_report" +); +impl_file_sink!( + poc_mobile::CoverageObjectV1, + FileType::CoverageObject.to_str(), + "coverage_object" +); +impl_file_sink!( + poc_mobile::DataTransferSessionIngestReportV1, + FileType::DataTransferSessionIngestReport.to_str(), + "mobile_data_transfer_session_report" +); +impl_file_sink!( + poc_mobile::Heartbeat, + FileType::ValidatedHeartbeat.to_str(), + "heartbeat" +); +impl_file_sink!( + poc_mobile::InvalidDataTransferIngestReportV1, + FileType::InvalidDataTransferSessionIngestReport.to_str(), + "invalid_data_transfer_session" +); +impl_file_sink!( + poc_mobile::InvalidatedRadioThresholdIngestReportV1, + FileType::InvalidatedRadioThresholdIngestReport.to_str(), + "invalidated_radio_threshold_ingest_report" +); +impl_file_sink!( + poc_mobile::MobileRewardShare, + FileType::MobileRewardShare.to_str(), + "radio_reward_share" +); +impl_file_sink!( + poc_mobile::OracleBoostingReportV1, + FileType::OracleBoostingReport.to_str(), + "oracle_boosting_report" +); +impl_file_sink!( + poc_mobile::RadioThresholdIngestReportV1, + FileType::RadioThresholdIngestReport.to_str(), + "radio_threshold_ingest_report" +); +impl_file_sink!( + poc_mobile::SeniorityUpdate, + FileType::SeniorityUpdate.to_str(), + "seniority_update" +); +impl_file_sink!( + poc_mobile::ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + FileType::SPBoostedRewardsBannedRadioIngestReport.to_str(), + "service_provider_boosted_rewards_banned_radio" +); +impl_file_sink!( + poc_mobile::SpeedtestAvg, + FileType::SpeedtestAvg.to_str(), + "speedtest_average" +); +impl_file_sink!( + poc_mobile::SpeedtestIngestReportV1, + FileType::CellSpeedtestIngestReport.to_str(), + "speedtest_report" +); +impl_file_sink!( + poc_mobile::SubscriberLocationIngestReportV1, + FileType::SubscriberLocationIngestReport.to_str(), + "subscriber_location_report" +); +impl_file_sink!( + poc_mobile::SubscriberVerifiedMappingEventIngestReportV1, + FileType::SubscriberVerifiedMappingEventIngestReport.to_str(), + "subscriber_verified_mapping_event_ingest_report" +); +impl_file_sink!( + poc_mobile::VerifiedInvalidatedRadioThresholdIngestReportV1, + FileType::VerifiedInvalidatedRadioThresholdIngestReport.to_str(), + "verified_invalidated_radio_threshold" +); +impl_file_sink!( + poc_mobile::VerifiedRadioThresholdIngestReportV1, + FileType::VerifiedRadioThresholdIngestReport.to_str(), + "verified_radio_threshold" +); +impl_file_sink!( + poc_mobile::VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1, + FileType::VerifiedSPBoostedRewardsBannedRadioIngestReport.to_str(), + "verified_sp_boosted_rewards_ban" +); +impl_file_sink!( + poc_mobile::VerifiedSpeedtest, + FileType::VerifiedSpeedtest.to_str(), + "verified_speedtest" +); +impl_file_sink!( + poc_mobile::VerifiedSubscriberLocationIngestReportV1, + FileType::VerifiedSubscriberLocationIngestReport.to_str(), + "verified_subscriber_location" +); +impl_file_sink!( + poc_mobile::VerifiedSubscriberVerifiedMappingEventIngestReportV1, + FileType::VerifiedSubscriberVerifiedMappingEventIngestReport.to_str(), + "verified_subscriber_verified_mapping_event_ingest_report" +); +impl_file_sink!( + poc_mobile::WifiHeartbeatIngestReportV1, + FileType::WifiHeartbeatIngestReport.to_str(), + "wifi_heartbeat_report" +); +impl_file_sink!( + proto::BoostedHexUpdateV1, + FileType::BoostedHexUpdate.to_str(), + "boosted_hex_update" +); +impl_file_sink!( + proto::EntropyReportV1, + FileType::EntropyReport.to_str(), + "report_submission" +); +impl_file_sink!( + proto::PriceReportV1, + FileType::PriceReport.to_str(), + "report_submission" +); +impl_file_sink!( + proto::RewardManifest, + FileType::RewardManifest.to_str(), + "reward_manifest" +); diff --git a/file_store/src/traits/mod.rs b/file_store/src/traits/mod.rs index 7f4ddc806..ae37821c8 100644 --- a/file_store/src/traits/mod.rs +++ b/file_store/src/traits/mod.rs @@ -1,8 +1,12 @@ +mod file_sink_write; +mod msg_bytes; mod msg_decode; mod msg_timestamp; mod msg_verify; mod report_id; +pub use file_sink_write::{FileSinkWriteExt, DEFAULT_ROLL_TIME}; +pub use msg_bytes::MsgBytes; pub use msg_decode::MsgDecode; pub use msg_timestamp::{MsgTimestamp, TimestampDecode, TimestampEncode}; pub use msg_verify::MsgVerify; diff --git a/file_store/src/traits/msg_bytes.rs b/file_store/src/traits/msg_bytes.rs new file mode 100644 index 000000000..e3f2eab80 --- /dev/null +++ b/file_store/src/traits/msg_bytes.rs @@ -0,0 +1,27 @@ +pub trait MsgBytes { + fn as_bytes(&self) -> bytes::Bytes; +} + +// As prost::Message is implemented for basically all types, implementing +// MsgBytes for anything that implements prost::Message makes it so you +// cannot use a FileSink for anything that is _not_ a protobuf. So we +// provide utility implementations for Vec an String, and require all +// protos to be implemented directly, following the pattern of verifying and +// signing messages. +impl MsgBytes for Vec { + fn as_bytes(&self) -> bytes::Bytes { + bytes::Bytes::from(self.clone()) + } +} + +impl MsgBytes for String { + fn as_bytes(&self) -> bytes::Bytes { + bytes::Bytes::from(self.clone()) + } +} + +impl MsgBytes for bytes::Bytes { + fn as_bytes(&self) -> bytes::Bytes { + self.clone() + } +} diff --git a/ingest/src/server_iot.rs b/ingest/src/server_iot.rs index dbaf78397..f80034d69 100644 --- a/ingest/src/server_iot.rs +++ b/ingest/src/server_iot.rs @@ -2,10 +2,9 @@ use crate::Settings; use anyhow::{Error, Result}; use chrono::Utc; use file_store::{ - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_upload, - traits::MsgVerify, - FileType, + traits::{FileSinkWriteExt, MsgVerify}, }; use futures::{ future::{LocalBoxFuture, TryFutureExt}, @@ -33,8 +32,8 @@ type Nonce = [u8; 32]; #[derive(Debug)] struct StreamState { - beacon_report_sink: FileSinkClient, - witness_report_sink: FileSinkClient, + beacon_report_sink: FileSinkClient, + witness_report_sink: FileSinkClient, required_network: Network, pub_key_bytes: Option>, session_key: Option, @@ -170,8 +169,8 @@ impl StreamState { #[derive(Clone, Debug)] pub struct GrpcServer { - pub beacon_report_sink: FileSinkClient, - pub witness_report_sink: FileSinkClient, + pub beacon_report_sink: FileSinkClient, + pub witness_report_sink: FileSinkClient, pub required_network: Network, pub address: SocketAddr, pub session_key_offer_timeout: std::time::Duration, @@ -239,7 +238,7 @@ where } async fn handle_beacon_report( - file_sink: &FileSinkClient, + file_sink: &FileSinkClient, timestamp: u64, report: LoraBeaconReportReqV1, signing_key: Option<&PublicKey>, @@ -264,7 +263,7 @@ async fn handle_beacon_report( } async fn handle_witness_report( - file_sink: &FileSinkClient, + file_sink: &FileSinkClient, timestamp: u64, report: LoraWitnessReportReqV1, session_key: Option<&PublicKey>, @@ -363,25 +362,21 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let store_base_path = Path::new(&settings.cache); // iot beacon reports - let (beacon_report_sink, beacon_report_sink_server) = file_sink::FileSinkBuilder::new( - FileType::IotBeaconIngestReport, + let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_beacon_report"), + Some(Duration::from_secs(5 * 60)), + env!("CARGO_PKG_NAME"), ) - .roll_time(Duration::from_secs(5 * 60)) - .create() .await?; // iot witness reports - let (witness_report_sink, witness_report_sink_server) = file_sink::FileSinkBuilder::new( - FileType::IotWitnessIngestReport, + let (witness_report_sink, witness_report_sink_server) = LoraWitnessIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_witness_report"), + Some(Duration::from_secs(5 * 60)), + env!("CARGO_PKG_NAME"), ) - .roll_time(Duration::from_secs(5 * 60)) - .create() .await?; let grpc_server = GrpcServer { diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 57bc19c21..636bd156d 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -2,10 +2,9 @@ use crate::Settings; use anyhow::{bail, Error, Result}; use chrono::Utc; use file_store::{ - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_upload, - traits::MsgVerify, - FileType, + traits::{FileSinkWriteExt, MsgVerify}, }; use futures::future::LocalBoxFuture; use futures_util::TryFutureExt; @@ -35,16 +34,18 @@ pub type GrpcResult = std::result::Result, Status>; pub type VerifyResult = std::result::Result; pub struct GrpcServer { - heartbeat_report_sink: FileSinkClient, - wifi_heartbeat_report_sink: FileSinkClient, - speedtest_report_sink: FileSinkClient, - data_transfer_session_sink: FileSinkClient, - subscriber_location_report_sink: FileSinkClient, - radio_threshold_report_sink: FileSinkClient, - invalidated_radio_threshold_report_sink: FileSinkClient, - coverage_object_report_sink: FileSinkClient, - sp_boosted_rewards_ban_sink: FileSinkClient, - subscriber_mapping_event_sink: FileSinkClient, + heartbeat_report_sink: FileSinkClient, + wifi_heartbeat_report_sink: FileSinkClient, + speedtest_report_sink: FileSinkClient, + data_transfer_session_sink: FileSinkClient, + subscriber_location_report_sink: FileSinkClient, + radio_threshold_report_sink: FileSinkClient, + invalidated_radio_threshold_report_sink: + FileSinkClient, + coverage_object_report_sink: FileSinkClient, + sp_boosted_rewards_ban_sink: + FileSinkClient, + subscriber_mapping_event_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -70,16 +71,20 @@ fn make_span(_request: &http::request::Request) -> impl GrpcServer { #[allow(clippy::too_many_arguments)] pub fn new( - heartbeat_report_sink: FileSinkClient, - wifi_heartbeat_report_sink: FileSinkClient, - speedtest_report_sink: FileSinkClient, - data_transfer_session_sink: FileSinkClient, - subscriber_location_report_sink: FileSinkClient, - radio_threshold_report_sink: FileSinkClient, - invalidated_radio_threshold_report_sink: FileSinkClient, - coverage_object_report_sink: FileSinkClient, - sp_boosted_rewards_ban_sink: FileSinkClient, - subscriber_mapping_event_sink: FileSinkClient, + heartbeat_report_sink: FileSinkClient, + wifi_heartbeat_report_sink: FileSinkClient, + speedtest_report_sink: FileSinkClient, + data_transfer_session_sink: FileSinkClient, + subscriber_location_report_sink: FileSinkClient, + radio_threshold_report_sink: FileSinkClient, + invalidated_radio_threshold_report_sink: FileSinkClient< + InvalidatedRadioThresholdIngestReportV1, + >, + coverage_object_report_sink: FileSinkClient, + sp_boosted_rewards_ban_sink: FileSinkClient< + ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + >, + subscriber_mapping_event_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -441,125 +446,94 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let store_base_path = Path::new(&settings.cache); - let (heartbeat_report_sink, heartbeat_report_sink_server) = file_sink::FileSinkBuilder::new( - FileType::CbrsHeartbeatIngestReport, - store_base_path, - file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_heartbeat_report"), - ) - .roll_time(settings.roll_time) - .create() - .await?; + let (heartbeat_report_sink, heartbeat_report_sink_server) = + CellHeartbeatIngestReportV1::file_sink( + store_base_path, + file_upload.clone(), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), + ) + .await?; let (wifi_heartbeat_report_sink, wifi_heartbeat_report_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::WifiHeartbeatIngestReport, + WifiHeartbeatIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_wifi_heartbeat_report"), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; // speedtests - let (speedtest_report_sink, speedtest_report_sink_server) = file_sink::FileSinkBuilder::new( - FileType::CellSpeedtestIngestReport, + let (speedtest_report_sink, speedtest_report_sink_server) = SpeedtestIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_speedtest_report"), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; let (data_transfer_session_sink, data_transfer_session_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::DataTransferSessionIngestReport, + DataTransferSessionIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!( - env!("CARGO_PKG_NAME"), - "_mobile_data_transfer_session_report" - ), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; let (subscriber_location_report_sink, subscriber_location_report_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::SubscriberLocationIngestReport, + SubscriberLocationIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_subscriber_location_report"), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; let (radio_threshold_report_sink, radio_threshold_report_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::RadioThresholdIngestReport, + RadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_radio_threshold_ingest_report"), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; let (invalidated_radio_threshold_report_sink, invalidated_radio_threshold_report_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::InvalidatedRadioThresholdIngestReport, + InvalidatedRadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!( - env!("CARGO_PKG_NAME"), - "_invalidated_radio_threshold_ingest_report" - ), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; let (coverage_object_report_sink, coverage_object_report_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::CoverageObjectIngestReport, + CoverageObjectIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_coverage_object_report"), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; let (sp_boosted_rewards_ban_sink, sp_boosted_rewards_ban_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::SPBoostedRewardsBannedRadioIngestReport, + ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!( - env!("CARGO_PKG_NAME"), - "_service_provider_boosted_rewards_banned_radio" - ), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; let (subscriber_mapping_event_sink, subscriber_mapping_event_server) = - file_sink::FileSinkBuilder::new( - FileType::SubscriberVerifiedMappingEventIngestReport, + SubscriberVerifiedMappingEventIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!( - env!("CARGO_PKG_NAME"), - "_subscriber_verified_mapping_event_ingest_report" - ), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; let Some(api_token) = settings diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index a42167357..52512eba1 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -3,8 +3,8 @@ use backon::{ExponentialBuilder, Retryable}; use file_store::file_sink::FileSinkClient; use helium_crypto::{KeyTag, Keypair, Network, Sign}; use helium_proto::services::poc_mobile::{ - Client as PocMobileClient, SubscriberVerifiedMappingEventReqV1, - SubscriberVerifiedMappingEventResV1, + Client as PocMobileClient, SubscriberVerifiedMappingEventIngestReportV1, + SubscriberVerifiedMappingEventReqV1, SubscriberVerifiedMappingEventResV1, }; use ingest::server_mobile::GrpcServer; use prost::Message; @@ -21,9 +21,6 @@ use triggered::Trigger; pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let key_pair = generate_keypair(); - let (file_sink_tx, file_sink_rx) = tokio::sync::mpsc::channel(10); - let file_sink = FileSinkClient::new(file_sink_tx, "test_file_sync"); - let socket_addr = { let tcp_listener = TcpListener::bind("127.0.0.1:0").await?; tcp_listener.local_addr()? @@ -37,18 +34,29 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let (trigger, listener) = triggered::trigger(); + let (cbrs_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10); + let (wifi_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10); + let (speedtest_tx, _rx) = tokio::sync::mpsc::channel(10); + let (data_transfer_tx, _rx) = tokio::sync::mpsc::channel(10); + let (subscriber_location_tx, _rx) = tokio::sync::mpsc::channel(10); + let (radio_threshold_tx, _rx) = tokio::sync::mpsc::channel(10); + let (invalidated_threshold_tx, _rx) = tokio::sync::mpsc::channel(10); + let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10); + let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10); + let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10); + tokio::spawn(async move { let grpc_server = GrpcServer::new( - file_sink.clone(), - file_sink.clone(), - file_sink.clone(), - file_sink.clone(), - file_sink.clone(), - file_sink.clone(), - file_sink.clone(), - file_sink.clone(), - file_sink.clone(), - file_sink.clone(), + FileSinkClient::new(cbrs_heartbeat_tx, "noop"), + FileSinkClient::new(wifi_heartbeat_tx, "noop"), + FileSinkClient::new(speedtest_tx, "noop"), + FileSinkClient::new(data_transfer_tx, "noop"), + FileSinkClient::new(subscriber_location_tx, "noop"), + FileSinkClient::new(radio_threshold_tx, "noop"), + FileSinkClient::new(invalidated_threshold_tx, "noop"), + FileSinkClient::new(coverage_obj_tx, "noop"), + FileSinkClient::new(sp_boosted_tx, "noop"), + FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"), Network::MainNet, socket_addr, api_token, @@ -57,7 +65,13 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { grpc_server.run(listener).await }); - let client = TestClient::new(socket_addr, key_pair, token.to_string(), file_sink_rx).await; + let client = TestClient::new( + socket_addr, + key_pair, + token.to_string(), + subscriber_mapping_rx, + ) + .await; Ok((client, trigger)) } @@ -66,7 +80,8 @@ pub struct TestClient { client: PocMobileClient, key_pair: Arc, authorization: MetadataValue, - file_sink_rx: Receiver, + file_sink_rx: + Receiver>, } impl TestClient { @@ -74,7 +89,9 @@ impl TestClient { socket_addr: SocketAddr, key_pair: Keypair, api_token: String, - file_sink_rx: Receiver, + file_sink_rx: Receiver< + file_store::file_sink::Message, + >, ) -> TestClient { let client = (|| PocMobileClient::connect(format!("http://{socket_addr}"))) .retry(&ExponentialBuilder::default()) @@ -89,7 +106,7 @@ impl TestClient { } } - pub async fn recv(mut self) -> anyhow::Result> { + pub async fn recv(mut self) -> anyhow::Result { match timeout(Duration::from_secs(2), self.file_sink_rx.recv()).await { Ok(Some(msg)) => match msg { file_store::file_sink::Message::Commit(_) => bail!("got Commit"), diff --git a/ingest/tests/iot_ingest.rs b/ingest/tests/iot_ingest.rs index 94d19cf87..8aae5fc18 100644 --- a/ingest/tests/iot_ingest.rs +++ b/ingest/tests/iot_ingest.rs @@ -405,12 +405,12 @@ async fn stream_stops_on_session_timeout() { .await; } -struct MockFileSinkReceiver { - receiver: tokio::sync::mpsc::Receiver, +struct MockFileSinkReceiver { + receiver: tokio::sync::mpsc::Receiver>, } -impl MockFileSinkReceiver { - async fn receive(&mut self) -> SinkMessage { +impl MockFileSinkReceiver { + async fn receive(&mut self) -> SinkMessage { match timeout(seconds(2), self.receiver.recv()).await { Ok(Some(msg)) => msg, Ok(None) => panic!("server closed connection while waiting for message"), @@ -423,25 +423,28 @@ impl MockFileSinkReceiver { panic!("receiver should have been empty") }; } +} +impl MockFileSinkReceiver { async fn receive_beacon(&mut self) -> LoraBeaconIngestReportV1 { match self.receive().await { - SinkMessage::Data(_, bytes) => LoraBeaconIngestReportV1::decode(bytes.as_slice()) - .expect("decode beacon ingest report"), + SinkMessage::Data(_, msg) => msg, _ => panic!("invalid beacon message"), } } +} +impl MockFileSinkReceiver { async fn receive_witness(&mut self) -> LoraWitnessIngestReportV1 { match self.receive().await { - SinkMessage::Data(_, bytes) => LoraWitnessIngestReportV1::decode(bytes.as_slice()) - .expect("decode witness ingest report"), + SinkMessage::Data(_, msg) => msg, _ => panic!("invalid witness message"), } } } -fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { +fn create_file_sink( +) -> (FileSinkClient, MockFileSinkReceiver) { let (tx, rx) = tokio::sync::mpsc::channel(5); ( FileSinkClient::new(tx, "metric"), @@ -573,8 +576,8 @@ impl TestClient { fn create_test_server( socket_addr: SocketAddr, - beacon_file_sink: FileSinkClient, - witness_file_sink: FileSinkClient, + beacon_file_sink: FileSinkClient, + witness_file_sink: FileSinkClient, offer_timeout: Option, timeout: Option, ) -> GrpcServer { diff --git a/ingest/tests/mobile_ingest.rs b/ingest/tests/mobile_ingest.rs index 1f0f8da05..477e2ede2 100644 --- a/ingest/tests/mobile_ingest.rs +++ b/ingest/tests/mobile_ingest.rs @@ -1,6 +1,3 @@ -use helium_proto::services::poc_mobile::SubscriberVerifiedMappingEventIngestReportV1; -use prost::Message; - mod common; #[tokio::test] @@ -19,10 +16,7 @@ async fn submit_verified_subscriber_mapping_event() -> anyhow::Result<()> { let timestamp: String = res.unwrap().id; match client.recv().await { - Ok(data) => { - let report = SubscriberVerifiedMappingEventIngestReportV1::decode(data.as_slice()) - .expect("unable to decode into SubscriberVerifiedMappingEventIngestReportV1"); - + Ok(report) => { assert_eq!(timestamp, report.received_timestamp.to_string()); match report.report { diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 0d05e3cec..77b1697dd 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -8,13 +8,14 @@ use crate::{ use anyhow::{bail, Result}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::FileSinkBuilder, file_sink::FileSinkClient, file_source, file_upload, iot_packet::PacketRouterPacketReport, + traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, FileStore, FileType, }; use futures_util::TryFutureExt; +use helium_proto::services::packet_verifier::{InvalidPacket, ValidPacket}; use iot_config::client::{org_client::Orgs, OrgClient}; use solana::burn::SolanaRpc; use sqlx::{Pool, Postgres}; @@ -28,8 +29,8 @@ struct Daemon { pool: Pool, verifier: Verifier>>, SharedCachedOrgClient>, report_files: Receiver>, - valid_packets: FileSinkClient, - invalid_packets: FileSinkClient, + valid_packets: FileSinkClient, + invalid_packets: FileSinkClient, minimum_allowed_balance: u64, } @@ -137,24 +138,20 @@ impl Cmd { let store_base_path = std::path::Path::new(&settings.cache); // Verified packets: - let (valid_packets, valid_packets_server) = FileSinkBuilder::new( - FileType::IotValidPacket, + let (valid_packets, valid_packets_server) = ValidPacket::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_valid_packets"), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; - let (invalid_packets, invalid_packets_server) = FileSinkBuilder::new( - FileType::InvalidPacket, + let (invalid_packets, invalid_packets_server) = InvalidPacket::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_invalid_packets"), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; let org_client = Arc::new(Mutex::new(CachedOrgClient::new(OrgClient::from_settings( diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 4f632d23a..766a02a23 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -1,7 +1,9 @@ use crate::pending::AddPendingBurn; use async_trait::async_trait; use file_store::{ - file_sink::FileSinkClient, iot_packet::PacketRouterPacketReport, traits::MsgTimestamp, + file_sink::FileSinkClient, + iot_packet::PacketRouterPacketReport, + traits::{MsgBytes, MsgTimestamp}, }; use futures::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; @@ -371,7 +373,7 @@ pub trait PacketWriter { } #[async_trait] -impl PacketWriter for &'_ FileSinkClient { +impl PacketWriter for &'_ FileSinkClient { type Error = file_store::Error; async fn write(&mut self, packet: T) -> Result<(), Self::Error> { diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index bf8bc07fd..5afeb8866 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -2,8 +2,19 @@ use crate::entropy_loader::EntropyLoader; use anyhow::Result; use clap::Parser; use file_store::{ - entropy_report::EntropyReport, file_info_poller::LookbackBehavior, file_sink, file_source, - file_upload, iot_packet::IotValidPacket, FileStore, FileType, + entropy_report::EntropyReport, + file_info_poller::LookbackBehavior, + file_source, file_upload, + iot_packet::IotValidPacket, + traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + FileStore, FileType, +}; +use helium_proto::{ + services::poc_lora::{ + IotRewardShare, LoraInvalidBeaconReportV1, LoraInvalidWitnessReportV1, LoraPocV1, + NonRewardablePacket, + }, + RewardManifest, }; use iot_config::client::Client as IotConfigClient; use iot_verifier::{ @@ -110,27 +121,22 @@ impl Server { // * // Gateway reward shares sink - let (rewards_sink, gateway_rewards_sink_server) = file_sink::FileSinkBuilder::new( - FileType::IotRewardShare, + let (rewards_sink, gateway_rewards_sink_server) = IotRewardShare::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_gateway_reward_shares"), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; // Reward manifest - let (reward_manifests_sink, reward_manifests_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::RewardManifest, - store_base_path, - file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_iot_reward_manifest"), - ) - .auto_commit(false) - .create() - .await?; + let (reward_manifests_sink, reward_manifests_sink_server) = RewardManifest::file_sink( + store_base_path, + file_upload.clone(), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), + ) + .await?; let rewarder = Rewarder { pool: pool.clone(), @@ -168,14 +174,12 @@ impl Server { // * let (non_rewardable_packet_sink, non_rewardable_packet_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::NonRewardablePacket, + NonRewardablePacket::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_non_rewardable_packet"), + Some(Duration::from_secs(5 * 60)), + env!("CARGO_PKG_NAME"), ) - .roll_time(Duration::from_secs(5 * 60)) - .create() .await?; let packet_store = FileStore::from_settings(&settings.packet_ingest).await?; @@ -202,27 +206,22 @@ impl Server { // * // setup the purger requirements // * - let (purger_invalid_beacon_sink, purger_invalid_beacon_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::IotInvalidBeaconReport, + LoraInvalidBeaconReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon"), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; let (purger_invalid_witness_sink, purger_invalid_witness_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::IotInvalidWitnessReport, + LoraInvalidWitnessReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; let purger = purger::Purger::new( @@ -241,35 +240,29 @@ impl Server { // * let (runner_invalid_beacon_sink, runner_invalid_beacon_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::IotInvalidBeaconReport, + LoraInvalidBeaconReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon_report"), + Some(Duration::from_secs(5 * 60)), + env!("CARGO_PKG_NAME"), ) - .roll_time(Duration::from_secs(5 * 60)) - .create() .await?; let (runner_invalid_witness_sink, runner_invalid_witness_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::IotInvalidWitnessReport, + LoraInvalidWitnessReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), + Some(Duration::from_secs(5 * 60)), + env!("CARGO_PKG_NAME"), ) - .roll_time(Duration::from_secs(5 * 60)) - .create() .await?; - let (runner_poc_sink, runner_poc_sink_server) = file_sink::FileSinkBuilder::new( - FileType::IotPoc, + let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_valid_poc"), + Some(Duration::from_secs(2 * 60)), + env!("CARGO_PKG_NAME"), ) - .roll_time(Duration::from_secs(2 * 60)) - .create() .await?; let runner = runner::Runner::from_settings( diff --git a/iot_verifier/src/packet_loader.rs b/iot_verifier/src/packet_loader.rs index afdc1ba46..e41573edc 100644 --- a/iot_verifier/src/packet_loader.rs +++ b/iot_verifier/src/packet_loader.rs @@ -16,7 +16,7 @@ pub struct PacketLoader { pub cache: String, gateway_cache: GatewayCache, file_receiver: Receiver>, - file_sink: file_sink::FileSinkClient, + file_sink: file_sink::FileSinkClient, } #[derive(thiserror::Error, Debug)] @@ -42,7 +42,7 @@ impl PacketLoader { pool: PgPool, gateway_cache: GatewayCache, file_receiver: Receiver>, - file_sink: file_sink::FileSinkClient, + file_sink: file_sink::FileSinkClient, ) -> Self { tracing::info!("from_settings packet loader"); let cache = settings.cache.clone(); diff --git a/iot_verifier/src/purger.rs b/iot_verifier/src/purger.rs index c9a01d7db..4c83c87e6 100644 --- a/iot_verifier/src/purger.rs +++ b/iot_verifier/src/purger.rs @@ -32,8 +32,8 @@ pub struct Purger { pub beacon_stale_period: Duration, pub witness_stale_period: Duration, pub entropy_stale_period: Duration, - pub invalid_beacon_sink: FileSinkClient, - pub invalid_witness_sink: FileSinkClient, + pub invalid_beacon_sink: FileSinkClient, + pub invalid_witness_sink: FileSinkClient, } #[derive(thiserror::Error, Debug)] @@ -56,8 +56,8 @@ impl Purger { witness_stale_period: Duration, entropy_stale_period: Duration, pool: PgPool, - invalid_beacon_sink: FileSinkClient, - invalid_witness_sink: FileSinkClient, + invalid_beacon_sink: FileSinkClient, + invalid_witness_sink: FileSinkClient, ) -> Result { Ok(Self { pool, diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index c0129a071..1f9bd1527 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -28,8 +28,8 @@ const REWARDS_NOT_CURRENT_DELAY_PERIOD: Duration = Duration::from_secs(5 * 60); pub struct Rewarder { pub pool: Pool, - pub rewards_sink: file_sink::FileSinkClient, - pub reward_manifests_sink: file_sink::FileSinkClient, + pub rewards_sink: file_sink::FileSinkClient, + pub reward_manifests_sink: file_sink::FileSinkClient, pub reward_period_hours: Duration, pub reward_offset: Duration, pub price_tracker: PriceTracker, @@ -53,8 +53,8 @@ impl ManagedTask for Rewarder { impl Rewarder { pub async fn new( pool: PgPool, - rewards_sink: file_sink::FileSinkClient, - reward_manifests_sink: file_sink::FileSinkClient, + rewards_sink: file_sink::FileSinkClient, + reward_manifests_sink: file_sink::FileSinkClient, reward_period_hours: Duration, reward_offset: Duration, price_tracker: PriceTracker, @@ -230,7 +230,7 @@ impl Rewarder { } pub async fn reward_poc_and_dc( pool: &Pool, - rewards_sink: &file_sink::FileSinkClient, + rewards_sink: &file_sink::FileSinkClient, reward_period: &Range>, iot_price: Decimal, ) -> anyhow::Result { @@ -284,7 +284,7 @@ pub async fn reward_poc_and_dc( } pub async fn reward_operational( - rewards_sink: &file_sink::FileSinkClient, + rewards_sink: &file_sink::FileSinkClient, reward_period: &Range>, ) -> anyhow::Result<()> { let total_operational_rewards = @@ -329,7 +329,7 @@ pub async fn reward_operational( } pub async fn reward_oracles( - rewards_sink: &file_sink::FileSinkClient, + rewards_sink: &file_sink::FileSinkClient, reward_period: &Range>, ) -> anyhow::Result<()> { // atm 100% of oracle rewards are assigned to 'unallocated' @@ -352,7 +352,7 @@ pub async fn reward_oracles( } async fn write_unallocated_reward( - rewards_sink: &file_sink::FileSinkClient, + rewards_sink: &file_sink::FileSinkClient, unallocated_type: UnallocatedRewardType, unallocated_amount: u64, reward_period: &'_ Range>, diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 58e0be180..5b1092c8d 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -59,9 +59,9 @@ pub struct Runner { pub deny_list: DenyList, pub gateway_cache: GatewayCache, pub region_cache: RegionCache, - pub invalid_beacon_sink: FileSinkClient, - pub invalid_witness_sink: FileSinkClient, - pub poc_sink: FileSinkClient, + pub invalid_beacon_sink: FileSinkClient, + pub invalid_witness_sink: FileSinkClient, + pub poc_sink: FileSinkClient, pub hex_density_map: HexDensityMap, pub witness_updater: WitnessUpdater, } @@ -104,9 +104,9 @@ where gateways: G, pool: PgPool, gateway_cache: GatewayCache, - invalid_beacon_sink: FileSinkClient, - invalid_witness_sink: FileSinkClient, - poc_sink: FileSinkClient, + invalid_beacon_sink: FileSinkClient, + invalid_witness_sink: FileSinkClient, + poc_sink: FileSinkClient, hex_density_map: HexDensityMap, witness_updater: WitnessUpdater, ) -> anyhow::Result { diff --git a/iot_verifier/tests/integrations/common/mod.rs b/iot_verifier/tests/integrations/common/mod.rs index 0dea4c251..cf4a63cb9 100644 --- a/iot_verifier/tests/integrations/common/mod.rs +++ b/iot_verifier/tests/integrations/common/mod.rs @@ -31,7 +31,8 @@ use sqlx::{PgPool, Postgres, Transaction}; use std::{self, ops::DerefMut, str::FromStr}; use tokio::{sync::mpsc::error::TryRecvError, sync::Mutex, time::timeout}; -pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { +pub fn create_file_sink( +) -> (FileSinkClient, MockFileSinkReceiver) { let (tx, rx) = tokio::sync::mpsc::channel(10); ( @@ -40,12 +41,12 @@ pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { ) } -pub struct MockFileSinkReceiver { - pub receiver: tokio::sync::mpsc::Receiver, +pub struct MockFileSinkReceiver { + pub receiver: tokio::sync::mpsc::Receiver>, } -impl MockFileSinkReceiver { - pub async fn receive(&mut self) -> Option> { +impl MockFileSinkReceiver { + pub async fn receive(&mut self) -> Option { match timeout(seconds(2), self.receiver.recv()).await { Ok(Some(SinkMessage::Data(on_write_tx, msg))) => { let _ = on_write_tx.send(Ok(())); @@ -65,37 +66,38 @@ impl MockFileSinkReceiver { panic!("receiver should have been empty") }; } +} +impl MockFileSinkReceiver { pub async fn receive_valid_poc(&mut self) -> LoraPocV1 { match self.receive().await { - Some(bytes) => { - LoraPocV1::decode(bytes.as_slice()).expect("failed to decode expected valid poc") - } + Some(msg) => msg, None => panic!("failed to receive valid poc"), } } - +} +impl MockFileSinkReceiver { pub async fn receive_invalid_beacon(&mut self) -> LoraInvalidBeaconReportV1 { match self.receive().await { - Some(bytes) => LoraInvalidBeaconReportV1::decode(bytes.as_slice()) - .expect("failed to decode expected invalid beacon report"), + Some(msg) => msg, None => panic!("failed to receive invalid beacon"), } } +} +impl MockFileSinkReceiver { pub async fn receive_invalid_witness(&mut self) -> LoraInvalidWitnessReportV1 { match self.receive().await { - Some(bytes) => LoraInvalidWitnessReportV1::decode(bytes.as_slice()) - .expect("failed to decode expected invalid witness report"), + Some(msg) => msg, None => panic!("failed to receive invalid witness"), } } +} +impl MockFileSinkReceiver { pub async fn receive_gateway_reward(&mut self) -> GatewayReward { match self.receive().await { - Some(bytes) => { - let iot_reward = IotRewardShare::decode(bytes.as_slice()) - .expect("failed to decode expected gateway reward"); + Some(iot_reward) => { println!("iot_reward: {:?}", iot_reward); match iot_reward.reward { Some(IotReward::GatewayReward(r)) => r, @@ -108,9 +110,7 @@ impl MockFileSinkReceiver { pub async fn receive_operational_reward(&mut self) -> OperationalReward { match self.receive().await { - Some(bytes) => { - let iot_reward = IotRewardShare::decode(bytes.as_slice()) - .expect("failed to decode expected operational reward"); + Some(iot_reward) => { println!("iot_reward: {:?}", iot_reward); match iot_reward.reward { Some(IotReward::OperationalReward(r)) => r, @@ -123,9 +123,7 @@ impl MockFileSinkReceiver { pub async fn receive_unallocated_reward(&mut self) -> UnallocatedReward { match self.receive().await { - Some(bytes) => { - let iot_reward = IotRewardShare::decode(bytes.as_slice()) - .expect("failed to decode expected unallocated reward"); + Some(iot_reward) => { println!("iot_reward: {:?}", iot_reward); match iot_reward.reward { Some(IotReward::UnallocatedReward(r)) => r, diff --git a/iot_verifier/tests/integrations/rewarder_operations.rs b/iot_verifier/tests/integrations/rewarder_operations.rs index f8d71f588..7bf9f51ca 100644 --- a/iot_verifier/tests/integrations/rewarder_operations.rs +++ b/iot_verifier/tests/integrations/rewarder_operations.rs @@ -1,6 +1,6 @@ use crate::common::{self, MockFileSinkReceiver}; use chrono::{Duration as ChronoDuration, Utc}; -use helium_proto::services::poc_lora::OperationalReward; +use helium_proto::services::poc_lora::{IotRewardShare, OperationalReward}; use iot_verifier::{reward_share, rewarder}; use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; use rust_decimal_macros::dec; @@ -34,7 +34,7 @@ async fn test_operations() -> anyhow::Result<()> { } async fn receive_expected_rewards( - iot_rewards: &mut MockFileSinkReceiver, + iot_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result { // expect one operational reward msg let reward = iot_rewards.receive_operational_reward().await; diff --git a/iot_verifier/tests/integrations/rewarder_oracles.rs b/iot_verifier/tests/integrations/rewarder_oracles.rs index 37c9aabbd..72254a6a9 100644 --- a/iot_verifier/tests/integrations/rewarder_oracles.rs +++ b/iot_verifier/tests/integrations/rewarder_oracles.rs @@ -1,6 +1,6 @@ use crate::common::{self, MockFileSinkReceiver}; use chrono::{Duration as ChronoDuration, Utc}; -use helium_proto::services::poc_lora::UnallocatedReward; +use helium_proto::services::poc_lora::{IotRewardShare, UnallocatedReward}; use iot_verifier::{reward_share, rewarder}; use rust_decimal::{prelude::ToPrimitive, Decimal, RoundingStrategy}; use rust_decimal_macros::dec; @@ -35,7 +35,7 @@ async fn test_oracles(_pool: PgPool) -> anyhow::Result<()> { } async fn receive_expected_rewards( - iot_rewards: &mut MockFileSinkReceiver, + iot_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result { // expect one unallocated reward // as oracle rewards are currently 100% unallocated diff --git a/iot_verifier/tests/integrations/rewarder_poc_dc.rs b/iot_verifier/tests/integrations/rewarder_poc_dc.rs index 0ed213b58..db822df65 100644 --- a/iot_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/iot_verifier/tests/integrations/rewarder_poc_dc.rs @@ -1,7 +1,9 @@ use crate::common::{self, MockFileSinkReceiver}; use chrono::{DateTime, Duration as ChronoDuration, Utc}; use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_lora::{GatewayReward, UnallocatedReward, UnallocatedRewardType}; +use helium_proto::services::poc_lora::{ + GatewayReward, IotRewardShare, UnallocatedReward, UnallocatedRewardType, +}; use iot_verifier::{ poc_report::ReportType, reward_share::{self, GatewayDCShare, GatewayPocShare}, @@ -113,7 +115,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { } async fn receive_expected_rewards( - iot_rewards: &mut MockFileSinkReceiver, + iot_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result<(Vec, UnallocatedReward)> { // get the filestore outputs from rewards run // we will have 3 gateway rewards and one unallocated reward diff --git a/iot_verifier/tests/integrations/runner_tests.rs b/iot_verifier/tests/integrations/runner_tests.rs index 3c8fd7fec..c8897f71c 100644 --- a/iot_verifier/tests/integrations/runner_tests.rs +++ b/iot_verifier/tests/integrations/runner_tests.rs @@ -5,8 +5,8 @@ use denylist::DenyList; use futures_util::{stream, StreamExt as FuturesStreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_lora::{ - InvalidParticipantSide, InvalidReason, LoraBeaconReportReqV1, LoraWitnessReportReqV1, - VerificationStatus, + InvalidParticipantSide, InvalidReason, LoraBeaconReportReqV1, LoraInvalidBeaconReportV1, + LoraInvalidWitnessReportV1, LoraPocV1, LoraWitnessReportReqV1, VerificationStatus, }; use helium_proto::Region as ProtoRegion; use iot_config::{ @@ -60,9 +60,9 @@ impl Gateways for MockIotConfigClient { struct TestContext { runner: Runner, - valid_pocs: MockFileSinkReceiver, - invalid_beacons: MockFileSinkReceiver, - invalid_witnesses: MockFileSinkReceiver, + valid_pocs: MockFileSinkReceiver, + invalid_beacons: MockFileSinkReceiver, + invalid_witnesses: MockFileSinkReceiver, entropy_ts: DateTime, } diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index b0da9b8e6..e00570b61 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -21,7 +21,7 @@ pub async fn accumulate_sessions( gateway_info_resolver: &impl GatewayInfoResolver, authorization_verifier: &impl AuthorizationVerifier, conn: &mut Transaction<'_, Postgres>, - invalid_data_session_report_sink: &FileSinkClient, + invalid_data_session_report_sink: &FileSinkClient, curr_file_ts: DateTime, reports: impl Stream, ) -> anyhow::Result<()> { @@ -131,7 +131,7 @@ async fn is_duplicate( } async fn write_invalid_report( - invalid_data_session_report_sink: &FileSinkClient, + invalid_data_session_report_sink: &FileSinkClient, reason: DataTransferIngestReportStatus, report: DataTransferSessionIngestReport, ) -> Result<(), file_store::Error> { diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index 6cf3c7f41..f3cb54617 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -31,12 +31,12 @@ impl PayerTotals { } pub struct Burner { - valid_sessions: FileSinkClient, + valid_sessions: FileSinkClient, solana: S, } impl Burner { - pub fn new(valid_sessions: FileSinkClient, solana: S) -> Self { + pub fn new(valid_sessions: FileSinkClient, solana: S) -> Self { Self { valid_sessions, solana, diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 767132e1b..af94716e9 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -6,9 +6,13 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload, mobile_session::DataTransferSessionIngestReport, - FileSinkBuilder, FileStore, FileType, + traits::FileSinkWriteExt, + FileStore, FileType, }; +use helium_proto::services::{ + packet_verifier::ValidDataTransferSession, poc_mobile::InvalidDataTransferIngestReportV1, +}; use mobile_config::client::{ authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, AuthorizationClient, GatewayClient, @@ -29,7 +33,7 @@ pub struct Daemon { min_burn_period: Duration, gateway_info_resolver: GIR, authorization_verifier: AV, - invalid_data_session_report_sink: FileSinkClient, + invalid_data_session_report_sink: FileSinkClient, } impl Daemon { @@ -40,7 +44,7 @@ impl Daemon { burner: Burner, gateway_info_resolver: GIR, authorization_verifier: AV, - invalid_data_session_report_sink: FileSinkClient, + invalid_data_session_report_sink: FileSinkClient, ) -> Self { Self { pool, @@ -137,25 +141,22 @@ impl Cmd { let store_base_path = std::path::Path::new(&settings.cache); - let (valid_sessions, valid_sessions_server) = FileSinkBuilder::new( - FileType::ValidDataTransferSession, + let (valid_sessions, valid_sessions_server) = ValidDataTransferSession::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_valid_data_transfer_session"), + None, + env!("CARGO_PKG_NAME"), ) - .auto_commit(true) - .create() .await?; - let (invalid_sessions, invalid_sessions_server) = FileSinkBuilder::new( - FileType::InvalidDataTransferSessionIngestReport, - store_base_path, - file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_invalid_data_transfer_session"), - ) - .auto_commit(false) - .create() - .await?; + let (invalid_sessions, invalid_sessions_server) = + InvalidDataTransferIngestReportV1::file_sink( + store_base_path, + file_upload.clone(), + None, + env!("CARGO_PKG_NAME"), + ) + .await?; let burner = Burner::new(valid_sessions, solana); diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index 770888228..32819fd66 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -7,13 +7,13 @@ use std::{ use chrono::{DateTime, Utc}; use file_store::{ - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_upload::FileUpload, - traits::{TimestampDecode, TimestampEncode}, - FileStore, FileType, + traits::{FileSinkWriteExt, TimestampDecode, TimestampEncode}, + FileStore, }; use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; -use helium_proto::services::poc_mobile as proto; +use helium_proto::services::poc_mobile::{self as proto, OracleBoostingReportV1}; use hextree::disktree::DiskTreeMap; use lazy_static::lazy_static; use regex::Regex; @@ -244,7 +244,14 @@ where } } -impl DataSetDownloaderDaemon { +impl + DataSetDownloaderDaemon< + Footfall, + Landtype, + Urbanization, + FileSinkClient, + > +{ pub async fn create_managed_task( pool: PgPool, settings: &Settings, @@ -252,15 +259,12 @@ impl DataSetDownloaderDaemon { new_coverage_object_notification: NewCoverageObjectNotification, ) -> anyhow::Result { let (oracle_boosting_reports, oracle_boosting_reports_server) = - file_sink::FileSinkBuilder::new( - FileType::OracleBoostingReport, + OracleBoostingReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_oracle_boosting_report"), + Some(Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), ) - .auto_commit(true) - .roll_time(Duration::from_secs(15 * 60)) - .create() .await?; let urbanization = Urbanization::new(None); @@ -518,7 +522,7 @@ pub trait DataSetProcessor: Send + Sync + 'static { } #[async_trait::async_trait] -impl DataSetProcessor for FileSinkClient { +impl DataSetProcessor for FileSinkClient { async fn set_all_oracle_boosting_assignments( &self, pool: &PgPool, @@ -725,7 +729,10 @@ impl AssignedCoverageObjects { Ok(Self { coverage_objs }) } - async fn write(&self, boosting_reports: &FileSinkClient) -> file_store::Result { + async fn write( + &self, + boosting_reports: &FileSinkClient, + ) -> file_store::Result { let timestamp = Utc::now().encode_timestamp(); for (uuid, hexes) in self.coverage_objs.iter() { let assignments: Vec<_> = hexes diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 7e1768d21..0ce51d47c 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -15,7 +15,8 @@ use crate::{ telemetry, Settings, }; use anyhow::Result; -use file_store::{file_sink, file_upload, FileStore, FileType}; +use file_store::{file_upload, traits::FileSinkWriteExt, FileStore}; +use helium_proto::services::poc_mobile::{Heartbeat, SeniorityUpdate, SpeedtestAvg}; use mobile_config::client::{ entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient, CarrierServiceClient, GatewayClient, @@ -48,38 +49,29 @@ impl Cmd { let carrier_client = CarrierServiceClient::from_settings(&settings.config_client)?; let hex_boosting_client = HexBoostingClient::from_settings(&settings.config_client)?; - let (valid_heartbeats, valid_heartbeats_server) = file_sink::FileSinkBuilder::new( - FileType::ValidatedHeartbeat, + let (valid_heartbeats, valid_heartbeats_server) = Heartbeat::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_heartbeat"), + Some(Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .roll_time(Duration::from_secs(15 * 60)) - .create() .await?; // Seniority updates - let (seniority_updates, seniority_updates_server) = file_sink::FileSinkBuilder::new( - FileType::SeniorityUpdate, + let (seniority_updates, seniority_updates_server) = SeniorityUpdate::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_seniority_update"), + Some(Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .roll_time(Duration::from_secs(15 * 60)) - .create() .await?; - let (speedtests_avg, speedtests_avg_server) = file_sink::FileSinkBuilder::new( - FileType::SpeedtestAvg, + let (speedtests_avg, speedtests_avg_server) = SpeedtestAvg::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"), + Some(Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .roll_time(Duration::from_secs(15 * 60)) - .create() .await?; let usa_region_paths = settings.usa_region_paths()?; diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 055cd3149..d10aebe7a 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -7,10 +7,10 @@ use chrono::{DateTime, Utc}; use file_store::{ coverage::{self, CoverageObjectIngestReport}, file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_source, file_upload::FileUpload, - traits::TimestampEncode, + traits::{FileSinkWriteExt, TimestampEncode}, FileStore, FileType, }; use futures::{ @@ -73,7 +73,7 @@ pub struct CoverageDaemon { pool: Pool, auth_client: AuthorizationClient, coverage_objs: Receiver>, - coverage_obj_sink: FileSinkClient, + coverage_obj_sink: FileSinkClient, new_coverage_object_notifier: NewCoverageObjectNotifier, } @@ -86,15 +86,12 @@ impl CoverageDaemon { auth_client: AuthorizationClient, new_coverage_object_notifier: NewCoverageObjectNotifier, ) -> anyhow::Result { - let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new( - FileType::CoverageObject, + let (valid_coverage_objs, valid_coverage_objs_server) = proto::CoverageObjectV1::file_sink( settings.store_base_path(), file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_coverage_object"), + Some(Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .roll_time(Duration::from_secs(15 * 60)) - .create() .await?; let (coverage_objs, coverage_objs_server) = @@ -126,7 +123,7 @@ impl CoverageDaemon { pool: PgPool, auth_client: AuthorizationClient, coverage_objs: Receiver>, - coverage_obj_sink: FileSinkClient, + coverage_obj_sink: FileSinkClient, new_coverage_object_notifier: NewCoverageObjectNotifier, ) -> Self { Self { @@ -272,7 +269,10 @@ impl CoverageObject { } } - pub async fn write(&self, coverage_objects: &FileSinkClient) -> anyhow::Result<()> { + pub async fn write( + &self, + coverage_objects: &FileSinkClient, + ) -> anyhow::Result<()> { coverage_objects .write( proto::CoverageObjectV1 { diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index 2d7e8f0a1..e02010c81 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -15,6 +15,7 @@ use file_store::{ FileStore, FileType, }; use futures::{stream::StreamExt, TryFutureExt}; +use helium_proto::services::poc_mobile as proto; use retainer::Cache; use sqlx::{Pool, Postgres}; use std::{ @@ -29,8 +30,8 @@ pub struct CbrsHeartbeatDaemon { gateway_info_resolver: GIR, heartbeats: Receiver>, max_distance_to_coverage: u32, - heartbeat_sink: FileSinkClient, - seniority_sink: FileSinkClient, + heartbeat_sink: FileSinkClient, + seniority_sink: FileSinkClient, geofence: GFV, } @@ -45,8 +46,8 @@ where settings: &Settings, file_store: FileStore, gateway_resolver: GIR, - valid_heartbeats: FileSinkClient, - seniority_updates: FileSinkClient, + valid_heartbeats: FileSinkClient, + seniority_updates: FileSinkClient, geofence: GFV, ) -> anyhow::Result { // CBRS Heartbeats @@ -82,8 +83,8 @@ where gateway_info_resolver: GIR, heartbeats: Receiver>, max_distance_to_coverage: u32, - heartbeat_sink: FileSinkClient, - seniority_sink: FileSinkClient, + heartbeat_sink: FileSinkClient, + seniority_sink: FileSinkClient, geofence: GFV, ) -> Self { Self { diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 13417ba42..c3399f58d 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -607,7 +607,7 @@ impl ValidatedHeartbeat { }) } - pub async fn write(&self, heartbeats: &FileSinkClient) -> file_store::Result { + pub async fn write(&self, heartbeats: &FileSinkClient) -> file_store::Result { heartbeats .write( proto::Heartbeat { @@ -720,8 +720,8 @@ pub(crate) async fn process_validated_heartbeats( validated_heartbeats: impl Stream>, heartbeat_cache: &Cache<(String, DateTime), ()>, coverage_claim_time_cache: &CoverageClaimTimeCache, - heartbeat_sink: &FileSinkClient, - seniority_sink: &FileSinkClient, + heartbeat_sink: &FileSinkClient, + seniority_sink: &FileSinkClient, transaction: &mut Transaction<'_, Postgres>, ) -> anyhow::Result<()> { let mut validated_heartbeats = pin!(validated_heartbeats); diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index f40da6e24..9d2ed0d5f 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -14,6 +14,7 @@ use file_store::{ FileStore, FileType, }; use futures::{stream::StreamExt, TryFutureExt}; +use helium_proto::services::poc_mobile as proto; use retainer::Cache; use sqlx::{Pool, Postgres}; use std::{ @@ -28,8 +29,8 @@ pub struct WifiHeartbeatDaemon { gateway_info_resolver: GIR, heartbeats: Receiver>, max_distance_to_coverage: u32, - heartbeat_sink: FileSinkClient, - seniority_sink: FileSinkClient, + heartbeat_sink: FileSinkClient, + seniority_sink: FileSinkClient, geofence: GFV, } @@ -44,8 +45,8 @@ where settings: &Settings, file_store: FileStore, gateway_resolver: GIR, - valid_heartbeats: FileSinkClient, - seniority_updates: FileSinkClient, + valid_heartbeats: FileSinkClient, + seniority_updates: FileSinkClient, geofence: GFV, ) -> anyhow::Result { // Wifi Heartbeats @@ -80,8 +81,8 @@ where gateway_info_resolver: GIR, heartbeats: Receiver>, max_distance_to_coverage: u32, - heartbeat_sink: FileSinkClient, - seniority_sink: FileSinkClient, + heartbeat_sink: FileSinkClient, + seniority_sink: FileSinkClient, geofence: GFV, ) -> Self { Self { diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index fb058ced0..c10453bff 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_source, file_upload::FileUpload, mobile_radio_invalidated_threshold::{ @@ -11,6 +11,7 @@ use file_store::{ mobile_radio_threshold::{ RadioThresholdIngestReport, RadioThresholdReportReq, VerifiedRadioThresholdIngestReport, }, + traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -35,8 +36,8 @@ pub struct RadioThresholdIngestor { pool: PgPool, reports_receiver: Receiver>, invalid_reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, - verified_invalid_report_sink: FileSinkClient, + verified_report_sink: FileSinkClient, + verified_invalid_report_sink: FileSinkClient, authorization_verifier: AV, } @@ -69,28 +70,21 @@ where authorization_verifier: AV, ) -> anyhow::Result { let (verified_radio_threshold, verified_radio_threshold_server) = - file_sink::FileSinkBuilder::new( - FileType::VerifiedRadioThresholdIngestReport, + VerifiedRadioThresholdIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_verified_radio_threshold"), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; let (verified_invalidated_radio_threshold, verified_invalidated_radio_threshold_server) = - file_sink::FileSinkBuilder::new( - FileType::VerifiedInvalidatedRadioThresholdIngestReport, + VerifiedInvalidatedRadioThresholdIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - concat!( - env!("CARGO_PKG_NAME"), - "_verified_invalidated_radio_threshold" - ), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; let (radio_threshold_ingest, radio_threshold_ingest_server) = @@ -134,8 +128,10 @@ where pool: sqlx::Pool, reports_receiver: Receiver>, invalid_reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, - verified_invalid_report_sink: FileSinkClient, + verified_report_sink: FileSinkClient, + verified_invalid_report_sink: FileSinkClient< + VerifiedInvalidatedRadioThresholdIngestReportV1, + >, authorization_verifier: AV, ) -> Self { Self { diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 7f22868a7..8d5f81426 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -15,18 +15,17 @@ use anyhow::bail; use chrono::{DateTime, TimeZone, Utc}; use db_store::meta; use file_store::{ - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_upload::FileUpload, - traits::TimestampEncode, - FileType, + traits::{FileSinkWriteExt, TimestampEncode, DEFAULT_ROLL_TIME}, }; use futures_util::TryFutureExt; use helium_proto::{ reward_manifest::RewardData::MobileRewardData, - services::{ - poc_mobile as proto, poc_mobile::mobile_reward_share::Reward as ProtoReward, - poc_mobile::UnallocatedReward, poc_mobile::UnallocatedRewardType, + services::poc_mobile::{ + self as proto, mobile_reward_share::Reward as ProtoReward, MobileRewardShare, + UnallocatedReward, UnallocatedRewardType, }, MobileRewardData as ManifestMobileRewardData, RewardManifest, }; @@ -58,10 +57,10 @@ pub struct Rewarder { hex_service_client: B, reward_period_duration: Duration, reward_offset: Duration, - pub mobile_rewards: FileSinkClient, - reward_manifests: FileSinkClient, + pub mobile_rewards: FileSinkClient, + reward_manifests: FileSinkClient, price_tracker: PriceTracker, - speedtest_averages: FileSinkClient, + speedtest_averages: FileSinkClient, } impl Rewarder @@ -75,28 +74,24 @@ where file_upload: FileUpload, carrier_service_verifier: A, hex_boosting_info_resolver: B, - speedtests_avg: FileSinkClient, + speedtests_avg: FileSinkClient, ) -> anyhow::Result { let (price_tracker, price_daemon) = PriceTracker::new_tm(&settings.price_tracker).await?; - let (mobile_rewards, mobile_rewards_server) = file_sink::FileSinkBuilder::new( - FileType::MobileRewardShare, + let (mobile_rewards, mobile_rewards_server) = MobileRewardShare::file_sink( settings.store_base_path(), file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_radio_reward_shares"), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; - let (reward_manifests, reward_manifests_server) = file_sink::FileSinkBuilder::new( - FileType::RewardManifest, + let (reward_manifests, reward_manifests_server) = RewardManifest::file_sink( settings.store_base_path(), file_upload, - concat!(env!("CARGO_PKG_NAME"), "_reward_manifest"), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; let rewarder = Rewarder::new( @@ -126,10 +121,10 @@ where hex_service_client: B, reward_period_duration: Duration, reward_offset: Duration, - mobile_rewards: FileSinkClient, - reward_manifests: FileSinkClient, + mobile_rewards: FileSinkClient, + reward_manifests: FileSinkClient, price_tracker: PriceTracker, - speedtest_averages: FileSinkClient, + speedtest_averages: FileSinkClient, ) -> Self { Self { pool, @@ -354,8 +349,8 @@ where pub async fn reward_poc_and_dc( pool: &Pool, hex_service_client: &impl HexBoostingInfoResolver, - mobile_rewards: &FileSinkClient, - speedtest_avg_sink: &FileSinkClient, + mobile_rewards: &FileSinkClient, + speedtest_avg_sink: &FileSinkClient, reward_period: &Range>, mobile_bone_price: Decimal, ) -> anyhow::Result { @@ -415,8 +410,8 @@ pub async fn reward_poc_and_dc( async fn reward_poc( pool: &Pool, hex_service_client: &impl HexBoostingInfoResolver, - mobile_rewards: &FileSinkClient, - speedtest_avg_sink: &FileSinkClient, + mobile_rewards: &FileSinkClient, + speedtest_avg_sink: &FileSinkClient, reward_period: &Range>, reward_shares: DataTransferAndPocAllocatedRewardBuckets, ) -> anyhow::Result<(Decimal, CalculatedPocRewardShares)> { @@ -479,7 +474,7 @@ async fn reward_poc( } pub async fn reward_dc( - mobile_rewards: &FileSinkClient, + mobile_rewards: &FileSinkClient, reward_period: &Range>, transfer_rewards: TransferRewards, reward_shares: &DataTransferAndPocAllocatedRewardBuckets, @@ -504,7 +499,7 @@ pub async fn reward_dc( pub async fn reward_mappers( pool: &Pool, - mobile_rewards: &FileSinkClient, + mobile_rewards: &FileSinkClient, reward_period: &Range>, ) -> anyhow::Result<()> { // Mapper rewards currently include rewards for discovery mapping only. @@ -554,7 +549,7 @@ pub async fn reward_mappers( } pub async fn reward_oracles( - mobile_rewards: &FileSinkClient, + mobile_rewards: &FileSinkClient, reward_period: &Range>, ) -> anyhow::Result<()> { // atm 100% of oracle rewards are assigned to 'unallocated' @@ -579,7 +574,7 @@ pub async fn reward_oracles( pub async fn reward_service_providers( pool: &Pool, carrier_client: &impl CarrierServiceVerifier, - mobile_rewards: &FileSinkClient, + mobile_rewards: &FileSinkClient, reward_period: &Range>, mobile_bone_price: Decimal, ) -> anyhow::Result<()> { @@ -617,7 +612,7 @@ pub async fn reward_service_providers( } async fn write_unallocated_reward( - mobile_rewards: &FileSinkClient, + mobile_rewards: &FileSinkClient, unallocated_type: UnallocatedRewardType, unallocated_amount: u64, reward_period: &'_ Range>, diff --git a/mobile_verifier/src/seniority.rs b/mobile_verifier/src/seniority.rs index 8b4a07c59..b17ba3ab6 100644 --- a/mobile_verifier/src/seniority.rs +++ b/mobile_verifier/src/seniority.rs @@ -134,7 +134,10 @@ impl<'a> SeniorityUpdate<'a> { impl SeniorityUpdate<'_> { #[allow(deprecated)] - pub async fn write(&self, seniorities: &FileSinkClient) -> anyhow::Result<()> { + pub async fn write( + &self, + seniorities: &FileSinkClient, + ) -> anyhow::Result<()> { if let SeniorityUpdateAction::Insert { new_seniority, update_reason, diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index e3ea08cb9..54433e117 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -5,8 +5,9 @@ use file_store::{ file_info_poller::{ FileInfoPollerConfigBuilder, FileInfoStream, LookbackBehavior, ProstFileInfoPollerParser, }, - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_upload::FileUpload, + traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, FileStore, FileType, }; use futures::{prelude::future::LocalBoxFuture, StreamExt, TryFutureExt, TryStreamExt}; @@ -17,7 +18,8 @@ use helium_proto::services::{ service_provider_boosted_rewards_banned_radio_req_v1::{ KeyType as ProtoKeyType, SpBoostedRewardsBannedRadioReason, }, - SeniorityUpdateReason, ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + SeniorityUpdate as SeniorityUpdateProto, SeniorityUpdateReason, + ServiceProviderBoostedRewardsBannedRadioIngestReportV1, ServiceProviderBoostedRewardsBannedRadioVerificationStatus, VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1, }, @@ -117,8 +119,8 @@ pub struct ServiceProviderBoostedRewardsBanIngestor { pool: PgPool, authorization_verifier: AV, receiver: Receiver>, - verified_sink: FileSinkClient, - seniority_update_sink: FileSinkClient, + verified_sink: FileSinkClient, + seniority_update_sink: FileSinkClient, } impl ManagedTask for ServiceProviderBoostedRewardsBanIngestor @@ -150,17 +152,16 @@ where file_store: FileStore, authorization_verifier: AV, settings: &Settings, - seniority_update_sink: FileSinkClient, + seniority_update_sink: FileSinkClient, ) -> anyhow::Result { - let (verified_sink, verified_sink_server) = file_sink::FileSinkBuilder::new( - FileType::VerifiedSPBoostedRewardsBannedRadioIngestReport, - settings.store_base_path(), - file_upload, - concat!(env!("CARGO_PKG_NAME"), "_verified_sp_boosted_rewards_ban"), - ) - .auto_commit(false) - .create() - .await?; + let (verified_sink, verified_sink_server) = + VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink( + settings.store_base_path(), + file_upload, + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), + ) + .await?; let (receiver, ingest_server) = FileInfoPollerConfigBuilder::< ServiceProviderBoostedRewardsBannedRadioIngestReportV1, @@ -421,7 +422,9 @@ mod tests { use chrono::Duration; use file_store::file_sink::Message; use helium_crypto::{KeyTag, Keypair, PublicKey}; - use helium_proto::services::poc_mobile::ServiceProviderBoostedRewardsBannedRadioReqV1; + use helium_proto::services::poc_mobile::{ + SeniorityUpdate as SeniorityUpdateProto, ServiceProviderBoostedRewardsBannedRadioReqV1, + }; use rand::rngs::OsRng; use tokio::sync::mpsc; @@ -448,8 +451,9 @@ mod tests { struct TestSetup { ingestor: ServiceProviderBoostedRewardsBanIngestor, - _verified_receiver: Receiver, - _seniority_receiver: Receiver, + _verified_receiver: + Receiver>, + _seniority_receiver: Receiver>, } impl TestSetup { diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 9809b3a0b..97edf8e82 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -5,10 +5,11 @@ use crate::{ use chrono::{DateTime, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_source, file_upload::FileUpload, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, + traits::FileSinkWriteExt, FileStore, FileType, }; use futures::{ @@ -17,7 +18,7 @@ use futures::{ }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ - SpeedtestIngestReportV1, SpeedtestVerificationResult, + SpeedtestAvg as SpeedtestAvgProto, SpeedtestIngestReportV1, SpeedtestVerificationResult, VerifiedSpeedtest as VerifiedSpeedtestProto, }; use mobile_config::client::gateway_client::GatewayInfoResolver; @@ -57,8 +58,8 @@ pub struct SpeedtestDaemon { pool: sqlx::Pool, gateway_info_resolver: GIR, speedtests: Receiver>, - speedtest_avg_file_sink: FileSinkClient, - verified_speedtest_file_sink: FileSinkClient, + speedtest_avg_file_sink: FileSinkClient, + verified_speedtest_file_sink: FileSinkClient, } impl SpeedtestDaemon @@ -70,18 +71,15 @@ where settings: &Settings, file_upload: FileUpload, file_store: FileStore, - speedtests_avg: FileSinkClient, + speedtests_avg: FileSinkClient, gateway_resolver: GIR, ) -> anyhow::Result { - let (speedtests_validity, speedtests_validity_server) = file_sink::FileSinkBuilder::new( - FileType::VerifiedSpeedtest, + let (speedtests_validity, speedtests_validity_server) = VerifiedSpeedtestProto::file_sink( settings.store_base_path(), file_upload, - concat!(env!("CARGO_PKG_NAME"), "_verified_speedtest"), + Some(Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .roll_time(Duration::from_secs(15 * 60)) - .create() .await?; let (speedtests, speedtests_server) = @@ -112,8 +110,8 @@ where pool: sqlx::Pool, gateway_info_resolver: GIR, speedtests: Receiver>, - speedtest_avg_file_sink: FileSinkClient, - verified_speedtest_file_sink: FileSinkClient, + speedtest_avg_file_sink: FileSinkClient, + verified_speedtest_file_sink: FileSinkClient, ) -> Self { Self { pool, diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs index 115bf0537..bd54f708b 100644 --- a/mobile_verifier/src/speedtests_average.rs +++ b/mobile_verifier/src/speedtests_average.rs @@ -89,7 +89,10 @@ impl From> for SpeedtestAverage { } impl SpeedtestAverage { - pub async fn write(&self, filesink: &FileSinkClient) -> file_store::Result { + pub async fn write( + &self, + filesink: &FileSinkClient, + ) -> file_store::Result { filesink .write( proto::SpeedtestAvg { @@ -205,7 +208,10 @@ pub struct SpeedtestAverages { } impl SpeedtestAverages { - pub async fn write_all(&self, sink: &FileSinkClient) -> anyhow::Result<()> { + pub async fn write_all( + &self, + sink: &FileSinkClient, + ) -> anyhow::Result<()> { for speedtest in self.averages.values() { speedtest.write(sink).await?; } diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 930c20983..69a09a369 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -1,13 +1,14 @@ use chrono::{DateTime, Duration, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_source, file_upload::FileUpload, mobile_subscriber::{ SubscriberLocationIngestReport, SubscriberLocationReq, VerifiedSubscriberLocationIngestReport, }, + traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -36,7 +37,7 @@ pub struct SubscriberLocationIngestor { authorization_verifier: AV, entity_verifier: EV, reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, + verified_report_sink: FileSinkClient, } impl SubscriberLocationIngestor @@ -53,14 +54,12 @@ where entity_verifier: EV, ) -> anyhow::Result { let (verified_subscriber_location, verified_subscriber_location_server) = - file_sink::FileSinkBuilder::new( - FileType::VerifiedSubscriberLocationIngestReport, + VerifiedSubscriberLocationIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_verified_subscriber_location"), + Some(DEFAULT_ROLL_TIME), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .create() .await?; let (subscriber_location_ingest, subscriber_location_ingest_server) = @@ -92,7 +91,7 @@ where authorization_verifier: AV, entity_verifier: EV, reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, + verified_report_sink: FileSinkClient, ) -> Self { Self { pool, diff --git a/mobile_verifier/src/subscriber_verified_mapping_event.rs b/mobile_verifier/src/subscriber_verified_mapping_event.rs index 5f67ff28b..99ed76015 100644 --- a/mobile_verifier/src/subscriber_verified_mapping_event.rs +++ b/mobile_verifier/src/subscriber_verified_mapping_event.rs @@ -2,11 +2,12 @@ use crate::Settings; use chrono::{DateTime, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_source, file_upload::FileUpload, subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent, subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport, + traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, verified_subscriber_verified_mapping_event_ingest_report::VerifiedSubscriberVerifiedMappingEventIngestReport, FileStore, FileType, }; @@ -32,7 +33,7 @@ pub struct SubscriberVerifiedMappingEventDaemon { authorization_verifier: AV, entity_verifier: EV, reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, + verified_report_sink: FileSinkClient, } impl SubscriberVerifiedMappingEventDaemon @@ -45,7 +46,7 @@ where authorization_verifier: AV, entity_verifier: EV, reports_receiver: Receiver>, - verified_report_sink: FileSinkClient, + verified_report_sink: FileSinkClient, ) -> Self { Self { pool, @@ -73,18 +74,14 @@ where .create() .await?; - let (verified_report_sink, verified_report_sink_server) = file_sink::FileSinkBuilder::new( - FileType::VerifiedSubscriberVerifiedMappingEventIngestReport, - settings.store_base_path(), - file_upload.clone(), - concat!( + let (verified_report_sink, verified_report_sink_server) = + VerifiedSubscriberVerifiedMappingEventIngestReportV1::file_sink( + settings.store_base_path(), + file_upload.clone(), + Some(DEFAULT_ROLL_TIME), env!("CARGO_PKG_NAME"), - "_verified_subscriber_verified_mapping_event_ingest_report" - ), - ) - .auto_commit(false) - .create() - .await?; + ) + .await?; let task = Self::new( pool, diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 5ba8ab557..178bda55d 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -5,17 +5,13 @@ use file_store::{ }; use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; -use helium_proto::{ - services::{ - mobile_config::NetworkKeyRole, - poc_mobile::{ - mobile_reward_share::Reward as MobileReward, radio_reward_v2, GatewayReward, - MobileRewardShare, OracleBoostingHexAssignment, OracleBoostingReportV1, RadioReward, - RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward, - UnallocatedReward, - }, +use helium_proto::services::{ + mobile_config::NetworkKeyRole, + poc_mobile::{ + mobile_reward_share::Reward as MobileReward, radio_reward_v2, GatewayReward, + MobileRewardShare, OracleBoostingHexAssignment, OracleBoostingReportV1, RadioReward, + RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward, UnallocatedReward, }, - Message, }; use hex_assignments::{Assignment, HexAssignment, HexBoostData}; use mobile_config::{ @@ -62,12 +58,12 @@ impl HexBoostingInfoResolver for MockHexBoostingClient { } } -pub struct MockFileSinkReceiver { - pub receiver: tokio::sync::mpsc::Receiver, +pub struct MockFileSinkReceiver { + pub receiver: tokio::sync::mpsc::Receiver>, } -impl MockFileSinkReceiver { - pub async fn receive(&mut self, caller: &str) -> Option> { +impl MockFileSinkReceiver { + pub async fn receive(&mut self, caller: &str) -> Option { match timeout(seconds(2), self.receiver.recv()).await { Ok(Some(SinkMessage::Data(on_write_tx, msg))) => { let _ = on_write_tx.send(Ok(())); @@ -82,36 +78,28 @@ impl MockFileSinkReceiver { } } - pub async fn get_all(&mut self) -> Vec> { - let mut buf = Vec::new(); - while let Ok(SinkMessage::Data(on_write_tx, msg)) = self.receiver.try_recv() { - let _ = on_write_tx.send(Ok(())); - buf.push(msg); - } - buf - } - pub fn assert_no_messages(&mut self) { let Err(TryRecvError::Empty) = self.receiver.try_recv() else { panic!("receiver should have been empty") }; } +} +impl MockFileSinkReceiver { pub async fn get_all_speedtest_avgs(&mut self) -> Vec { - self.get_all() - .await - .into_iter() - .map(|bytes| { - SpeedtestAvg::decode(bytes.as_slice()).expect("Not a valid speedtest average") - }) - .collect() + let mut messages = vec![]; + while let Ok(SinkMessage::Data(on_write_tx, msg)) = self.receiver.try_recv() { + let _ = on_write_tx.send(Ok(())); + messages.push(msg); + } + messages } +} +impl MockFileSinkReceiver { pub async fn receive_radio_reward_v1(&mut self) -> RadioReward { match self.receive("receive_radio_reward_v1").await { - Some(bytes) => { - let mobile_reward = MobileRewardShare::decode(bytes.as_slice()) - .expect("failed to decode expected radio reward"); + Some(mobile_reward) => { println!("mobile_reward: {:?}", mobile_reward); match mobile_reward.reward { Some(MobileReward::RadioReward(r)) => r, @@ -127,30 +115,24 @@ impl MockFileSinkReceiver { // and the comparison. let radio_reward_v1 = self.receive_radio_reward_v1().await; match self.receive("receive_radio_reward").await { - Some(bytes) => { - let mobile_reward = MobileRewardShare::decode(bytes.as_slice()) - .expect("failed to decode expected radio reward v2"); - match mobile_reward.reward { - Some(MobileReward::RadioRewardV2(reward)) => { - assert_eq!( - reward.total_poc_reward(), - radio_reward_v1.poc_reward, - "mismatch in poc rewards between v1 and v2" - ); - reward - } - _ => panic!("failed to get radio reward"), + Some(mobile_reward) => match mobile_reward.reward { + Some(MobileReward::RadioRewardV2(reward)) => { + assert_eq!( + reward.total_poc_reward(), + radio_reward_v1.poc_reward, + "mismatch in poc rewards between v1 and v2" + ); + reward } - } + _ => panic!("failed to get radio reward"), + }, None => panic!("failed to receive radio reward"), } } pub async fn receive_gateway_reward(&mut self) -> GatewayReward { match self.receive("receive_gateway_reward").await { - Some(bytes) => { - let mobile_reward = MobileRewardShare::decode(bytes.as_slice()) - .expect("failed to decode expected gateway reward"); + Some(mobile_reward) => { println!("mobile_reward: {:?}", mobile_reward); match mobile_reward.reward { Some(MobileReward::GatewayReward(r)) => r, @@ -163,9 +145,7 @@ impl MockFileSinkReceiver { pub async fn receive_service_provider_reward(&mut self) -> ServiceProviderReward { match self.receive("receive_service_provider_reward").await { - Some(bytes) => { - let mobile_reward = MobileRewardShare::decode(bytes.as_slice()) - .expect("failed to decode expected service provider reward"); + Some(mobile_reward) => { println!("mobile_reward: {:?}", mobile_reward); match mobile_reward.reward { Some(MobileReward::ServiceProviderReward(r)) => r, @@ -178,9 +158,7 @@ impl MockFileSinkReceiver { pub async fn receive_subscriber_reward(&mut self) -> SubscriberReward { match self.receive("receive_subscriber_reward").await { - Some(bytes) => { - let mobile_reward = MobileRewardShare::decode(bytes.as_slice()) - .expect("failed to decode expected subscriber reward"); + Some(mobile_reward) => { println!("mobile_reward: {:?}", mobile_reward); match mobile_reward.reward { Some(MobileReward::SubscriberReward(r)) => r, @@ -193,9 +171,7 @@ impl MockFileSinkReceiver { pub async fn receive_unallocated_reward(&mut self) -> UnallocatedReward { match self.receive("receive_unallocated_reward").await { - Some(bytes) => { - let mobile_reward = MobileRewardShare::decode(bytes.as_slice()) - .expect("failed to decode expected unallocated reward"); + Some(mobile_reward) => { println!("mobile_reward: {:?}", mobile_reward); match mobile_reward.reward { Some(MobileReward::UnallocatedReward(r)) => r, @@ -207,12 +183,12 @@ impl MockFileSinkReceiver { } } -pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { +pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) { let (tx, rx) = tokio::sync::mpsc::channel(20); ( FileSinkClient { sender: tx, - metric: "metric", + metric: "metric".into(), }, MockFileSinkReceiver { receiver: rx }, ) diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index 314a22a10..ae11f4363 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -9,8 +9,8 @@ use helium_crypto::PublicKeyBinary; use helium_proto::services::{ poc_lora::UnallocatedRewardType, poc_mobile::{ - CoverageObjectValidity, HeartbeatValidity, LocationSource, RadioRewardV2, - SeniorityUpdateReason, SignalLevel, UnallocatedReward, + CoverageObjectValidity, HeartbeatValidity, LocationSource, MobileRewardShare, + RadioRewardV2, SeniorityUpdateReason, SignalLevel, UnallocatedReward, }, }; use hextree::Cell; @@ -1292,7 +1292,7 @@ fn rounded(num: Decimal) -> u64 { } async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, + mobile_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result<(Vec, UnallocatedReward)> { receive_expected_rewards_maybe_unallocated(mobile_rewards, ExpectUnallocated::Yes).await } @@ -1303,7 +1303,7 @@ enum ExpectUnallocated { } async fn receive_expected_rewards_maybe_unallocated( - mobile_rewards: &mut MockFileSinkReceiver, + mobile_rewards: &mut MockFileSinkReceiver, expect_unallocated: ExpectUnallocated, ) -> anyhow::Result<(Vec, UnallocatedReward)> { // get the filestore outputs from rewards run diff --git a/mobile_verifier/tests/integrations/rewarder_mappers.rs b/mobile_verifier/tests/integrations/rewarder_mappers.rs index fe98830fb..fc0e7e486 100644 --- a/mobile_verifier/tests/integrations/rewarder_mappers.rs +++ b/mobile_verifier/tests/integrations/rewarder_mappers.rs @@ -3,7 +3,9 @@ use chrono::{DateTime, Duration as ChronoDuration, Utc}; use file_store::mobile_subscriber::{SubscriberLocationIngestReport, SubscriberLocationReq}; use helium_crypto::PublicKeyBinary; use helium_proto::{ - services::poc_mobile::{SubscriberReward, UnallocatedReward, UnallocatedRewardType}, + services::poc_mobile::{ + MobileRewardShare, SubscriberReward, UnallocatedReward, UnallocatedRewardType, + }, Message, }; use mobile_verifier::{reward_shares, rewarder, subscriber_location}; @@ -95,7 +97,7 @@ async fn test_mapper_rewards(pool: PgPool) -> anyhow::Result<()> { } async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, + mobile_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result<(Vec, UnallocatedReward)> { // get the filestore outputs from rewards run // we will have 3 radio rewards, 1 wifi radio and 2 cbrs radios diff --git a/mobile_verifier/tests/integrations/rewarder_oracles.rs b/mobile_verifier/tests/integrations/rewarder_oracles.rs index 566bc340d..2830afe2e 100644 --- a/mobile_verifier/tests/integrations/rewarder_oracles.rs +++ b/mobile_verifier/tests/integrations/rewarder_oracles.rs @@ -1,6 +1,8 @@ use crate::common::{self, MockFileSinkReceiver}; use chrono::{Duration as ChronoDuration, Utc}; -use helium_proto::services::poc_mobile::{UnallocatedReward, UnallocatedRewardType}; +use helium_proto::services::poc_mobile::{ + MobileRewardShare, UnallocatedReward, UnallocatedRewardType, +}; use mobile_verifier::{reward_shares, rewarder}; use rust_decimal::prelude::*; use rust_decimal_macros::dec; @@ -43,7 +45,7 @@ async fn test_oracle_rewards(_pool: PgPool) -> anyhow::Result<()> { } async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, + mobile_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result { // expect one unallocated reward // as oracle rewards are currently 100% unallocated diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index b9889cb2a..e26af88b8 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -6,8 +6,8 @@ use file_store::{ }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ - CoverageObjectValidity, GatewayReward, HeartbeatValidity, LocationSource, RadioRewardV2, - SeniorityUpdateReason, SignalLevel, + CoverageObjectValidity, GatewayReward, HeartbeatValidity, LocationSource, MobileRewardShare, + RadioRewardV2, SeniorityUpdateReason, SignalLevel, }; use mobile_verifier::{ cell_type::CellType, @@ -124,7 +124,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { } async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, + mobile_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result<(Vec, Vec)> { // get the filestore outputs from rewards run diff --git a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs index 165a9e52f..9aa63f3dc 100644 --- a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs +++ b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs @@ -4,7 +4,9 @@ use std::string::ToString; use async_trait::async_trait; use chrono::{DateTime, Duration as ChronoDuration, Utc}; use helium_proto::{ - services::poc_mobile::{ServiceProviderReward, UnallocatedReward, UnallocatedRewardType}, + services::poc_mobile::{ + MobileRewardShare, ServiceProviderReward, UnallocatedReward, UnallocatedRewardType, + }, ServiceProvider, }; use rust_decimal::prelude::*; @@ -143,7 +145,7 @@ async fn test_service_provider_rewards_invalid_sp(pool: PgPool) -> anyhow::Resul } async fn receive_expected_rewards( - mobile_rewards: &mut MockFileSinkReceiver, + mobile_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result<(ServiceProviderReward, UnallocatedReward)> { // get the filestore outputs from rewards run // we will have 3 radio rewards, 1 wifi radio and 2 cbrs radios diff --git a/mobile_verifier/tests/integrations/subscriber_verified_mapping_event.rs b/mobile_verifier/tests/integrations/subscriber_verified_mapping_event.rs index 14674d911..f002b192d 100644 --- a/mobile_verifier/tests/integrations/subscriber_verified_mapping_event.rs +++ b/mobile_verifier/tests/integrations/subscriber_verified_mapping_event.rs @@ -6,12 +6,10 @@ use file_store::{ FileInfo, }; use helium_crypto::{KeyTag, Keypair, PublicKeyBinary}; -use helium_proto::services::poc_mobile::VerifiedSubscriberVerifiedMappingEventIngestReportV1; use mobile_verifier::subscriber_verified_mapping_event::{ aggregate_verified_mapping_events, SubscriberVerifiedMappingEventDaemon, VerifiedSubscriberVerifiedMappingEventShare, VerifiedSubscriberVerifiedMappingEventShares, }; -use prost::Message; use rand::rngs::OsRng; use sqlx::{PgPool, Pool, Postgres, Row}; use std::{collections::HashMap, ops::Range}; @@ -86,10 +84,7 @@ async fn main_test(pool: PgPool) -> anyhow::Result<()> { Ok(Some(msg)) => match msg { file_store::file_sink::Message::Commit(_) => panic!("got Commit"), file_store::file_sink::Message::Rollback(_) => panic!("got Rollback"), - file_store::file_sink::Message::Data(_, data) => { - let proto_verified_report = VerifiedSubscriberVerifiedMappingEventIngestReportV1::decode(data.as_slice()) - .expect("unable to decode into VerifiedSubscriberVerifiedMappingEventIngestReportV1"); - + file_store::file_sink::Message::Data(_, proto_verified_report) => { let rcv_report: SubscriberVerifiedMappingEventIngestReport = proto_verified_report.report.unwrap().try_into()?; diff --git a/poc_entropy/src/entropy_generator.rs b/poc_entropy/src/entropy_generator.rs index dd78a5048..51898def7 100644 --- a/poc_entropy/src/entropy_generator.rs +++ b/poc_entropy/src/entropy_generator.rs @@ -117,7 +117,7 @@ impl EntropyGenerator { pub async fn run( &mut self, - file_sink: file_sink::FileSinkClient, + file_sink: file_sink::FileSinkClient, shutdown: &triggered::Listener, ) -> anyhow::Result<()> { tracing::info!("started entropy generator"); @@ -147,7 +147,7 @@ impl EntropyGenerator { async fn handle_entropy_tick( &mut self, - file_sink: &file_sink::FileSinkClient, + file_sink: &file_sink::FileSinkClient, ) -> anyhow::Result<()> { let source_data = match Self::get_entropy(&self.client).await { Ok(data) => data, diff --git a/poc_entropy/src/main.rs b/poc_entropy/src/main.rs index ddfea4944..e13253438 100644 --- a/poc_entropy/src/main.rs +++ b/poc_entropy/src/main.rs @@ -1,7 +1,8 @@ use anyhow::{Error, Result}; use clap::Parser; -use file_store::{file_sink, file_upload, FileType}; +use file_store::{file_upload, traits::FileSinkWriteExt}; use futures_util::TryFutureExt; +use helium_proto::EntropyReportV1; use poc_entropy::{entropy_generator::EntropyGenerator, server::ApiServer, Settings}; use std::{net::SocketAddr, path, time::Duration}; use tokio::{self, signal}; @@ -70,14 +71,12 @@ impl Server { let (file_upload, file_upload_server) = file_upload::FileUpload::from_settings_tm(&settings.output).await?; - let (entropy_sink, entropy_sink_server) = file_sink::FileSinkBuilder::new( - FileType::EntropyReport, + let (entropy_sink, entropy_sink_server) = EntropyReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_report_submission"), + Some(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)), + env!("CARGO_PKG_NAME"), ) - .roll_time(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)) - .create() .await?; // server diff --git a/price/src/main.rs b/price/src/main.rs index bc0747488..4695a9005 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -1,6 +1,7 @@ use anyhow::Result; use clap::Parser; -use file_store::{file_sink, file_upload, FileType}; +use file_store::{file_upload, traits::FileSinkWriteExt}; +use helium_proto::PriceReportV1; use price::{cli::check, PriceGenerator, Settings}; use std::{ path::{self, PathBuf}, @@ -81,14 +82,12 @@ impl Server { let store_base_path = path::Path::new(&settings.cache); - let (price_sink, price_sink_server) = file_sink::FileSinkBuilder::new( - FileType::PriceReport, + let (price_sink, price_sink_server) = PriceReportV1::file_sink( store_base_path, file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_report_submission"), + Some(Duration::from_secs(PRICE_SINK_ROLL_SECS)), + env!("CARGO_PKG_NAME"), ) - .roll_time(Duration::from_secs(PRICE_SINK_ROLL_SECS)) - .create() .await?; let mut task_manager = TaskManager::new(); diff --git a/price/src/price_generator.rs b/price/src/price_generator.rs index abd887fd4..db129a341 100644 --- a/price/src/price_generator.rs +++ b/price/src/price_generator.rs @@ -37,7 +37,7 @@ pub struct PriceGenerator { default_price: Option, stale_price_duration: Duration, latest_price_file: PathBuf, - file_sink: file_sink::FileSinkClient, + file_sink: file_sink::FileSinkClient, } impl AsRef for PriceGenerator { @@ -99,7 +99,7 @@ impl PriceGenerator { settings: &Settings, token: Token, default_price: Option, - file_sink: file_sink::FileSinkClient, + file_sink: file_sink::FileSinkClient, ) -> Result { let client = RpcClient::new(settings.source.clone()); Ok(Self {