Skip to content

Commit

Permalink
ccl/changefeedccl: add compression options for webhook sink
Browse files Browse the repository at this point in the history
  • Loading branch information
massimo-ua committed Jan 11, 2025
1 parent 44bf6d4 commit 6d280f2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 19 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const (
OptExpirePTSAfter = `gc_protect_expires_after`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptWebhookCompression = `compression`
OptOnError = `on_error`
OptMetricsScope = `metrics_label`
OptUnordered = `unordered`
Expand Down Expand Up @@ -398,7 +399,7 @@ var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaReg
var CloudStorageValidOptions = makeStringSet(OptCompression)

// WebhookValidOptions is options exclusive to webhook sink
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig)
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig, OptWebhookCompression)

// PubsubValidOptions is options exclusive to pubsub sink
var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig)
Expand Down Expand Up @@ -925,6 +926,7 @@ type WebhookSinkOptions struct {
JSONConfig SinkSpecificJSONConfig
AuthHeader string
ClientTimeout *time.Duration
Compression string
}

// GetWebhookSinkOptions includes arbitrary json to be interpreted
Expand Down
88 changes: 70 additions & 18 deletions pkg/ccl/changefeedccl/sink_webhook_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package changefeedccl

import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
Expand All @@ -29,9 +30,12 @@ import (
)

const (
applicationTypeJSON = `application/json`
applicationTypeCSV = `text/csv`
authorizationHeader = `Authorization`
applicationTypeJSON = `application/json`
applicationTypeCSV = `text/csv`
authorizationHeader = `Authorization`
contentEncodingHeader = `Content-Encoding`
acceptEncodingHeader = `Accept-Encoding`
gzipEncoding = `gzip`
)

func isWebhookSink(u *url.URL) bool {
Expand All @@ -45,12 +49,13 @@ func isWebhookSink(u *url.URL) bool {
}

type webhookSinkClient struct {
ctx context.Context
format changefeedbase.FormatType
url sinkURL
authHeader string
batchCfg sinkBatchConfig
client *httputil.Client
ctx context.Context
format changefeedbase.FormatType
url sinkURL
authHeader string
batchCfg sinkBatchConfig
client *httputil.Client
compression string
}

var _ SinkClient = (*webhookSinkClient)(nil)
Expand All @@ -73,10 +78,11 @@ func makeWebhookSinkClient(
u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`)

sinkClient := &webhookSinkClient{
ctx: ctx,
authHeader: opts.AuthHeader,
format: encodingOpts.Format,
batchCfg: batchCfg,
ctx: ctx,
authHeader: opts.AuthHeader,
format: encodingOpts.Format,
batchCfg: batchCfg,
compression: opts.Compression,
}

var connTimeout time.Duration
Expand Down Expand Up @@ -178,7 +184,22 @@ func makeWebhookClient(
}

func (sc *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, error) {
req, err := http.NewRequestWithContext(sc.ctx, http.MethodPost, sc.url.String(), bytes.NewReader(body))
var reader io.Reader = bytes.NewReader(body)

// Apply compression if configured
if sc.compression == gzipEncoding {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := gz.Write(body); err != nil {
return nil, errors.Wrap(err, "failed to compress payload")
}
if err := gz.Close(); err != nil {
return nil, errors.Wrap(err, "failed to finalize compression")
}
reader = bytes.NewReader(buf.Bytes())
}

req, err := http.NewRequestWithContext(sc.ctx, http.MethodPost, sc.url.String(), reader)
if err != nil {
return nil, err
}
Expand All @@ -189,6 +210,11 @@ func (sc *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, erro
req.Header.Set("Content-Type", applicationTypeCSV)
}

if sc.compression == gzipEncoding {
req.Header.Set(contentEncodingHeader, gzipEncoding)
req.Header.Set(acceptEncodingHeader, gzipEncoding)
}

if sc.authHeader != "" {
req.Header.Set(authorizationHeader, sc.authHeader)
}
Expand Down Expand Up @@ -271,6 +297,11 @@ func validateWebhookOpts(
encodingOpts.KeyInValue = true
}

if opts.Compression != "" && opts.Compression != gzipEncoding {
return errors.Errorf(`unsupported compression type %s, supported types: %s`,
opts.Compression, gzipEncoding)
}

return nil
}

Expand Down Expand Up @@ -299,9 +330,10 @@ func (cb *webhookCSVBuffer) Close() (SinkPayload, error) {
}

type webhookJSONBuffer struct {
messages [][]byte
numBytes int
sc *webhookSinkClient
messages [][]byte
numBytes int
compressedBytes int
sc *webhookSinkClient
}

var _ BatchBuffer = (*webhookJSONBuffer)(nil)
Expand Down Expand Up @@ -334,7 +366,27 @@ func (jb *webhookJSONBuffer) Close() (SinkPayload, error) {
buffer.Write(msg)
}
buffer.WriteString(suffix)
return jb.sc.makePayloadForBytes(buffer.Bytes())

sinkPayload, err := jb.sc.makePayloadForBytes(buffer.Bytes())
if err != nil {
return nil, err
}

if jb.sc.compression != "" {
if req, ok := sinkPayload.(*http.Request); ok {
if closer, ok := req.Body.(io.Closer); ok {
defer closer.Close()
}
compressed, err := io.ReadAll(req.Body)
if err != nil {
return nil, errors.Wrap(err, "failed to read compressed payload")
}
jb.compressedBytes = len(compressed)
req.Body = io.NopCloser(bytes.NewReader(compressed))
}
}

return sinkPayload, nil
}

// MakeBatchBuffer implements the SinkClient interface
Expand Down

0 comments on commit 6d280f2

Please sign in to comment.