Skip to content

Commit

Permalink
add operation to updates
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Oct 24, 2024
1 parent dcff34e commit 00b1282
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 29 deletions.
4 changes: 2 additions & 2 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,15 +722,15 @@ pub async fn process_fully_buffered_changes(
let conn = agent.pool().read().await?;
block_in_place(|| {
if let Err(e) = match_changes_from_db_version(agent.subs_manager(), &conn, db_version) {
error!(%db_version, "could not match changes from db version: {e}");
error!(%db_version, "could not match changes for subs from db version: {e}");
}
});

block_in_place(|| {
if let Err(e) =
match_changes_from_db_version(agent.updates_manager(), &conn, db_version)
{
error!(%db_version, "could not match changes from db version: {e}");
error!(%db_version, "could not match changes for updates from db version: {e}");
}
});
}
Expand Down
43 changes: 39 additions & 4 deletions crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,12 +1151,12 @@ mod tests {

assert_eq!(
notify_rows.recv().await.unwrap().unwrap(),
QueryEvent::Notify(TableName("tests".into()), vec!["service-id-3".into()],)
QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-3".into()],)
);

assert_eq!(
notify_rows.recv().await.unwrap().unwrap(),
QueryEvent::Notify(TableName("tests".into()), vec!["service-id-4".into()],)
QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-4".into()],)
);

let mut res = api_v1_subs(
Expand Down Expand Up @@ -1243,7 +1243,7 @@ mod tests {
assert_eq!(rows_from.recv().await.unwrap().unwrap(), query_evt);

let notify_evt =
QueryEvent::Notify(TableName("tests".into()), vec!["service-id-5".into()]);
QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-5".into()]);

assert_eq!(notify_rows.recv().await.unwrap().unwrap(), notify_evt);

Expand Down Expand Up @@ -1316,6 +1316,39 @@ mod tests {
vec!["service-id-5".into(), "service-name-5".into()]
)
);

let (status_code, _) = api_v1_transactions(
Extension(agent.clone()),
axum::Json(vec![Statement::WithParams(
"insert into tests (id, text) values (?,?)".into(),
vec!["service-id-6".into(), "service-name-6".into()],
)]),
)
.await;

assert_eq!(status_code, StatusCode::OK);

let (status_code, _) = api_v1_transactions(
Extension(agent.clone()),
axum::Json(vec![Statement::WithParams(
"delete from tests where id = ?".into(),
vec!["service-id-6".into()],
)]),
)
.await;

assert_eq!(status_code, StatusCode::OK);

assert_eq!(
notify_rows.recv().await.unwrap().unwrap(),
QueryEvent::Notify(TableName("tests".into()), ChangeType::Update, vec!["service-id-6".into()],)
);

assert_eq!(
notify_rows.recv().await.unwrap().unwrap(),
QueryEvent::Notify(TableName("tests".into()), ChangeType::Delete, vec!["service-id-6".into()],)
);

}

// previous subs have been dropped.
Expand Down Expand Up @@ -1456,6 +1489,8 @@ mod tests {
)
);



Ok(())
}

Expand Down Expand Up @@ -1677,7 +1712,7 @@ mod tests {
let notify_res = timeout(Duration::from_secs(5), notify_rows.recv()).await?;
assert_eq!(
notify_res.unwrap().unwrap(),
QueryEvent::Notify(TableName("buftests".into()), vec![Integer(2)],)
QueryEvent::Notify(TableName("buftests".into()), ChangeType::Update, vec![Integer(2)],)
);

tripwire_tx.send(()).await.ok();
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum TypedQueryEvent<T> {
},
Change(ChangeType, RowId, T, ChangeId),
Error(CompactString),
Notify(TableName, T)
Notify(TableName, ChangeType, T)
}

impl<T> TypedQueryEvent<T> {
Expand All @@ -46,7 +46,7 @@ impl<T> TypedQueryEvent<T> {
TypedQueryEvent::EndOfQuery { change_id, .. } => QueryEventMeta::EndOfQuery(*change_id),
TypedQueryEvent::Change(_, _, _, id) => QueryEventMeta::Change(*id),
TypedQueryEvent::Error(_) => QueryEventMeta::Error,
TypedQueryEvent::Notify(_, _) => QueryEventMeta::Notify,
TypedQueryEvent::Notify(_, _, _) => QueryEventMeta::Notify,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-tpl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl QueryResponseIter {
return Some(Err(Box::new(EvalAltResult::from(e))));
}
// TODO(somtochiama):
QueryEvent::Notify(_, _) => {
QueryEvent::Notify(..) => {
self.done = true;
return None;
}
Expand Down
19 changes: 10 additions & 9 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ impl Manager<MatcherHandle> for SubsManager {
}

fn get_handles(&self) -> BTreeMap<Uuid, MatcherHandle> {
let handles = { self.0.read().handles.clone() };
handles.clone()
self.0.read().handles.clone()
}
}

Expand Down Expand Up @@ -195,6 +194,7 @@ pub struct MatchableChange<'a> {
pub table: &'a TableName,
pub pk: &'a [u8],
pub column: &'a ColumnName,
pub cl: &'a i64,
}

impl<'a> From<&'a Change> for MatchableChange<'a> {
Expand All @@ -203,6 +203,7 @@ impl<'a> From<&'a Change> for MatchableChange<'a> {
table: &value.table,
pk: &value.pk,
column: &value.cid,
cl: &value.cl,
}
}
}
Expand Down Expand Up @@ -266,7 +267,7 @@ struct InnerMatcherHandle {
cached_statements: HashMap<String, MatcherStmt>,
}

