Skip to content

Commit

Permalink
Merge pull request #31285 from petrosagg/test-reclock-consolidation
Browse files Browse the repository at this point in the history
timely-util: reclock: test binding consolidation
  • Loading branch information
petrosagg authored Feb 4, 2025
2 parents 46b28b8 + d5f5433 commit a9d5a81
Showing 1 changed file with 127 additions and 6 deletions.
133 changes: 127 additions & 6 deletions src/timely-util/src/reclock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,14 +545,18 @@ impl<D, T: Timestamp, R> FromIterator<(D, T, R)> for ChainBatch<D, T, R> {

#[cfg(test)]
mod test {
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{Receiver, TryRecvError};

use differential_dataflow::consolidation;
use differential_dataflow::input::{Input, InputSession};
use serde::{Deserialize, Serialize};
use timely::communication::allocator::Thread;
use timely::dataflow::operators::capture::{Event, Extract};
use timely::dataflow::operators::unordered_input::UnorderedHandle;
use timely::dataflow::operators::{ActivateCapability, Capture, UnorderedInput};
use timely::progress::timestamp::Refines;
use timely::progress::PathSummary;
use timely::worker::Worker;

use crate::capture::PusherCapture;
Expand All @@ -562,8 +566,8 @@ mod test {

type FromTime = Partitioned<u64, u64>;
type IntoTime = u64;
type BindingHandle = InputSession<IntoTime, FromTime, i64>;
type DataHandle<D> = (
type BindingHandle<FromTime> = InputSession<IntoTime, FromTime, i64>;
type DataHandle<D, FromTime> = (
UnorderedHandle<FromTime, (D, FromTime, i64)>,
ActivateCapability<FromTime>,
);
Expand All @@ -576,10 +580,16 @@ mod test {
/// * A `BindingHandle` that allows the test to manipulate the remap bindings
/// * A `DataHandle` that allows the test to submit the data to be reclocked
/// * A `ReclockedStream` that allows observing the result of the reclocking process
fn harness<D, F, R>(as_of: Antichain<IntoTime>, test_logic: F) -> R
fn harness<FromTime, D, F, R>(as_of: Antichain<IntoTime>, test_logic: F) -> R
where
FromTime: Timestamp + Refines<()>,
D: ExchangeData,
F: FnOnce(&mut Worker<Thread>, BindingHandle, DataHandle<D>, ReclockedStream<D>) -> R
F: FnOnce(
&mut Worker<Thread>,
BindingHandle<FromTime>,
DataHandle<D, FromTime>,
ReclockedStream<D>,
) -> R
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -620,7 +630,7 @@ mod test {
#[mz_ore::test]
fn basic_reclocking() {
let as_of = Antichain::from_elem(IntoTime::minimum());
harness(
harness::<FromTime, _, _, _>(
as_of,
|worker, bindings, (mut data, data_cap), reclocked| {
// Reclock everything at the minimum IntoTime
Expand Down Expand Up @@ -719,7 +729,7 @@ mod test {
#[mz_ore::test]
fn test_reclock_frontier() {
let as_of = Antichain::from_elem(IntoTime::minimum());
harness::<(), _, _>(
harness::<_, (), _, _>(
as_of,
|worker, mut bindings, (_data, data_cap), reclocked| {
// Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
Expand Down Expand Up @@ -1007,4 +1017,115 @@ mod test {
let b = ChainBatch::from_iter([('a', 0, -1), ('a', 1, 1)]);
assert_eq!(a.merge_with(b), ChainBatch::from_iter([('a', 1, 1)]));
}

#[mz_ore::test]
fn test_binding_consolidation() {
use std::sync::atomic::Ordering;

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
struct Time(u64);

// A counter of the number of active Time instances
static INSTANCES: AtomicUsize = AtomicUsize::new(0);

impl Time {
fn new(time: u64) -> Self {
INSTANCES.fetch_add(1, Ordering::Relaxed);
Self(time)
}
}

impl Clone for Time {
fn clone(&self) -> Self {
INSTANCES.fetch_add(1, Ordering::Relaxed);
Self(self.0)
}
}

impl Drop for Time {
fn drop(&mut self) {
INSTANCES.fetch_sub(1, Ordering::Relaxed);
}
}

impl Timestamp for Time {
type Summary = ();

fn minimum() -> Self {
Time::new(0)
}
}

impl PathSummary<Time> for () {
fn results_in(&self, src: &Time) -> Option<Time> {
Some(src.clone())
}

fn followed_by(&self, _other: &()) -> Option<Self> {
Some(())
}
}

impl Refines<()> for Time {
fn to_inner(_: ()) -> Self {
Self::minimum()
}
fn to_outer(self) -> () {}
fn summarize(_path: ()) {}
}

impl PartialOrder for Time {
fn less_equal(&self, other: &Self) -> bool {
self.0.less_equal(&other.0)
}
}

let as_of = 1000;

// Test that supplying a single big batch of unconsolidated bindings gets
// consolidated after a single worker step.
harness::<Time, u64, _, _>(
Antichain::from_elem(as_of),
move |worker, mut bindings, _, _| {
step(worker);
let instances_before = INSTANCES.load(Ordering::Relaxed);
for ts in 0..as_of {
if ts > 0 {
bindings.update_at(Time::new(ts - 1), ts, -1);
}
bindings.update_at(Time::new(ts), ts, 1);
}
bindings.advance_to(as_of);
bindings.flush();
step(worker);
let instances_after = INSTANCES.load(Ordering::Relaxed);
// The extra instances live in a ChangeBatch which considers compaction when more
// than 32 elements are inside.
assert!(instances_after - instances_before < 32);
},
);

// Test that a slow feed of uncompacted bindings over multiple steps never leads to an
// excessive number of bindings held in memory.
harness::<Time, u64, _, _>(
Antichain::from_elem(as_of),
move |worker, mut bindings, _, _| {
step(worker);
let instances_before = INSTANCES.load(Ordering::Relaxed);
for ts in 0..as_of {
if ts > 0 {
bindings.update_at(Time::new(ts - 1), ts, -1);
}
bindings.update_at(Time::new(ts), ts, 1);
bindings.advance_to(ts + 1);
bindings.flush();
step(worker);
let instances_now = INSTANCES.load(Ordering::Relaxed);
// The extra instances live in a ChangeBatch which considers compaction when
// more than 32 elements are inside.
assert!(instances_now - instances_before < 32);
}
},
);
}
}

0 comments on commit a9d5a81

Please sign in to comment.