Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl/changefeedccl: add compression options for webhook sink #138872

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 130 additions & 14 deletions pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,49 @@
package cdctest

import (
"compress/gzip"
"crypto/tls"
"io"
"net/http"
"net/http/httptest"
"sync"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

var (
encoders = sync.Pool{}
decoders = sync.Pool{}
getEncoder = func(dst io.Writer) *gzip.Writer {
if v := encoders.Get(); v != nil {
enc := v.(*gzip.Writer)
enc.Reset(dst)
return enc
}
return gzip.NewWriter(dst)
}
putEncoder = func(enc *gzip.Writer) {
encoders.Put(enc)
}
getDecoder = func(src io.Reader) (*gzip.Reader, error) {
if v := decoders.Get(); v != nil {
dec := v.(*gzip.Reader)
err := dec.Reset(src)
if err != nil {
dec = nil
return gzip.NewReader(src)

}
return dec, nil
}
return gzip.NewReader(src)
}
putDecoder = func(dec *gzip.Reader) {
decoders.Put(dec)
}
)

// MockWebhookSink is the Webhook sink used in tests.
type MockWebhookSink struct {
basicAuth bool
Expand All @@ -23,9 +57,11 @@ type MockWebhookSink struct {
mu struct {
syncutil.Mutex
numCalls int
responseBodies map[int][]byte
statusCodes []int
statusCodesIndex int
rows []string
lastHeaders http.Header
notify chan struct{}
}
}
Expand All @@ -37,6 +73,13 @@ func StartMockWebhookSinkInsecure() (*MockWebhookSink, error) {
return s, nil
}

// LastRequestHeaders returns the headers from the most recent request.
func (s *MockWebhookSink) LastRequestHeaders() http.Header {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.lastHeaders
}

// StartMockWebhookSink creates and starts a mock webhook sink for tests.
func StartMockWebhookSink(certificate *tls.Certificate) (*MockWebhookSink, error) {
s := makeMockWebhookSink()
Expand All @@ -51,7 +94,7 @@ func StartMockWebhookSink(certificate *tls.Certificate) (*MockWebhookSink, error
}

// StartMockWebhookSinkSecure creates and starts a mock webhook sink server that
// requires clients to provide client certificates for authentication
// requires clients to provide client certificates for authentication.
func StartMockWebhookSinkSecure(certificate *tls.Certificate) (*MockWebhookSink, error) {
s := makeMockWebhookSink()
if certificate == nil {
Expand Down Expand Up @@ -88,6 +131,7 @@ func StartMockWebhookSinkWithBasicAuth(
func makeMockWebhookSink() *MockWebhookSink {
s := &MockWebhookSink{}
s.mu.statusCodes = []int{http.StatusOK}
s.mu.responseBodies = make(map[int][]byte)
s.server = httptest.NewUnstartedServer(http.HandlerFunc(s.requestHandler))
return s
}
Expand All @@ -114,6 +158,24 @@ func (s *MockWebhookSink) SetStatusCodes(statusCodes []int) {
s.mu.statusCodesIndex = 0
}

// SetResponse sets the response body and status code to use when responding to
// a request. Useful for testing error handling behavior on client side.
func (s *MockWebhookSink) SetResponse(statusCode int, responseBody []byte) {
s.mu.Lock()
defer s.mu.Unlock()
numOfStatusCodes := len(s.mu.statusCodes)
s.mu.statusCodes = append(s.mu.statusCodes, statusCode)
s.mu.responseBodies[numOfStatusCodes] = responseBody
}

// ClearStatusCodes resets status codes to empty list and resets the index.
func (s *MockWebhookSink) ClearStatusCodes() {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.statusCodes = []int{}
s.mu.statusCodesIndex = 0
}

// Close closes the mock Webhook sink.
func (s *MockWebhookSink) Close() {
s.server.Close()
Expand All @@ -131,7 +193,7 @@ func (s *MockWebhookSink) Latest() string {
return latest
}

// Pop deletes and returns the oldest message from MockWebhookSink
// Pop deletes and returns the oldest message from MockWebhookSink.
func (s *MockWebhookSink) Pop() string {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -181,22 +243,76 @@ func (s *MockWebhookSink) requestHandler(hw http.ResponseWriter, hr *http.Reques

func (s *MockWebhookSink) publish(hw http.ResponseWriter, hr *http.Request) error {
defer hr.Body.Close()
row, err := io.ReadAll(hr.Body)
if err != nil {
return err
}

s.mu.Lock()
defer s.mu.Unlock()
s.mu.lastHeaders = hr.Header.Clone()
s.mu.numCalls++
if s.mu.statusCodes[s.mu.statusCodesIndex] >= http.StatusOK && s.mu.statusCodes[s.mu.statusCodesIndex] < http.StatusMultipleChoices {
s.mu.rows = append(s.mu.rows, string(row))
if s.mu.notify != nil {
close(s.mu.notify)
s.mu.notify = nil
statusCode := s.mu.statusCodes[s.mu.statusCodesIndex]
resBody, hasResBody := s.mu.responseBodies[s.mu.statusCodesIndex]
s.mu.statusCodesIndex = (s.mu.statusCodesIndex + 1) % len(s.mu.statusCodes)
s.mu.Unlock()

gzCompression := hr.Header.Get("Content-Encoding") == "gzip"

if statusCode < http.StatusOK || statusCode > http.StatusMultipleChoices {
if !hasResBody {
hw.WriteHeader(statusCode)
return nil
}

if gzCompression {
hw.Header().Set("Content-Encoding", "gzip")
gw := getEncoder(hw)
defer putEncoder(gw)
hw.WriteHeader(statusCode)
if hasResBody {
if _, err := gw.Write(resBody); err != nil {
return errors.Wrap(err, "failed to write response body")
}
if err := gw.Close(); err != nil {
return errors.Wrap(err, "failed to flush gzip writer")
}
}
return nil
}
}

var row []byte
if gzCompression {
gzReader, err := getDecoder(hr.Body)
if err != nil {
return errors.Wrap(err, "failed to create gzip reader")
}
defer putDecoder(gzReader)
row, err = io.ReadAll(gzReader)
if err != nil {
return errors.Wrap(err, "failed to read compressed request body")
}
if err = gzReader.Close(); err != nil {
return errors.Wrap(err, "failed to close gzip reader")
}
} else {
var err error
row, err = io.ReadAll(hr.Body)
if err != nil {
return errors.Wrap(err, "failed to read plain request body")
}
}

s.mu.Lock()
s.mu.rows = append(s.mu.rows, string(row))
if s.mu.notify != nil {
close(s.mu.notify)
s.mu.notify = nil
}
s.mu.Unlock()

hw.WriteHeader(statusCode)
if hasResBody {
if _, err := hw.Write(resBody); err != nil {
return errors.Wrap(err, "failed to write response body")
}
}

hw.WriteHeader(s.mu.statusCodes[s.mu.statusCodesIndex])
s.mu.statusCodesIndex = (s.mu.statusCodesIndex + 1) % len(s.mu.statusCodes)
return nil
}
9 changes: 7 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5921,8 +5921,8 @@ func TestChangefeedErrors(t *testing.T) {
`webhook-https://fake-host`,
)
sqlDB.ExpectErrWithTimeout(
t, `this sink is incompatible with option compression`,
asg0451 marked this conversation as resolved.
Show resolved Hide resolved
`CREATE CHANGEFEED FOR foo INTO $1 WITH compression='gzip'`,
t, `unknown compression: invalid, valid values are 'gzip' and 'zstd'`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH compression='invalid'`,
`webhook-https://fake-host`,
)
sqlDB.ExpectErrWithTimeout(
Expand Down Expand Up @@ -5965,6 +5965,11 @@ func TestChangefeedErrors(t *testing.T) {
t, `unknown on_error: not_valid, valid values are 'pause' and 'fail'`,
`CREATE CHANGEFEED FOR foo into $1 WITH on_error='not_valid'`,
`kafka://nope`)
// Sanity check for options compatibility validation.
sqlDB.ExpectErrWithTimeout(
t, `this sink is incompatible with option compression`,
`CREATE CHANGEFEED FOR foo into $1 WITH compression='gzip'`,
`kafka://nope`)

sqlDB.ExpectErrWithTimeout(
t, `envelope=enriched is incompatible with SELECT statement`,
Expand Down
9 changes: 7 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaReg
var CloudStorageValidOptions = makeStringSet(OptCompression)

// WebhookValidOptions is options exclusive to webhook sink
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig)
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig, OptCompression)

// PubsubValidOptions is options exclusive to pubsub sink
var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig)
Expand Down Expand Up @@ -933,12 +933,17 @@ type WebhookSinkOptions struct {
JSONConfig SinkSpecificJSONConfig
AuthHeader string
ClientTimeout *time.Duration
Compression string
}

// GetWebhookSinkOptions includes arbitrary json to be interpreted
// by the webhook sink.
func (s StatementOptions) GetWebhookSinkOptions() (WebhookSinkOptions, error) {
o := WebhookSinkOptions{JSONConfig: s.getJSONValue(OptWebhookSinkConfig), AuthHeader: s.m[OptWebhookAuthHeader]}
o := WebhookSinkOptions{
JSONConfig: s.getJSONValue(OptWebhookSinkConfig),
AuthHeader: s.m[OptWebhookAuthHeader],
Compression: s.m[OptCompression],
}
timeout, err := s.getDurationValue(OptWebhookClientTimeout)
if err != nil {
return o, err
Expand Down
Loading