Skip to content

Commit

Permalink
ccl/changefeedccl: add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
massimo-ua committed Jan 11, 2025
1 parent 6d280f2 commit 90f7858
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 44 deletions.
33 changes: 30 additions & 3 deletions pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package cdctest

import (
"compress/gzip"
"crypto/tls"
"io"
"net/http"
Expand All @@ -26,6 +27,7 @@ type MockWebhookSink struct {
statusCodes []int
statusCodesIndex int
rows []string
lastHeaders http.Header
notify chan struct{}
}
}
Expand All @@ -37,6 +39,13 @@ func StartMockWebhookSinkInsecure() (*MockWebhookSink, error) {
return s, nil
}

// LastRequestHeaders returns the headers from the most recent request
func (s *MockWebhookSink) LastRequestHeaders() http.Header {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.lastHeaders
}

// StartMockWebhookSink creates and starts a mock webhook sink for tests.
func StartMockWebhookSink(certificate *tls.Certificate) (*MockWebhookSink, error) {
s := makeMockWebhookSink()
Expand Down Expand Up @@ -181,14 +190,32 @@ func (s *MockWebhookSink) requestHandler(hw http.ResponseWriter, hr *http.Reques

func (s *MockWebhookSink) publish(hw http.ResponseWriter, hr *http.Request) error {
defer hr.Body.Close()
row, err := io.ReadAll(hr.Body)

s.mu.Lock()
s.mu.lastHeaders = hr.Header.Clone()
s.mu.Unlock()

reader := hr.Body

if hr.Header.Get("Content-Encoding") == "gzip" {
gzReader, err := gzip.NewReader(reader)
if err != nil {
return errors.Wrap(err, "failed to create gzip reader")
}
defer gzReader.Close()
reader = gzReader
}

row, err := io.ReadAll(reader)
if err != nil {
return err
return errors.Wrap(err, "failed to read request body")
}

s.mu.Lock()
defer s.mu.Unlock()
s.mu.numCalls++
if s.mu.statusCodes[s.mu.statusCodesIndex] >= http.StatusOK && s.mu.statusCodes[s.mu.statusCodesIndex] < http.StatusMultipleChoices {
if s.mu.statusCodes[s.mu.statusCodesIndex] >= http.StatusOK &&
s.mu.statusCodes[s.mu.statusCodesIndex] < http.StatusMultipleChoices {
s.mu.rows = append(s.mu.rows, string(row))
if s.mu.notify != nil {
close(s.mu.notify)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5883,8 +5883,8 @@ func TestChangefeedErrors(t *testing.T) {
`webhook-https://fake-host`,
)
sqlDB.ExpectErrWithTimeout(
t, `this sink is incompatible with option compression`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH compression='gzip'`,
t, `unknown webhook_compression: invalid, the only valid value is 'gzip'`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_compression='invalid'`,
`webhook-https://fake-host`,
)
sqlDB.ExpectErrWithTimeout(
Expand Down
9 changes: 7 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const (
OptExpirePTSAfter = `gc_protect_expires_after`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptWebhookCompression = `compression`
OptWebhookCompression = `webhook_compression`
OptOnError = `on_error`
OptMetricsScope = `metrics_label`
OptUnordered = `unordered`
Expand Down Expand Up @@ -364,6 +364,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptWebhookSinkConfig: jsonOption,
OptWebhookAuthHeader: stringOption,
OptWebhookClientTimeout: durationOption,
OptWebhookCompression: enum("gzip"),
OptOnError: enum("pause", "fail"),
OptMetricsScope: stringOption,
OptUnordered: flagOption,
Expand Down Expand Up @@ -932,7 +933,11 @@ type WebhookSinkOptions struct {
// GetWebhookSinkOptions includes arbitrary json to be interpreted
// by the webhook sink.
func (s StatementOptions) GetWebhookSinkOptions() (WebhookSinkOptions, error) {
o := WebhookSinkOptions{JSONConfig: s.getJSONValue(OptWebhookSinkConfig), AuthHeader: s.m[OptWebhookAuthHeader]}
o := WebhookSinkOptions{
JSONConfig: s.getJSONValue(OptWebhookSinkConfig),
AuthHeader: s.m[OptWebhookAuthHeader],
Compression: s.m[OptWebhookCompression],
}
timeout, err := s.getDurationValue(OptWebhookClientTimeout)
if err != nil {
return o, err
Expand Down
157 changes: 157 additions & 0 deletions pkg/ccl/changefeedccl/sink_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,3 +763,160 @@ func TestWebhookSinkRetry(t *testing.T) {
sinkDest.Close()
require.NoError(t, sinkSrc.Close())
}

func TestWebhookSinkCompression(t *testing.T) {
defer leaktest.AfterTest(t)()

webhookSinkCompressionTestFn := func(parallelism int) {
// Create a test certificate
cert, certEncoded, err := cdctest.NewCACertBase64Encoded()
require.NoError(t, err)

// Start mock webhook sink
sinkDest, err := cdctest.StartMockWebhookSink(cert)
require.NoError(t, err)

// Get sink options with compression enabled
opts := getGenericWebhookSinkOptions(struct {
key string
value string
}{
key: changefeedbase.OptWebhookCompression,
value: "gzip",
})

sinkDestHost, err := url.Parse(sinkDest.URL())
require.NoError(t, err)

params := sinkDestHost.Query()
params.Set(changefeedbase.SinkParamCACert, certEncoded)
sinkDestHost.RawQuery = params.Encode()

details := jobspb.ChangefeedDetails{
SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()),
Opts: opts.AsMap(),
}

sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{})
require.NoError(t, err)

// Test with compression
require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{},
[]byte("[1001]"),
[]byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"),
zeroTS,
zeroTS,
zeroAlloc))
require.NoError(t, sinkSrc.Flush(context.Background()))

// Verify compression headers are present
require.Equal(t, "gzip", sinkDest.LastRequestHeaders().Get("Content-Encoding"))

// Verify the content can be decompressed and matches expected
expected := "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":1}"
require.Equal(t, expected, sinkDest.Latest())

// Test invalid compression type
invalidOpts := getGenericWebhookSinkOptions(struct {
key string
value string
}{
key: changefeedbase.OptWebhookCompression,
value: "invalid",
})

details.Opts = invalidOpts.AsMap()
_, err = setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{})
require.Error(t, err)
require.Contains(t, err.Error(), "unsupported compression type")

require.NoError(t, sinkSrc.Close())
sinkDest.Close()
}

// Run tests with parallelism from 1-4 like other webhook sink tests
for i := 1; i <= 4; i++ {
webhookSinkCompressionTestFn(i)
}
}

func TestWebhookSinkCompressionWithBatching(t *testing.T) {
defer leaktest.AfterTest(t)()

batchingWithCompressionTestFn := func(parallelism int) {
cert, certEncoded, err := cdctest.NewCACertBase64Encoded()
require.NoError(t, err)
sinkDest, err := cdctest.StartMockWebhookSink(cert)
require.NoError(t, err)

// Configure both compression and batching
opts := getGenericWebhookSinkOptions(
struct {
key string
value string
}{
key: changefeedbase.OptWebhookCompression,
value: "gzip",
},
struct {
key string
value string
}{
key: changefeedbase.OptWebhookSinkConfig,
value: `{"Flush":{"Messages": 2, "Frequency": "1h"}}`,
},
)

sinkDestHost, err := url.Parse(sinkDest.URL())
require.NoError(t, err)

params := sinkDestHost.Query()
params.Set(changefeedbase.SinkParamCACert, certEncoded)
sinkDestHost.RawQuery = params.Encode()

details := jobspb.ChangefeedDetails{
SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()),
Opts: opts.AsMap(),
}

mt := timeutil.NewManualTime(timeutil.Now())
sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, mt)
require.NoError(t, err)

// Send first message - should not trigger batch
require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{},
[]byte("[1001]"),
[]byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"),
zeroTS,
zeroTS,
zeroAlloc))
require.Equal(t, "", sinkDest.Latest())

// Send second message - should trigger batch
require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{},
[]byte("[1002]"),
[]byte("{\"after\":{\"col1\":\"val2\",\"rowid\":1001},\"key\":[1002],\"topic:\":\"foo\"}"),
zeroTS,
zeroTS,
zeroAlloc))
require.NoError(t, sinkSrc.Flush(context.Background()))

// Verify compression headers
require.Equal(t, "gzip", sinkDest.LastRequestHeaders().Get("Content-Encoding"))

// Verify batched content
expected := "{\"payload\":[" +
"{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}," +
"{\"after\":{\"col1\":\"val2\",\"rowid\":1001},\"key\":[1002],\"topic:\":\"foo\"}" +
"],\"length\":2}"
require.Equal(t, expected, sinkDest.Latest())

require.NoError(t, sinkSrc.Close())
sinkDest.Close()
}

// Run tests with parallelism from 1-4
for i := 1; i <= 4; i++ {
batchingWithCompressionTestFn(i)
}
}
Loading

0 comments on commit 90f7858

Please sign in to comment.