Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest Timely #31158

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/adapter-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mz-ore = { path = "../ore" }
mz-repr = { path = "../repr" }
mz-storage-types = { path = "../storage-types" }
serde = "1.0.152"
timely = "0.16.0"
timely = "0.17.0"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

[package.metadata.cargo-udeps.ignore]
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytesize = "1.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
dec = "0.4.8"
derivative = "2.2.0"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
enum-kinds = "0.5.1"
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
Expand Down Expand Up @@ -78,7 +78,7 @@ serde_plain = "1.0.1"
sha2 = "0.10.6"
smallvec = { version = "1.10.0", features = ["union"] }
static_assertions = "1.1"
timely = "0.16.0"
timely = "0.17.0"
tokio = { version = "1.38.0", features = ["rt", "time"] }
tokio-postgres = { version = "0.7.8" }
tracing = "0.1.37"
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytesize = "1.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
clap = { version = "4.5.23", features = ["derive"] }
derivative = "2.2.0"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
futures = "0.3.25"
ipnet = "2.5.0"
itertools = "0.12.1"
Expand Down Expand Up @@ -60,7 +60,7 @@ serde_plain = "1.0.1"
static_assertions = "1.1"
sha2 = "0.10.6"
thiserror = "1.0.37"
timely = "0.16.0"
timely = "0.17.0"
tokio = { version = "1.38.0" }
tracing = "0.1.37"
uuid = "1.2.2"
Expand Down
22 changes: 16 additions & 6 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5567,15 +5567,25 @@ pub static MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER: LazyLock<BuiltinView> =
oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER_OID,
column_defs: None,
sql: "SELECT
address,
addr2.id,
reachability.worker_id,
port,
worker_id,
update_type,
time,
pg_catalog.count(*) as count
FROM
mz_introspection.mz_dataflow_operator_reachability_raw
GROUP BY address, port, worker_id, update_type, time",
mz_introspection.mz_dataflow_operator_reachability_raw reachability,
mz_introspection.mz_dataflow_addresses_per_worker addr1,
mz_introspection.mz_dataflow_addresses_per_worker addr2
WHERE
CASE
WHEN source = 0 THEN addr2.address = addr1.address
ELSE addr2.address = addr1.address || reachability.source
END
AND addr1.id = reachability.id
AND addr1.worker_id = reachability.worker_id
AND addr2.worker_id = reachability.worker_id
GROUP BY addr2.id, reachability.worker_id, port, update_type, time",
access: vec![PUBLIC_SELECT],
});

Expand All @@ -5587,13 +5597,13 @@ pub static MZ_DATAFLOW_OPERATOR_REACHABILITY: LazyLock<BuiltinView> =
column_defs: None,
sql: "
SELECT
address,
id,
port,
update_type,
time,
pg_catalog.sum(count) as count
FROM mz_introspection.mz_dataflow_operator_reachability_per_worker
GROUP BY address, port, update_type, time",
GROUP BY id, port, update_type, time",
access: vec![PUBLIC_SELECT],
});

Expand Down
4 changes: 2 additions & 2 deletions src/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ workspace = true
anyhow = "1.0.66"
async-trait = "0.1.83"
crossbeam-channel = "0.5.8"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
futures = "0.3.25"
mz-cluster-client = { path = "../cluster-client" }
mz-ore = { path = "../ore", features = ["async", "process", "tracing_"] }
mz-persist-client = { path = "../persist-client" }
mz-service = { path = "../service" }
mz-txn-wal = { path = "../txn-wal" }
regex = "1.7.0"
timely = "0.16.0"
timely = "0.17.0"
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
tracing = "0.1.37"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions src/compute-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async-trait = "0.1.83"
bytesize = "1.1.0"
crossbeam-channel = "0.5.8"
derivative = "2.2.0"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
futures = "0.3.25"
http = "1.1.0"
mz-build-info = { path = "../build-info" }
Expand Down Expand Up @@ -43,7 +43,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.125"
thiserror = "1.0.37"
timely = "0.16.0"
timely = "0.17.0"
tokio = "1.38.0"
tokio-stream = "0.1.11"
tonic = "0.12.1"
Expand Down
12 changes: 3 additions & 9 deletions src/compute-client/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,16 +448,10 @@ impl LogVariant {
.finish(),

LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
.with_column(
"address",
ScalarType::List {
element_type: Box::new(ScalarType::UInt64),
custom_id: None,
}
.nullable(false),
)
.with_column("port", ScalarType::UInt64.nullable(false))
.with_column("id", ScalarType::UInt64.nullable(false))
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("source", ScalarType::UInt64.nullable(false))
.with_column("port", ScalarType::UInt64.nullable(false))
.with_column("update_type", ScalarType::String.nullable(false))
.with_column("time", ScalarType::MzTimestamp.nullable(true))
.finish(),
Expand Down
4 changes: 2 additions & 2 deletions src/compute-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ workspace = true
[dependencies]
columnar = "0.2.2"
columnation = "0.1.0"
differential-dataflow = "0.13.3"
differential-dataflow = "0.13.4"
itertools = "0.12.1"
mz-dyncfg = { path = "../dyncfg" }
mz-expr = { path = "../expr" }
Expand All @@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] }
proptest-derive = { version = "0.5.1", features = ["boxed_union"] }
prost = { version = "0.13.2", features = ["no-recursion-limit"] }
serde = { version = "1.0.152", features = ["derive"] }
timely = "0.16.0"
timely = "0.17.0"
tracing = "0.1.37"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

