Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: inline small writes directly in consensus State #19383

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions doc/developer/design/20240401_persist_inline_writes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Persist Inline Writes

- Associated: [#24832](https://github.com/MaterializeInc/materialize/pull/24832)

## The Problem

A non-empty persist write currently always requires a write to S3 and a write to
CockroachDB. S3 writes are much slower than CRDB writes and incur a per-PUT
charge, so in the case of very small writes, this is wasteful of both latency
and cost.

Persist latencies directly impact the user experience of Materialize in a number
of ways. The above waste is particularly egregious in DDL, which may serially
perform a number of small persist writes.

## Success Criteria

Eligible persist writes have latency of `O(crdb write)` and not `O(s3 write)`.

## Out of Scope

- Latency guarantees: In particular, to keep persist metadata small, inline
writes are an optimization, they are not guaranteed.
- Read latencies.

## Solution Proposal

Persist internally has the concept of a _part_, which corresponds 1:1:1 with a
persist _blob_ and an _object_ in S3. Currently, all data in a persist shard is
stored in S3 and then a _hollow_ reference to it is stored in CRDB. This
reference also includes metadata, such as pushdown statistics and the encoded
size.

We make this reference instead a two variant enum: `Hollow` and `Inline`. The
`Inline` variant stores the same data as would be written to s3, but in an
encoded protobuf. This protobuf is only decoded in data fetch paths, and is
otherwise passed around as opaque bytes to save allocations and cpu cycles.
Pushdown statistics and the encoded size are both unnecessary for inline parts.

The persist state stored in CRDB is a control plane concept, so there is both a
performance and a stability risk from mixing the data plane into it. We reduce
the inline parts over time by making compaction flush them out to S3, never
inlining them. However, nothing prevents new writes from arriving faster than
compaction can pull them out. We protect the control plane with the following
two limits to create a hard upper bound on how much data can be inline:

- `persist_inline_writes_single_max_bytes`: An (exclusive) maximum size of a
write that persist will inline in metadata.
- `persist_inline_writes_total_max_bytes`: An (inclusive) maximum total size of
inline writes in metadata. Any attempted writes beyond this threshold will
instead fall through to the s3 path.

## Alternatives

- In addition to S3, also store _blobs_ in CRDB (or a third technology).

CRDB [self-documents as not a good fit][crdb-large-blob] for workloads
involving large binary blobs: "it's recommended to keep values under 1 MB to
ensure adequate performance". That said, given that inline writes are small
and directly translate to a CRDB write anyway, this would likely be a totally
workable alternative.

That said, CRDB is already a dominant cost of persist and a large part of that
is a function of the rate of SQL values changed, regardless of the size or
batching of those writes.

Additionally, putting the writes inline in persist state allows pubsub to push
them around for us, meaning that a reader that is subscribed/listening to the
shard doesn't hit the network when it gets the new seqno.

A third technology would not be worth the additional operational burden.

- Make S3 faster. This is not actionable in the short term.

[crdb-large-blob]: https://www.cockroachlabs.com/docs/stable/bytes#size

## Open Questions

- How do we tune the two thresholds?
- Should every inline write result in a compaction request for the new batch
immediately flushing it out to s3?
2 changes: 2 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
"enable_worker_core_affinity": "true",
"persist_batch_delete_enabled": "true",
"persist_fast_path_limit": "1000",
"persist_inline_writes_single_max_bytes": "4096",
"persist_inline_writes_total_max_bytes": "1048576",
"persist_pubsub_client_enabled": "true",
"persist_pubsub_push_diff_enabled": "true",
"persist_sink_minimum_batch_updates": "128",
Expand Down
2 changes: 0 additions & 2 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ breaking:
- expr/src/relation.proto
# reason: not yet released
- mysql-util/src/desc.proto
# reason: working around a false positive in buf's breaking change lint
- persist-client/src/internal/state.proto
# reason: does currently not require backward-compatibility
- storage-client/src/client.proto
# reason: does currently not require backward-compatibility
Expand Down
11 changes: 7 additions & 4 deletions src/ore/src/lgbytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ impl<T: Copy> AsRef<[T]> for MetricsRegion<T> {
}
}

impl From<Arc<MetricsRegion<u8>>> for LgBytes {
fn from(region: Arc<MetricsRegion<u8>>) -> Self {
LgBytes { offset: 0, region }
}
}

impl AsRef<[u8]> for LgBytes {
fn as_ref(&self) -> &[u8] {
// This implementation of [bytes::Buf] chooses to panic instead of
Expand Down Expand Up @@ -267,10 +273,7 @@ impl LgBytesOpMetrics {
/// region, falling back to a heap allocation.
pub fn try_mmap<T: AsRef<[u8]>>(&self, buf: T) -> LgBytes {
let region = self.try_mmap_region(buf);
LgBytes {
offset: 0,
region: Arc::new(region),
}
LgBytes::from(Arc::new(region))
}

/// Attempts to copy the given buf into an lgalloc managed file-based mapped
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ fn main() {
// is to re-run if any file in the crate changes; that's still a bit too
// broad, but it's better.
.emit_rerun_if_changed(false)
.extern_path(".mz_persist", "::mz_persist")
.extern_path(".mz_persist_types", "::mz_persist_types")
.extern_path(".mz_proto", "::mz_proto")
.compile_with_config(
Expand Down
Loading