Skip to content

Commit

Permalink
[WIP] refactor pause and resume for handler & batch handler
Browse files Browse the repository at this point in the history
  • Loading branch information
jxsl13 committed Nov 15, 2023
1 parent 16a8200 commit 6291ef5
Show file tree
Hide file tree
Showing 25 changed files with 1,250 additions and 718 deletions.
453 changes: 453 additions & 0 deletions DEBUG.md

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
environment:
docker-compose up -d

down:
docker-compose down

test:
go test -v -race -count=1 ./...
go test -v -race -count=1 ./...
24 changes: 14 additions & 10 deletions amqpx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package amqpx

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -210,14 +211,20 @@ func (a *AMQPX) Start(connectUrl string, options ...Option) (err error) {
if err != nil {
return
}
a.sub = pool.NewSubscriber(subPool, pool.SubscriberWithAutoClosePool(true))
a.sub = pool.NewSubscriber(subPool,
pool.SubscriberWitCloseTimeout(a.closeTimeout),
pool.SubscriberWithAutoClosePool(true),
)
for _, h := range a.handlers {
a.sub.RegisterHandler(h)
}
for _, bh := range a.batchHandlers {
a.sub.RegisterBatchHandler(bh)
}
a.sub.Start()
err = a.sub.Start()
if err != nil {
return
}
}
})
return err
Expand All @@ -233,15 +240,14 @@ func (a *AMQPX) close() (err error) {
a.closeOnce.Do(func() {

if a.sub != nil {
a.sub.Close()
err = errors.Join(err, a.sub.Close())
}

if a.pub != nil {
a.pub.Close()
}

if a.pubPool != nil {

if a.pubPool != nil && len(a.topologyDeleters) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), a.closeTimeout)
defer cancel()

Expand All @@ -251,13 +257,11 @@ func (a *AMQPX) close() (err error) {
pool.TopologerWithTransientSessions(true),
)
for _, f := range a.topologyDeleters {
e := f(topologer)
if e != nil {
err = e
return
}
err = errors.Join(err, f(topologer))
}
}

if a.pubPool != nil {
// finally close the publisher pool
// which is also used for topology.
a.pubPool.Close()
Expand Down
Loading

0 comments on commit 6291ef5

Please sign in to comment.