diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 15ac9d7..d828a01 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -71,7 +71,7 @@ jobs: run: docker-compose up -d - name: Code Coverage - run: go test ./... -timeout 900s -race -count=1 -covermode=atomic -coverprofile=coverage.txt + run: go test -timeout 900s -race -count=1 -covermode=atomic -coverprofile=coverage.txt ./... - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/amqpx_test.go b/amqpx_test.go index 68061b8..c8e541f 100644 --- a/amqpx_test.go +++ b/amqpx_test.go @@ -677,6 +677,155 @@ func testBatchHandlerPauseAndResume(t *testing.T) { assertActive(t, handler01, false) } +func TestQueueDeletedConsumerReconnect(t *testing.T) { + queueName := "TestQueueDeletedConsumerReconnect-01" + var err error + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGINT) + defer cancel() + + log := logging.NewTestLogger(t) + defer func() { + assert.NoError(t, amqpx.Reset()) + }() + + ts, closer := newTransientSession(t, connectURL) + defer closer() + + // step 1 - fill queue with messages + amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error { + _, err := t.QueueDeclare(queueName) + if err != nil { + return err + } + return nil + }) + amqpx.RegisterTopologyDeleter(func(t *pool.Topologer) error { + _, err := t.QueueDelete(queueName) + if err != nil { + return err + } + return nil + }) + + h := amqpx.RegisterHandler(queueName, func(msg pool.Delivery) (err error) { + return nil + }) + + assertActive(t, h, false) + + err = amqpx.Start( + connectURL, + amqpx.WithLogger(log), + ) + if err != nil { + assert.NoError(t, err) + return + } + + assert.NoError(t, h.Pause(context.Background())) + assertActive(t, h, false) + + _, err = ts.QueueDelete(queueName) + assert.NoError(t, err) + + tctx, cancel := context.WithTimeout(ctx, 5*time.Second) + err = h.Resume(tctx) + cancel() + assert.Error(t, err) + assertActive(t, h, false) + + _, err = ts.QueueDeclare(queueName) + assert.NoError(t, err) + + tctx, cancel = context.WithTimeout(ctx, 5*time.Second) + err = h.Resume(tctx) + cancel() + assert.NoError(t, err) + assertActive(t, h, true) +} + +func TestQueueDeletedBatchConsumerReconnect(t *testing.T) { + queueName := "TestQueueDeletedBatchConsumerReconnect-01" + var err error + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGINT) + defer cancel() + + log := logging.NewTestLogger(t) + defer func() { + assert.NoError(t, amqpx.Reset()) + }() + + ts, closer := newTransientSession(t, connectURL) + defer closer() + + // step 1 - fill queue with messages + amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error { + _, err := t.QueueDeclare(queueName) + if err != nil { + return err + } + return nil + }) + amqpx.RegisterTopologyDeleter(func(t *pool.Topologer) error { + _, err := t.QueueDelete(queueName) + if err != nil { + return err + } + return nil + }) + + h := amqpx.RegisterBatchHandler(queueName, func(msg []pool.Delivery) (err error) { + return nil + }) + + assertActive(t, h, false) + + err = amqpx.Start( + connectURL, + amqpx.WithLogger(log), + ) + if err != nil { + assert.NoError(t, err) + return + } + + assert.NoError(t, h.Pause(context.Background())) + assertActive(t, h, false) + + _, err = ts.QueueDelete(queueName) + assert.NoError(t, err) + + tctx, cancel := context.WithTimeout(ctx, 5*time.Second) + err = h.Resume(tctx) + cancel() + assert.Error(t, err) + assertActive(t, h, false) + + _, err = ts.QueueDeclare(queueName) + assert.NoError(t, err) + + tctx, cancel = context.WithTimeout(ctx, 5*time.Second) + err = h.Resume(tctx) + cancel() + assert.NoError(t, err) + assertActive(t, h, true) +} + +func newTransientSession(t *testing.T, connectUrl string) (session *pool.Session, closer func()) { + p, err := pool.New(connectUrl, 1, 1, pool.WithLogger(logging.NewTestLogger(t))) + require.NoError(t, err) + + s, err := p.GetSession() + require.NoError(t, err) + + return s, func() { + err = s.Close() + assert.NoError(t, err) + p.Close() + assert.NoError(t, err) + } +} + type handlerStats interface { Queue() string IsActive(ctx context.Context) (active bool, err error) diff --git a/pool/subscriber.go b/pool/subscriber.go index b1de22c..50e7d98 100644 --- a/pool/subscriber.go +++ b/pool/subscriber.go @@ -214,6 +214,56 @@ func (s *Subscriber) Start() (err error) { return nil } +func (s *Subscriber) retry(consumerType string, h handler, f func() (err error)) { + var ( + backoff = newDefaultBackoffPolicy(1*time.Millisecond, 5*time.Second) + retry = 0 + err error + timer = time.NewTimer(0) + drained = false + ) + defer closeTimer(timer, &drained) + + for { + opts := h.QueueConfig() + + err = f() + // only return if closed + if errors.Is(err, ErrClosed) { + return + } + // consume was canceled due to context being closed (paused) + if errors.Is(err, context.Canceled) { + s.infoConsumer(opts.ConsumerTag, fmt.Sprintf("%s paused: pausing %v", consumerType, err)) + continue + } + + if errors.Is(err, ErrDeliveryClosed) { + select { + case <-h.pausing().Done(): + s.infoConsumer(opts.ConsumerTag, fmt.Sprintf("%s paused: %v", consumerType, err)) + continue + default: + // not paused + } + } + + s.error(opts.ConsumerTag, opts.Queue, err, fmt.Sprintf("%s closed unexpectedly: %v", consumerType, err)) + + retry++ + resetTimer(timer, backoff(retry), &drained) + + select { + case <-s.catchShutdown(): + return + case <-timer.C: + // at this point we know that the timer channel has been drained + drained = true + continue + } + } +} + func (s *Subscriber) consumer(h *Handler, wg *sync.WaitGroup) { defer wg.Done() defer h.close() @@ -227,17 +277,14 @@ func (s *Subscriber) consumer(h *Handler, wg *sync.WaitGroup) { return } - for { + s.retry("consumer", h, func() error { select { case <-s.catchShutdown(): - return + return ErrClosed case <-h.resuming().Done(): - err = s.consume(h) - if errors.Is(err, ErrClosed) { - return - } + return s.consume(h) } - } + }) } func (s *Subscriber) consume(h *Handler) (err error) { @@ -346,17 +393,14 @@ func (s *Subscriber) batchConsumer(h *BatchHandler, wg *sync.WaitGroup) { return } - for { + s.retry("batch consumer", h, func() error { select { case <-s.catchShutdown(): - return + return ErrClosed case <-h.resuming().Done(): - err = s.batchConsume(h) - if errors.Is(err, ErrClosed) { - return - } + return s.batchConsume(h) } - } + }) } func (s *Subscriber) batchConsume(h *BatchHandler) (err error) { @@ -510,13 +554,12 @@ func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerConfig, lastDeliveryTag } type handler interface { - ConsumeOptions() ConsumeOptions - Queue() string + QueueConfig() QueueConfig pausing() doner } func (s *Subscriber) returnSession(h handler, session *Session, err error) { - opts := h.ConsumeOptions() + opts := h.QueueConfig() if errors.Is(err, ErrClosed) { // graceful shutdown diff --git a/pool/subscriber_batch_handler.go b/pool/subscriber_batch_handler.go index 52c59bf..374bd68 100644 --- a/pool/subscriber_batch_handler.go +++ b/pool/subscriber_batch_handler.go @@ -95,6 +95,16 @@ func (h *BatchHandler) Config() BatchHandlerConfig { return h.configUnguarded() } +func (h *BatchHandler) QueueConfig() QueueConfig { + h.mu.Lock() + defer h.mu.Unlock() + + return QueueConfig{ + Queue: h.queue, + ConsumeOptions: h.consumeOpts, + } +} + func (h *BatchHandler) configUnguarded() BatchHandlerConfig { return BatchHandlerConfig{ Queue: h.queue, diff --git a/pool/subscriber_handler.go b/pool/subscriber_handler.go index e903d48..35ab51f 100644 --- a/pool/subscriber_handler.go +++ b/pool/subscriber_handler.go @@ -87,6 +87,16 @@ func (h *Handler) Config() HandlerConfig { return h.configUnguarded() } +func (h *Handler) QueueConfig() QueueConfig { + h.mu.Lock() + defer h.mu.Unlock() + + return QueueConfig{ + Queue: h.queue, + ConsumeOptions: h.consumeOpts, + } +} + func (h *Handler) configUnguarded() HandlerConfig { return HandlerConfig{ Queue: h.queue, @@ -171,3 +181,10 @@ func (h *Handler) SetConsumeOptions(consumeOpts ConsumeOptions) { defer h.mu.Unlock() h.consumeOpts = consumeOpts } + +// QueueConfig is a read only snapshot of the current handler's queue configuration. +// It is the common configuration between the handler and the batch handler. +type QueueConfig struct { + Queue string + ConsumeOptions +}