Skip to content

Commit

Permalink
Update to latest Timely
Browse files Browse the repository at this point in the history
Point Materialize at latest Timely. We need to incorporate some changes
around reachability logging, which is now typed, and event iterators that
return cow'ed data.

Some of the complexity stems from the fact that event links are
single-writer, so we need separate event links for each reachability log
variant.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jan 28, 2025
1 parent 64d71af commit 936c650
Show file tree
Hide file tree
Showing 41 changed files with 203 additions and 163 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/adapter-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mz-ore = { path = "../ore" }
mz-repr = { path = "../repr" }
mz-storage-types = { path = "../storage-types" }
serde = "1.0.152"
timely = "0.16.0"
timely = "0.17.0"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

[package.metadata.cargo-udeps.ignore]
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytesize = "1.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
dec = "0.4.8"
derivative = "2.2.0"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
enum-kinds = "0.5.1"
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
Expand Down Expand Up @@ -78,7 +78,7 @@ serde_plain = "1.0.1"
sha2 = "0.10.6"
smallvec = { version = "1.10.0", features = ["union"] }
static_assertions = "1.1"
timely = "0.16.0"
timely = "0.17.0"
tokio = { version = "1.38.0", features = ["rt", "time"] }
tokio-postgres = { version = "0.7.8" }
tracing = "0.1.37"
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytesize = "1.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
clap = { version = "4.5.23", features = ["derive"] }
derivative = "2.2.0"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
futures = "0.3.25"
ipnet = "2.5.0"
itertools = "0.12.1"
Expand Down Expand Up @@ -60,7 +60,7 @@ serde_plain = "1.0.1"
static_assertions = "1.1"
sha2 = "0.10.6"
thiserror = "1.0.37"
timely = "0.16.0"
timely = "0.17.0"
tokio = { version = "1.38.0" }
tracing = "0.1.37"
uuid = "1.2.2"
Expand Down
19 changes: 13 additions & 6 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5567,15 +5567,22 @@ pub static MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER: LazyLock<BuiltinView> =
oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER_OID,
column_defs: None,
sql: "SELECT
address,
addr2.id,
reachability.worker_id,
port,
worker_id,
update_type,
time,
pg_catalog.count(*) as count
FROM
mz_introspection.mz_dataflow_operator_reachability_raw
GROUP BY address, port, worker_id, update_type, time",
mz_introspection.mz_dataflow_operator_reachability_raw reachability,
mz_introspection.mz_dataflow_addresses_per_worker addr1,
mz_introspection.mz_dataflow_addresses_per_worker addr2
WHERE
addr2.address = addr1.address || reachability.source
AND addr1.id = reachability.id
AND addr1.worker_id = reachability.worker_id
AND addr2.worker_id = reachability.worker_id
GROUP BY addr2.id, reachability.worker_id, port, update_type, time",
access: vec![PUBLIC_SELECT],
});

Expand All @@ -5587,13 +5594,13 @@ pub static MZ_DATAFLOW_OPERATOR_REACHABILITY: LazyLock<BuiltinView> =
column_defs: None,
sql: "
SELECT
address,
id,
port,
update_type,
time,
pg_catalog.sum(count) as count
FROM mz_introspection.mz_dataflow_operator_reachability_per_worker
GROUP BY address, port, update_type, time",
GROUP BY id, port, update_type, time",
access: vec![PUBLIC_SELECT],
});

Expand Down
4 changes: 2 additions & 2 deletions src/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ workspace = true
anyhow = "1.0.66"
async-trait = "0.1.83"
crossbeam-channel = "0.5.8"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
futures = "0.3.25"
mz-cluster-client = { path = "../cluster-client" }
mz-ore = { path = "../ore", features = ["async", "process", "tracing_"] }
mz-persist-client = { path = "../persist-client" }
mz-service = { path = "../service" }
mz-txn-wal = { path = "../txn-wal" }
regex = "1.7.0"
timely = "0.16.0"
timely = "0.17.0"
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
tracing = "0.1.37"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions src/compute-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async-trait = "0.1.83"
bytesize = "1.1.0"
crossbeam-channel = "0.5.8"
derivative = "2.2.0"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
futures = "0.3.25"
http = "1.1.0"
mz-build-info = { path = "../build-info" }
Expand Down Expand Up @@ -43,7 +43,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.125"
thiserror = "1.0.37"
timely = "0.16.0"
timely = "0.17.0"
tokio = "1.38.0"
tokio-stream = "0.1.11"
tonic = "0.12.1"
Expand Down
12 changes: 3 additions & 9 deletions src/compute-client/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,16 +448,10 @@ impl LogVariant {
.finish(),

LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
.with_column(
"address",
ScalarType::List {
element_type: Box::new(ScalarType::UInt64),
custom_id: None,
}
.nullable(false),
)
.with_column("port", ScalarType::UInt64.nullable(false))
.with_column("id", ScalarType::UInt64.nullable(false))
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("source", ScalarType::UInt64.nullable(false))
.with_column("port", ScalarType::UInt64.nullable(false))
.with_column("update_type", ScalarType::String.nullable(false))
.with_column("time", ScalarType::MzTimestamp.nullable(true))
.finish(),
Expand Down
4 changes: 2 additions & 2 deletions src/compute-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ workspace = true
[dependencies]
columnar = "0.2.2"
columnation = "0.1.0"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
itertools = "0.12.1"
mz-dyncfg = { path = "../dyncfg" }
mz-expr = { path = "../expr" }
Expand All @@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] }
proptest-derive = { version = "0.5.1", features = ["boxed_union"] }
prost = { version = "0.13.2", features = ["no-recursion-limit"] }
serde = { version = "1.0.152", features = ["derive"] }
timely = "0.16.0"
timely = "0.17.0"
tracing = "0.1.37"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

