Skip to content

Commit

Permalink
Fueled as_specific_collection
Browse files Browse the repository at this point in the history
Adds fueling to `as_specific_collection` by calling into `flat_map` instead
of `as_collection`. Add a flag to restore the previous behavior.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Feb 3, 2025
1 parent 8491d33 commit 4711397
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 19 deletions.
8 changes: 8 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ pub const COMPUTE_APPLY_COLUMN_DEMANDS: Config<bool> = Config::new(
"When enabled, passes applys column demands to the RelationDesc used to read out of Persist.",
);

/// Whether to render `as_specific_collection` using a fueled flat-map operator.
pub const ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION: Config<bool> = Config::new(
"enable_compute_render_fueled_as_specific_collection",
true,
"When enabled, renders `as_specific_collection` using a fueled flat-map operator.",
);

/// Adds the full set of all compute `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand All @@ -198,4 +205,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&COMPUTE_REPLICA_EXPIRATION_OFFSET)
.add(&COMPUTE_APPLY_COLUMN_DEMANDS)
.add(&CONSOLIDATING_VEC_GROWTH_DAMPENER)
.add(&ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION)
}
29 changes: 22 additions & 7 deletions src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,12 +1068,17 @@ where
mfp,
Some((key, row)),
self.until.clone(),
&self.flags,
);
CollectionBundle::from_collections(oks, errs)
}
mz_compute_types::plan::GetPlan::Collection(mfp) => {
let (oks, errs) =
collection.as_collection_core(mfp, None, self.until.clone());
let (oks, errs) = collection.as_collection_core(
mfp,
None,
self.until.clone(),
&self.flags,
);
CollectionBundle::from_collections(oks, errs)
}
}
Expand All @@ -1088,8 +1093,12 @@ where
if mfp.is_identity() {
input
} else {
let (oks, errs) =
input.as_collection_core(mfp, input_key_val, self.until.clone());
let (oks, errs) = input.as_collection_core(
mfp,
input_key_val,
self.until.clone(),
&self.flags,
);
CollectionBundle::from_collections(oks, errs)
}
}
Expand Down Expand Up @@ -1131,7 +1140,7 @@ where
}
Negate { input } => {
let input = expect_input(input);
let (oks, errs) = input.as_specific_collection(None);
let (oks, errs) = input.as_specific_collection(None, &self.flags);
CollectionBundle::from_collections(oks.negate(), errs)
}
Threshold {
Expand All @@ -1148,7 +1157,7 @@ where
let mut oks = Vec::new();
let mut errs = Vec::new();
for input in inputs.into_iter() {
let (os, es) = expect_input(input).as_specific_collection(None);
let (os, es) = expect_input(input).as_specific_collection(None, &self.flags);
oks.push(os);
errs.push(es);
}
Expand All @@ -1166,7 +1175,13 @@ where
input_mfp,
} => {
let input = expect_input(input);
input.ensure_collections(keys, input_key, input_mfp, self.until.clone())
input.ensure_collections(
keys,
input_key,
input_mfp,
self.until.clone(),
&self.flags,
)
}
}
}
Expand Down
62 changes: 54 additions & 8 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use differential_dataflow::trace::cursor::IntoOwned;
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
use differential_dataflow::{AsCollection, Collection, Data};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
use mz_compute_types::plan::{AvailableCollections, LirId};
use mz_expr::{Id, MapFilterProject, MirScalarExpr};
use mz_repr::fixed_length::ToDatumIter;
Expand Down Expand Up @@ -97,6 +98,24 @@ where
/// The expiration time for dataflows in this context. The output's frontier should never advance
/// past this frontier, except the empty frontier.
pub dataflow_expiration: Antichain<T>,
/// The flags active in this context.
pub flags: ContextFlags,
}

/// Flags influencing the behavior of a context.
#[derive(Clone, Debug)]
pub struct ContextFlags {
/// Whether to use the fueled `flat_map` for creating specific collections.
pub enable_fueled_as_specific_collection: bool,
}

impl From<&mz_dyncfg::ConfigSet> for ContextFlags {
fn from(config: &mz_dyncfg::ConfigSet) -> Self {
Self {
enable_fueled_as_specific_collection: ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION
.get(config),
}
}
}

impl<S: Scope> Context<S>
Expand Down Expand Up @@ -135,6 +154,8 @@ where
)
};

let flags = ContextFlags::from(&compute_state.worker_config);

