diff --git a/file_store/Cargo.toml b/file_store/Cargo.toml index 92f92a5cc..601143fa9 100644 --- a/file_store/Cargo.toml +++ b/file_store/Cargo.toml @@ -43,7 +43,7 @@ rust_decimal = {workspace = true} rust_decimal_macros = {workspace = true} base64 = {workspace = true} beacon = {workspace = true} -sqlx = {workspace = true} +sqlx = {workspace = true, optional = true} async-trait = {workspace = true} derive_builder = "0" retainer = {workspace = true} @@ -56,4 +56,6 @@ hex-literal = "0" tempfile = "3" [features] +default = ["sqlx-postgres"] local = ["aws-types"] +sqlx-postgres = ["sqlx"] diff --git a/file_store/src/error.rs b/file_store/src/error.rs index eba8986d9..1205a1a16 100644 --- a/file_store/src/error.rs +++ b/file_store/src/error.rs @@ -24,8 +24,6 @@ pub enum Error { Channel, #[error("no manifest")] NoManifest, - #[error("db error")] - DbError(#[from] sqlx::Error), #[error("tokio join error")] JoinError(#[from] tokio::task::JoinError), #[error("send timeout")] @@ -34,6 +32,9 @@ pub enum Error { Shutdown, #[error("error building file info poller")] FileInfoPollerError(#[from] crate::file_info_poller::FileInfoPollerConfigBuilderError), + #[cfg(feature = "sqlx-postgres")] + #[error("db error")] + DbError(#[from] sqlx::Error), } #[derive(Error, Debug)] diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index dfdd17429..c129047db 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -1,7 +1,8 @@ use crate::{traits::MsgDecode, Error, FileInfo, FileStore, Result}; -use chrono::{DateTime, Duration, TimeZone, Utc}; +use chrono::{DateTime, Duration, Utc}; use derive_builder::Builder; -use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt, TryFutureExt}; +use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt}; +use futures_util::TryFutureExt; use retainer::Cache; use std::marker::PhantomData; use task_manager::ManagedTask; @@ -15,8 +16,27 @@ const CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(3 * 60 * 6 type MemoryFileCache = Cache; +#[async_trait::async_trait] +pub trait FileInfoPollerState: Send + Sync + 'static { + async fn latest_timestamp( + &self, + process_name: &str, + file_type: &str, + ) -> Result>>; + + async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result; + + async fn clean(&self, process_name: &str, file_type: &str) -> Result; +} + +#[async_trait::async_trait] +pub trait FileInfoPollerStateRecorder { + async fn record(self, process_name: &str, file_info: &FileInfo) -> Result; +} + pub struct FileInfoStream { pub file_info: FileInfo, + process_name: String, stream: BoxStream<'static, T>, } @@ -24,15 +44,19 @@ impl FileInfoStream where T: Send, { - pub fn new(file_info: FileInfo, stream: BoxStream<'static, T>) -> Self { - Self { file_info, stream } + pub fn new(process_name: String, file_info: FileInfo, stream: BoxStream<'static, T>) -> Self { + Self { + file_info, + process_name, + stream, + } } pub async fn into_stream( self, - transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, + recorder: impl FileInfoPollerStateRecorder, ) -> Result> { - db::insert(transaction, self.file_info).await?; + recorder.record(&self.process_name, &self.file_info).await?; Ok(self.stream) } } @@ -45,10 +69,10 @@ pub enum LookbackBehavior { #[derive(Debug, Clone, Builder)] #[builder(pattern = "owned")] -pub struct FileInfoPollerConfig { +pub struct FileInfoPollerConfig { #[builder(default = "Duration::seconds(DEFAULT_POLL_DURATION_SECS)")] poll_duration: Duration, - db: sqlx::Pool, + state: S, store: FileStore, prefix: String, lookback: LookbackBehavior, @@ -56,30 +80,34 @@ pub struct FileInfoPollerConfig { offset: Duration, #[builder(default = "20")] queue_size: usize, + #[builder(default = r#""default".to_string()"#)] + process_name: String, #[builder(setter(skip))] p: PhantomData, } #[derive(Debug, Clone)] -pub struct FileInfoPollerServer { - config: FileInfoPollerConfig, +pub struct FileInfoPollerServer { + config: FileInfoPollerConfig, sender: Sender>, } -impl FileInfoPollerConfigBuilder +type FileInfoStreamReceiver = Receiver>; +impl FileInfoPollerConfigBuilder where T: Clone, { - pub fn create(self) -> Result<(Receiver>, FileInfoPollerServer)> { + pub fn create(self) -> Result<(FileInfoStreamReceiver, FileInfoPollerServer)> { let config = self.build()?; let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size); Ok((receiver, FileInfoPollerServer { config, sender })) } } -impl ManagedTask for FileInfoPollerServer +impl ManagedTask for FileInfoPollerServer where T: MsgDecode + TryFrom + Send + Sync + 'static, + S: FileInfoPollerState, { fn start_task( self: Box, @@ -95,9 +123,10 @@ where } } -impl FileInfoPollerServer +impl FileInfoPollerServer where T: MsgDecode + TryFrom + Send + Sync + 'static, + S: FileInfoPollerState, { pub async fn start( self, @@ -117,9 +146,18 @@ where let cache = create_cache(); let mut poll_trigger = tokio::time::interval(self.poll_duration()); let mut cleanup_trigger = tokio::time::interval(CLEAN_DURATION); - - let mut latest_ts = db::latest_ts(&self.config.db, &self.config.prefix).await?; - tracing::info!(r#type = self.config.prefix, "starting FileInfoPoller",); + let process_name = self.config.process_name.clone(); + + let mut latest_ts = self + .config + .state + .latest_timestamp(&self.config.process_name, &self.config.prefix) + .await?; + tracing::info!( + r#type = self.config.prefix, + %process_name, + "starting FileInfoPoller", + ); loop { let after = self.after(latest_ts); @@ -128,19 +166,19 @@ where tokio::select! { biased; _ = shutdown.clone() => { - tracing::info!(r#type = self.config.prefix, "stopping FileInfoPoller"); + tracing::info!(r#type = self.config.prefix, %process_name, "stopping FileInfoPoller"); break; } _ = cleanup_trigger.tick() => self.clean(&cache).await?, _ = poll_trigger.tick() => { let files = self.config.store.list_all(&self.config.prefix, after, before).await?; for file in files { - if !is_already_processed(&self.config.db, &cache, &file).await? { - if send_stream(&self.sender, &self.config.store, file.clone()).await? { + if !is_already_processed(&self.config.state, &cache, &process_name, &file).await? { + if send_stream(&self.sender, &self.config.store, process_name.clone(), file.clone()).await? { latest_ts = Some(file.timestamp); cache_file(&cache, &file).await; } else { - tracing::info!("FileInfoPoller: channel full"); + tracing::info!(r#type = self.config.prefix, %process_name, "FileInfoPoller: channel full"); break; } } @@ -164,7 +202,10 @@ where async fn clean(&self, cache: &MemoryFileCache) -> Result { cache.purge(4, 0.25).await; - db::clean(&self.config.db, &self.config.prefix).await?; + self.config + .state + .clean(&self.config.process_name, &self.config.prefix) + .await?; Ok(()) } @@ -179,6 +220,7 @@ where async fn send_stream( sender: &Sender>, store: &FileStore, + process_name: String, file: FileInfo, ) -> Result where @@ -210,11 +252,7 @@ where }) .boxed(); - let incoming_data_stream = FileInfoStream { - file_info: file, - stream, - }; - + let incoming_data_stream = FileInfoStream::new(process_name, file, stream); match sender.try_send(incoming_data_stream) { Ok(_) => Ok(true), Err(TrySendError::Full(_)) => Ok(false), @@ -227,14 +265,15 @@ fn create_cache() -> MemoryFileCache { } async fn is_already_processed( - db: impl sqlx::PgExecutor<'_>, + state: &impl FileInfoPollerState, cache: &MemoryFileCache, + process_name: &str, file_info: &FileInfo, ) -> Result { if cache.get(&file_info.key).await.is_some() { Ok(true) } else { - db::exists(db, file_info).await + state.exists(process_name, file_info).await } } @@ -242,76 +281,76 @@ async fn cache_file(cache: &MemoryFileCache, file_info: &FileInfo) { cache.insert(file_info.key.clone(), true, CACHE_TTL).await; } -mod db { - use super::*; +#[cfg(feature = "sqlx-postgres")] +#[async_trait::async_trait] +impl FileInfoPollerStateRecorder for &mut sqlx::Transaction<'_, sqlx::Postgres> { + async fn record(self, process_name: &str, file_info: &FileInfo) -> Result { + sqlx::query( + r#" + INSERT INTO files_processed(process_name, file_name, file_type, file_timestamp, processed_at) VALUES($1, $2, $3, $4, $5) + "#) + .bind(process_name) + .bind(&file_info.key) + .bind(&file_info.prefix) + .bind(file_info.timestamp) + .bind(Utc::now()) + .execute(self) + .await + .map(|_| ()) + .map_err(Error::from) + } +} - pub async fn latest_ts( - db: impl sqlx::PgExecutor<'_>, +#[cfg(feature = "sqlx-postgres")] +#[async_trait::async_trait] +impl FileInfoPollerState for sqlx::Pool { + async fn latest_timestamp( + &self, + process_name: &str, file_type: &str, ) -> Result>> { - let default = Utc.timestamp_opt(0, 0).single().unwrap(); - - let result = sqlx::query_scalar::<_, DateTime>( + sqlx::query_scalar::<_, Option>>( r#" - SELECT COALESCE(MAX(file_timestamp), $1) FROM files_processed where file_type = $2 - "#, - ) - .bind(default) - .bind(file_type) - .fetch_one(db) - .await?; - - if result == default { - Ok(None) - } else { - Ok(Some(result)) - } + SELECT MAX(file_timestamp) FROM files_processed where process_name = $1 and file_type = $2 + "#, + ) + .bind(process_name) + .bind(file_type) + .fetch_one(self) + .await + .map_err(Error::from) } - pub async fn exists(db: impl sqlx::PgExecutor<'_>, file_info: &FileInfo) -> Result { - Ok(sqlx::query_scalar::<_, bool>( + async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result { + sqlx::query_scalar::<_, bool>( r#" - SELECT EXISTS(SELECT 1 from files_processed where file_name = $1) - "#, - ) - .bind(file_info.key.clone()) - .fetch_one(db) - .await?) + SELECT EXISTS(SELECT 1 from files_processed where process_name = $1 and file_name = $2) + "#, + ) + .bind(process_name) + .bind(&file_info.key) + .fetch_one(self) + .await + .map_err(Error::from) } - pub async fn insert( - tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - file_info: FileInfo, - ) -> Result { - sqlx::query(r#" - INSERT INTO files_processed(file_name, file_type, file_timestamp, processed_at) VALUES($1, $2, $3, $4) - "#) - .bind(file_info.key) - .bind(&file_info.prefix) - .bind(file_info.timestamp) - .bind(Utc::now()) - .execute(tx) - .await?; - - Ok(()) - } - - pub async fn clean(db: impl sqlx::PgExecutor<'_>, file_type: &str) -> Result { + async fn clean(&self, process_name: &str, file_type: &str) -> Result { sqlx::query( r#" - DELETE FROM files_processed where file_name in ( - SELECT file_name - FROM files_processed - WHERE file_type = $1 - ORDER BY file_timestamp DESC - OFFSET 100 - ) - "#, + DELETE FROM files_processed where file_name in ( + SELECT file_name + FROM files_processed + WHERE process_name = $1 and file_type = $2 + ORDER BY file_timestamp DESC + OFFSET 100 + ) + "#, ) + .bind(process_name) .bind(file_type) - .execute(db) - .await?; - - Ok(()) + .execute(self) + .await + .map(|_| ()) + .map_err(Error::from) } } diff --git a/file_store/src/file_source.rs b/file_store/src/file_source.rs index 59d7c7279..1777ace58 100644 --- a/file_store/src/file_source.rs +++ b/file_store/src/file_source.rs @@ -8,11 +8,11 @@ use std::path::{Path, PathBuf}; use tokio::{fs::File, io::BufReader}; use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedRead}; -pub fn continuous_source() -> FileInfoPollerConfigBuilder +pub fn continuous_source() -> FileInfoPollerConfigBuilder where T: Clone, { - FileInfoPollerConfigBuilder::::default() + FileInfoPollerConfigBuilder::::default() } pub fn source(paths: I) -> BytesMutStream diff --git a/iot_packet_verifier/migrations/6_files_processed_process_name.sql b/iot_packet_verifier/migrations/6_files_processed_process_name.sql new file mode 100644 index 000000000..8d0c4a9d2 --- /dev/null +++ b/iot_packet_verifier/migrations/6_files_processed_process_name.sql @@ -0,0 +1 @@ +alter table files_processed add column process_name text not null default 'default'; diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 75d4bc46a..60c11d7f8 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -164,8 +164,8 @@ impl Cmd { let file_store = FileStore::from_settings(&settings.ingest).await?; let (report_files, report_files_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(file_store) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::IotPacketReport.to_string()) diff --git a/iot_verifier/Cargo.toml b/iot_verifier/Cargo.toml index 07a50725b..6bbf783c8 100644 --- a/iot_verifier/Cargo.toml +++ b/iot_verifier/Cargo.toml @@ -36,7 +36,7 @@ h3o = {workspace = true, features = ["geo"]} xorf = {workspace = true} lazy_static = {workspace = true} once_cell = {workspace = true} -file-store = { path = "../file_store" } +file-store = { path = "../file_store"} metrics = {workspace = true} retainer = {workspace = true} blake3 = {workspace = true} diff --git a/iot_verifier/migrations/13_files_processed_process_name.sql b/iot_verifier/migrations/13_files_processed_process_name.sql new file mode 100644 index 000000000..8d0c4a9d2 --- /dev/null +++ b/iot_verifier/migrations/13_files_processed_process_name.sql @@ -0,0 +1 @@ +alter table files_processed add column process_name text not null default 'default'; diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 2ddffdd04..ba25e8cb3 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -151,8 +151,8 @@ impl Server { let entropy_store = FileStore::from_settings(&settings.entropy).await?; let entropy_interval = settings.entropy_interval(); let (entropy_loader_receiver, entropy_loader_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(entropy_store) .prefix(FileType::EntropyReport.to_string()) .lookback(LookbackBehavior::Max(max_lookback_age)) @@ -183,8 +183,8 @@ impl Server { let packet_store = FileStore::from_settings(&settings.packet_ingest).await?; let packet_interval = settings.packet_interval(); let (pk_loader_receiver, pk_loader_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(packet_store.clone()) .prefix(FileType::IotValidPacket.to_string()) .lookback(LookbackBehavior::Max(max_lookback_age)) diff --git a/mobile_packet_verifier/migrations/6_files_processed_process_name.sql b/mobile_packet_verifier/migrations/6_files_processed_process_name.sql new file mode 100644 index 000000000..8d0c4a9d2 --- /dev/null +++ b/mobile_packet_verifier/migrations/6_files_processed_process_name.sql @@ -0,0 +1 @@ +alter table files_processed add column process_name text not null default 'default'; diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 9541f2872..cf09dfc0f 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -153,8 +153,8 @@ impl Cmd { let file_store = FileStore::from_settings(&settings.ingest).await?; let (reports, reports_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(file_store) .lookback(LookbackBehavior::StartAfter( Utc.timestamp_millis_opt(0).unwrap(), diff --git a/mobile_verifier/migrations/23_files_processed_process_name.sql b/mobile_verifier/migrations/23_files_processed_process_name.sql new file mode 100644 index 000000000..8d0c4a9d2 --- /dev/null +++ b/mobile_verifier/migrations/23_files_processed_process_name.sql @@ -0,0 +1 @@ +alter table files_processed add column process_name text not null default 'default'; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 9596a8c27..2a3c5d7fd 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -60,8 +60,8 @@ impl Cmd { // CBRS Heartbeats let (cbrs_heartbeats, cbrs_heartbeats_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(report_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::CbrsHeartbeatIngestReport.to_string()) @@ -73,8 +73,8 @@ impl Cmd { // Wifi Heartbeats let (wifi_heartbeats, wifi_heartbeats_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(report_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::WifiHeartbeatIngestReport.to_string()) @@ -126,8 +126,8 @@ impl Cmd { // Speedtests let (speedtests, speedtests_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(report_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::CellSpeedtestIngestReport.to_string()) @@ -166,8 +166,8 @@ impl Cmd { // Coverage objects let (coverage_objs, coverage_objs_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(report_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::CoverageObjectIngestReport.to_string()) @@ -228,8 +228,8 @@ impl Cmd { // subscriber location let (subscriber_location_ingest, subscriber_location_ingest_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(report_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::SubscriberLocationIngestReport.to_string()) @@ -259,8 +259,8 @@ impl Cmd { // data transfers let (data_session_ingest, data_session_ingest_server) = - file_source::continuous_source::() - .db(pool.clone()) + file_source::continuous_source::() + .state(pool.clone()) .store(data_transfer_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .prefix(FileType::ValidDataTransferSession.to_string()) diff --git a/reward_index/migrations/7_files_processed_process_name.sql b/reward_index/migrations/7_files_processed_process_name.sql new file mode 100644 index 000000000..8d0c4a9d2 --- /dev/null +++ b/reward_index/migrations/7_files_processed_process_name.sql @@ -0,0 +1 @@ +alter table files_processed add column process_name text not null default 'default'; diff --git a/reward_index/src/main.rs b/reward_index/src/main.rs index 0d646c9aa..54b3d83e9 100644 --- a/reward_index/src/main.rs +++ b/reward_index/src/main.rs @@ -77,8 +77,8 @@ impl Server { let file_store = FileStore::from_settings(&settings.verifier).await?; - let (receiver, server) = file_source::continuous_source::() - .db(pool.clone()) + let (receiver, server) = file_source::continuous_source::() + .state(pool.clone()) .store(file_store) .prefix(FileType::RewardManifest.to_string()) .lookback(LookbackBehavior::StartAfter(