pub type MatchCandidates = IndexMap<TableName, IndexSet<Vec<u8>>>;
pub type MatchCandidates = IndexMap<TableName, IndexSet<(Vec<u8>, i64)>>;

#[async_trait]
impl Handle for MatcherHandle {
Expand Down Expand Up @@ -300,7 +301,7 @@ impl Handle for MatcherHandle {
// don't double process the same pk
if candidates
.get(change.table)
.map(|pks| pks.contains(change.pk))
.map(|pks| pks.contains(&(change.pk.to_vec(), change.cl.clone())))
.unwrap_or_default()
{
trace!("already contained key");
Expand All @@ -321,9 +322,9 @@ impl Handle for MatcherHandle {
}

if let Some(v) = candidates.get_mut(change.table) {
v.insert(change.pk.to_vec())
v.insert((change.pk.to_vec(), change.cl.clone()))
} else {
candidates.insert(change.table.clone(), [change.pk.to_vec()].into());
candidates.insert(change.table.clone(), [(change.pk.to_vec(), change.cl.clone())].into());
true
}
}
Expand Down Expand Up @@ -1426,7 +1427,7 @@ impl Matcher {
for (table, pks) in candidates {
let pks = pks
.iter()
.map(|pk| unpack_columns(pk))
.map(|(pk, _)| unpack_columns(pk))
.collect::<Result<Vec<Vec<SqliteValueRef>>, _>>()?;

let tmp_table_name = format!("temp_{table}");
Expand Down Expand Up @@ -1693,7 +1694,7 @@ impl Matcher {
{
let mut changes_prepped = state_conn.prepare_cached(
r#"
SELECT DISTINCT "table", pk
SELECT DISTINCT "table", pk, cl
FROM crsql_changes
WHERE db_version > ?
AND db_version <= ? -- TODO: allow going over?
Expand All @@ -1707,7 +1708,7 @@ impl Matcher {
candidates
.entry(row.get(0)?)
.or_default()
.insert(row.get(1)?);
.insert((row.get(1)?, row.get(2)?));
}
}

Expand Down
27 changes: 17 additions & 10 deletions crates/corro-types/src/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::pubsub::{
};
use crate::schema::Schema;
use async_trait::async_trait;
use corro_api_types::sqlite::ChangeType;
use corro_api_types::{Change, ColumnName, QueryEvent, SqliteValueRef, TableName};
use corro_base_types::CrsqlDbVersion;
use metrics::{counter, histogram};
Expand Down Expand Up @@ -60,8 +61,7 @@ impl Manager<UpdateHandle> for UpdatesManager {
}

fn get_handles(&self) -> BTreeMap<Uuid, UpdateHandle> {
let handles = { self.0.read().handles.clone() };
handles.clone()
self.0.read().handles.clone()
}
}

Expand Down Expand Up @@ -92,17 +92,17 @@ impl Handle for UpdateHandle {
// don't double process the same pk
if candidates
.get(change.table)
.map(|pks| pks.contains(change.pk))
.map(|pks| pks.contains(&(change.pk.to_vec(), change.cl.clone())))
.unwrap_or_default()
{
trace!("already contained key");
return false;
}

if let Some(v) = candidates.get_mut(change.table) {
v.insert(change.pk.to_vec())
v.insert((change.pk.to_vec(), change.cl.clone()))
} else {
candidates.insert(change.table.clone(), [change.pk.to_vec()].into());
candidates.insert(change.table.clone(), [(change.pk.to_vec(), change.cl.clone())].into());
true
}
}
Expand Down Expand Up @@ -252,12 +252,17 @@ fn handle_candidates(
for (table, pks) in candidates {
let pks = pks
.iter()
.map(|pk| unpack_columns(pk))
.collect::<Result<Vec<Vec<SqliteValueRef>>, _>>()?;
.map(|(pk, cl)| unpack_columns(pk).and_then(|x| Ok((x, cl.clone()))))
.collect::<Result<Vec<(Vec<SqliteValueRef>, i64)>, _>>()?;

for pk in pks {
for (pk, cl) in pks {
let mut change_type = ChangeType::Update;
if cl % 2 == 0 {
change_type = ChangeType::Delete
}
if let Err(e) = evt_tx.blocking_send(QueryEvent::Notify(
table.clone(),
change_type,
pk.iter().map(|x| x.to_owned()).collect::<Vec<_>>(),
)) {
debug!("could not send back row to matcher sub sender: {e}");
Expand Down Expand Up @@ -429,7 +434,7 @@ pub fn match_changes_from_db_version<H: Handle + Send + 'static>(
{
let mut prepped = conn.prepare_cached(
r#"
SELECT "table", pk, cid
SELECT "table", pk, cid, cl
FROM crsql_changes
WHERE db_version = ?
ORDER BY seq ASC
Expand All @@ -441,17 +446,19 @@ pub fn match_changes_from_db_version<H: Handle + Send + 'static>(
row.get::<_, TableName>(0)?,
row.get::<_, Vec<u8>>(1)?,
row.get::<_, ColumnName>(2)?,
row.get::<_, i64>(3)?,
))
})?;

for change_res in rows {
let (table, pk, column) = change_res?;
let (table, pk, column, cl) = change_res?;

for (_id, (candidates, handle)) in candidates.iter_mut() {
let change = MatchableChange {
table: &table,
pk: &pk,
column: &column,
cl: &cl,
};
handle.filter_matchable_change(candidates, change);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/corrosion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> {
Ok(QueryEvent::Change(_, _, _, _)) => {
break;
}
Ok(QueryEvent::Notify(_, _)) => {
Ok(QueryEvent::Notify(..)) => {
break;
}
Ok(QueryEvent::Error(e)) => {
Expand Down

0 comments on commit 00b1282

Please sign in to comment.