diff --git a/context.go b/context.go index a4c2355..c8d6821 100644 --- a/context.go +++ b/context.go @@ -4,20 +4,34 @@ import ( "context" "errors" "math" + "time" "github.com/cloudevents/sdk-go/v2/event" ) const abortIndex int8 = math.MaxInt8 >> 1 +type ResultNackWithRedeliveryDelay struct { + delay time.Duration +} + +func (r ResultNackWithRedeliveryDelay) Error() string { + return "nack with delay" +} + +func (r ResultNackWithRedeliveryDelay) Delay() time.Duration { + return r.delay +} + type Context struct { engine *Engine Request *Request - handlers HandlersChain - index int8 - nack bool + handlers HandlersChain + index int8 + nack bool + nackRedeliveryDelay time.Duration Errors equeueErrors } @@ -26,6 +40,7 @@ func (c *Context) reset() { c.handlers = nil c.index = -1 c.nack = false + c.nackRedeliveryDelay = 0 c.Errors = c.Errors[:0] } @@ -64,6 +79,11 @@ func (c *Context) Nack() { c.nack = true } +func (c *Context) NackWithRedeliveryDelay(delay time.Duration) { + c.nack = true + c.nackRedeliveryDelay = delay +} + func (c *Context) IsNack() bool { return c.nack } diff --git a/driver/nats/consumer.go b/driver/nats/consumer.go index 9edcf4c..9d6c9bf 100644 --- a/driver/nats/consumer.go +++ b/driver/nats/consumer.go @@ -95,6 +95,10 @@ func (n *natsReceiveMessage) Finish(err error) error { if protocol.IsACK(err) { return n.msg.Ack() } else { + var nack *equeue.ResultNackWithRedeliveryDelay + if errors.As(err, &nack) { + return n.msg.NakWithDelay(nack.Delay()) + } return n.msg.Nak() } } diff --git a/equeue.go b/equeue.go index 6994966..1ebec1c 100644 --- a/equeue.go +++ b/equeue.go @@ -322,7 +322,11 @@ func (w *worker) run() { c.Next() if c.IsNack() { - w.message.Finish(protocol.ResultNACK) + if c.nackRedeliveryDelay > 0 { + w.message.Finish(&ResultNackWithRedeliveryDelay{delay: c.nackRedeliveryDelay}) + } else { + w.message.Finish(protocol.ResultNACK) + } } else { w.message.Finish(protocol.ResultACK) }