Skip to content

Commit

Permalink
feat: 优化mq发送消息
Browse files Browse the repository at this point in the history
  • Loading branch information
吴建华 committed Nov 27, 2023
1 parent 76d93b5 commit 42ea6db
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
4 changes: 2 additions & 2 deletions component/mq/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ type MsgHandler func(delivery amqp.Delivery)

// Watch 监听连接断开, 然后重连.
func (c *Consumer) Watch() {
oldConn := c.conn
oldCh := c.ch
watchConsumerLoop:
for {
select {
case <-c.closeNotifyChan:
oldConn := c.conn
oldCh := c.ch
if err := c.setup(); err != nil {
c.conf.Logger.Errorf("rabbitmq consumer reconnect failed, err: %s", err)
time.Sleep(enums.ThreeSecD)
Expand Down
20 changes: 16 additions & 4 deletions component/mq/rabbitmq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ func NewProducer(conf ProducerConf) RabbitProducer {

// Watch 监听连接断开, 然后重连.
func (p *Producer) Watch() {
oldConn := p.conn
oldCh := p.ch

watchProducerLoop:
for {
select {
case <-p.closeNotifyChan:
oldConn := p.conn
oldCh := p.ch
if err := p.Setup(); err != nil {
p.conf.Logger.Errorf("rabbitmq producer reconnect failed, err: %s", err)
time.Sleep(enums.FiveSecD)
Expand Down Expand Up @@ -121,6 +120,19 @@ type Msg struct {
Delay time.Duration
}

func (p *Producer) getChannel() *amqp.Channel {
if p.ch.IsClosed() {
if ch, channelErr := p.conn.Channel(); channelErr != nil {
p.conf.Logger.Error("reconnect channel err, channelErr=", channelErr)
} else {
p.ch = ch
p.conf.Logger.Info("reconnect channel success")
}
}

return p.ch
}

// SendMsg 发送消息.
func (p *Producer) SendMsg(ctx context.Context, msg *Msg) error {
body := msg.Body
Expand All @@ -135,7 +147,7 @@ func (p *Producer) SendMsg(ctx context.Context, msg *Msg) error {
headers = amqp.Table{"x-delay": delay.Milliseconds()} // x-delay 消息延时的时间(毫秒)
}

err := p.ch.PublishWithContext(
err := p.getChannel().PublishWithContext(
ctx,
p.conf.Exchange.Name,
key,
Expand Down

0 comments on commit 42ea6db

Please sign in to comment.