Skip to content

Commit

Permalink
ccl/changefeedccl: add compression options for webhook sink
Browse files Browse the repository at this point in the history
Release note (sql change): Added compression support for
changefeed webhook sinks. This reduces network bandwidth and
storage usage, improving performance and lowering costs. Users
can enable compression by setting the compression=<algorithm>
option. Supported algorithms are gzip and zstd.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-39392
Resolved: https://cockroachlabs.atlassian.net/browse/CRDB-42915
Issue: #132279
  • Loading branch information
massimo-ua committed Jan 18, 2025
1 parent 76e9051 commit e8616aa
Show file tree
Hide file tree
Showing 7 changed files with 635 additions and 122 deletions.
137 changes: 123 additions & 14 deletions pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,49 @@
package cdctest

import (
"compress/gzip"
"crypto/tls"
"io"
"net/http"
"net/http/httptest"
"sync"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

var (
encoders = sync.Pool{}
decoders = sync.Pool{}
getEncoder = func(dst io.Writer) *gzip.Writer {
if v := encoders.Get(); v != nil {
enc := v.(*gzip.Writer)
enc.Reset(dst)
return enc
}
return gzip.NewWriter(dst)
}
putEncoder = func(enc *gzip.Writer) {
encoders.Put(enc)
}
getDecoder = func(src io.Reader) (*gzip.Reader, error) {
if v := decoders.Get(); v != nil {
dec := v.(*gzip.Reader)
err := dec.Reset(src)
if err != nil {
dec = nil
return gzip.NewReader(src)

}
return dec, nil
}
return gzip.NewReader(src)
}
putDecoder = func(dec *gzip.Reader) {
decoders.Put(dec)
}
)

// MockWebhookSink is the Webhook sink used in tests.
type MockWebhookSink struct {
basicAuth bool
Expand All @@ -23,9 +57,11 @@ type MockWebhookSink struct {
mu struct {
syncutil.Mutex
numCalls int
responseBodies map[int][]byte
statusCodes []int
statusCodesIndex int
rows []string
lastHeaders http.Header
notify chan struct{}
}
}
Expand All @@ -37,6 +73,13 @@ func StartMockWebhookSinkInsecure() (*MockWebhookSink, error) {
return s, nil
}

// LastRequestHeaders returns the headers from the most recent request.
func (s *MockWebhookSink) LastRequestHeaders() http.Header {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.lastHeaders
}

// StartMockWebhookSink creates and starts a mock webhook sink for tests.
func StartMockWebhookSink(certificate *tls.Certificate) (*MockWebhookSink, error) {
s := makeMockWebhookSink()
Expand All @@ -51,7 +94,7 @@ func StartMockWebhookSink(certificate *tls.Certificate) (*MockWebhookSink, error
}

// StartMockWebhookSinkSecure creates and starts a mock webhook sink server that
// requires clients to provide client certificates for authentication
// requires clients to provide client certificates for authentication.
func StartMockWebhookSinkSecure(certificate *tls.Certificate) (*MockWebhookSink, error) {
s := makeMockWebhookSink()
if certificate == nil {
Expand Down Expand Up @@ -88,6 +131,7 @@ func StartMockWebhookSinkWithBasicAuth(
func makeMockWebhookSink() *MockWebhookSink {
s := &MockWebhookSink{}
s.mu.statusCodes = []int{http.StatusOK}
s.mu.responseBodies = make(map[int][]byte)
s.server = httptest.NewUnstartedServer(http.HandlerFunc(s.requestHandler))
return s
}
Expand All @@ -114,6 +158,24 @@ func (s *MockWebhookSink) SetStatusCodes(statusCodes []int) {
s.mu.statusCodesIndex = 0
}

// SetResponse sets the response body and status code to use when responding to
// a request. Useful for testing error handling behavior on client side.
func (s *MockWebhookSink) SetResponse(statusCode int, responseBody []byte) {
s.mu.Lock()
defer s.mu.Unlock()
numOfStatusCodes := len(s.mu.statusCodes)
s.mu.statusCodes = append(s.mu.statusCodes, statusCode)
s.mu.responseBodies[numOfStatusCodes] = responseBody
}

// ClearStatusCodes resets status codes to empty list and resets the index.
func (s *MockWebhookSink) ClearStatusCodes() {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.statusCodes = []int{}
s.mu.statusCodesIndex = 0
}

// Close closes the mock Webhook sink.
func (s *MockWebhookSink) Close() {
s.server.Close()
Expand All @@ -131,7 +193,7 @@ func (s *MockWebhookSink) Latest() string {
return latest
}

// Pop deletes and returns the oldest message from MockWebhookSink
// Pop deletes and returns the oldest message from MockWebhookSink.
func (s *MockWebhookSink) Pop() string {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -181,22 +243,69 @@ func (s *MockWebhookSink) requestHandler(hw http.ResponseWriter, hr *http.Reques

func (s *MockWebhookSink) publish(hw http.ResponseWriter, hr *http.Request) error {
defer hr.Body.Close()
row, err := io.ReadAll(hr.Body)
if err != nil {
return err
}

s.mu.Lock()
defer s.mu.Unlock()
s.mu.lastHeaders = hr.Header.Clone()
s.mu.numCalls++
if s.mu.statusCodes[s.mu.statusCodesIndex] >= http.StatusOK && s.mu.statusCodes[s.mu.statusCodesIndex] < http.StatusMultipleChoices {
s.mu.rows = append(s.mu.rows, string(row))
if s.mu.notify != nil {
close(s.mu.notify)
s.mu.notify = nil
s.mu.Unlock()

gzCompression := hr.Header.Get("Content-Encoding") == "gzip"
statusCode := s.mu.statusCodes[s.mu.statusCodesIndex]
resBody, hasResBody := s.mu.responseBodies[s.mu.statusCodesIndex]
s.mu.statusCodesIndex = (s.mu.statusCodesIndex + 1) % len(s.mu.statusCodes)

if statusCode < http.StatusOK || statusCode > http.StatusMultipleChoices {
if !hasResBody {
hw.WriteHeader(statusCode)
return nil
}

if gzCompression {
hw.Header().Set("Content-Encoding", "gzip")
gw := getEncoder(hw)
defer putEncoder(gw)
hw.WriteHeader(statusCode)
if hasResBody {
if _, err := gw.Write(resBody); err != nil {
return errors.Wrap(err, "failed to write response body")
}
if err := gw.Close(); err != nil {
return errors.Wrap(err, "failed to flush gzip writer")
}
}
return nil
}
}

hw.WriteHeader(s.mu.statusCodes[s.mu.statusCodesIndex])
s.mu.statusCodesIndex = (s.mu.statusCodesIndex + 1) % len(s.mu.statusCodes)
var row []byte
if gzCompression {
gzReader, err := getDecoder(hr.Body)
if err != nil {
return errors.Wrap(err, "failed to create gzip reader")
}
defer putDecoder(gzReader)
defer gzReader.Close()
row, err = io.ReadAll(gzReader)
if err != nil {
return errors.Wrap(err, "failed to read compressed request body")
}
} else {
var err error
row, err = io.ReadAll(hr.Body)
if err != nil {
return errors.Wrap(err, "failed to read plain request body")
}
}

s.mu.rows = append(s.mu.rows, string(row))
if s.mu.notify != nil {
close(s.mu.notify)
s.mu.notify = nil
}

if _, err := hw.Write(resBody); err != nil {
return errors.Wrap(err, "failed to write response body")
}

return nil
}
9 changes: 7 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5883,8 +5883,8 @@ func TestChangefeedErrors(t *testing.T) {
`webhook-https://fake-host`,
)
sqlDB.ExpectErrWithTimeout(
t, `this sink is incompatible with option compression`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH compression='gzip'`,
t, `unknown compression: invalid, valid values are 'gzip' and 'zstd'`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH compression='invalid'`,
`webhook-https://fake-host`,
)
sqlDB.ExpectErrWithTimeout(
Expand Down Expand Up @@ -5927,6 +5927,11 @@ func TestChangefeedErrors(t *testing.T) {
t, `unknown on_error: not_valid, valid values are 'pause' and 'fail'`,
`CREATE CHANGEFEED FOR foo into $1 WITH on_error='not_valid'`,
`kafka://nope`)
// Sanity check for options compatibility validation.
sqlDB.ExpectErrWithTimeout(
t, `this sink is incompatible with option compression`,
`CREATE CHANGEFEED FOR foo into $1 WITH compression='gzip'`,
`kafka://nope`)
}

func TestChangefeedDescription(t *testing.T) {
Expand Down
9 changes: 7 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaReg
var CloudStorageValidOptions = makeStringSet(OptCompression)

// WebhookValidOptions is options exclusive to webhook sink
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig)
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig, OptCompression)

// PubsubValidOptions is options exclusive to pubsub sink
var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig)
Expand Down Expand Up @@ -922,12 +922,17 @@ type WebhookSinkOptions struct {
JSONConfig SinkSpecificJSONConfig
AuthHeader string
ClientTimeout *time.Duration
Compression string
}

// GetWebhookSinkOptions includes arbitrary json to be interpreted
// by the webhook sink.
func (s StatementOptions) GetWebhookSinkOptions() (WebhookSinkOptions, error) {
o := WebhookSinkOptions{JSONConfig: s.getJSONValue(OptWebhookSinkConfig), AuthHeader: s.m[OptWebhookAuthHeader]}
o := WebhookSinkOptions{
JSONConfig: s.getJSONValue(OptWebhookSinkConfig),
AuthHeader: s.m[OptWebhookAuthHeader],
Compression: s.m[OptCompression],
}
timeout, err := s.getDurationValue(OptWebhookClientTimeout)
if err != nil {
return o, err
Expand Down
Loading

0 comments on commit e8616aa

Please sign in to comment.