Skip to content

Commit

Permalink
wip(cat-gateway): add purge methods to cassandra session
Browse files Browse the repository at this point in the history
  • Loading branch information
saibatizoku committed Nov 12, 2024
1 parent c952dc8 commit fd3e1e9
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
11 changes: 5 additions & 6 deletions catalyst-gateway/bin/src/db/index/queries/purge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,9 @@ impl PreparedQueries {
///
/// Returns an iterator that iterates over all the result pages that the query
/// returns.
pub(crate) async fn execute_iter<P>(
&self, session: Arc<Session>, select_query: PreparedSelectQuery, params: P,
) -> anyhow::Result<RowIterator>
where P: SerializeRow {
pub(crate) async fn execute_iter(
&self, session: Arc<Session>, select_query: PreparedSelectQuery,
) -> anyhow::Result<RowIterator> {
let prepared_stmt = match select_query {
PreparedSelectQuery::TxoAda => &self.get_txo_purge_queries,
PreparedSelectQuery::TxoAsset => &self.get_txo_asset_purge_queries,
Expand All @@ -254,11 +253,11 @@ impl PreparedQueries {
},
};

super::session_execute_iter(session, prepared_stmt, params).await
super::session_execute_iter(session, prepared_stmt, NO_PARAMS).await
}

/// Execute a purge query with the given parameters.
pub(crate) async fn execute<T: SerializeRow + Debug>(
pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
&self, session: Arc<Session>, cfg: Arc<cassandra_db::EnvVars>, query: PreparedDeleteQuery,
values: Vec<T>,
) -> FallibleQueryResults {
Expand Down
43 changes: 42 additions & 1 deletion catalyst-gateway/bin/src/db/index/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use tracing::{error, info};

use super::{
queries::{
purge, FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery,
purge::{self, PreparedDeleteQuery},
FallibleQueryResults, PreparedQueries, PreparedSelectQuery, PreparedQuery,
PreparedUpsertQuery,
},
schema::create_schema,
Expand Down Expand Up @@ -145,6 +146,46 @@ impl CassandraSession {
queries.execute_upsert(session, query, value).await
}

/// Execute a purge query with the given parameters.
///
/// Values should be a Vec of values which implement `SerializeRow` and they MUST be
/// the same, and must match the query being executed.
///
/// This will divide the batch into optimal sized chunks and execute them until all
/// values have been executed or the first error is encountered.
///
/// NOTE: This is currently only used to purge volatile data.
pub(crate) async fn purge_execute_batch<T: SerializeRow + Debug>(
&self, query: PreparedDeleteQuery, values: Vec<T>,
) -> FallibleQueryResults {
// Only execute purge queries on the volatile session
let persistent = false;
let Some(volatile_db) = Self::get(persistent) else {
// This should never happen
anyhow::bail!("Volatile DB Session not found");
};
let cfg = self.cfg.clone();
let queries = self.purge_queries.clone();
let session = volatile_db.session.clone();

queries.execute_batch(session, cfg, query, values).await
}

/// Execute a select query to gather primary keys for purging.
pub(crate) async fn purge_execute_iter(
&self, query: purge::PreparedSelectQuery
) -> anyhow::Result<RowIterator> {
// Only execute purge queries on the volatile session
let persistent = false;
let Some(volatile_db) = Self::get(persistent) else {
// This should never happen
anyhow::bail!("Volatile DB Session not found");
};
let queries = self.purge_queries.clone();

queries.execute_iter(volatile_db.session.clone(), query).await
}

/// Get underlying Raw Cassandra Session.
pub(crate) fn get_raw_session(&self) -> Arc<Session> {
self.session.clone()
Expand Down

0 comments on commit fd3e1e9

Please sign in to comment.