From 639cd8d2e0b0f5f0dbe539810fbf2afffb7f6092 Mon Sep 17 00:00:00 2001 From: Mike Kaperys Date: Wed, 15 Jul 2020 14:50:40 +0100 Subject: [PATCH 1/8] implements flush.AsyncMessageSink --- README.md | 3 + examples/flush/sink.go | 61 +++++++++++ flush/sink.go | 152 ++++++++++++++++++++++++++ flush/sink_test.go | 243 +++++++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 20 ++++ 6 files changed, 480 insertions(+) create mode 100644 examples/flush/sink.go create mode 100644 flush/sink.go create mode 100644 flush/sink_test.go diff --git a/README.md b/README.md index 1cc3c4e..0502752 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,9 @@ Provides wrappers for both message source and message sink that add prometheus m Is a message source wrapper that wraps any number of sources. It consumes messages from all of them and passes them on to the user. It ensures that the acknowledgements are passed to the correct source. +### Flush +Is a message flushing wrapper which blocks until all produced messages have been acked by the user. In the scenario that the user performs an action only when the sink has acknowledge the write, the flushing wrapper provides a guarantee that such an action is only performed on a successful sink. + ## Other ### Message diff --git a/examples/flush/sink.go b/examples/flush/sink.go new file mode 100644 index 0000000..4b8fb46 --- /dev/null +++ b/examples/flush/sink.go @@ -0,0 +1,61 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/uw-labs/substrate" + "github.com/uw-labs/substrate-tools/flush" + "github.com/uw-labs/substrate/kafka" +) + +const messages = 200 + +func main() { + asyncSink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "example-stratesub-topic", + Version: "2.0.1", + }) + if err != nil { + panic(err) + } + + // `cancel` is used to terminate `sink.Run`. Timeout is used to prevent + // `sink.Flush` being able to infinitely block. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + sink := flush.NewAsyncMessageSink(asyncSink, flush.WithAckFunc(func(msg substrate.Message) error { + fmt.Println(string(msg.Data())) + return nil + })) + defer func() { + err := sink.Flush(ctx) + if err != nil { + panic(err) + } + }() + + go func() { + err = sink.Run(ctx) + if err != nil { + panic(err) + } + }() + + var wg sync.WaitGroup + wg.Add(messages) + + for i := 0; i < messages; i++ { + go func(i int) { + defer wg.Done() + + sink.PublishMessage([]byte(string('A' + i))) + }(i) + } + + wg.Wait() +} diff --git a/flush/sink.go b/flush/sink.go new file mode 100644 index 0000000..aacf3c4 --- /dev/null +++ b/flush/sink.go @@ -0,0 +1,152 @@ +// Package flush provides a message flushing wrapper around `substrate.AsyncMessageSink`. +package flush + +import ( + "context" + "fmt" + "sync/atomic" + + "github.com/uw-labs/substrate" + "github.com/uw-labs/substrate-tools/message" + "github.com/uw-labs/sync/rungroup" +) + +const ( + defaultMsgBufferSize = 50 + defaultAckBufferSize = 100 +) + +// An AsyncMessageSinkOption is a function which sets an AsyncMessageSink configuration option. +type AsyncMessageSinkOption func(ams *AsyncMessageSink) + +// WithAckFunc set the AckFunc callback function, which is called for each ack recieved. If the +// AckFunc returns an error, it will be returned by `Run`. +func WithAckFunc(ackFn AckFunc) AsyncMessageSinkOption { + return func(ams *AsyncMessageSink) { + ams.ackFn = ackFn + } +} + +// WithMsgBufferSize sets the msg channel buffer size. +func WithMsgBufferSize(size int) AsyncMessageSinkOption { + return func(ams *AsyncMessageSink) { + ams.msgBufferSize = size + } +} + +// WithAckBufferSize sets the ack channel buffer size. +func WithAckBufferSize(size int) AsyncMessageSinkOption { + return func(ams *AsyncMessageSink) { + ams.ackBufferSize = size + } +} + +// AckFunc is a callback function executed by `Run` for each message succesfully produced. The ack +// counter is only incremented once this function returns. If no AckFunc is provided the ack counter +// is always incremented. +type AckFunc func(msg substrate.Message) error + +// AsyncMessageSink wraps substrate.AsyncMessageSink and provides an interface for interaction with +// the underlying sink, as well as the capability to flush messages. +type AsyncMessageSink struct { + sink substrate.AsyncMessageSink + msgs uint64 + msgBufferSize int + msgCh chan substrate.Message + acks uint64 + ackBufferSize int + ackCh chan substrate.Message + ackFn AckFunc +} + +// NewAsyncMessageSink returns a pointer a new AsyncMessageSink. +// See examples/flush/sink.go for example usage. +func NewAsyncMessageSink(sink substrate.AsyncMessageSink, opts ...AsyncMessageSinkOption) *AsyncMessageSink { + ams := &AsyncMessageSink{ + sink: sink, + msgBufferSize: defaultMsgBufferSize, + ackBufferSize: defaultAckBufferSize, + } + + for _, opt := range opts { + opt(ams) + } + + ams.msgCh = make(chan substrate.Message, ams.msgBufferSize) + ams.ackCh = make(chan substrate.Message, ams.ackBufferSize) + + return ams +} + +// Run initialises message publishing using the underlying sink and blocks until either an error +// occurs or the context is done. If an AckFunc is configured, Run will execute it for each ack +// recieved. If an error is returned, the user should cancel the context to prevent `Flush` from +// blocking. +func (ams *AsyncMessageSink) Run(ctx context.Context) error { + group, groupctx := rungroup.New(ctx) + + group.Go(func() error { + return ams.sink.PublishMessages(groupctx, ams.ackCh, ams.msgCh) + }) + + group.Go(func() error { + for { + select { + case <-groupctx.Done(): + return nil + case msg := <-ams.ackCh: + if ams.ackFn != nil { + err := ams.ackFn(msg) + if err != nil { + return err + } + } + + atomic.AddUint64(&ams.acks, 1) + } + } + }) + + return group.Wait() +} + +// PublishMessage publishes a message to the underlying sink. PublishMessage is desgined to be +// called concurrently by the user. +func (ams *AsyncMessageSink) PublishMessage(msg []byte) { + atomic.AddUint64(&ams.msgs, 1) + ams.msgCh <- message.NewMessage(msg) +} + +// Close closes the underlying sink and releases it's resources. +func (ams *AsyncMessageSink) Close() error { + return ams.sink.Close() +} + +// Status calls the Status method on the underlying sink. +func (ams *AsyncMessageSink) Status() (*substrate.Status, error) { + return ams.sink.Status() +} + +// Flush blocks until the AsyncMessageSink has consumed as many acks as messages produced or ctx +// is done. Flush returns an error if the context is cancelled before all messages produced have +// been acked. +func (ams *AsyncMessageSink) Flush(ctx context.Context) error { + defer ams.Close() + + for { + select { + case <-ctx.Done(): + if atomic.LoadUint64(&ams.msgs) > atomic.LoadUint64(&ams.acks) { + return fmt.Errorf("incomplete flush: %d left to ack", atomic.LoadUint64(&ams.msgs)-atomic.LoadUint64(&ams.acks)) + } + + return nil + default: + if atomic.LoadUint64(&ams.acks) != atomic.LoadUint64(&ams.msgs) { + continue + } + + return nil + } + } +} diff --git a/flush/sink_test.go b/flush/sink_test.go new file mode 100644 index 0000000..d074e91 --- /dev/null +++ b/flush/sink_test.go @@ -0,0 +1,243 @@ +package flush_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/uw-labs/substrate" + "github.com/uw-labs/substrate-tools/flush" +) + +type asyncMessageSinkMock struct { + substrate.AsyncMessageSink + publishMessageMock func(context.Context, chan<- substrate.Message, <-chan substrate.Message) error +} + +func (m asyncMessageSinkMock) PublishMessages(ctx context.Context, acks chan<- substrate.Message, msgs <-chan substrate.Message) error { + return m.publishMessageMock(ctx, acks, msgs) +} + +func (m asyncMessageSinkMock) Close() error { + return nil +} + +const messages = 100 + +func TestAsyncMessageSinkSuccess(t *testing.T) { + ctx := context.TODO() + + mock := asyncMessageSinkMock{ + publishMessageMock: func(ctx context.Context, acks chan<- substrate.Message, msgs <-chan substrate.Message) error { + for { + select { + case <-ctx.Done(): + return nil + case msg := <-msgs: + acks <- msg + } + } + }, + } + + var ackedMessages string + sink := flush.NewAsyncMessageSink(mock, flush.WithAckFunc(func(msg substrate.Message) error { + ackedMessages += string(msg.Data()) + return nil + })) + + var wg sync.WaitGroup + wg.Add(messages) + + go sink.Run(ctx) + for i := 0; i < messages; i++ { + go func(i int) { + defer wg.Done() + + sink.PublishMessage([]byte(string('A' + i))) + }(i) + } + + wg.Wait() // wait for the messages to publish + + // Usually we could defer `sink.Flush()`. However, as `Flush` blocks until ack + // processing is complete, and we want to test the `ackedMessages` value *only* + // once processing is complete, we must call `sink.Flush` first excplicitly - + // only then we can safely test `ackedMessages`. + err := sink.Flush(ctx) + if err != nil { + t.Fatal(err) + } + + if ackedMessages == "" { + t.Fatal("recieved no acks") + } + + if ackedMessages == "ABCDEFGHIJ" { + t.Fatal("recieved messages in synchronous order") + } +} + +func TestAsyncMessageSinkInterruption(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + + mock := asyncMessageSinkMock{ + publishMessageMock: func(ctx context.Context, acks chan<- substrate.Message, msgs <-chan substrate.Message) error { + for { + select { + case <-ctx.Done(): + return nil + case msg := <-msgs: + acks <- msg + } + } + }, + } + + var ackedMessages int + sink := flush.NewAsyncMessageSink(mock, flush.WithAckFunc(func(msg substrate.Message) error { + time.Sleep(1 * time.Second) // simulating doing some work + ackedMessages++ + return nil + })) + + var wg sync.WaitGroup + wg.Add(messages) + + go sink.Run(ctx) + for i := 0; i < messages; i++ { + go func(i int) { + defer wg.Done() + + sink.PublishMessage([]byte(string('A' + i))) + }(i) + } + + wg.Wait() // wait for the messages to publish + + // Cancel production before we process the acks are processed. + cancel() + + // Usually we could defer `sink.Flush()`. However, as `Flush` blocks until ack + // processing is complete, and we want to test the `ackedMessages` value *only* + // once processing is complete, we must call `sink.Flush` first excplicitly - + // only then we can safely test `ackedMessages`. + err := sink.Flush(ctx) + if err == nil { + t.Fatal("expected an error") + } +} + +func TestAsyncMessageSinkAckError(t *testing.T) { + ctx := context.TODO() + + mock := asyncMessageSinkMock{ + publishMessageMock: func(ctx context.Context, acks chan<- substrate.Message, msgs <-chan substrate.Message) error { + for { + select { + case <-ctx.Done(): + return nil + case msg := <-msgs: + acks <- msg + } + } + }, + } + + sink := flush.NewAsyncMessageSink(mock, flush.WithAckFunc(func(msg substrate.Message) error { + return fmt.Errorf("error: %s", msg.Data()) + })) + + sink.PublishMessage([]byte("dummy-message")) + + err := sink.Run(ctx) + if err == nil { + t.Fatal("expected an error") + } +} + +func BenchmarkAsyncMessageSink_1_50(b *testing.B) { + for n := 0; n < b.N; n++ { + benchmarkBufferSizes(b, 1, 50) + } +} + +func BenchmarkAsyncMessageSink_50_100(b *testing.B) { + for n := 0; n < b.N; n++ { + benchmarkBufferSizes(b, 50, 100) + } +} + +func BenchmarkAsyncMessageSink_100_100(b *testing.B) { + for n := 0; n < b.N; n++ { + benchmarkBufferSizes(b, 100, 100) + } +} + +func BenchmarkAsyncMessageSink_100_50(b *testing.B) { + for n := 0; n < b.N; n++ { + benchmarkBufferSizes(b, 100, 50) + } +} + +func BenchmarkAsyncMessageSink_1000_1000(b *testing.B) { + for n := 0; n < b.N; n++ { + benchmarkBufferSizes(b, 1000, 1000) + } +} + +func benchmarkBufferSizes(b *testing.B, msgBufferSize, ackBufferSize int) { + ctx := context.TODO() + + mock := asyncMessageSinkMock{ + publishMessageMock: func(ctx context.Context, acks chan<- substrate.Message, msgs <-chan substrate.Message) error { + for { + select { + case <-ctx.Done(): + return nil + case msg := <-msgs: + acks <- msg + } + } + }, + } + + var ackedMessages string + sink := flush.NewAsyncMessageSink(mock, flush.WithAckFunc(func(msg substrate.Message) error { + ackedMessages += string(msg.Data()) + return nil + }), flush.WithMsgBufferSize(msgBufferSize), flush.WithAckBufferSize(ackBufferSize)) + + var wg sync.WaitGroup + wg.Add(messages) + + go sink.Run(ctx) + for i := 0; i < messages; i++ { + go func(i int) { + defer wg.Done() + + sink.PublishMessage([]byte(string('A' + i))) + }(i) + } + + wg.Wait() // wait for the messages to publish + + // Usually we could defer `sink.Flush()`. However, as `Flush` blocks until ack + // processing is complete, and we want to test the `ackedMessages` value *only* + // once processing is complete, we must call `sink.Flush` first excplicitly - + // only then we can safely test `ackedMessages`. + err := sink.Flush(ctx) + if err != nil { + b.Fatal(err) + } + + if ackedMessages == "" { + b.Fatal("recieved no acks") + } + + if ackedMessages == "ABCDEFGHIJ" { + b.Fatal("recieved messages in synchronous order") + } +} diff --git a/go.mod b/go.mod index 88b7ca3..326a19c 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.2.1 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 + github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.4.0 github.com/uw-labs/substrate v0.0.0-20200128100231-abc43d668589 github.com/uw-labs/sync v0.0.0-20190307114256-1bb306bf6e71 diff --git a/go.sum b/go.sum index 702a7f5..ded6c34 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Shopify/sarama v1.22.1/go.mod h1:FRzlvRpMFO/639zY1SDxUxkqH97Y0ndM5CbGj6oG3As= +github.com/Shopify/sarama v1.24.1 h1:svn9vfN3R1Hz21WR2Gj0VW9ehaDGkiOS+VqlIcZOkMI= github.com/Shopify/sarama v1.24.1/go.mod h1:fGP8eQ6PugKEI0iUETYYtnP6d1pH/bdDMTel1X5ajsU= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -18,6 +19,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A= github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA= github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM= @@ -27,8 +29,11 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ= @@ -50,6 +55,7 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -72,6 +78,7 @@ github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uP github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -80,6 +87,7 @@ github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7 github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -88,6 +96,7 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs= github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -124,6 +133,7 @@ github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= +github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw= github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -151,8 +161,10 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8= github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -163,6 +175,7 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= +github.com/utilitywarehouse/stratesub v0.0.0-20200714154232-118cd0c10114 h1:KPJ9tHA3Zp28HS8ylLm9V+etn5rkVU1guNl/Enc7/TI= github.com/uw-labs/freezer v0.0.0-20190920143022-fb6f336e053f/go.mod h1:th0wddpLXRokc3BjOsQSMy0B/PY6xGuo0Mwrchu1HtE= github.com/uw-labs/proximo v0.0.0-20190913093050-8229af78f5dd/go.mod h1:qvhJ61jBx9RI4vrThNU4jroHCMpbU850pzmm+1b4fYk= github.com/uw-labs/straw v0.0.0-20191028082632-df455ab35be2/go.mod h1:bVgIBcoAnl11UG58LiS78c/+ocH5s0QXKloPod/ODiQ= @@ -184,6 +197,7 @@ golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaE golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586 h1:7KByu05hhLed2MO29w7p1XfZvZ13m8mub3shuVftRs0= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -203,6 +217,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -212,6 +227,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -262,10 +278,14 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010= gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= From 5c87687ceb792fd0c926fa3bab87d7fe3b26c41e Mon Sep 17 00:00:00 2001 From: Mike Kaperys Date: Wed, 15 Jul 2020 15:25:56 +0100 Subject: [PATCH 2/8] fix flushing wrapper readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0502752..2aa0376 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Is a message source wrapper that wraps any number of sources. It consumes messag It ensures that the acknowledgements are passed to the correct source. ### Flush -Is a message flushing wrapper which blocks until all produced messages have been acked by the user. In the scenario that the user performs an action only when the sink has acknowledge the write, the flushing wrapper provides a guarantee that such an action is only performed on a successful sink. +Is a message flushing wrapper which blocks until all produced messages have been acked by the user. In the scenario that the user performs an action only after a message has been produced, the flushing wrapper provides a guarantee that such an action is only performed on a successful sink. ## Other From 7e89c674b4ce3174221478c3c2577cb442894325 Mon Sep 17 00:00:00 2001 From: Mike Kaperys Date: Wed, 15 Jul 2020 15:57:50 +0100 Subject: [PATCH 3/8] update flushing example and add to readme --- README.md | 60 ++++++++++++++++++++++++++++++++++++++++++ examples/flush/sink.go | 28 +++++++++++++------- 2 files changed, 79 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 2aa0376..1a08e9a 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,66 @@ It ensures that the acknowledgements are passed to the correct source. ### Flush Is a message flushing wrapper which blocks until all produced messages have been acked by the user. In the scenario that the user performs an action only after a message has been produced, the flushing wrapper provides a guarantee that such an action is only performed on a successful sink. +#### Example usage + +```go + asyncSink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "example-stratesub-topic", + Version: "2.0.1", + }) + if err != nil { + panic(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + sink := flush.NewAsyncMessageSink(asyncSink, flush.WithAckFunc(func(msg substrate.Message) error { + println(string(msg.Data())) + return nil + })) + defer func() { + err := sink.Flush(ctx) + if err != nil { + panic(err) + } + }() + + go func() { + err = sink.Run(ctx) + if err != nil { + panic(err) + } + }() + + messages := []string{ + "message one", + "message two", + "message three", + "message four", + "message five", + "message six", + "message seven", + "message eight", + "message nine", + "message ten", + } + + var wg sync.WaitGroup + wg.Add(len(messages)) + + for _, msg := range messages { + go func(msg string) { + defer wg.Done() + + sink.PublishMessage([]byte(msg)) + }(msg) + } + + wg.Wait() +``` + ## Other ### Message diff --git a/examples/flush/sink.go b/examples/flush/sink.go index 4b8fb46..455118c 100644 --- a/examples/flush/sink.go +++ b/examples/flush/sink.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "sync" "time" @@ -11,8 +10,6 @@ import ( "github.com/uw-labs/substrate/kafka" ) -const messages = 200 - func main() { asyncSink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{ Brokers: []string{"localhost:9092"}, @@ -29,7 +26,7 @@ func main() { defer cancel() sink := flush.NewAsyncMessageSink(asyncSink, flush.WithAckFunc(func(msg substrate.Message) error { - fmt.Println(string(msg.Data())) + println(string(msg.Data())) return nil })) defer func() { @@ -46,15 +43,28 @@ func main() { } }() + messages := []string{ + "message one", + "message two", + "message three", + "message four", + "message five", + "message six", + "message seven", + "message eight", + "message nine", + "message ten", + } + var wg sync.WaitGroup - wg.Add(messages) + wg.Add(len(messages)) - for i := 0; i < messages; i++ { - go func(i int) { + for _, msg := range messages { + go func(msg string) { defer wg.Done() - sink.PublishMessage([]byte(string('A' + i))) - }(i) + sink.PublishMessage([]byte(msg)) + }(msg) } wg.Wait() From 127e1fc2502886ad611775e2575cb1906c7be083 Mon Sep 17 00:00:00 2001 From: Mike Kaperys Date: Thu, 16 Jul 2020 10:21:39 +0100 Subject: [PATCH 4/8] improve goroutine lifecycle management, context propagation, channel management and greater unit test coverage --- README.md | 15 ++++--- examples/flush/sink.go | 23 +++++++--- flush/sink.go | 67 ++++++++++++++++++--------- flush/sink_test.go | 100 ++++++++++++++++++++++++++++++++++------- 4 files changed, 155 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index 1a08e9a..3c9cb27 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Is a message flushing wrapper which blocks until all produced messages have been #### Example usage ```go - asyncSink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{ + asyncSink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{ Brokers: []string{"localhost:9092"}, Topic: "example-stratesub-topic", Version: "2.0.1", @@ -37,19 +37,21 @@ Is a message flushing wrapper which blocks until all produced messages have been ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - sink := flush.NewAsyncMessageSink(asyncSink, flush.WithAckFunc(func(msg substrate.Message) error { + ackFn := flush.WithAckFunc(func(msg substrate.Message) error { println(string(msg.Data())) return nil - })) + }) + + sink := flush.NewAsyncMessageSink(ctx, asyncSink, ackFn) defer func() { - err := sink.Flush(ctx) + err := sink.Flush() if err != nil { panic(err) } }() go func() { - err = sink.Run(ctx) + err = sink.Run() if err != nil { panic(err) } @@ -74,8 +76,7 @@ Is a message flushing wrapper which blocks until all produced messages have been for _, msg := range messages { go func(msg string) { defer wg.Done() - - sink.PublishMessage([]byte(msg)) + sink.PublishMessage(context.Background(), []byte(msg)) }(msg) } diff --git a/examples/flush/sink.go b/examples/flush/sink.go index 455118c..e3b6988 100644 --- a/examples/flush/sink.go +++ b/examples/flush/sink.go @@ -20,24 +20,30 @@ func main() { panic(err) } - // `cancel` is used to terminate `sink.Run`. Timeout is used to prevent - // `sink.Flush` being able to infinitely block. + // This context is used to control the goroutines initialised by the flushing wrapper. + // Canelling this context closes the underlying sink and allows `Flush` to return. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - sink := flush.NewAsyncMessageSink(asyncSink, flush.WithAckFunc(func(msg substrate.Message) error { + ackFn := flush.WithAckFunc(func(msg substrate.Message) error { println(string(msg.Data())) return nil - })) + }) + + sink := flush.NewAsyncMessageSink(ctx, asyncSink, ackFn) defer func() { - err := sink.Flush(ctx) + // Flush will block until all messages produced using `PublishMessage` have been acked + // by the AckFunc, if provided. + err := sink.Flush() if err != nil { panic(err) } }() go func() { - err = sink.Run(ctx) + // Run blocks until the sink is closed or an error occurs. If the AckFn retruns an error + // it will be returned by `Run`. + err = sink.Run() if err != nil { panic(err) } @@ -63,7 +69,10 @@ func main() { go func(msg string) { defer wg.Done() - sink.PublishMessage([]byte(msg)) + // This context is used to control the publishing of the message. This ctx + // could, and probably should, be different to the lifecyle context passed + // into the constructor. + sink.PublishMessage(context.Background(), []byte(msg)) }(msg) } diff --git a/flush/sink.go b/flush/sink.go index aacf3c4..def24f5 100644 --- a/flush/sink.go +++ b/flush/sink.go @@ -47,8 +47,11 @@ func WithAckBufferSize(size int) AsyncMessageSinkOption { type AckFunc func(msg substrate.Message) error // AsyncMessageSink wraps substrate.AsyncMessageSink and provides an interface for interaction with -// the underlying sink, as well as the capability to flush messages. +// the underlying sink, as well as the capability to flush the message buffer. type AsyncMessageSink struct { + ctx context.Context + cancel context.CancelFunc + wait func() error sink substrate.AsyncMessageSink msgs uint64 msgBufferSize int @@ -61,8 +64,12 @@ type AsyncMessageSink struct { // NewAsyncMessageSink returns a pointer a new AsyncMessageSink. // See examples/flush/sink.go for example usage. -func NewAsyncMessageSink(sink substrate.AsyncMessageSink, opts ...AsyncMessageSinkOption) *AsyncMessageSink { +func NewAsyncMessageSink(ctx context.Context, sink substrate.AsyncMessageSink, opts ...AsyncMessageSinkOption) *AsyncMessageSink { + ctx, cancel := context.WithCancel(ctx) + ams := &AsyncMessageSink{ + ctx: ctx, + cancel: cancel, sink: sink, msgBufferSize: defaultMsgBufferSize, ackBufferSize: defaultAckBufferSize, @@ -79,11 +86,12 @@ func NewAsyncMessageSink(sink substrate.AsyncMessageSink, opts ...AsyncMessageSi } // Run initialises message publishing using the underlying sink and blocks until either an error -// occurs or the context is done. If an AckFunc is configured, Run will execute it for each ack -// recieved. If an error is returned, the user should cancel the context to prevent `Flush` from -// blocking. -func (ams *AsyncMessageSink) Run(ctx context.Context) error { - group, groupctx := rungroup.New(ctx) +// occurs or the constructor context is done. If an AckFunc is configured, Run will execute it for +// each ack recieved. If an error is returned, the user should cancel the constructor context to +// prevent `Flush` from blocking. +func (ams *AsyncMessageSink) Run() error { + group, groupctx := rungroup.New(ams.ctx) + ams.wait = group.Wait group.Go(func() error { return ams.sink.PublishMessages(groupctx, ams.ackCh, ams.msgCh) @@ -94,7 +102,11 @@ func (ams *AsyncMessageSink) Run(ctx context.Context) error { select { case <-groupctx.Done(): return nil - case msg := <-ams.ackCh: + case msg, ok := <-ams.ackCh: + if !ok { + return nil + } + if ams.ackFn != nil { err := ams.ackFn(msg) if err != nil { @@ -107,18 +119,33 @@ func (ams *AsyncMessageSink) Run(ctx context.Context) error { } }) - return group.Wait() + return ams.wait() } // PublishMessage publishes a message to the underlying sink. PublishMessage is desgined to be -// called concurrently by the user. -func (ams *AsyncMessageSink) PublishMessage(msg []byte) { - atomic.AddUint64(&ams.msgs, 1) - ams.msgCh <- message.NewMessage(msg) +// called concurrently by the user. The ctx passed to PublishMessage controls only the publishing +// of the message and is a seperate concern to the constructor context. +func (ams *AsyncMessageSink) PublishMessage(ctx context.Context, msg []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ams.msgCh <- message.NewMessage(msg): + atomic.AddUint64(&ams.msgs, 1) + } + + return nil } -// Close closes the underlying sink and releases it's resources. +// Close permanently closes the underlying sink and releases its resources. If Run has been called +// before Close, Close will block until any active AckFuncs return. func (ams *AsyncMessageSink) Close() error { + ams.cancel() + + if ams.wait != nil { + ams.wait() + } + + close(ams.msgCh) return ams.sink.Close() } @@ -127,15 +154,13 @@ func (ams *AsyncMessageSink) Status() (*substrate.Status, error) { return ams.sink.Status() } -// Flush blocks until the AsyncMessageSink has consumed as many acks as messages produced or ctx -// is done. Flush returns an error if the context is cancelled before all messages produced have -// been acked. -func (ams *AsyncMessageSink) Flush(ctx context.Context) error { - defer ams.Close() - +// Flush blocks until the AsyncMessageSink has consumed as many acks as messages produced or the +// constructor ctx is done. Flush returns an error if the context is cancelled before all messages +// produced have been acked. +func (ams *AsyncMessageSink) Flush() error { for { select { - case <-ctx.Done(): + case <-ams.ctx.Done(): if atomic.LoadUint64(&ams.msgs) > atomic.LoadUint64(&ams.acks) { return fmt.Errorf("incomplete flush: %d left to ack", atomic.LoadUint64(&ams.msgs)-atomic.LoadUint64(&ams.acks)) } diff --git a/flush/sink_test.go b/flush/sink_test.go index d074e91..605f1d5 100644 --- a/flush/sink_test.go +++ b/flush/sink_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -43,7 +44,7 @@ func TestAsyncMessageSinkSuccess(t *testing.T) { } var ackedMessages string - sink := flush.NewAsyncMessageSink(mock, flush.WithAckFunc(func(msg substrate.Message) error { + sink := flush.NewAsyncMessageSink(ctx, mock, flush.WithAckFunc(func(msg substrate.Message) error { ackedMessages += string(msg.Data()) return nil })) @@ -51,12 +52,12 @@ func TestAsyncMessageSinkSuccess(t *testing.T) { var wg sync.WaitGroup wg.Add(messages) - go sink.Run(ctx) + go sink.Run() for i := 0; i < messages; i++ { go func(i int) { defer wg.Done() - sink.PublishMessage([]byte(string('A' + i))) + sink.PublishMessage(ctx, []byte(string('A'+i))) }(i) } @@ -66,7 +67,7 @@ func TestAsyncMessageSinkSuccess(t *testing.T) { // processing is complete, and we want to test the `ackedMessages` value *only* // once processing is complete, we must call `sink.Flush` first excplicitly - // only then we can safely test `ackedMessages`. - err := sink.Flush(ctx) + err := sink.Flush() if err != nil { t.Fatal(err) } @@ -97,7 +98,7 @@ func TestAsyncMessageSinkInterruption(t *testing.T) { } var ackedMessages int - sink := flush.NewAsyncMessageSink(mock, flush.WithAckFunc(func(msg substrate.Message) error { + sink := flush.NewAsyncMessageSink(ctx, mock, flush.WithAckFunc(func(msg substrate.Message) error { time.Sleep(1 * time.Second) // simulating doing some work ackedMessages++ return nil @@ -106,12 +107,12 @@ func TestAsyncMessageSinkInterruption(t *testing.T) { var wg sync.WaitGroup wg.Add(messages) - go sink.Run(ctx) + go sink.Run() for i := 0; i < messages; i++ { go func(i int) { defer wg.Done() - sink.PublishMessage([]byte(string('A' + i))) + sink.PublishMessage(ctx, []byte(string('A'+i))) }(i) } @@ -124,7 +125,7 @@ func TestAsyncMessageSinkInterruption(t *testing.T) { // processing is complete, and we want to test the `ackedMessages` value *only* // once processing is complete, we must call `sink.Flush` first excplicitly - // only then we can safely test `ackedMessages`. - err := sink.Flush(ctx) + err := sink.Flush() if err == nil { t.Fatal("expected an error") } @@ -146,18 +147,87 @@ func TestAsyncMessageSinkAckError(t *testing.T) { }, } - sink := flush.NewAsyncMessageSink(mock, flush.WithAckFunc(func(msg substrate.Message) error { + sink := flush.NewAsyncMessageSink(ctx, mock, flush.WithAckFunc(func(msg substrate.Message) error { return fmt.Errorf("error: %s", msg.Data()) })) - sink.PublishMessage([]byte("dummy-message")) + sink.PublishMessage(ctx, []byte("dummy-message")) - err := sink.Run(ctx) + err := sink.Run() if err == nil { t.Fatal("expected an error") } } +func TestAsyncMessageSinkPublishError(t *testing.T) { + mock := asyncMessageSinkMock{ + publishMessageMock: func(ctx context.Context, acks chan<- substrate.Message, msgs <-chan substrate.Message) error { + for { + select { + case <-ctx.Done(): + return nil + case msg := <-msgs: + acks <- msg + } + } + }, + } + + var acks uint32 + sink := flush.NewAsyncMessageSink(context.TODO(), mock, flush.WithAckFunc(func(msg substrate.Message) error { + atomic.AddUint32(&acks, 1) + return nil + })) + + go sink.Run() + + reqCtx, reqCancel := context.WithCancel(context.TODO()) + reqCancel() + + errCanceled := sink.PublishMessage(reqCtx, []byte("dummy-message")) + + err := sink.PublishMessage(context.TODO(), []byte("dummy-message")) + if err != nil { + t.Fatal(err) + } + + sink.Flush() + + a := atomic.LoadUint32(&acks) + if (a == 1 && errCanceled == nil) && a != 2 { + t.Fatalf("expected either 2 acks, or 1 ack and 1 error: got %d and %v", a, errCanceled) + } +} + +func TestAsyncMessageSinkEarlyCloseProducesPanic(t *testing.T) { + ctx := context.TODO() + + mock := asyncMessageSinkMock{ + publishMessageMock: func(ctx context.Context, acks chan<- substrate.Message, msgs <-chan substrate.Message) error { + for { + select { + case <-ctx.Done(): + return nil + case msg := <-msgs: + acks <- msg + } + } + }, + } + + sink := flush.NewAsyncMessageSink(ctx, mock) + sink.Close() + + defer func() { + if r := recover(); r == nil { + t.Fatal("expected a panic") + } + }() + + go sink.Run() + sink.PublishMessage(ctx, []byte("dummy-message")) // send to a closed channel +} + func BenchmarkAsyncMessageSink_1_50(b *testing.B) { for n := 0; n < b.N; n++ { benchmarkBufferSizes(b, 1, 50) @@ -205,7 +275,7 @@ func benchmarkBufferSizes(b *testing.B, msgBufferSize, ackBufferSize int) { } var ackedMessages string - sink := flush.NewAsyncMessageSink(mock, flush.WithAckFunc(func(msg substrate.Message) error { + sink := flush.NewAsyncMessageSink(ctx, mock, flush.WithAckFunc(func(msg substrate.Message) error { ackedMessages += string(msg.Data()) return nil }), flush.WithMsgBufferSize(msgBufferSize), flush.WithAckBufferSize(ackBufferSize)) @@ -213,12 +283,12 @@ func benchmarkBufferSizes(b *testing.B, msgBufferSize, ackBufferSize int) { var wg sync.WaitGroup wg.Add(messages) - go sink.Run(ctx) + go sink.Run() for i := 0; i < messages; i++ { go func(i int) { defer wg.Done() - sink.PublishMessage([]byte(string('A' + i))) + sink.PublishMessage(ctx, []byte(string('A'+i))) }(i) } @@ -228,7 +298,7 @@ func benchmarkBufferSizes(b *testing.B, msgBufferSize, ackBufferSize int) { // processing is complete, and we want to test the `ackedMessages` value *only* // once processing is complete, we must call `sink.Flush` first excplicitly - // only then we can safely test `ackedMessages`. - err := sink.Flush(ctx) + err := sink.Flush() if err != nil { b.Fatal(err) } From a9cc2c27cc4a9af67e48eae633833d6b61257919 Mon Sep 17 00:00:00 2001 From: Mike Kaperys Date: Thu, 16 Jul 2020 11:48:47 +0100 Subject: [PATCH 5/8] simplify acking goroutine --- flush/sink.go | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/flush/sink.go b/flush/sink.go index def24f5..e583d44 100644 --- a/flush/sink.go +++ b/flush/sink.go @@ -94,29 +94,23 @@ func (ams *AsyncMessageSink) Run() error { ams.wait = group.Wait group.Go(func() error { + defer close(ams.ackCh) return ams.sink.PublishMessages(groupctx, ams.ackCh, ams.msgCh) }) group.Go(func() error { - for { - select { - case <-groupctx.Done(): - return nil - case msg, ok := <-ams.ackCh: - if !ok { - return nil + for msg := range ams.ackCh { + if ams.ackFn != nil { + err := ams.ackFn(msg) + if err != nil { + return err } - - if ams.ackFn != nil { - err := ams.ackFn(msg) - if err != nil { - return err - } - } - - atomic.AddUint64(&ams.acks, 1) } + + atomic.AddUint64(&ams.acks, 1) } + + return nil }) return ams.wait() From 47c3717e53cce2197429401c0f7e1fe92d97eb2c Mon Sep 17 00:00:00 2001 From: Mike Kaperys Date: Thu, 16 Jul 2020 13:59:35 +0100 Subject: [PATCH 6/8] PublishMessages returns substrate.ErrSinkAlreadyClosed --- flush/sink.go | 8 +++++++- flush/sink_test.go | 14 ++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/flush/sink.go b/flush/sink.go index e583d44..eb2b9ec 100644 --- a/flush/sink.go +++ b/flush/sink.go @@ -120,9 +120,15 @@ func (ams *AsyncMessageSink) Run() error { // called concurrently by the user. The ctx passed to PublishMessage controls only the publishing // of the message and is a seperate concern to the constructor context. func (ams *AsyncMessageSink) PublishMessage(ctx context.Context, msg []byte) error { + if ams.ctx.Err() != nil { + return substrate.ErrSinkAlreadyClosed + } + select { + case <-ams.ctx.Done(): + return substrate.ErrSinkAlreadyClosed case <-ctx.Done(): - return ctx.Err() + return substrate.ErrSinkAlreadyClosed case ams.msgCh <- message.NewMessage(msg): atomic.AddUint64(&ams.msgs, 1) } diff --git a/flush/sink_test.go b/flush/sink_test.go index 605f1d5..9b08470 100644 --- a/flush/sink_test.go +++ b/flush/sink_test.go @@ -2,6 +2,7 @@ package flush_test import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -199,7 +200,7 @@ func TestAsyncMessageSinkPublishError(t *testing.T) { } } -func TestAsyncMessageSinkEarlyCloseProducesPanic(t *testing.T) { +func TestAsyncMessageSinkEarlyCloseErrors(t *testing.T) { ctx := context.TODO() mock := asyncMessageSinkMock{ @@ -218,14 +219,11 @@ func TestAsyncMessageSinkEarlyCloseProducesPanic(t *testing.T) { sink := flush.NewAsyncMessageSink(ctx, mock) sink.Close() - defer func() { - if r := recover(); r == nil { - t.Fatal("expected a panic") - } - }() - go sink.Run() - sink.PublishMessage(ctx, []byte("dummy-message")) // send to a closed channel + err := sink.PublishMessage(ctx, []byte("dummy-message")) // send to a closed channel + if err == nil || (err != nil && !errors.Is(err, substrate.ErrSinkAlreadyClosed)) { + t.Fatal("expected a substrate.ErrSinkAlreadyClosed error") + } } func BenchmarkAsyncMessageSink_1_50(b *testing.B) { From 1743c2c538879aed90fcf0eaa106adb47df22053 Mon Sep 17 00:00:00 2001 From: Mike Kaperys Date: Thu, 16 Jul 2020 14:00:44 +0100 Subject: [PATCH 7/8] don't close msgCh --- flush/sink.go | 1 - 1 file changed, 1 deletion(-) diff --git a/flush/sink.go b/flush/sink.go index eb2b9ec..c5790a5 100644 --- a/flush/sink.go +++ b/flush/sink.go @@ -145,7 +145,6 @@ func (ams *AsyncMessageSink) Close() error { ams.wait() } - close(ams.msgCh) return ams.sink.Close() } From ba2007a45450ebf620782076f3a781a8fc12a63a Mon Sep 17 00:00:00 2001 From: Mike Kaperys Date: Thu, 16 Jul 2020 14:09:29 +0100 Subject: [PATCH 8/8] return ctx.Error() for PublishMessage context --- README.md | 2 +- examples/flush/sink.go | 2 +- flush/sink.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 3c9cb27..7c2685d 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Is a message flushing wrapper which blocks until all produced messages have been ```go asyncSink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{ Brokers: []string{"localhost:9092"}, - Topic: "example-stratesub-topic", + Topic: "example-topic", Version: "2.0.1", }) if err != nil { diff --git a/examples/flush/sink.go b/examples/flush/sink.go index e3b6988..4e47d21 100644 --- a/examples/flush/sink.go +++ b/examples/flush/sink.go @@ -13,7 +13,7 @@ import ( func main() { asyncSink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{ Brokers: []string{"localhost:9092"}, - Topic: "example-stratesub-topic", + Topic: "example-topic", Version: "2.0.1", }) if err != nil { diff --git a/flush/sink.go b/flush/sink.go index c5790a5..5826d53 100644 --- a/flush/sink.go +++ b/flush/sink.go @@ -128,7 +128,7 @@ func (ams *AsyncMessageSink) PublishMessage(ctx context.Context, msg []byte) err case <-ams.ctx.Done(): return substrate.ErrSinkAlreadyClosed case <-ctx.Done(): - return substrate.ErrSinkAlreadyClosed + return ctx.Err() case ams.msgCh <- message.NewMessage(msg): atomic.AddUint64(&ams.msgs, 1) }