Self {
scope,
debug_name: dataflow.debug_name.clone(),
Expand All @@ -147,6 +168,7 @@ where
compute_logger,
linear_join_spec: compute_state.linear_join_spec,
dataflow_expiration,
flags,
}
}
}
Expand Down Expand Up @@ -230,6 +252,7 @@ where
linear_join_spec: self.linear_join_spec.clone(),
bindings,
dataflow_expiration: self.dataflow_expiration.clone(),
flags: self.flags.clone(),
}
}
}
Expand Down Expand Up @@ -327,9 +350,12 @@ where
{
/// Presents `self` as a stream of updates.
///
/// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
///
/// This method presents the contents as they are, without further computation.
/// If you have logic that could be applied to each record, consider using the
/// `flat_map` methods which allows this and can reduce the work done.
#[deprecated(note = "Use `flat_map` instead.")]
pub fn as_collection(&self) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
let mut datums = DatumVec::new();
let logic = move |k: DatumSeq, v: DatumSeq| {
Expand Down Expand Up @@ -562,9 +588,14 @@ where
/// doing any unthinning transformation.
/// Therefore, it should be used when the appropriate transformation
/// was planned as part of a following MFP.
///
/// If `key` is specified, the function converts the arrangement to a collection. It uses either
/// the fueled `flat_map` or `as_collection` method, depending on the flag
/// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
pub fn as_specific_collection(
&self,
key: Option<&[MirScalarExpr]>,
flags: &ContextFlags,
) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
// Any operator that uses this method was told to use a particular
// collection during LIR planning, where we should have made
Expand All @@ -576,11 +607,20 @@ where
.collection
.clone()
.expect("The unarranged collection doesn't exist."),
Some(key) => self
.arranged
.get(key)
.unwrap_or_else(|| panic!("The collection arranged by {:?} doesn't exist.", key))
.as_collection(),
Some(key) => {
let arranged = self.arranged.get(key).unwrap_or_else(|| {
panic!("The collection arranged by {:?} doesn't exist.", key)
});
if flags.enable_fueled_as_specific_collection {
let (ok, err) = arranged.flat_map(None, |borrow, t, r| {
Some((SharedRow::pack(borrow.iter()), t, r))
});
(ok.as_collection(), err)
} else {
#[allow(deprecated)]
arranged.as_collection()
}
}
}
}

Expand Down Expand Up @@ -723,6 +763,7 @@ where
mut mfp: MapFilterProject,
key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
until: Antichain<mz_repr::Timestamp>,
flags: &ContextFlags,
) -> (
Collection<S, mz_repr::Row, Diff>,
Collection<S, DataflowError, Diff>,
Expand All @@ -743,7 +784,7 @@ where

if mfp_plan.is_identity() && !has_key_val {
let key = key_val.map(|(k, _v)| k);
return self.as_specific_collection(key.as_deref());
return self.as_specific_collection(key.as_deref(), flags);
}
let (stream, errors) = self.flat_map(key_val, {
let mut datum_vec = DatumVec::new();
Expand Down Expand Up @@ -801,6 +842,7 @@ where
input_key: Option<Vec<MirScalarExpr>>,
input_mfp: MapFilterProject,
until: Antichain<mz_repr::Timestamp>,
flags: &ContextFlags,
) -> Self {
if collections == Default::default() {
return self;
Expand All @@ -820,8 +862,12 @@ where
.iter()
.any(|(key, _, _)| !self.arranged.contains_key(key));
if form_raw_collection && self.collection.is_none() {
self.collection =
Some(self.as_collection_core(input_mfp, input_key.map(|k| (k, None)), until));
self.collection = Some(self.as_collection_core(
input_mfp,
input_key.map(|k| (k, None)),
until,
flags,
));
}
for (key, _, thinning) in collections.arranged {
if !self.arranged.contains_key(&key) {
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/render/flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ where
) -> CollectionBundle<G> {
let until = self.until.clone();
let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
let (ok_collection, err_collection) = input.as_specific_collection(input_key.as_deref());
let (ok_collection, err_collection) =
input.as_specific_collection(input_key.as_deref(), &self.flags);
let stream = ok_collection.inner;
let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, _| {
Box::new(move |input, ok_output, err_output| {
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/render/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ where
// TODO: extract closure from the first stage in the join plan, should it exist.
// TODO: apply that closure in `flat_map_ref` rather than calling `.collection`.
let (joined, errs) = inputs[linear_plan.source_relation]
.as_specific_collection(linear_plan.source_key.as_deref());
.as_specific_collection(linear_plan.source_key.as_deref(), &self.flags);
errors.push(errs.enter_region(inner));
let mut joined = joined.enter_region(inner);

Expand Down
7 changes: 6 additions & 1 deletion src/compute/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ where
let (permutation, thinning) = permutation_for_arrangement(key, unthinned_arity);
let mut mfp = MapFilterProject::new(unthinned_arity);
mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
bundle.as_collection_core(mfp, Some((key.clone(), None)), self.until.clone())
bundle.as_collection_core(
mfp,
Some((key.clone(), None)),
self.until.clone(),
&self.flags,
)
};

// Attach logging of dataflow errors.
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/render/top_k.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where
input: CollectionBundle<G>,
top_k_plan: TopKPlan,
) -> CollectionBundle<G> {
let (ok_input, err_input) = input.as_specific_collection(None);
let (ok_input, err_input) = input.as_specific_collection(None, &self.flags);

// We create a new region to compartmentalize the topk logic.
let (ok_result, err_collection) = ok_input.scope().region_named("TopK", |inner| {
Expand Down

0 comments on commit 4711397

Please sign in to comment.