-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ccl/changefeedccl: add compression options for webhook sink #138872
base: master
Are you sure you want to change the base?
ccl/changefeedccl: add compression options for webhook sink #138872
Conversation
It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? Thank you for contributing to CockroachDB. Please ensure you have followed the guidelines for creating a PR. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
90f7858
to
9ab5e87
Compare
Thank you for updating your pull request. My owl senses detect your PR is good for review. Please keep an eye out for any test failures in CI. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
dc150e3
to
3c59821
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @massimo-ua! I have a few nits and a few questions, but this looks very good overall.
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
pkg/ccl/changefeedccl/compression.go
Outdated
// since we are using decompression only for reading error response body, we can use default reader | ||
return pgzip.NewReader(src) | ||
case sinkCompressionZstd: | ||
// zstd reader does not implement io.Closer interface, so we need to wrap it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The zstd reader does implement Close(), which we should call.
It may also be worth caching the readers instead of making a new one every time (see my next comment on doing the same for writers)... but that might be unnecessary for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zstd reader implements func (d *[Decoder] Close()
void
but io.ReadCloser
requires
func (io.Closer) Close() error
with error
that's why I had to wrap it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but your wrapper does not call the void Close method, where it should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asg0451 Thanks for the review. I'll work through the comments and update you once I'm done. |
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
1 similar comment
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
dace026
to
4866761
Compare
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
1 similar comment
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
cb4fa0f
to
505533e
Compare
@asg0451 Please review recent changes |
@@ -887,6 +887,7 @@ func (f *cloudStorageSinkFile) flushToStorage( | |||
if err := f.codec.Close(); err != nil { | |||
return err | |||
} | |||
f.codec = nil // Set to nil to prevent double close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reasoning behind this? did this issue come up in the tests? maybe it's worthwhile to make the wrappers more robust to double closing if this is an issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this came up during the tests. TestCloudStorageSinkFastGzip was panicking with "invalid memory address or nil pointer dereference" while attempting to invoke (*encWrapper).Close. This helped to fix the issue. As far as I understand, the issue is with cloudStorageSinkFile and its codec reference management. It closes the writer but doesn't reset the reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need some guidance here. Not sure we can address that on an encWrapper level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could have a closed bool
flag in the wrappers, set it on Close, and panic/err if it's used while closed. Reset could reset it. This could also prevent incorrect usage (like using it after closing it, which would be bad)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you feel about returning nil in case somebody is trying to close a closed wrapper but return error when Write or Read is called?
Returning nil on multiple attempts to Close the same reader is a default behaviour for gzip Writer and Reader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added bool flag to indicate the underlying encoder/decoder is closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds good
2648d6e
to
3889c4f
Compare
pkg/ccl/changefeedccl/compression.go
Outdated
// is recycled after closing rather than being discarded. | ||
func (e *encWrapper) Close() error { | ||
var err error | ||
if fErr := e.encoder.Flush(); fErr != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to call Flush here. Did we do it before? is Close not enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. There is no need to call flush since according to the docs close flushes unwritten data to reader. I'll remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -887,6 +887,7 @@ func (f *cloudStorageSinkFile) flushToStorage( | |||
if err := f.codec.Close(); err != nil { | |||
return err | |||
} | |||
f.codec = nil // Set to nil to prevent double close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could have a closed bool
flag in the wrappers, set it on Close, and panic/err if it's used while closed. Reset could reset it. This could also prevent incorrect usage (like using it after closing it, which would be bad)
230a0d8
to
ed1168d
Compare
ed1168d
to
5bbc73a
Compare
I noticed the race issues during Bazel extended CI. I don't believe there is a connection to the changes I've made. The race appears when spanInner.isNoop() and Span.reset() in pkg/utils/tracing are attempting to read and write to a shared resource. Do you think I should ignore this? |
Feel free to ignore bazel extended ci. |
Then that's it. There are no more changes left in this PR. Thank you for the review. Let me know if there is anything |
pkg/ccl/changefeedccl/compression.go
Outdated
Reset(io.Reader) error | ||
} | ||
|
||
// encWrapper wraps an encoder with encoders pool. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: phrasing
// encWrapper wraps an encoder with encoders pool. | |
// encWrapper wraps an encoder and includes a pointer to the pool it's associated with. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
pkg/ccl/changefeedccl/compression.go
Outdated
|
||
func (e *encWrapper) Write(p []byte) (int, error) { | ||
if e.closed { | ||
return 0, errors.New("attempt to write on a closed encoder, reset it first") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make this error an errors.AssertionFailedf(..)
. ditto for the similar check in decWrapper.Read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
struct { | ||
key string | ||
value string | ||
}{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use the same key/value struct for both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
return errors.Wrap(err, "failed to create gzip reader") | ||
} | ||
defer putDecoder(gzReader) | ||
defer gzReader.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: check this error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Looking really good @massimo-ua. I've asked @andyyang890 to take a look as well since it's a somewhat tricky change. |
5bbc73a
to
4689f9a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good to me! I'll defer to @asg0451 for final approval.
pkg/ccl/changefeedccl/compression.go
Outdated
), | ||
settings.WithPublic) | ||
var ( | ||
useFastGzip = settings.RegisterBoolSetting( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't group this together with the sync pools
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
func (sc *webhookSinkClient) setRequestHeaders(req *http.Request) { | ||
switch sc.format { | ||
case changefeedbase.OptFormatJSON: | ||
req.Header.Set("Content-Type", applicationTypeJSON) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe add a constant for Content-Type
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
4689f9a
to
adcf255
Compare
// Close flushes and closes the underlying encoder, resets it, and returns the wrapper to the pool. | ||
// This differs from the inner encoder's Close by adding pool management - the wrapper | ||
// is recycled after closing rather than being discarded. | ||
func (e *encWrapper) Close() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading the cloudstorage code again, i'm wondering if the following can happen:
- we make a new cloudStorageSinkFile with a compression codec. we use it and close it
- the encWrapper is put back in the pool
- it's handed out to somebody else, eg a webhook sink, resetting it and its close flag
- due to some race or something the cloudStorageSinkFile writes to the codec again, causing data corruption
even if this can't happen now, is there a way we can prevent this from being possible? eg maybe we store the encoders in the pool and not the wrappers so that a closed encWrapper will always stay closed. maybe we just require that any f.codec.Close()
is immediately followed by f.codec = nil
. etc
what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think that storing the encoders instead of wrappers in the pool will help to resolve the issue with possible concurrent access because the problem is that during the close operation anyone who owns the reference to encoder through reference to wrapper can write to encoder until the boolean flag is set to false. Once the flag is false attempt to write returns error. And it might only happen on close because when clients get encoders from the pool they get reference after reset is applied so nobody can access it before that. It looks to that it can be easily fixed by adding mutex so we can set wrapper to closed state in the critical section, this in turn will enforce anyone who wants to write wait until it is unlocked and they shouldn’t be able to write to underlying code while open->closed transition is happening. Does it make sense to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we won’t need to ask clients to do a f.codec = nil. The client will be getting error on their side so they can decide how to sync codec state on their side in case its okay to have multiple references to the same codec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure i follow - how will mutexes help? If i have an encWrapper e
and call Close, wait a bit, then call Write again, and in the meantime someone else has gotten it back from the pool. That means that the closed flag will be reset, as has the dest writer, and i'll write to their output, causing corruption. concurrent calls arent required for this to happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I've double-checked the code and yes, it makes total sense. The encWrapper might be put back in the pool while still referenced by cloudStorageSinkFile. Another goroutine could get this wrapper from the pool and reset it.
The original cloudStorageSinkFile might still try to use it. It's highly possible. The case I was trying to describe it's another potential issue when Close is called but concurrent requests might still write to the codec before the closed flag is set to true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
d78e559
to
0acf5d0
Compare
Release note (sql change): Added compression support for changefeed webhook sinks. This reduces network bandwidth and storage usage, improving performance and lowering costs. Users can enable compression by setting the compression=<algorithm> option. Supported algorithms are gzip and zstd. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-39392 Resolved: https://cockroachlabs.atlassian.net/browse/CRDB-42915 Issue: cockroachdb#132279
0acf5d0
to
50a2c2c
Compare
Added gzip compression option to the changefeed webhook sink to reduce both network bytes and storage bytes for the customer, leading to cost savings and potentially better performance. A minimum configuration to support for gzip.
Jira issue: CRDB-42915
Epic CRDB-39392
Issue #132279