Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use exchange-based sequencing to broadcast internal commands #31147

Merged
merged 2 commits into from
Jan 28, 2025

Conversation

teskje
Copy link
Contributor

@teskje teskje commented Jan 22, 2025

The Timely Sequencer doesn't behave well when different workers instantiate their parts at points in time that are sufficiently spread apart. It delays commands by that creation time difference and is prone to spinning the CPU at 100%.

To work around these issues, this PR replaces the Sequencer used in storage for broadcasting internal commands with a dataflow that sequences by sending all commands through a single worker.

First commit introduces a simple version of the sequencer dataflow that assumes deterministic ordering of data sent through Timely channels. The second commit removes that assumption by explicitly keeping track of the command order and waiting for outstanding commands.

Motivation

  • This PR fixes a recognized bug.

Fixes https://github.com/MaterializeInc/database-issues/issues/8893

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

The Timely `Sequencer` doesn't behave well when different workers
instantiate their parts at points in time that are sufficiently spread
apart. It delays commands by that creation time difference and is prone
to spinning the CPU at 100%.

To work around these issues, this commit replaces the `Sequencer` used
in storage for broadcasting internal commands with a dataflow that
sequences by sending all commands through a single worker.
@teskje teskje force-pushed the storage-exchange-sequencer branch 2 times, most recently from b53175b to 234a272 Compare January 23, 2025 16:12
The previous command sequencer dataflow relied on the assumption that
updates passing through Timely channels are not reordered. That isn't an
assumption we should make, so this commit adds code to explicitly track
and restore the correct ordering by assigning indexes to transmitted
commands and then making sure that no indexes are skipped downstream.
@teskje teskje force-pushed the storage-exchange-sequencer branch from 234a272 to 508f73d Compare January 23, 2025 17:25
@teskje teskje marked this pull request as ready for review January 27, 2025 10:59
@teskje teskje requested a review from a team as a code owner January 27, 2025 10:59
Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine, left a comment about an alternative way to implementa the command distribution.

let (output_tx, output_rx) = mpsc::channel();
let activator = Rc::new(RefCell::new(None));

timely_worker.dataflow_named::<(), _, _>("command_sequencer", {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, but it essentially uses the index as a capability. I think there's an alternative design that uses Timely's capabilities, but I'm leaving it to you which one you'd like to implement here. I think the order from the source to the sequencer doesn't matter, as long as it's not reordered -- you could also think of this as a batch of data and a capability (worker, index). After the sequencer, it's simpler as there is only a single operator instance that retains a capability, so it could just use a counter as its capability. The data then would be a vectors of updates per time, i.e., Stream<_, Vec<Command>>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using timestamps first, instead of putting the index into the data and it turned out more complicated. Maybe I missed something though.

  • I want to use a Timestamp type of (worker, index), so (usize, u64). Tuples already implement Timestamp, but they don't implement Refines<()>, so I can't use them directly. Instead I have to implement a new timestamp type, which requires a bunch of boilerplate.
  • If I do the above, I think I'd still have to do the put-commands-into-a-map-and-wait-for-the-next-index thing in both the sequencer and the sink because there is no guarantee that an operator sees updates in it inputs ordered by time (right?). So in the end the code would look pretty much the same, just with a bunch of more timestamp impl and capability management code.

I think the order from the source to the sequencer doesn't matter, as long as it's not reordered

The "as long as it's not reordered" is important though! Correct me if I'm wrong, but I think this could happen:

  • Source sends command C1 at (0, 0).
  • Source sends command C2 at (0, 1).
  • Sequencer receives command C2.
  • Sequencer receives command C1.

Or is there any guarantee that updates with different times are received in time order?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to use a Timestamp type of (worker, index), ...

Ah, I'm not sure you do! At least, (worker, index) is a totally ordered type, where worker trumps index no matter what, and roughly everyone should wait for a worker to finish out all of its index before moving on to the next worker. I think you might want Product<worker, index>, which I think should implement Refines<()>. Does that sound right? (I'm trying to back out the intent from my partial understanding).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, though the thing I was considering would not involve waiting for frontiers or even looking at frontiers at all, but just using the timestamp to transport the command index (instead of putting it into the data as this PR does). If you don't look at frontiers, the partial order of times doesn't matter. It's entirely possible that working with frontiers would lead to a simpler implementation, but I don't know what that could look like!

I think you might want Product<worker, index>

I'm not so sure! The only thing we need is that the commands coming from one worker retain their order in the output, between workers there is no defined order. So we'd need a (worker, index) where instances are only comparable if worker is equal. But that becomes awkward because Timestamp assumes the existence of a minimum that you can downgrade to all other timestamps. So I guess an (Option<worker>, index) where (None, 0) is that minimum?

@teskje
Copy link
Contributor Author

teskje commented Jan 28, 2025

I'm going ahead and merging this since it improves the status quo. Happy to iterate if we can come up with a better/simpler way!

@teskje teskje merged commit 8f331b7 into MaterializeInc:main Jan 28, 2025
236 of 238 checks passed
@teskje teskje deleted the storage-exchange-sequencer branch January 28, 2025 09:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants