Skip to content

Commit

Permalink
feat: implement feature allowing tracking progress of events consumpt…
Browse files Browse the repository at this point in the history
…ion #124 (#125)

* Implement feature allowing tracking progress of events consuption #124

Signed-off-by: tomasz.ziolkowski <[email protected]>

* fix typo, remove debug print

---------

Signed-off-by: tomasz.ziolkowski <[email protected]>
Co-authored-by: tomasz.ziolkowski <[email protected]>
  • Loading branch information
erayarslan and ziollek authored Feb 13, 2025
1 parent 44d5bd5 commit a2c8c59
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
49 changes: 40 additions & 9 deletions dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type dcp struct {
version *couchbase.Version
bucketInfo *couchbase.BucketInfo
healthCheck couchbase.HealthCheck
listener models.Listener
consumer models.Consumer
readyCh chan struct{}
cancelCh chan os.Signal
stopCh chan struct{}
Expand Down Expand Up @@ -125,7 +125,7 @@ func (s *dcp) Start() {

s.stream = stream.NewStream(
s.client, s.metadata, s.config, s.version, s.bucketInfo, s.vBucketDiscovery,
s.listener, collectionIDs, s.stopCh, s.bus, s.eventHandler,
s.consumer, collectionIDs, s.stopCh, s.bus, s.eventHandler,
tc,
)

Expand Down Expand Up @@ -245,7 +245,7 @@ func (s *dcp) GetVersion() *couchbase.Version {
return s.version
}

func newDcp(config *config.Dcp, listener models.Listener) (Dcp, error) {
func newDcp(config *config.Dcp, consumer models.Consumer) (Dcp, error) {
config.ApplyDefaults()
copyOfConfig := config
printConfiguration(*copyOfConfig)
Expand Down Expand Up @@ -292,7 +292,7 @@ func newDcp(config *config.Dcp, listener models.Listener) (Dcp, error) {

return &dcp{
client: client,
listener: listener,
consumer: consumer,
config: config,
version: version,
bucketInfo: bucketInfo,
Expand All @@ -306,29 +306,60 @@ func newDcp(config *config.Dcp, listener models.Listener) (Dcp, error) {
}, nil
}

type simplifiedConsumer struct {
listener models.Listener
}

func NewSimpleConsumer(listener models.Listener) models.Consumer {
return &simplifiedConsumer{listener: listener}
}

func (s *simplifiedConsumer) ConsumeEvent(ctx *models.ListenerContext) {
s.listener(ctx)
}

func (s *simplifiedConsumer) TrackOffset(vbID uint16, offset *models.Offset) {}

// NewExtendedDcp creates a new Dcp client
//
// config: path to a configuration file or a configuration struct
// consumer must implement models.Consumer interface containing both ConsumeEvent and TrackOffset methods
func NewExtendedDcp(cfg any, consumer models.Consumer) (Dcp, error) {
switch v := cfg.(type) {
case *config.Dcp:
return newDcp(v, consumer)
case config.Dcp:
return newDcp(&v, consumer)
case string:
return newDcpWithPath(v, consumer)
default:
return nil, errors.New("invalid config")
}
}

// NewDcp creates a new Dcp client
//
// config: path to a configuration file or a configuration struct
// listener is a callback function that will be called when a mutation, deletion or expiration event occurs
func NewDcp(cfg any, listener models.Listener) (Dcp, error) {
switch v := cfg.(type) {
case *config.Dcp:
return newDcp(v, listener)
return newDcp(v, NewSimpleConsumer(listener))
case config.Dcp:
return newDcp(&v, listener)
return newDcp(&v, NewSimpleConsumer(listener))
case string:
return newDcpWithPath(v, listener)
return newDcpWithPath(v, NewSimpleConsumer(listener))
default:
return nil, errors.New("invalid config")
}
}

func newDcpWithPath(path string, listener models.Listener) (Dcp, error) {
func newDcpWithPath(path string, consumer models.Consumer) (Dcp, error) {
c, err := newDcpConfig(path)
if err != nil {
return nil, err
}
return newDcp(&c, listener)
return newDcp(&c, consumer)
}

func newDcpConfig(path string) (config.Dcp, error) {
Expand Down
5 changes: 5 additions & 0 deletions models/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ type (
ListenerCh chan ListenerArgs
ListenerEndCh chan DcpStreamEndContext
)

type Consumer interface {
ConsumeEvent(ctx *ListenerContext)
TrackOffset(vbID uint16, offset *Offset)
}
9 changes: 5 additions & 4 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type stream struct {
vbIDRange *models.VbIDRange
dirtyOffsets *wrapper.ConcurrentSwissMap[uint16, bool]
stopCh chan struct{}
listener models.Listener
consumer models.Consumer
bucketInfo *couchbase.BucketInfo
finishStreamWithEndEventCh chan struct{}
finishStreamWithCloseCh chan struct{}
Expand Down Expand Up @@ -94,6 +94,7 @@ func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) {
return
}
s.offsets.Store(vbID, offset)
s.consumer.TrackOffset(vbID, offset)
if !dirty {
return
}
Expand Down Expand Up @@ -136,7 +137,7 @@ func (s *stream) waitAndForward(

start := time.Now()

s.listener(ctx)
s.consumer.ConsumeEvent(ctx)

s.metric.ProcessLatency = time.Since(start).Milliseconds()
}
Expand Down Expand Up @@ -466,7 +467,7 @@ func NewStream(client couchbase.Client,
version *couchbase.Version,
bucketInfo *couchbase.BucketInfo,
vBucketDiscovery VBucketDiscovery,
listener models.Listener,
consumer models.Consumer,
collectionIDs map[uint32]string,
stopCh chan struct{},
bus EventBus.Bus,
Expand All @@ -476,7 +477,7 @@ func NewStream(client couchbase.Client,
stream := &stream{
client: client,
metadata: metadata,
listener: listener,
consumer: consumer,
config: config,
bucketInfo: bucketInfo,
vBucketDiscovery: vBucketDiscovery,
Expand Down

0 comments on commit a2c8c59

Please sign in to comment.