From 69759e66f788b22938d88233e29a041666bc7fb9 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 8 Nov 2024 19:19:28 +0100 Subject: [PATCH] more tests around dropping items from queue --- crates/corro-agent/src/agent/handlers.rs | 100 +++++++++++++++++++++-- 1 file changed, 94 insertions(+), 6 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 330bcb93..ca3eff7f 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -918,11 +918,11 @@ pub async fn handle_changes( } // drop old items when the queue is full. - if queue.len() > max_queue_len { - let change = queue.pop_back(); - if let Some(change) = change { - for v in change.0.versions() { - let _ = seen.remove(&(change.0.actor_id, v)); + if queue.len() >= max_queue_len { + let dropped = queue.pop_front(); + if let Some(dropped) = dropped { + for v in dropped.0.versions() { + let _ = seen.remove(&(dropped.0.actor_id, v)); } } @@ -1082,11 +1082,18 @@ pub async fn handle_sync( #[cfg(test)] mod tests { + use crate::agent::setup; + use crate::api::public::api_v1_db_schema; + use super::*; + use axum::{http::StatusCode, Extension, Json}; + use corro_tests::TEST_SCHEMA; + use corro_types::api::{Change, ColumnName, TableName}; + use corro_types::{base::CrsqlDbVersion, base::Version, config::Config, pubsub::pack_columns}; use rusqlite::Connection; use std::sync::Arc; use tokio::sync::Semaphore; - use tokio::time::timeout; + use tokio::time::{timeout, Duration}; #[test] fn ensure_truncate_works() -> eyre::Result<()> { @@ -1105,6 +1112,87 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_loadshed_handle_changes() -> eyre::Result<()> { + _ = tracing_subscriber::fmt::try_init(); + let (tripwire, _tripwire_worker, _tripwire_tx) = Tripwire::new_simple(); + let dir = tempfile::tempdir()?; + + let mut config = Config::builder() + .db_path(dir.path().join("corrosion.db").display().to_string()) + .gossip_addr("127.0.0.1:0".parse()?) + .api_addr("127.0.0.1:0".parse()?) + .build()?; + config.perf.apply_queue_len = 1; + config.perf.processing_queue_len = 3; + + let (agent, agent_options) = setup(config, tripwire.clone()).await?; + + let (status_code, _res) = + api_v1_db_schema(Extension(agent.clone()), Json(vec![TEST_SCHEMA.to_owned()])).await; + assert_eq!(status_code, StatusCode::OK); + + let other_actor = ActorId(uuid::Uuid::new_v4()); + let bookie = Bookie::new(Default::default()); + tokio::spawn(handle_changes( + agent.clone(), + bookie.clone(), + agent_options.rx_changes, + tripwire, + )); + + { + // hold write connection so that max_concurrency is reached + let _conn = agent.pool().write_normal().await?; + + // queue size is very small - only three changes + // 10-6 are stuck proecessing because we hold the write conn + // next two versions, 3-5, enter the queue + // last version 2-1, displace 4 and 5 from the queue and + // they never get processed + for i in (1i64..=10i64).rev() { + let crsql_row = Change { + table: TableName("tests".into()), + pk: pack_columns(&vec![i.into()])?, + cid: ColumnName("text".into()), + val: "two override".into(), + col_version: 1, + db_version: CrsqlDbVersion(4), + seq: CrsqlSeq(0), + site_id: agent.actor_id().to_bytes(), + cl: 1, + }; + + let change = ( + ChangeV1 { + actor_id: other_actor, + changeset: Changeset::Full { + version: Version(i as u64), + changes: vec![crsql_row.clone()], + seqs: CrsqlSeq(0)..=CrsqlSeq(0), + last_seq: CrsqlSeq(0), + ts: agent.clock().new_timestamp().into(), + }, + }, + ChangeSource::Sync, + ); + + agent.tx_changes().send(change).await?; + } + } + + sleep(Duration::from_secs(2)).await; + + let bookie = bookie.read("read booked").await; + let booked = bookie.get(&other_actor).unwrap().read("test").await; + assert!(booked.contains_all(Version(6)..=Version(10), None)); + assert!(booked.contains_all(Version(1)..=Version(3), None)); + assert!(!booked.contains_version(&Version(5))); + assert!(!booked.contains_version(&Version(4))); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn ensure_vacuum_works() -> eyre::Result<()> { let tmpdir = tempfile::tempdir()?;