Expand Down
6 changes: 3 additions & 3 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ bytesize = "1.1.0"
columnar = "0.2.2"
crossbeam-channel = "0.5.8"
dec = { version = "0.4.8", features = ["serde"] }
differential-dataflow = "0.13.3"
differential-dogs3 = "0.1.3"
differential-dataflow = "0.13.4"
differential-dogs3 = "0.1.4"
futures = "0.3.25"
itertools = "0.12.1"
lgalloc = "0.4"
Expand All @@ -39,7 +39,7 @@ prometheus = { version = "0.13.3", default-features = false }
scopeguard = "1.1.0"
serde = { version = "1.0.152", features = ["derive"] }
smallvec = { version = "1.10.0", features = ["serde", "union"] }
timely = "0.16.0"
timely = "0.17.0"
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
tracing = "0.1.37"
uuid = { version = "1.7.0", features = ["serde", "v4"] }
Expand Down
13 changes: 9 additions & 4 deletions src/compute/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,23 @@ where
///
/// This is just a bundle-type intended to make passing around its contents in the logging
/// initialization code more convenient.
///
/// The `N` type parameter specifies the number of links to create for the event queue. We need
/// separate links for queues that feed from multiple loggers because the `EventLink` type is not
/// multi-producer safe (it is a linked-list, and multiple writers would blindly append, replacing
/// existing new data, and cutting off other writers).
#[derive(Clone)]
struct EventQueue<C> {
link: Rc<EventLink<Timestamp, C>>,
struct EventQueue<C, const N: usize = 1> {
links: [Rc<EventLink<Timestamp, C>>; N],
activator: RcActivator,
}

impl<C> EventQueue<C> {
impl<C, const N: usize> EventQueue<C, N> {
fn new(name: &str) -> Self {
let activator_name = format!("{name}_activator");
let activate_after = 128;
Self {
link: Rc::new(EventLink::new()),
links: [(); N].map(|_| Rc::new(EventLink::new())),
activator: RcActivator::new(activator_name, activate_after),
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,16 +308,16 @@ pub(super) fn construct<A: Allocate + 'static>(

worker.dataflow_named("Dataflow: compute logging", move |scope| {
let enable_logging = config.enable_logging;
let (logs, token) = Some(event_queue.link).mz_replay::<_, ProvidedBuilder<_>, _>(
let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
scope,
"compute logs",
config.interval,
event_queue.activator,
move |mut session, data| {
move |mut session, mut data| {
// If logging is disabled, we still need to install the indexes, but we can leave them
// empty. We do so by immediately filtering all logs events.
if enable_logging {
session.give_container(&mut data.clone())
session.give_container(data.to_mut())
}
},
);
Expand Down
25 changes: 13 additions & 12 deletions src/compute/src/logging/differential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ use differential_dataflow::logging::{
};
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, Timestamp};
use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder};
use mz_timely_util::containers::{
columnar_exchange, Col2ValBatcher, ColumnBuilder, ProvidedBuilder,
};
use mz_timely_util::replay::MzReplay;
use timely::communication::Allocate;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::Filter;
use timely::dataflow::Stream;

use crate::extensions::arrange::MzArrangeCore;
Expand Down Expand Up @@ -57,21 +57,22 @@ pub(super) fn construct<A: Allocate>(
let dataflow_index = worker.next_dataflow_index();

worker.dataflow_named("Dataflow: differential logging", move |scope| {
let (mut logs, token) = Some(event_queue.link)
.mz_replay::<_, CapacityContainerBuilder<_>, _>(
let enable_logging = config.enable_logging;
let (logs, token) = event_queue.links
.mz_replay::<_, ProvidedBuilder<_>, _>(
scope,
"differential logs",
config.interval,
event_queue.activator,
|mut session, data| session.give_iterator(data.iter()),
move |mut session, mut data|{
// If logging is disabled, we still need to install the indexes, but we can leave them
// empty. We do so by immediately filtering all logs events.
if enable_logging {
session.give_container(data.to_mut())
}
}
);

// If logging is disabled, we still need to install the indexes, but we can leave them
// empty. We do so by immediately filtering all logs events.
if !config.enable_logging {
logs = logs.filter(|_| false);
}

// Build a demux operator that splits the replayed event stream up into the separate
// logging streams.
let mut demux =
Expand Down
Loading