Skip to content

Commit

Permalink
test loadshedding in broadcast queue
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Nov 6, 2024
1 parent ce51743 commit 9977246
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 28 deletions.
38 changes: 20 additions & 18 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ pub fn spawn_incoming_connection_handlers(

// Spawn handler tasks for this connection
spawn_foca_handler(&agent, &tripwire, &conn);
uni::spawn_unipayload_handler(&tripwire, &conn, agent.cluster_id(), agent.tx_changes().clone());
uni::spawn_unipayload_handler(
&tripwire,
&conn,
agent.cluster_id(),
agent.tx_changes().clone(),
);
bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn);
});
}
Expand Down Expand Up @@ -529,12 +534,9 @@ pub async fn handle_emptyset(
mut tripwire: Tripwire,
) {
type EmptyQueue = VecDeque<(Vec<RangeInclusive<Version>>, Timestamp)>;
let mut buf: HashMap<ActorId, EmptyQueue> =
HashMap::new();
let mut buf: HashMap<ActorId, EmptyQueue> = HashMap::new();

let mut join_set: JoinSet<
HashMap<ActorId, EmptyQueue>,
> = JoinSet::new();
let mut join_set: JoinSet<HashMap<ActorId, EmptyQueue>> = JoinSet::new();

loop {
tokio::select! {
Expand Down Expand Up @@ -865,18 +867,6 @@ pub async fn handle_changes(
continue;
}

// drop old items when the queue is full.
if queue.len() > max_queue_len {
let change = queue.pop_back();
if let Some(change) = change {
for v in change.0.versions() {
let _ = seen.remove(&(change.0.actor_id, v));
}
}

log_at_pow_10("dropped old change from queue", &mut drop_log_count);
}

if let Some(mut seqs) = change.seqs().cloned() {
let v = *change.versions().start();
if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) {
Expand Down Expand Up @@ -927,6 +917,18 @@ pub async fn handle_changes(
}
}

// drop old items when the queue is full.
if queue.len() > max_queue_len {
let change = queue.pop_back();
if let Some(change) = change {
for v in change.0.versions() {
let _ = seen.remove(&(change.0.actor_id, v));
}
}

log_at_pow_10("dropped old change from queue", &mut drop_log_count);
}

if let Some(recv_lag) = recv_lag {
let src_str: &'static str = src.into();
histogram!("corro.agent.changes.recv.lag.seconds", "source" => src_str)
Expand Down
79 changes: 69 additions & 10 deletions crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,22 +614,32 @@ async fn handle_broadcasts(
}
}

// if broadcast queue is over the max, drop the oldest, most sent item
if to_broadcast.len() > max_queue_len {
let max_sent = to_broadcast
.iter()
.enumerate()
.max_by_key(|(_, val)| val.send_count);
if let Some((i, _)) = max_sent {
to_broadcast.remove(i);
log_at_pow_10("dropped broadcast from queue", &mut log_count);
};
if drop_oldest_broadcast(&mut to_broadcast, max_queue_len).is_some() {
log_at_pow_10("dropped old change from queue", &mut log_count);
}
}

info!("broadcasts are done");
}

// Drop the oldest, most sent item
fn drop_oldest_broadcast(
queue: &mut VecDeque<PendingBroadcast>,
max: usize,
) -> Option<PendingBroadcast> {
if queue.len() > max {
let max_sent: Option<(_, _)> = queue
.iter()
.enumerate()
.max_by_key(|(_, val)| val.send_count);
if let Some((i, _)) = max_sent {
return queue.remove(i);
}
}

None
}

fn diff_member_states(
agent: &Agent,
foca: &Foca<Actor, BincodeCodec<DefaultOptions>, StdRng, NoCustomBroadcast>,
Expand Down Expand Up @@ -832,6 +842,55 @@ mod tests {
};
use uuid::Uuid;

#[test]
fn test_behaviour_when_queue_is_full() -> eyre::Result<()> {
let max = 4;
let mut queue = VecDeque::new();

assert!(drop_oldest_broadcast(&mut queue, max).is_none());


queue.push_front(build_broadcast(1, 0));
queue.push_front(build_broadcast(2, 3));
queue.push_front(build_broadcast(3, 1));
queue.push_front(build_broadcast(4, 1));
queue.push_front(build_broadcast(5, 2));
queue.push_front(build_broadcast(6, 1));
queue.push_front(build_broadcast(7, 3));
queue.push_front(build_broadcast(8, 0));

// drop oldest item with highest send count
let dropped = drop_oldest_broadcast(&mut queue, max).unwrap();
assert_eq!(dropped.send_count, 3);
assert_eq!(2_i64.to_be_bytes(), dropped.payload.as_ref());

let dropped = drop_oldest_broadcast(&mut queue, max).unwrap();
assert_eq!(dropped.send_count, 3);
assert_eq!(7_i64.to_be_bytes(), dropped.payload.as_ref());

let dropped = drop_oldest_broadcast(&mut queue, max).unwrap();
assert_eq!(dropped.send_count, 2);
assert_eq!(5_i64.to_be_bytes(), dropped.payload.as_ref());

let dropped = drop_oldest_broadcast(&mut queue, max).unwrap();
assert_eq!(dropped.send_count, 1);
assert_eq!(3_i64.to_be_bytes(), dropped.payload.as_ref());

// queue is still at max now, no item gets dropped
assert!(drop_oldest_broadcast(&mut queue, max).is_none());

Ok(())
}

fn build_broadcast(id: u64, send_count: u8) -> PendingBroadcast {
PendingBroadcast {
payload: Bytes::copy_from_slice(&id.to_be_bytes()),
is_local: false,
send_count,
sent_to: HashSet::new(),
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_broadcast_order() -> eyre::Result<()> {
tracing_subscriber::fmt()
Expand Down

0 comments on commit 9977246

Please sign in to comment.