Expand Down
6 changes: 3 additions & 3 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ bytesize = "1.1.0"
columnar = "0.2.2"
crossbeam-channel = "0.5.8"
dec = { version = "0.4.8", features = ["serde"] }
differential-dataflow = "0.13.3"
differential-dogs3 = "0.1.3"
differential-dataflow = "0.13.4"
differential-dogs3 = "0.1.4"
futures = "0.3.25"
itertools = "0.12.1"
lgalloc = "0.4"
Expand All @@ -39,7 +39,7 @@ prometheus = { version = "0.13.3", default-features = false }
scopeguard = "1.1.0"
serde = { version = "1.0.152", features = ["derive"] }
smallvec = { version = "1.10.0", features = ["serde", "union"] }
timely = "0.16.0"
timely = "0.17.0"
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
tracing = "0.1.37"
uuid = { version = "1.7.0", features = ["serde", "v4"] }
Expand Down
12 changes: 8 additions & 4 deletions src/compute/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,22 @@ where
///
/// This is just a bundle-type intended to make passing around its contents in the logging
/// initialization code more convenient.
///
/// The `N` type parameter specifies the number of links to create for the event queue. We need
/// separate links for queues that feed from multiple loggers because the `EventLink` type is not
/// multi-producer safe.
#[derive(Clone)]
struct EventQueue<C> {
link: Rc<EventLink<Timestamp, C>>,
struct EventQueue<C, const N: usize = 1> {
links: [Rc<EventLink<Timestamp, C>>; N],
activator: RcActivator,
}

impl<C> EventQueue<C> {
impl<C, const N: usize> EventQueue<C, N> {
fn new(name: &str) -> Self {
let activator_name = format!("{name}_activator");
let activate_after = 128;
Self {
link: Rc::new(EventLink::new()),
links: [(); N].map(|_| Rc::new(EventLink::new())),
activator: RcActivator::new(activator_name, activate_after),
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,16 +308,16 @@ pub(super) fn construct<A: Allocate + 'static>(

worker.dataflow_named("Dataflow: compute logging", move |scope| {
let enable_logging = config.enable_logging;
let (logs, token) = Some(event_queue.link).mz_replay::<_, ProvidedBuilder<_>, _>(
let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
scope,
"compute logs",
config.interval,
event_queue.activator,
move |mut session, data| {
move |mut session, mut data| {
// If logging is disabled, we still need to install the indexes, but we can leave them
// empty. We do so by immediately filtering all logs events.
if enable_logging {
session.give_container(&mut data.clone())
session.give_container(data.to_mut())
}
},
);
Expand Down
25 changes: 13 additions & 12 deletions src/compute/src/logging/differential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ use differential_dataflow::logging::{
};
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, Timestamp};
use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder};
use mz_timely_util::containers::{
columnar_exchange, Col2ValBatcher, ColumnBuilder, ProvidedBuilder,
};
use mz_timely_util::replay::MzReplay;
use timely::communication::Allocate;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::Filter;
use timely::dataflow::Stream;

use crate::extensions::arrange::MzArrangeCore;
Expand Down Expand Up @@ -57,21 +57,22 @@ pub(super) fn construct<A: Allocate>(
let dataflow_index = worker.next_dataflow_index();

worker.dataflow_named("Dataflow: differential logging", move |scope| {
let (mut logs, token) = Some(event_queue.link)
.mz_replay::<_, CapacityContainerBuilder<_>, _>(
let enable_logging = config.enable_logging;
let (logs, token) = event_queue.links.clone()
.mz_replay::<_, ProvidedBuilder<_>, _>(
scope,
"differential logs",
config.interval,
event_queue.activator,
|mut session, data| session.give_iterator(data.iter()),
move |mut session, mut data|{
// If logging is disabled, we still need to install the indexes, but we can leave them
// empty. We do so by immediately filtering all logs events.
if enable_logging {
session.give_container(data.to_mut())
}
}
);

// If logging is disabled, we still need to install the indexes, but we can leave them
// empty. We do so by immediately filtering all logs events.
if !config.enable_logging {
logs = logs.filter(|_| false);
}

// Build a demux operator that splits the replayed event stream up into the separate
// logging streams.
let mut demux =
Expand Down
Loading

0 comments on commit 936c650

Please sign in to comment.