Skip to content

Commit

Permalink
Merge pull request #37 from jxsl13/feature-pause-rework
Browse files Browse the repository at this point in the history
fix edge case when queue is deleted while the consumer is paused
  • Loading branch information
jxsl13 authored Nov 17, 2023
2 parents 459aecd + cb618fb commit 05c96ef
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
run: docker-compose up -d

- name: Code Coverage
run: go test ./... -timeout 900s -race -count=1 -covermode=atomic -coverprofile=coverage.txt
run: go test -timeout 900s -race -count=1 -covermode=atomic -coverprofile=coverage.txt ./...

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
Expand Down
149 changes: 149 additions & 0 deletions amqpx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,155 @@ func testBatchHandlerPauseAndResume(t *testing.T) {
assertActive(t, handler01, false)
}

func TestQueueDeletedConsumerReconnect(t *testing.T) {
queueName := "TestQueueDeletedConsumerReconnect-01"
var err error
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGINT)
defer cancel()

log := logging.NewTestLogger(t)
defer func() {
assert.NoError(t, amqpx.Reset())
}()

ts, closer := newTransientSession(t, connectURL)
defer closer()

// step 1 - fill queue with messages
amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error {
_, err := t.QueueDeclare(queueName)
if err != nil {
return err
}
return nil
})
amqpx.RegisterTopologyDeleter(func(t *pool.Topologer) error {
_, err := t.QueueDelete(queueName)
if err != nil {
return err
}
return nil
})

h := amqpx.RegisterHandler(queueName, func(msg pool.Delivery) (err error) {
return nil
})

assertActive(t, h, false)

err = amqpx.Start(
connectURL,
amqpx.WithLogger(log),
)
if err != nil {
assert.NoError(t, err)
return
}

assert.NoError(t, h.Pause(context.Background()))
assertActive(t, h, false)

_, err = ts.QueueDelete(queueName)
assert.NoError(t, err)

tctx, cancel := context.WithTimeout(ctx, 5*time.Second)
err = h.Resume(tctx)
cancel()
assert.Error(t, err)
assertActive(t, h, false)

_, err = ts.QueueDeclare(queueName)
assert.NoError(t, err)

tctx, cancel = context.WithTimeout(ctx, 5*time.Second)
err = h.Resume(tctx)
cancel()
assert.NoError(t, err)
assertActive(t, h, true)
}

func TestQueueDeletedBatchConsumerReconnect(t *testing.T) {
queueName := "TestQueueDeletedBatchConsumerReconnect-01"
var err error
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGINT)
defer cancel()

log := logging.NewTestLogger(t)
defer func() {
assert.NoError(t, amqpx.Reset())
}()

ts, closer := newTransientSession(t, connectURL)
defer closer()

// step 1 - fill queue with messages
amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error {
_, err := t.QueueDeclare(queueName)
if err != nil {
return err
}
return nil
})
amqpx.RegisterTopologyDeleter(func(t *pool.Topologer) error {
_, err := t.QueueDelete(queueName)
if err != nil {
return err
}
return nil
})

h := amqpx.RegisterBatchHandler(queueName, func(msg []pool.Delivery) (err error) {
return nil
})

assertActive(t, h, false)

err = amqpx.Start(
connectURL,
amqpx.WithLogger(log),
)
if err != nil {
assert.NoError(t, err)
return
}

assert.NoError(t, h.Pause(context.Background()))
assertActive(t, h, false)

_, err = ts.QueueDelete(queueName)
assert.NoError(t, err)

tctx, cancel := context.WithTimeout(ctx, 5*time.Second)
err = h.Resume(tctx)
cancel()
assert.Error(t, err)
assertActive(t, h, false)

_, err = ts.QueueDeclare(queueName)
assert.NoError(t, err)

tctx, cancel = context.WithTimeout(ctx, 5*time.Second)
err = h.Resume(tctx)
cancel()
assert.NoError(t, err)
assertActive(t, h, true)
}

func newTransientSession(t *testing.T, connectUrl string) (session *pool.Session, closer func()) {
p, err := pool.New(connectUrl, 1, 1, pool.WithLogger(logging.NewTestLogger(t)))
require.NoError(t, err)

s, err := p.GetSession()
require.NoError(t, err)

return s, func() {
err = s.Close()
assert.NoError(t, err)
p.Close()
assert.NoError(t, err)
}
}

