Skip to content

Commit

Permalink
feat: 지연된 메시지 재전송 지원 (#3)
Browse files Browse the repository at this point in the history
* feat: nack delay 추가

* refactor: 이름 변경

* refactor: 이름 변경
  • Loading branch information
HHongSeungWoo authored May 16, 2024
1 parent aec4449 commit 9b17e1d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
26 changes: 23 additions & 3 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,34 @@ import (
"context"
"errors"
"math"
"time"

"github.com/cloudevents/sdk-go/v2/event"
)

const abortIndex int8 = math.MaxInt8 >> 1

type ResultNackWithRedeliveryDelay struct {
delay time.Duration
}

func (r ResultNackWithRedeliveryDelay) Error() string {
return "nack with delay"
}

func (r ResultNackWithRedeliveryDelay) Delay() time.Duration {
return r.delay
}

type Context struct {
engine *Engine

Request *Request

handlers HandlersChain
index int8
nack bool
handlers HandlersChain
index int8
nack bool
nackRedeliveryDelay time.Duration

Errors equeueErrors
}
Expand All @@ -26,6 +40,7 @@ func (c *Context) reset() {
c.handlers = nil
c.index = -1
c.nack = false
c.nackRedeliveryDelay = 0

c.Errors = c.Errors[:0]
}
Expand Down Expand Up @@ -64,6 +79,11 @@ func (c *Context) Nack() {
c.nack = true
}

func (c *Context) NackWithRedeliveryDelay(delay time.Duration) {
c.nack = true
c.nackRedeliveryDelay = delay
}

func (c *Context) IsNack() bool {
return c.nack
}
Expand Down
4 changes: 4 additions & 0 deletions driver/nats/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (n *natsReceiveMessage) Finish(err error) error {
if protocol.IsACK(err) {
return n.msg.Ack()
} else {
var nack *equeue.ResultNackWithRedeliveryDelay
if errors.As(err, &nack) {
return n.msg.NakWithDelay(nack.Delay())
}
return n.msg.Nak()
}
}
6 changes: 5 additions & 1 deletion equeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,11 @@ func (w *worker) run() {
c.Next()

if c.IsNack() {
w.message.Finish(protocol.ResultNACK)
if c.nackRedeliveryDelay > 0 {
w.message.Finish(&ResultNackWithRedeliveryDelay{delay: c.nackRedeliveryDelay})
} else {
w.message.Finish(protocol.ResultNACK)
}
} else {
w.message.Finish(protocol.ResultACK)
}
Expand Down

0 comments on commit 9b17e1d

Please sign in to comment.