From 42ea6db1b2e6368731bd56af8f868baa9e25ecb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E5=BB=BA=E5=8D=8E?= Date: Mon, 27 Nov 2023 11:56:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96mq=E5=8F=91=E9=80=81?= =?UTF-8?q?=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/mq/rabbitmq/consumer.go | 4 ++-- component/mq/rabbitmq/producer.go | 20 ++++++++++++++++---- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/component/mq/rabbitmq/consumer.go b/component/mq/rabbitmq/consumer.go index b611a41..46d1ef8 100644 --- a/component/mq/rabbitmq/consumer.go +++ b/component/mq/rabbitmq/consumer.go @@ -68,12 +68,12 @@ type MsgHandler func(delivery amqp.Delivery) // Watch 监听连接断开, 然后重连. func (c *Consumer) Watch() { - oldConn := c.conn - oldCh := c.ch watchConsumerLoop: for { select { case <-c.closeNotifyChan: + oldConn := c.conn + oldCh := c.ch if err := c.setup(); err != nil { c.conf.Logger.Errorf("rabbitmq consumer reconnect failed, err: %s", err) time.Sleep(enums.ThreeSecD) diff --git a/component/mq/rabbitmq/producer.go b/component/mq/rabbitmq/producer.go index a88a75a..312d42a 100644 --- a/component/mq/rabbitmq/producer.go +++ b/component/mq/rabbitmq/producer.go @@ -53,13 +53,12 @@ func NewProducer(conf ProducerConf) RabbitProducer { // Watch 监听连接断开, 然后重连. func (p *Producer) Watch() { - oldConn := p.conn - oldCh := p.ch - watchProducerLoop: for { select { case <-p.closeNotifyChan: + oldConn := p.conn + oldCh := p.ch if err := p.Setup(); err != nil { p.conf.Logger.Errorf("rabbitmq producer reconnect failed, err: %s", err) time.Sleep(enums.FiveSecD) @@ -121,6 +120,19 @@ type Msg struct { Delay time.Duration } +func (p *Producer) getChannel() *amqp.Channel { + if p.ch.IsClosed() { + if ch, channelErr := p.conn.Channel(); channelErr != nil { + p.conf.Logger.Error("reconnect channel err, channelErr=", channelErr) + } else { + p.ch = ch + p.conf.Logger.Info("reconnect channel success") + } + } + + return p.ch +} + // SendMsg 发送消息. func (p *Producer) SendMsg(ctx context.Context, msg *Msg) error { body := msg.Body @@ -135,7 +147,7 @@ func (p *Producer) SendMsg(ctx context.Context, msg *Msg) error { headers = amqp.Table{"x-delay": delay.Milliseconds()} // x-delay 消息延时的时间(毫秒) } - err := p.ch.PublishWithContext( + err := p.getChannel().PublishWithContext( ctx, p.conf.Exchange.Name, key,