diff --git a/pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go b/pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go index c8676afa3bec..edc791bae88b 100644 --- a/pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go +++ b/pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go @@ -6,15 +6,49 @@ package cdctest import ( + "compress/gzip" "crypto/tls" "io" "net/http" "net/http/httptest" + "sync" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) +var ( + encoders = sync.Pool{} + decoders = sync.Pool{} + getEncoder = func(dst io.Writer) *gzip.Writer { + if v := encoders.Get(); v != nil { + enc := v.(*gzip.Writer) + enc.Reset(dst) + return enc + } + return gzip.NewWriter(dst) + } + putEncoder = func(enc *gzip.Writer) { + encoders.Put(enc) + } + getDecoder = func(src io.Reader) (*gzip.Reader, error) { + if v := decoders.Get(); v != nil { + dec := v.(*gzip.Reader) + err := dec.Reset(src) + if err != nil { + dec = nil + return gzip.NewReader(src) + + } + return dec, nil + } + return gzip.NewReader(src) + } + putDecoder = func(dec *gzip.Reader) { + decoders.Put(dec) + } +) + // MockWebhookSink is the Webhook sink used in tests. type MockWebhookSink struct { basicAuth bool @@ -23,9 +57,11 @@ type MockWebhookSink struct { mu struct { syncutil.Mutex numCalls int + responseBodies map[int][]byte statusCodes []int statusCodesIndex int rows []string + lastHeaders http.Header notify chan struct{} } } @@ -37,6 +73,13 @@ func StartMockWebhookSinkInsecure() (*MockWebhookSink, error) { return s, nil } +// LastRequestHeaders returns the headers from the most recent request. +func (s *MockWebhookSink) LastRequestHeaders() http.Header { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.lastHeaders +} + // StartMockWebhookSink creates and starts a mock webhook sink for tests. func StartMockWebhookSink(certificate *tls.Certificate) (*MockWebhookSink, error) { s := makeMockWebhookSink() @@ -51,7 +94,7 @@ func StartMockWebhookSink(certificate *tls.Certificate) (*MockWebhookSink, error } // StartMockWebhookSinkSecure creates and starts a mock webhook sink server that -// requires clients to provide client certificates for authentication +// requires clients to provide client certificates for authentication. func StartMockWebhookSinkSecure(certificate *tls.Certificate) (*MockWebhookSink, error) { s := makeMockWebhookSink() if certificate == nil { @@ -88,6 +131,7 @@ func StartMockWebhookSinkWithBasicAuth( func makeMockWebhookSink() *MockWebhookSink { s := &MockWebhookSink{} s.mu.statusCodes = []int{http.StatusOK} + s.mu.responseBodies = make(map[int][]byte) s.server = httptest.NewUnstartedServer(http.HandlerFunc(s.requestHandler)) return s } @@ -114,6 +158,24 @@ func (s *MockWebhookSink) SetStatusCodes(statusCodes []int) { s.mu.statusCodesIndex = 0 } +// SetResponse sets the response body and status code to use when responding to +// a request. Useful for testing error handling behavior on client side. +func (s *MockWebhookSink) SetResponse(statusCode int, responseBody []byte) { + s.mu.Lock() + defer s.mu.Unlock() + numOfStatusCodes := len(s.mu.statusCodes) + s.mu.statusCodes = append(s.mu.statusCodes, statusCode) + s.mu.responseBodies[numOfStatusCodes] = responseBody +} + +// ClearStatusCodes resets status codes to empty list and resets the index. +func (s *MockWebhookSink) ClearStatusCodes() { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.statusCodes = []int{} + s.mu.statusCodesIndex = 0 +} + // Close closes the mock Webhook sink. func (s *MockWebhookSink) Close() { s.server.Close() @@ -131,7 +193,7 @@ func (s *MockWebhookSink) Latest() string { return latest } -// Pop deletes and returns the oldest message from MockWebhookSink +// Pop deletes and returns the oldest message from MockWebhookSink. func (s *MockWebhookSink) Pop() string { s.mu.Lock() defer s.mu.Unlock() @@ -181,22 +243,69 @@ func (s *MockWebhookSink) requestHandler(hw http.ResponseWriter, hr *http.Reques func (s *MockWebhookSink) publish(hw http.ResponseWriter, hr *http.Request) error { defer hr.Body.Close() - row, err := io.ReadAll(hr.Body) - if err != nil { - return err - } + s.mu.Lock() - defer s.mu.Unlock() + s.mu.lastHeaders = hr.Header.Clone() s.mu.numCalls++ - if s.mu.statusCodes[s.mu.statusCodesIndex] >= http.StatusOK && s.mu.statusCodes[s.mu.statusCodesIndex] < http.StatusMultipleChoices { - s.mu.rows = append(s.mu.rows, string(row)) - if s.mu.notify != nil { - close(s.mu.notify) - s.mu.notify = nil + s.mu.Unlock() + + gzCompression := hr.Header.Get("Content-Encoding") == "gzip" + statusCode := s.mu.statusCodes[s.mu.statusCodesIndex] + resBody, hasResBody := s.mu.responseBodies[s.mu.statusCodesIndex] + s.mu.statusCodesIndex = (s.mu.statusCodesIndex + 1) % len(s.mu.statusCodes) + + if statusCode < http.StatusOK || statusCode > http.StatusMultipleChoices { + if !hasResBody { + hw.WriteHeader(statusCode) + return nil + } + + if gzCompression { + hw.Header().Set("Content-Encoding", "gzip") + gw := getEncoder(hw) + defer putEncoder(gw) + hw.WriteHeader(statusCode) + if hasResBody { + if _, err := gw.Write(resBody); err != nil { + return errors.Wrap(err, "failed to write response body") + } + if err := gw.Close(); err != nil { + return errors.Wrap(err, "failed to flush gzip writer") + } + } + return nil } } - hw.WriteHeader(s.mu.statusCodes[s.mu.statusCodesIndex]) - s.mu.statusCodesIndex = (s.mu.statusCodesIndex + 1) % len(s.mu.statusCodes) + var row []byte + if gzCompression { + gzReader, err := getDecoder(hr.Body) + if err != nil { + return errors.Wrap(err, "failed to create gzip reader") + } + defer putDecoder(gzReader) + defer gzReader.Close() + row, err = io.ReadAll(gzReader) + if err != nil { + return errors.Wrap(err, "failed to read compressed request body") + } + } else { + var err error + row, err = io.ReadAll(hr.Body) + if err != nil { + return errors.Wrap(err, "failed to read plain request body") + } + } + + s.mu.rows = append(s.mu.rows, string(row)) + if s.mu.notify != nil { + close(s.mu.notify) + s.mu.notify = nil + } + + if _, err := hw.Write(resBody); err != nil { + return errors.Wrap(err, "failed to write response body") + } + return nil } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 37df5ed9ef94..049394fe697f 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -5883,8 +5883,8 @@ func TestChangefeedErrors(t *testing.T) { `webhook-https://fake-host`, ) sqlDB.ExpectErrWithTimeout( - t, `this sink is incompatible with option compression`, - `CREATE CHANGEFEED FOR foo INTO $1 WITH compression='gzip'`, + t, `unknown compression: invalid, valid values are 'gzip' and 'zstd'`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH compression='invalid'`, `webhook-https://fake-host`, ) sqlDB.ExpectErrWithTimeout( @@ -5927,6 +5927,11 @@ func TestChangefeedErrors(t *testing.T) { t, `unknown on_error: not_valid, valid values are 'pause' and 'fail'`, `CREATE CHANGEFEED FOR foo into $1 WITH on_error='not_valid'`, `kafka://nope`) + // Sanity check for options compatibility validation. + sqlDB.ExpectErrWithTimeout( + t, `this sink is incompatible with option compression`, + `CREATE CHANGEFEED FOR foo into $1 WITH compression='gzip'`, + `kafka://nope`) } func TestChangefeedDescription(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 988fede6cfca..fca2849f7991 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -395,7 +395,7 @@ var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaReg var CloudStorageValidOptions = makeStringSet(OptCompression) // WebhookValidOptions is options exclusive to webhook sink -var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig) +var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig, OptCompression) // PubsubValidOptions is options exclusive to pubsub sink var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig) @@ -922,12 +922,17 @@ type WebhookSinkOptions struct { JSONConfig SinkSpecificJSONConfig AuthHeader string ClientTimeout *time.Duration + Compression string } // GetWebhookSinkOptions includes arbitrary json to be interpreted // by the webhook sink. func (s StatementOptions) GetWebhookSinkOptions() (WebhookSinkOptions, error) { - o := WebhookSinkOptions{JSONConfig: s.getJSONValue(OptWebhookSinkConfig), AuthHeader: s.m[OptWebhookAuthHeader]} + o := WebhookSinkOptions{ + JSONConfig: s.getJSONValue(OptWebhookSinkConfig), + AuthHeader: s.m[OptWebhookAuthHeader], + Compression: s.m[OptCompression], + } timeout, err := s.getDurationValue(OptWebhookClientTimeout) if err != nil { return o, err diff --git a/pkg/ccl/changefeedccl/compression.go b/pkg/ccl/changefeedccl/compression.go index 390e88a064dc..04901aef5cf0 100644 --- a/pkg/ccl/changefeedccl/compression.go +++ b/pkg/ccl/changefeedccl/compression.go @@ -9,6 +9,7 @@ import ( stdgzip "compress/gzip" "io" "strings" + "sync" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" @@ -17,41 +18,128 @@ import ( "github.com/klauspost/pgzip" ) -var useFastGzip = settings.RegisterBoolSetting( - settings.ApplicationLevel, - "changefeed.fast_gzip.enabled", - "use fast gzip implementation", - metamorphic.ConstantWithTestBool( - "changefeed.fast_gzip.enabled", true, - ), - settings.WithPublic) +var ( + useFastGzip = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "changefeed.fast_gzip.enabled", + "use fast gzip implementation", + metamorphic.ConstantWithTestBool( + "changefeed.fast_gzip.enabled", true, + ), + settings.WithPublic) + gzipEncoderPool = sync.Pool{} + fastGzipEncoderPool = sync.Pool{} + zstdEncoderPool = sync.Pool{} + gzipDecoderPool = sync.Pool{} + zstdDecoderPool = sync.Pool{} +) type compressionAlgo string -const sinkCompressionGzip compressionAlgo = "gzip" -const sinkCompressionZstd compressionAlgo = "zstd" +const ( + sinkCompressionGzip compressionAlgo = "gzip" + sinkCompressionZstd compressionAlgo = "zstd" +) func (a compressionAlgo) enabled() bool { return a != "" } +type zstdDecoder struct { + *zstd.Decoder +} + +func (z *zstdDecoder) Close() error { + z.Decoder.Close() + return nil +} + +func getFromPullOrCreate[T any]( + pool *sync.Pool, create func() (T, error), reset func(v T) error, +) (_ T, done func(), _ error) { + var zero T + createDone := func(v T) func() { + return func() { + pool.Put(v) + } + } + if p := pool.Get(); p != nil { + v, ok := p.(T) + if !ok { + return zero, nil, errors.AssertionFailedf("failed to cast pool value to %T", zero) + } + if err := reset(v); err != nil { + return zero, nil, errors.Wrapf(err, "failed to reset coder %T", zero) + } + return v, createDone(v), nil + } + + v, err := create() + + if err != nil { + return zero, nil, err + } + return v, createDone(v), nil +} + // newCompressionCodec returns compression codec for the specified algorithm, // which writes compressed data to the destination. // TODO(yevgeniy): Support compression configuration (level, speed, etc). // TODO(yevgeniy): Add telemetry. func newCompressionCodec( algo compressionAlgo, sv *settings.Values, dest io.Writer, -) (io.WriteCloser, error) { +) (_ io.WriteCloser, done func(), _ error) { switch algo { case sinkCompressionGzip: if useFastGzip.Get(sv) { - return pgzip.NewWriterLevel(dest, pgzip.DefaultCompression) + create := func() (*pgzip.Writer, error) { + return pgzip.NewWriterLevel(dest, pgzip.DefaultCompression) + } + reset := func(v *pgzip.Writer) error { v.Reset(dest); return nil } + return getFromPullOrCreate(&fastGzipEncoderPool, create, reset) + } + + create := func() (*stdgzip.Writer, error) { + return stdgzip.NewWriterLevel(dest, stdgzip.DefaultCompression) + } + reset := func(v *stdgzip.Writer) error { v.Reset(dest); return nil } + return getFromPullOrCreate(&gzipEncoderPool, create, reset) + case sinkCompressionZstd: + create := func() (*zstd.Encoder, error) { + return zstd.NewWriter(dest, zstd.WithEncoderLevel(zstd.SpeedFastest)) + } + reset := func(v *zstd.Encoder) error { v.Reset(dest); return nil } + return getFromPullOrCreate(&zstdEncoderPool, create, reset) + default: + return nil, nil, errors.AssertionFailedf("unsupported compression algorithm %q", algo) + } +} + +// newDecompressionReader returns decompression reader for the specified algorithm +func newDecompressionReader( + algo compressionAlgo, src io.Reader, +) (_ io.ReadCloser, done func(), _ error) { + switch algo { + case sinkCompressionGzip: + // since we are using decompression only for reading error response body, we can use default reader + create := func() (*pgzip.Reader, error) { + return pgzip.NewReader(src) } - return stdgzip.NewWriterLevel(dest, stdgzip.DefaultCompression) + reset := func(v *pgzip.Reader) error { return v.Reset(src) } + return getFromPullOrCreate(&gzipDecoderPool, create, reset) case sinkCompressionZstd: - return zstd.NewWriter(dest, zstd.WithEncoderLevel(zstd.SpeedFastest)) + // zstd reader does not implement io.Closer interface, so we need to wrap it + create := func() (*zstdDecoder, error) { + decoder, err := zstd.NewReader(src) + if err != nil { + return nil, err + } + return &zstdDecoder{decoder}, nil + } + reset := func(v *zstdDecoder) error { return v.Decoder.Reset(src) } + return getFromPullOrCreate(&zstdDecoderPool, create, reset) default: - return nil, errors.AssertionFailedf("unsupported compression algorithm %q", algo) + return nil, nil, errors.AssertionFailedf("unsupported compression algorithm %q", algo) } } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 0f547b216dd0..2001cef49324 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -40,6 +40,11 @@ import ( _ "github.com/klauspost/pgzip" ) +type encoder struct { + io.WriteCloser + Done func() +} + func isCloudStorageSink(u *url.URL) bool { switch u.Scheme { case changefeedbase.SinkSchemeCloudStorageS3, changefeedbase.SinkSchemeCloudStorageGCS, @@ -68,7 +73,7 @@ func cloudStorageFormatTime(ts hlc.Timestamp) string { type cloudStorageSinkFile struct { cloudStorageSinkKey created time.Time - codec io.WriteCloser + codec *encoder rawSize int numMessages int buf bytes.Buffer @@ -531,11 +536,11 @@ func (s *cloudStorageSink) getOrCreateFile( } if s.compression.enabled() { - codec, err := newCompressionCodec(s.compression, &s.settings.SV, &f.buf) + codec, done, err := newCompressionCodec(s.compression, &s.settings.SV, &f.buf) if err != nil { return nil, err } - f.codec = codec + f.codec = &encoder{codec, done} } s.files.ReplaceOrInsert(f) return f, nil @@ -884,7 +889,9 @@ func (f *cloudStorageSinkFile) flushToStorage( } if f.codec != nil { - if err := f.codec.Close(); err != nil { + err := f.codec.Close() + f.codec.Done() + if err != nil { return err } } @@ -910,6 +917,7 @@ func (s *cloudStorageSink) closeAllCodecs() (err error) { f := i.(*cloudStorageSinkFile) if f.codec != nil { cErr := f.codec.Close() + f.codec.Done() f.codec = nil if err == nil { err = cErr diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 18bdae6ff39b..7fd69c5f8569 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -101,8 +101,8 @@ func testSendAndReceiveRows(t *testing.T, sinkSrc Sink, sinkDest *cdctest.MockWe // test an insert row entry var pool testAllocPool - require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1002},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) require.NoError(t, sinkSrc.Flush(ctx)) testutils.SucceedsSoon(t, func() error { remaining := pool.used() @@ -113,18 +113,18 @@ func testSendAndReceiveRows(t *testing.T, sinkSrc Sink, sinkDest *cdctest.MockWe }) require.Equal(t, - "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":1}", sinkDest.Latest(), + `{"payload":[{"after":{"col1":"val1","rowid":1002},"key":[1001],"topic:":"foo"}],"length":1}`, sinkDest.Latest(), "sink %s expected to receive message %s", sinkDest.URL(), - "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":1}") + `{"payload":[{"after":{"col1":"val1","rowid":1002},"key":[1001],"topic:":"foo"}],"length":1}`) // test a delete row entry - require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1002]"), []byte("{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1002]"), []byte(`{"after":null,"key":[1002],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) require.NoError(t, sinkSrc.Flush(ctx)) require.Equal(t, - "{\"payload\":[{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}],\"length\":1}", sinkDest.Latest(), + `{"payload":[{"after":null,"key":[1002],"topic:":"foo"}],"length":1}`, sinkDest.Latest(), "sink %s expected to receive message %s", sinkDest.URL(), - "{\"payload\":[{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}],\"length\":1}") + `{"payload":[{"after":null,"key":[1002],"topic:":"foo"}],"length":1}`) opts, err := getGenericWebhookSinkOptions().GetEncodingOptions() require.NoError(t, err) @@ -136,9 +136,9 @@ func testSendAndReceiveRows(t *testing.T, sinkSrc Sink, sinkDest *cdctest.MockWe require.NoError(t, sinkSrc.Flush(ctx)) require.Equal(t, - "{\"resolved\":\"2.0000000000\"}", sinkDest.Latest(), + `{"resolved":"2.0000000000"}`, sinkDest.Latest(), "sink %s expected to receive message %s", sinkDest.URL(), - "{\"resolved\":\"2.0000000000\"}") + `{"resolved":"2.0000000000"}`) } func TestWebhookSink(t *testing.T) { @@ -175,7 +175,7 @@ func TestWebhookSink(t *testing.T) { require.NoError(t, err) // now sink's client accepts no custom certs, should reject the server's cert and fail - require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc)) require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background())) @@ -190,7 +190,7 @@ func TestWebhookSink(t *testing.T) { // sink should throw an error if server is unreachable sinkDest.Close() - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc)) err = sinkSrc.Flush(context.Background()) require.Error(t, err) @@ -204,7 +204,7 @@ func TestWebhookSink(t *testing.T) { require.NoError(t, err) // sink's client should not accept the endpoint's use of HTTP (expects HTTPS) - require.NoError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrcWrongProtocol.Flush(context.Background()), fmt.Sprintf(`Post "%s": http: server gave HTTP response to HTTPS client`, fmt.Sprintf("https://%s", strings.TrimPrefix(sinkDestHTTP.URL(), @@ -303,7 +303,7 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { delete(details.Opts, changefeedbase.OptWebhookAuthHeader) sinkSrcNoCreds, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ") @@ -314,7 +314,7 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { sinkSrcWrongCreds, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ") @@ -401,7 +401,7 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrc.Flush(context.Background()), "500 Internal Server Error: ") @@ -443,7 +443,7 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) require.EqualError(t, sinkSrc.Flush(context.Background()), "500 Internal Server Error: ") @@ -485,18 +485,18 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, mt) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1003},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1004},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1001},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1002},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1003},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1004},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) require.NoError(t, sinkSrc.Flush(context.Background())) - require.Equal(t, sinkDest.Pop(), "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"},"+ - "{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"},"+ - "{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"},"+ - "{\"after\":{\"col1\":\"val1\",\"rowid\":1003},\"key\":[1001],\"topic:\":\"foo\"},"+ - "{\"after\":{\"col1\":\"val1\",\"rowid\":1004},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":5}") + require.Equal(t, sinkDest.Pop(), `{"payload":[{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"},`+ + `{"after":{"col1":"val1","rowid":1001},"key":[1001],"topic:":"foo"},`+ + `{"after":{"col1":"val1","rowid":1002},"key":[1001],"topic:":"foo"},`+ + `{"after":{"col1":"val1","rowid":1003},"key":[1001],"topic:":"foo"},`+ + `{"after":{"col1":"val1","rowid":1004},"key":[1001],"topic:":"foo"}],"length":5}`) testutils.SucceedsSoon(t, func() error { // wait for the timer in batch worker to be set (1 hour from now, as specified by config) before advancing time. @@ -512,8 +512,8 @@ func TestWebhookSinkConfig(t *testing.T) { require.Equal(t, sinkDest.Latest(), "") // messages without a full batch should not send - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1001},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) require.Equal(t, sinkDest.Latest(), "") require.NoError(t, sinkSrc.Close()) @@ -548,22 +548,22 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1003},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1004},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1001},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1002},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1003},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1004},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) require.NoError(t, sinkSrc.Flush(context.Background())) - require.Equal(t, sinkDest.Pop(), "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"},"+ - "{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"},"+ - "{\"after\":{\"col1\":\"val1\",\"rowid\":1002},\"key\":[1001],\"topic:\":\"foo\"},"+ - "{\"after\":{\"col1\":\"val1\",\"rowid\":1003},\"key\":[1001],\"topic:\":\"foo\"},"+ - "{\"after\":{\"col1\":\"val1\",\"rowid\":1004},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":5}") + require.Equal(t, sinkDest.Pop(), `{"payload":[{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"},`+ + `{"after":{"col1":"val1","rowid":1001},"key":[1001],"topic:":"foo"},`+ + `{"after":{"col1":"val1","rowid":1002},"key":[1001],"topic:":"foo"},`+ + `{"after":{"col1":"val1","rowid":1003},"key":[1001],"topic:":"foo"},`+ + `{"after":{"col1":"val1","rowid":1004},"key":[1001],"topic:":"foo"}],"length":5}`) // messages without a full batch should not send - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1001},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) require.Equal(t, sinkDest.Latest(), "") require.NoError(t, sinkSrc.Close()) @@ -608,8 +608,8 @@ func TestWebhookSinkConfig(t *testing.T) { } // send incomplete batch - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) - require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1001},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc())) // no messages at first require.Equal(t, sinkDest.Latest(), "") @@ -623,8 +623,8 @@ func TestWebhookSinkConfig(t *testing.T) { mt.Advance(time.Hour) require.NoError(t, sinkSrc.Flush(context.Background())) // batch should send after time expires even if message quota has not been met - require.Equal(t, sinkDest.Pop(), "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"},"+ - "{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":2}") + require.Equal(t, sinkDest.Pop(), `{"payload":[{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"},`+ + `{"after":{"col1":"val1","rowid":1001},"key":[1001],"topic:":"foo"}],"length":2}`) mt.Advance(time.Hour) require.NoError(t, sinkSrc.Flush(context.Background())) @@ -676,7 +676,7 @@ func TestWebhookSinkShutsDownOnError(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(ctx, details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc)) // error should be propagated immediately in the next call require.EqualError(t, sinkSrc.Flush(ctx), "500 Internal Server Error: ") @@ -755,11 +755,236 @@ func TestWebhookSinkRetry(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(ctx, details, 1 /* parallelism */, timeutil.DefaultTimeSource{}) require.NoError(t, err) - require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) + require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc)) require.NoError(t, sinkSrc.Flush(ctx)) - require.Equal(t, "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}],\"length\":1}", sinkDest.Pop()) + require.Equal(t, `{"payload":[{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}],"length":1}`, sinkDest.Pop()) sinkDest.Close() require.NoError(t, sinkSrc.Close()) } + +func TestWebhookSinkCompression(t *testing.T) { + defer leaktest.AfterTest(t)() + + webhookSinkCompressionTestFn := func(parallelism int) { + // Create a test certificate + cert, certEncoded, err := cdctest.NewCACertBase64Encoded() + require.NoError(t, err) + + // Start mock webhook sink + sinkDest, err := cdctest.StartMockWebhookSink(cert) + require.NoError(t, err) + + // Get sink options with compression enabled + opts := getGenericWebhookSinkOptions(struct { + key string + value string + }{ + key: changefeedbase.OptCompression, + value: "gzip", + }) + + sinkDestHost, err := url.Parse(sinkDest.URL()) + require.NoError(t, err) + + params := sinkDestHost.Query() + params.Set(changefeedbase.SinkParamCACert, certEncoded) + sinkDestHost.RawQuery = params.Encode() + + details := jobspb.ChangefeedDetails{ + SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()), + Opts: opts.AsMap(), + } + + sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) + require.NoError(t, err) + + // Test with compression + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, + []byte("[1001]"), + []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), + zeroTS, + zeroTS, + zeroAlloc)) + require.NoError(t, sinkSrc.Flush(context.Background())) + + // Verify compression headers are present + require.Equal(t, "gzip", sinkDest.LastRequestHeaders().Get("Content-Encoding")) + require.Equal(t, "gzip", sinkDest.LastRequestHeaders().Get("Accept-Encoding")) + + // Verify the content can be decompressed and matches expected + expected := `{"payload":[{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}],"length":1}` + require.Equal(t, expected, sinkDest.Latest()) + + // Test invalid compression type + invalidOpts := getGenericWebhookSinkOptions(struct { + key string + value string + }{ + key: changefeedbase.OptCompression, + value: "invalid", + }) + + details.Opts = invalidOpts.AsMap() + _, err = setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) + require.Error(t, err) + require.Contains(t, err.Error(), "unsupported compression type") + + require.NoError(t, sinkSrc.Close()) + sinkDest.Close() + } + + // Run tests with parallelism from 1-4 like other webhook sink tests + for i := 1; i <= 4; i++ { + webhookSinkCompressionTestFn(i) + } +} + +func TestWebhookSinkCompressionWithBatching(t *testing.T) { + defer leaktest.AfterTest(t)() + + batchingWithCompressionTestFn := func(parallelism int) { + cert, certEncoded, err := cdctest.NewCACertBase64Encoded() + require.NoError(t, err) + sinkDest, err := cdctest.StartMockWebhookSink(cert) + require.NoError(t, err) + + // Configure both compression and batching + opts := getGenericWebhookSinkOptions( + struct { + key string + value string + }{ + key: changefeedbase.OptCompression, + value: "gzip", + }, + struct { + key string + value string + }{ + key: changefeedbase.OptWebhookSinkConfig, + value: `{"Flush":{"Messages": 2, "Frequency": "1h"}}`, + }, + ) + + sinkDestHost, err := url.Parse(sinkDest.URL()) + require.NoError(t, err) + + params := sinkDestHost.Query() + params.Set(changefeedbase.SinkParamCACert, certEncoded) + sinkDestHost.RawQuery = params.Encode() + + details := jobspb.ChangefeedDetails{ + SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()), + Opts: opts.AsMap(), + } + + mt := timeutil.NewManualTime(timeutil.Now()) + sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, mt) + require.NoError(t, err) + + // Send first message - should not trigger batch + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, + []byte("[1001]"), + []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), + zeroTS, + zeroTS, + zeroAlloc)) + require.Equal(t, "", sinkDest.Latest()) + + // Send second message - should trigger batch + require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, + []byte("[1002]"), + []byte(`{"after":{"col1":"val2","rowid":1001},"key":[1002],"topic:":"foo"}`), + zeroTS, + zeroTS, + zeroAlloc)) + require.NoError(t, sinkSrc.Flush(context.Background())) + + // Verify compression headers + require.Equal(t, "gzip", sinkDest.LastRequestHeaders().Get("Content-Encoding")) + require.Equal(t, "gzip", sinkDest.LastRequestHeaders().Get("Accept-Encoding")) + + // Verify batched content + expected := `{"payload":[` + + `{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"},` + + `{"after":{"col1":"val2","rowid":1001},"key":[1002],"topic:":"foo"}` + + `],"length":2}` + require.Equal(t, expected, sinkDest.Latest()) + + require.NoError(t, sinkSrc.Close()) + sinkDest.Close() + } + + // Run tests with parallelism from 1-4 + for i := 1; i <= 4; i++ { + batchingWithCompressionTestFn(i) + } +} + +func TestWebhookSinkErrorCompressedResponse(t *testing.T) { + defer leaktest.AfterTest(t)() + + webhookSinkGzippedErrorTestFn := func(parallelism int) { + ctx := context.Background() + cert, certEncoded, err := cdctest.NewCACertBase64Encoded() + require.NoError(t, err) + sinkDest, err := cdctest.StartMockWebhookSink(cert) + require.NoError(t, err) + + // Configure sink to return error and use compression + opts := getGenericWebhookSinkOptions(struct { + key string + value string + }{ + key: changefeedbase.OptCompression, + value: "gzip", + }) + + // Configure error response + responseBody := "Test error response" + // we have to clear the default status code set by the sink mock factory + sinkDest.ClearStatusCodes() + sinkDest.SetResponse(http.StatusInternalServerError, []byte(responseBody)) + + sinkDestHost, err := url.Parse(sinkDest.URL()) + require.NoError(t, err) + + params := sinkDestHost.Query() + params.Set(changefeedbase.SinkParamCACert, certEncoded) + sinkDestHost.RawQuery = params.Encode() + + details := jobspb.ChangefeedDetails{ + SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()), + Opts: opts.AsMap(), + } + + sinkSrc, err := setupWebhookSinkWithDetails(ctx, details, parallelism, timeutil.DefaultTimeSource{}) + require.NoError(t, err) + + // Send data and expect error with compressed response + require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, + []byte("[1001]"), + []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), + zeroTS, + zeroTS, + zeroAlloc)) + + err = sinkSrc.Flush(ctx) + require.Error(t, err) + // Verify error body is decompressed + require.Equal(t, fmt.Sprintf(`500 Internal Server Error: %s`, responseBody), err.Error()) + + // Verify no messages delivered + require.Equal(t, "", sinkDest.Pop()) + + require.NoError(t, sinkSrc.Close()) + sinkDest.Close() + } + + // Run tests with parallelism from 1-4 + for i := 1; i <= 4; i++ { + webhookSinkGzippedErrorTestFn(i) + } +} diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index 32680aee9c52..8f40a8853afe 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -29,14 +29,16 @@ import ( ) const ( - applicationTypeJSON = `application/json` - applicationTypeCSV = `text/csv` - authorizationHeader = `Authorization` + applicationTypeJSON = `application/json` + applicationTypeCSV = `text/csv` + authorizationHeader = `Authorization` + contentEncodingHeader = `Content-Encoding` + acceptEncodingHeader = `Accept-Encoding` ) func isWebhookSink(u *url.URL) bool { switch u.Scheme { - // allow HTTP here but throw an error later to make it clear HTTPS is required + // Allow HTTP here but throw an error later to make it clear HTTPS is required. case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS: return true default: @@ -45,12 +47,14 @@ func isWebhookSink(u *url.URL) bool { } type webhookSinkClient struct { - ctx context.Context - format changefeedbase.FormatType - url *changefeedbase.SinkURL - authHeader string - batchCfg sinkBatchConfig - client *httputil.Client + ctx context.Context + format changefeedbase.FormatType + url *changefeedbase.SinkURL + authHeader string + batchCfg sinkBatchConfig + client *httputil.Client + settings *cluster.Settings + compression compressionAlgo } var _ SinkClient = (*webhookSinkClient)(nil) @@ -64,19 +68,31 @@ func makeWebhookSinkClient( batchCfg sinkBatchConfig, parallelism int, m metricsRecorder, + settings *cluster.Settings, ) (SinkClient, error) { err := validateWebhookOpts(u, encodingOpts, opts) if err != nil { return nil, err } + var compression compressionAlgo + if opts.Compression != "" { + algo, _, err := compressionFromString(strings.ToLower(opts.Compression)) + if err != nil { + return nil, errors.Wrap(err, "unsupported compression type") + } + compression = algo + } + u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`) sinkClient := &webhookSinkClient{ - ctx: ctx, - authHeader: opts.AuthHeader, - format: encodingOpts.Format, - batchCfg: batchCfg, + ctx: ctx, + authHeader: opts.AuthHeader, + format: encodingOpts.Format, + batchCfg: batchCfg, + settings: settings, + compression: compression, } var connTimeout time.Duration @@ -88,7 +104,7 @@ func makeWebhookSinkClient( return nil, err } - // remove known query params from sink URL before setting in sink config + // Remove known query params from sink URL before setting in sink config. sinkURLParsed, err := url.Parse(u.String()) if err != nil { return nil, err @@ -178,25 +194,37 @@ func makeWebhookClient( } func (sc *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, error) { - req, err := http.NewRequestWithContext(sc.ctx, http.MethodPost, sc.url.String(), bytes.NewReader(body)) + var finalBytes []byte + if sc.compression.enabled() { + var buf bytes.Buffer + codec, done, err := newCompressionCodec(sc.compression, &sc.settings.SV, &buf) + if err != nil { + return nil, errors.Wrap(err, "failed to create compression codec") + } + if _, err := codec.Write(body); err != nil { + return nil, errors.Wrap(err, "failed to compress payload") + } + if err := codec.Close(); err != nil { + return nil, errors.Wrap(err, "failed to close compression codec") + } + defer done() + + finalBytes = buf.Bytes() + } else { + finalBytes = body + } + + req, err := http.NewRequestWithContext(sc.ctx, http.MethodPost, sc.url.String(), bytes.NewReader(finalBytes)) if err != nil { return nil, err } - switch sc.format { - case changefeedbase.OptFormatJSON: - req.Header.Set("Content-Type", applicationTypeJSON) - case changefeedbase.OptFormatCSV: - req.Header.Set("Content-Type", applicationTypeCSV) - } - if sc.authHeader != "" { - req.Header.Set(authorizationHeader, sc.authHeader) - } + sc.setRequestHeaders(req) return req, nil } -// FlushResolvedPayload implements the SinkClient interface +// FlushResolvedPayload implements the SinkClient interface. func (sc *webhookSinkClient) FlushResolvedPayload( ctx context.Context, body []byte, _ func(func(topic string) error) error, retryOpts retry.Options, ) error { @@ -209,7 +237,32 @@ func (sc *webhookSinkClient) FlushResolvedPayload( }) } -// Flush implements the SinkClient interface +// readResponseBody handles response body reading and decompression if needed. +func (sc *webhookSinkClient) readResponseBody(res *http.Response) ([]byte, error) { + encoding := res.Header.Get(contentEncodingHeader) + if encoding == "" { + return io.ReadAll(res.Body) + } + + // Convert the content-encoding header to our internal compression algorithm type. + algo, _, err := compressionFromString(encoding) + if err != nil { + return nil, errors.Wrapf(err, + "webhook endpoint returned unsupported content encoding: %s", encoding) + } + + reader, done, err := newDecompressionReader(algo, res.Body) + if err != nil { + return nil, errors.Wrapf(err, + "failed to create decompression reader for algorithm %s", algo) + } + defer done() + defer reader.Close() + + return io.ReadAll(reader) +} + +// Flush implements the SinkClient interface. func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error { req := batch.(*http.Request) b, err := req.GetBody() @@ -224,7 +277,8 @@ func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error defer res.Body.Close() if !(res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusMultipleChoices) { - resBody, err := io.ReadAll(res.Body) + // Response body may be compressed, so we need to use our reader with decompression support. + resBody, err := sc.readResponseBody(res) if err != nil { return errors.Wrapf(err, "failed to read body for HTTP response with status: %d", res.StatusCode) } @@ -233,7 +287,7 @@ func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error return nil } -// Close implements the SinkClient interface +// Close implements the SinkClient interface. func (sc *webhookSinkClient) Close() error { sc.client.CloseIdleConnections() return nil @@ -243,6 +297,25 @@ func (sc *webhookSinkClient) CheckConnection(ctx context.Context) error { return nil } +func (sc *webhookSinkClient) setRequestHeaders(req *http.Request) { + switch sc.format { + case changefeedbase.OptFormatJSON: + req.Header.Set("Content-Type", applicationTypeJSON) + case changefeedbase.OptFormatCSV: + req.Header.Set("Content-Type", applicationTypeCSV) + } + + if sc.compression.enabled() { + compression := string(sc.compression) + req.Header.Set(acceptEncodingHeader, compression) + req.Header.Set(contentEncodingHeader, compression) + } + + if sc.authHeader != "" { + req.Header.Set(authorizationHeader, sc.authHeader) + } +} + func validateWebhookOpts( u *changefeedbase.SinkURL, encodingOpts changefeedbase.EncodingOptions, @@ -284,18 +357,18 @@ type webhookCSVBuffer struct { var _ BatchBuffer = (*webhookCSVBuffer)(nil) -// Append implements the BatchBuffer interface +// Append implements the BatchBuffer interface. func (cb *webhookCSVBuffer) Append(key []byte, value []byte, _ attributes) { cb.bytes = append(cb.bytes, value...) cb.messageCount += 1 } -// ShouldFlush implements the BatchBuffer interface +// ShouldFlush implements the BatchBuffer interface. func (cb *webhookCSVBuffer) ShouldFlush() bool { return shouldFlushBatch(len(cb.bytes), cb.messageCount, cb.sc.batchCfg) } -// Close implements the BatchBuffer interface +// Close implements the BatchBuffer interface. func (cb *webhookCSVBuffer) Close() (SinkPayload, error) { return cb.sc.makePayloadForBytes(cb.bytes) } @@ -308,24 +381,24 @@ type webhookJSONBuffer struct { var _ BatchBuffer = (*webhookJSONBuffer)(nil) -// Append implements the BatchBuffer interface +// Append implements the BatchBuffer interface. func (jb *webhookJSONBuffer) Append(key []byte, value []byte, _ attributes) { jb.messages = append(jb.messages, value) jb.numBytes += len(value) } -// ShouldFlush implements the BatchBuffer interface +// ShouldFlush implements the BatchBuffer interface. func (jb *webhookJSONBuffer) ShouldFlush() bool { return shouldFlushBatch(jb.numBytes, len(jb.messages), jb.sc.batchCfg) } -// Close implements the BatchBuffer interface +// Close implements the BatchBuffer interface. func (jb *webhookJSONBuffer) Close() (SinkPayload, error) { var buffer bytes.Buffer prefix := "{\"payload\":[" suffix := fmt.Sprintf("],\"length\":%d}", len(jb.messages)) - // Grow all at once to avoid reallocations + // Grow all at once to avoid reallocations. buffer.Grow(len(prefix) + jb.numBytes /* msgs */ + len(jb.messages) /* commas */ + len(suffix)) buffer.WriteString(prefix) @@ -339,7 +412,7 @@ func (jb *webhookJSONBuffer) Close() (SinkPayload, error) { return jb.sc.makePayloadForBytes(buffer.Bytes()) } -// MakeBatchBuffer implements the SinkClient interface +// MakeBatchBuffer implements the SinkClient interface. func (sc *webhookSinkClient) MakeBatchBuffer(topic string) BatchBuffer { if sc.format == changefeedbase.OptFormatCSV { return &webhookCSVBuffer{sc: sc} @@ -369,7 +442,7 @@ func makeWebhookSink( return nil, err } - sinkClient, err := makeWebhookSinkClient(ctx, u, encodingOpts, opts, batchCfg, parallelism, m) + sinkClient, err := makeWebhookSinkClient(ctx, u, encodingOpts, opts, batchCfg, parallelism, m, settings) if err != nil { return nil, err }