type handlerStats interface {
Queue() string
IsActive(ctx context.Context) (active bool, err error)
Expand Down
77 changes: 60 additions & 17 deletions pool/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,56 @@ func (s *Subscriber) Start() (err error) {
return nil
}

func (s *Subscriber) retry(consumerType string, h handler, f func() (err error)) {
var (
backoff = newDefaultBackoffPolicy(1*time.Millisecond, 5*time.Second)
retry = 0
err error
timer = time.NewTimer(0)
drained = false
)
defer closeTimer(timer, &drained)

for {
opts := h.QueueConfig()

err = f()
// only return if closed
if errors.Is(err, ErrClosed) {
return
}
// consume was canceled due to context being closed (paused)
if errors.Is(err, context.Canceled) {
s.infoConsumer(opts.ConsumerTag, fmt.Sprintf("%s paused: pausing %v", consumerType, err))
continue
}

if errors.Is(err, ErrDeliveryClosed) {
select {
case <-h.pausing().Done():
s.infoConsumer(opts.ConsumerTag, fmt.Sprintf("%s paused: %v", consumerType, err))
continue
default:
// not paused
}
}

s.error(opts.ConsumerTag, opts.Queue, err, fmt.Sprintf("%s closed unexpectedly: %v", consumerType, err))

retry++
resetTimer(timer, backoff(retry), &drained)

select {
case <-s.catchShutdown():
return
case <-timer.C:
// at this point we know that the timer channel has been drained
drained = true
continue
}
}
}

func (s *Subscriber) consumer(h *Handler, wg *sync.WaitGroup) {
defer wg.Done()
defer h.close()
Expand All @@ -227,17 +277,14 @@ func (s *Subscriber) consumer(h *Handler, wg *sync.WaitGroup) {
return
}

for {
s.retry("consumer", h, func() error {
select {
case <-s.catchShutdown():
return
return ErrClosed
case <-h.resuming().Done():
err = s.consume(h)
if errors.Is(err, ErrClosed) {
return
}
return s.consume(h)
}
}
})
}

func (s *Subscriber) consume(h *Handler) (err error) {
Expand Down Expand Up @@ -346,17 +393,14 @@ func (s *Subscriber) batchConsumer(h *BatchHandler, wg *sync.WaitGroup) {
return
}

for {
s.retry("batch consumer", h, func() error {
select {
case <-s.catchShutdown():
return
return ErrClosed
case <-h.resuming().Done():
err = s.batchConsume(h)
if errors.Is(err, ErrClosed) {
return
}
return s.batchConsume(h)
}
}
})
}

func (s *Subscriber) batchConsume(h *BatchHandler) (err error) {
Expand Down Expand Up @@ -510,13 +554,12 @@ func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerConfig, lastDeliveryTag
}

type handler interface {
ConsumeOptions() ConsumeOptions
Queue() string
QueueConfig() QueueConfig
pausing() doner
}

func (s *Subscriber) returnSession(h handler, session *Session, err error) {
opts := h.ConsumeOptions()
opts := h.QueueConfig()

if errors.Is(err, ErrClosed) {
// graceful shutdown
Expand Down
10 changes: 10 additions & 0 deletions pool/subscriber_batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ func (h *BatchHandler) Config() BatchHandlerConfig {
return h.configUnguarded()
}

func (h *BatchHandler) QueueConfig() QueueConfig {
h.mu.Lock()
defer h.mu.Unlock()

return QueueConfig{
Queue: h.queue,
ConsumeOptions: h.consumeOpts,
}
}

func (h *BatchHandler) configUnguarded() BatchHandlerConfig {
return BatchHandlerConfig{
Queue: h.queue,
Expand Down
17 changes: 17 additions & 0 deletions pool/subscriber_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ func (h *Handler) Config() HandlerConfig {
return h.configUnguarded()
}

func (h *Handler) QueueConfig() QueueConfig {
h.mu.Lock()
defer h.mu.Unlock()

return QueueConfig{
Queue: h.queue,
ConsumeOptions: h.consumeOpts,
}
}

func (h *Handler) configUnguarded() HandlerConfig {
return HandlerConfig{
Queue: h.queue,
Expand Down Expand Up @@ -171,3 +181,10 @@ func (h *Handler) SetConsumeOptions(consumeOpts ConsumeOptions) {
defer h.mu.Unlock()
h.consumeOpts = consumeOpts
}

// QueueConfig is a read only snapshot of the current handler's queue configuration.
// It is the common configuration between the handler and the batch handler.
type QueueConfig struct {
Queue string
ConsumeOptions
}

0 comments on commit 05c96ef

Please sign in to comment.