Skip to content

Commit

Permalink
Update file_info_poller to track a process name and make sqlx an opti…
Browse files Browse the repository at this point in the history
…onal dependency (#693)

* Add process_name to file_info_poller

* Add traits for file info poller state and make sqlx optional dependency of file_store

* Remove unused error conversion

* add migrations for new process_name field in files_processed

* Change file-store to have feature sqlx-postgres on by default

* Fix migration version
  • Loading branch information
bbalser authored Jan 2, 2024
1 parent 044b78c commit e77f349
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 115 deletions.
4 changes: 3 additions & 1 deletion file_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ rust_decimal = {workspace = true}
rust_decimal_macros = {workspace = true}
base64 = {workspace = true}
beacon = {workspace = true}
sqlx = {workspace = true}
sqlx = {workspace = true, optional = true}
async-trait = {workspace = true}
derive_builder = "0"
retainer = {workspace = true}
Expand All @@ -56,4 +56,6 @@ hex-literal = "0"
tempfile = "3"

[features]
default = ["sqlx-postgres"]
local = ["aws-types"]
sqlx-postgres = ["sqlx"]
5 changes: 3 additions & 2 deletions file_store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ pub enum Error {
Channel,
#[error("no manifest")]
NoManifest,
#[error("db error")]
DbError(#[from] sqlx::Error),
#[error("tokio join error")]
JoinError(#[from] tokio::task::JoinError),
#[error("send timeout")]
Expand All @@ -34,6 +32,9 @@ pub enum Error {
Shutdown,
#[error("error building file info poller")]
FileInfoPollerError(#[from] crate::file_info_poller::FileInfoPollerConfigBuilderError),
#[cfg(feature = "sqlx-postgres")]
#[error("db error")]
DbError(#[from] sqlx::Error),
}

#[derive(Error, Debug)]
Expand Down
213 changes: 126 additions & 87 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{traits::MsgDecode, Error, FileInfo, FileStore, Result};
use chrono::{DateTime, Duration, TimeZone, Utc};
use chrono::{DateTime, Duration, Utc};
use derive_builder::Builder;
use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt, TryFutureExt};
use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt};
use futures_util::TryFutureExt;
use retainer::Cache;
use std::marker::PhantomData;
use task_manager::ManagedTask;
Expand All @@ -15,24 +16,47 @@ const CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(3 * 60 * 6

type MemoryFileCache = Cache<String, bool>;

#[async_trait::async_trait]
pub trait FileInfoPollerState: Send + Sync + 'static {
async fn latest_timestamp(
&self,
process_name: &str,
file_type: &str,
) -> Result<Option<DateTime<Utc>>>;

async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result<bool>;

async fn clean(&self, process_name: &str, file_type: &str) -> Result;
}

#[async_trait::async_trait]
pub trait FileInfoPollerStateRecorder {
async fn record(self, process_name: &str, file_info: &FileInfo) -> Result;
}

pub struct FileInfoStream<T> {
pub file_info: FileInfo,
process_name: String,
stream: BoxStream<'static, T>,
}

impl<T> FileInfoStream<T>
where
T: Send,
{
pub fn new(file_info: FileInfo, stream: BoxStream<'static, T>) -> Self {
Self { file_info, stream }
pub fn new(process_name: String, file_info: FileInfo, stream: BoxStream<'static, T>) -> Self {
Self {
file_info,
process_name,
stream,
}
}

pub async fn into_stream(
self,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
recorder: impl FileInfoPollerStateRecorder,
) -> Result<BoxStream<'static, T>> {
db::insert(transaction, self.file_info).await?;
recorder.record(&self.process_name, &self.file_info).await?;
Ok(self.stream)
}
}
Expand All @@ -45,41 +69,45 @@ pub enum LookbackBehavior {

#[derive(Debug, Clone, Builder)]
#[builder(pattern = "owned")]
pub struct FileInfoPollerConfig<T> {
pub struct FileInfoPollerConfig<T, S> {
#[builder(default = "Duration::seconds(DEFAULT_POLL_DURATION_SECS)")]
poll_duration: Duration,
db: sqlx::Pool<sqlx::Postgres>,
state: S,
store: FileStore,
prefix: String,
lookback: LookbackBehavior,
#[builder(default = "Duration::minutes(10)")]
offset: Duration,
#[builder(default = "20")]
queue_size: usize,
#[builder(default = r#""default".to_string()"#)]
process_name: String,
#[builder(setter(skip))]
p: PhantomData<T>,
}

#[derive(Debug, Clone)]
pub struct FileInfoPollerServer<T> {
config: FileInfoPollerConfig<T>,
pub struct FileInfoPollerServer<T, S> {
config: FileInfoPollerConfig<T, S>,
sender: Sender<FileInfoStream<T>>,
}

impl<T> FileInfoPollerConfigBuilder<T>
type FileInfoStreamReceiver<T> = Receiver<FileInfoStream<T>>;
impl<T, S> FileInfoPollerConfigBuilder<T, S>
where
T: Clone,
{
pub fn create(self) -> Result<(Receiver<FileInfoStream<T>>, FileInfoPollerServer<T>)> {
pub fn create(self) -> Result<(FileInfoStreamReceiver<T>, FileInfoPollerServer<T, S>)> {
let config = self.build()?;
let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size);
Ok((receiver, FileInfoPollerServer { config, sender }))
}
}

impl<T> ManagedTask for FileInfoPollerServer<T>
impl<T, S> ManagedTask for FileInfoPollerServer<T, S>
where
T: MsgDecode + TryFrom<T::Msg, Error = Error> + Send + Sync + 'static,
S: FileInfoPollerState,
{
fn start_task(
self: Box<Self>,
Expand All @@ -95,9 +123,10 @@ where
}
}

impl<T> FileInfoPollerServer<T>
impl<T, S> FileInfoPollerServer<T, S>
where
T: MsgDecode + TryFrom<T::Msg, Error = Error> + Send + Sync + 'static,
S: FileInfoPollerState,
{
pub async fn start(
self,
Expand All @@ -117,9 +146,18 @@ where
let cache = create_cache();
let mut poll_trigger = tokio::time::interval(self.poll_duration());
let mut cleanup_trigger = tokio::time::interval(CLEAN_DURATION);

let mut latest_ts = db::latest_ts(&self.config.db, &self.config.prefix).await?;
tracing::info!(r#type = self.config.prefix, "starting FileInfoPoller",);
let process_name = self.config.process_name.clone();

let mut latest_ts = self
.config
.state
.latest_timestamp(&self.config.process_name, &self.config.prefix)
.await?;
tracing::info!(
r#type = self.config.prefix,
%process_name,
"starting FileInfoPoller",
);

loop {
let after = self.after(latest_ts);
Expand All @@ -128,19 +166,19 @@ where
tokio::select! {
biased;
_ = shutdown.clone() => {
tracing::info!(r#type = self.config.prefix, "stopping FileInfoPoller");
tracing::info!(r#type = self.config.prefix, %process_name, "stopping FileInfoPoller");
break;
}
_ = cleanup_trigger.tick() => self.clean(&cache).await?,
_ = poll_trigger.tick() => {
let files = self.config.store.list_all(&self.config.prefix, after, before).await?;
for file in files {
if !is_already_processed(&self.config.db, &cache, &file).await? {
if send_stream(&self.sender, &self.config.store, file.clone()).await? {
if !is_already_processed(&self.config.state, &cache, &process_name, &file).await? {
if send_stream(&self.sender, &self.config.store, process_name.clone(), file.clone()).await? {
latest_ts = Some(file.timestamp);
cache_file(&cache, &file).await;
} else {
tracing::info!("FileInfoPoller: channel full");
tracing::info!(r#type = self.config.prefix, %process_name, "FileInfoPoller: channel full");
break;
}
}
Expand All @@ -164,7 +202,10 @@ where

async fn clean(&self, cache: &MemoryFileCache) -> Result {
cache.purge(4, 0.25).await;
db::clean(&self.config.db, &self.config.prefix).await?;
self.config
.state
.clean(&self.config.process_name, &self.config.prefix)
.await?;
Ok(())
}

Expand All @@ -179,6 +220,7 @@ where
async fn send_stream<T>(
sender: &Sender<FileInfoStream<T>>,
store: &FileStore,
process_name: String,
file: FileInfo,
) -> Result<bool>
where
Expand Down Expand Up @@ -210,11 +252,7 @@ where
})
.boxed();

let incoming_data_stream = FileInfoStream {
file_info: file,
stream,
};

let incoming_data_stream = FileInfoStream::new(process_name, file, stream);
match sender.try_send(incoming_data_stream) {
Ok(_) => Ok(true),
Err(TrySendError::Full(_)) => Ok(false),
Expand All @@ -227,91 +265,92 @@ fn create_cache() -> MemoryFileCache {
}

async fn is_already_processed(
db: impl sqlx::PgExecutor<'_>,
state: &impl FileInfoPollerState,
cache: &MemoryFileCache,
process_name: &str,
file_info: &FileInfo,
) -> Result<bool> {
if cache.get(&file_info.key).await.is_some() {
Ok(true)
} else {
db::exists(db, file_info).await
state.exists(process_name, file_info).await
}
}

async fn cache_file(cache: &MemoryFileCache, file_info: &FileInfo) {
cache.insert(file_info.key.clone(), true, CACHE_TTL).await;
}

mod db {
use super::*;
#[cfg(feature = "sqlx-postgres")]
#[async_trait::async_trait]
impl FileInfoPollerStateRecorder for &mut sqlx::Transaction<'_, sqlx::Postgres> {
async fn record(self, process_name: &str, file_info: &FileInfo) -> Result {
sqlx::query(
r#"
INSERT INTO files_processed(process_name, file_name, file_type, file_timestamp, processed_at) VALUES($1, $2, $3, $4, $5)
"#)
.bind(process_name)
.bind(&file_info.key)
.bind(&file_info.prefix)
.bind(file_info.timestamp)
.bind(Utc::now())
.execute(self)
.await
.map(|_| ())
.map_err(Error::from)
}
}

pub async fn latest_ts(
db: impl sqlx::PgExecutor<'_>,
#[cfg(feature = "sqlx-postgres")]
#[async_trait::async_trait]
impl FileInfoPollerState for sqlx::Pool<sqlx::Postgres> {
async fn latest_timestamp(
&self,
process_name: &str,
file_type: &str,
) -> Result<Option<DateTime<Utc>>> {
let default = Utc.timestamp_opt(0, 0).single().unwrap();

let result = sqlx::query_scalar::<_, DateTime<Utc>>(
sqlx::query_scalar::<_, Option<DateTime<Utc>>>(
r#"
SELECT COALESCE(MAX(file_timestamp), $1) FROM files_processed where file_type = $2
"#,
)
.bind(default)
.bind(file_type)
.fetch_one(db)
.await?;

if result == default {
Ok(None)
} else {
Ok(Some(result))
}
SELECT MAX(file_timestamp) FROM files_processed where process_name = $1 and file_type = $2
"#,
)
.bind(process_name)
.bind(file_type)
.fetch_one(self)
.await
.map_err(Error::from)
}

pub async fn exists(db: impl sqlx::PgExecutor<'_>, file_info: &FileInfo) -> Result<bool> {
Ok(sqlx::query_scalar::<_, bool>(
async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result<bool> {
sqlx::query_scalar::<_, bool>(
r#"
SELECT EXISTS(SELECT 1 from files_processed where file_name = $1)
"#,
)
.bind(file_info.key.clone())
.fetch_one(db)
.await?)
SELECT EXISTS(SELECT 1 from files_processed where process_name = $1 and file_name = $2)
"#,
)
.bind(process_name)
.bind(&file_info.key)
.fetch_one(self)
.await
.map_err(Error::from)
}

pub async fn insert(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
file_info: FileInfo,
) -> Result {
sqlx::query(r#"
INSERT INTO files_processed(file_name, file_type, file_timestamp, processed_at) VALUES($1, $2, $3, $4)
"#)
.bind(file_info.key)
.bind(&file_info.prefix)
.bind(file_info.timestamp)
.bind(Utc::now())
.execute(tx)
.await?;

Ok(())
}

pub async fn clean(db: impl sqlx::PgExecutor<'_>, file_type: &str) -> Result {
async fn clean(&self, process_name: &str, file_type: &str) -> Result {
sqlx::query(
r#"
DELETE FROM files_processed where file_name in (
SELECT file_name
FROM files_processed
WHERE file_type = $1
ORDER BY file_timestamp DESC
OFFSET 100
)
"#,
DELETE FROM files_processed where file_name in (
SELECT file_name
FROM files_processed
WHERE process_name = $1 and file_type = $2
ORDER BY file_timestamp DESC
OFFSET 100
)
"#,
)
.bind(process_name)
.bind(file_type)
.execute(db)
.await?;

Ok(())
.execute(self)
.await
.map(|_| ())
.map_err(Error::from)
}
}
Loading

0 comments on commit e77f349

Please sign in to comment.