Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implements flush.AsyncMessageSink #4

Merged
merged 8 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ Provides wrappers for both message source and message sink that add prometheus m
Is a message source wrapper that wraps any number of sources. It consumes messages from all of them and passes them on to the user.
It ensures that the acknowledgements are passed to the correct source.

### Flush
Is a message flushing wrapper which blocks until all produced messages have been acked by the user. In the scenario that the user performs an action only after a message has been produced, the flushing wrapper provides a guarantee that such an action is only performed on a successful sink.

## Other

### Message
Expand Down
61 changes: 61 additions & 0 deletions examples/flush/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/uw-labs/substrate"
"github.com/uw-labs/substrate-tools/flush"
"github.com/uw-labs/substrate/kafka"
)

const messages = 200

func main() {
asyncSink, err := kafka.NewAsyncMessageSink(kafka.AsyncMessageSinkConfig{
Brokers: []string{"localhost:9092"},
Topic: "example-stratesub-topic",
Version: "2.0.1",
})
if err != nil {
panic(err)
}

// `cancel` is used to terminate `sink.Run`. Timeout is used to prevent
// `sink.Flush` being able to infinitely block.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

sink := flush.NewAsyncMessageSink(asyncSink, flush.WithAckFunc(func(msg substrate.Message) error {
fmt.Println(string(msg.Data()))
return nil
}))
defer func() {
err := sink.Flush(ctx)
if err != nil {
panic(err)
}
}()

go func() {
err = sink.Run(ctx)
if err != nil {
panic(err)
}
}()

var wg sync.WaitGroup
wg.Add(messages)

for i := 0; i < messages; i++ {
go func(i int) {
defer wg.Done()

sink.PublishMessage([]byte(string('A' + i)))
}(i)
}

wg.Wait()
}
152 changes: 152 additions & 0 deletions flush/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Package flush provides a message flushing wrapper around `substrate.AsyncMessageSink`.
package flush

import (
"context"
"fmt"
"sync/atomic"

"github.com/uw-labs/substrate"
"github.com/uw-labs/substrate-tools/message"
"github.com/uw-labs/sync/rungroup"
)

const (
defaultMsgBufferSize = 50
defaultAckBufferSize = 100
)

// An AsyncMessageSinkOption is a function which sets an AsyncMessageSink configuration option.
type AsyncMessageSinkOption func(ams *AsyncMessageSink)

// WithAckFunc set the AckFunc callback function, which is called for each ack recieved. If the
// AckFunc returns an error, it will be returned by `Run`.
func WithAckFunc(ackFn AckFunc) AsyncMessageSinkOption {
return func(ams *AsyncMessageSink) {
ams.ackFn = ackFn
}
}

// WithMsgBufferSize sets the msg channel buffer size.
func WithMsgBufferSize(size int) AsyncMessageSinkOption {
return func(ams *AsyncMessageSink) {
ams.msgBufferSize = size
}
}

// WithAckBufferSize sets the ack channel buffer size.
func WithAckBufferSize(size int) AsyncMessageSinkOption {
return func(ams *AsyncMessageSink) {
ams.ackBufferSize = size
}
}

// AckFunc is a callback function executed by `Run` for each message succesfully produced. The ack
// counter is only incremented once this function returns. If no AckFunc is provided the ack counter
// is always incremented.
type AckFunc func(msg substrate.Message) error

// AsyncMessageSink wraps substrate.AsyncMessageSink and provides an interface for interaction with
// the underlying sink, as well as the capability to flush messages.
type AsyncMessageSink struct {
sink substrate.AsyncMessageSink
msgs uint64
msgBufferSize int
msgCh chan substrate.Message
acks uint64
ackBufferSize int
ackCh chan substrate.Message
ackFn AckFunc
}

// NewAsyncMessageSink returns a pointer a new AsyncMessageSink.
// See examples/flush/sink.go for example usage.
func NewAsyncMessageSink(sink substrate.AsyncMessageSink, opts ...AsyncMessageSinkOption) *AsyncMessageSink {
ams := &AsyncMessageSink{
sink: sink,
msgBufferSize: defaultMsgBufferSize,
ackBufferSize: defaultAckBufferSize,
}

for _, opt := range opts {
opt(ams)
}

ams.msgCh = make(chan substrate.Message, ams.msgBufferSize)
ams.ackCh = make(chan substrate.Message, ams.ackBufferSize)

return ams
}

// Run initialises message publishing using the underlying sink and blocks until either an error
// occurs or the context is done. If an AckFunc is configured, Run will execute it for each ack
// recieved. If an error is returned, the user should cancel the context to prevent `Flush` from
// blocking.
func (ams *AsyncMessageSink) Run(ctx context.Context) error {
group, groupctx := rungroup.New(ctx)

group.Go(func() error {
return ams.sink.PublishMessages(groupctx, ams.ackCh, ams.msgCh)
})

group.Go(func() error {
for {
select {
case <-groupctx.Done():
return nil
case msg := <-ams.ackCh:
if ams.ackFn != nil {
err := ams.ackFn(msg)
if err != nil {
return err
}
}

atomic.AddUint64(&ams.acks, 1)
}
}
})

return group.Wait()
}

// PublishMessage publishes a message to the underlying sink. PublishMessage is desgined to be
// called concurrently by the user.
func (ams *AsyncMessageSink) PublishMessage(msg []byte) {
atomic.AddUint64(&ams.msgs, 1)
ams.msgCh <- message.NewMessage(msg)
}

// Close closes the underlying sink and releases it's resources.
func (ams *AsyncMessageSink) Close() error {
return ams.sink.Close()
}

// Status calls the Status method on the underlying sink.
func (ams *AsyncMessageSink) Status() (*substrate.Status, error) {
return ams.sink.Status()
}

// Flush blocks until the AsyncMessageSink has consumed as many acks as messages produced or ctx
// is done. Flush returns an error if the context is cancelled before all messages produced have
// been acked.
func (ams *AsyncMessageSink) Flush(ctx context.Context) error {
defer ams.Close()

for {
select {
case <-ctx.Done():
if atomic.LoadUint64(&ams.msgs) > atomic.LoadUint64(&ams.acks) {
return fmt.Errorf("incomplete flush: %d left to ack", atomic.LoadUint64(&ams.msgs)-atomic.LoadUint64(&ams.acks))
}

return nil
default:
if atomic.LoadUint64(&ams.acks) != atomic.LoadUint64(&ams.msgs) {
continue
}

return nil
}
}
}
Loading