Skip to content

Commit

Permalink
enh: avoid goroutine leaks when subscription fails
Browse files Browse the repository at this point in the history
  • Loading branch information
huskar-t committed Nov 28, 2024
1 parent 8f35374 commit 7bd82f5
Showing 1 changed file with 37 additions and 29 deletions.
66 changes: 37 additions & 29 deletions controller/ws/tmq/tmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,74 +531,82 @@ func (t *TMQ) subscribe(ctx context.Context, session *melody.Session, req *TMQSu
wsTMQErrorMsg(ctx, session, logger, 0xffff, err.Error(), action, req.ReqID, nil)
return
}
topicList := wrapper.TMQListNew()
defer func() {
wrapper.TMQListDestroy(topicList)
}()
for _, topic := range req.Topics {
logger.Tracef("tmq append topic:%s", topic)
errCode = wrapper.TMQListAppend(topicList, topic)
if errCode != 0 {
errStr := wrapper.TMQErr2Str(errCode)
logger.Errorf("tmq list append error, tpic:%s, code:%d, msg:%s", topic, errCode, errStr)
t.closeConsumerWithErrLog(logger, isDebug, cPointer)
wsTMQErrorMsg(ctx, session, logger, int(errCode), errStr, action, req.ReqID, nil)
return
}
}

errCode = t.wrapperSubscribe(logger, isDebug, cPointer, topicList)
if errCode != 0 {
errStr := wrapper.TMQErr2Str(errCode)
logger.Errorf("tmq subscribe error:%d %s", errCode, errStr)
t.closeConsumerWithErrLog(logger, isDebug, cPointer)
wsTMQErrorMsg(ctx, session, logger, int(errCode), errStr, action, req.ReqID, nil)
return
}
conn := wrapper.TMQGetConnect(cPointer)
logger.Trace("get whitelist")
whitelist, err := tool.GetWhitelist(conn)
if err != nil {
logger.Errorf("get whitelist error:%s", err.Error())
t.wrapperCloseConsumer(logger, isDebug, cPointer)
t.closeConsumerWithErrLog(logger, isDebug, cPointer)
wstool.WSError(ctx, session, logger, err, action, req.ReqID)
return
}
logger.Tracef("check whitelist, ip:%s, whitelist:%s", t.ipStr, tool.IpNetSliceToString(whitelist))
valid := tool.CheckWhitelist(whitelist, t.ip)
if !valid {
logger.Errorf("whitelist prohibits current IP access, ip:%s, whitelist:%s", t.ipStr, tool.IpNetSliceToString(whitelist))
t.wrapperCloseConsumer(logger, isDebug, cPointer)
t.closeConsumerWithErrLog(logger, isDebug, cPointer)
wstool.WSErrorMsg(ctx, session, logger, 0xffff, "whitelist prohibits current IP access", action, req.ReqID)
return
}
logger.Trace("register change whitelist")
err = tool.RegisterChangeWhitelist(conn, t.whitelistChangeHandle)
if err != nil {
logger.Errorf("register change whitelist error:%s", err)
t.wrapperCloseConsumer(logger, isDebug, cPointer)
t.closeConsumerWithErrLog(logger, isDebug, cPointer)
wstool.WSError(ctx, session, logger, err, action, req.ReqID)
return
}
logger.Trace("register drop user")
err = tool.RegisterDropUser(conn, t.dropUserHandle)
if err != nil {
logger.Errorf("register drop user error:%s", err)
t.wrapperCloseConsumer(logger, isDebug, cPointer)
t.closeConsumerWithErrLog(logger, isDebug, cPointer)
wstool.WSError(ctx, session, logger, err, action, req.ReqID)
return
}
t.conn = conn
t.consumer = cPointer
logger.Trace("start to wait signal")
go t.waitSignal(t.logger)
topicList := wrapper.TMQListNew()
defer func() {
wrapper.TMQListDestroy(topicList)
}()
for _, topic := range req.Topics {
logger.Tracef("tmq append topic:%s", topic)
errCode = wrapper.TMQListAppend(topicList, topic)
if errCode != 0 {
errStr := wrapper.TMQErr2Str(errCode)
logger.Errorf("tmq list append error, tpic:%s, code:%d, msg:%s", topic, errCode, errStr)
t.wrapperCloseConsumer(logger, isDebug, cPointer)
wsTMQErrorMsg(ctx, session, logger, int(errCode), errStr, action, req.ReqID, nil)
return
}
}

errCode = t.wrapperSubscribe(logger, isDebug, cPointer, topicList)
if errCode != 0 {
errStr := wrapper.TMQErr2Str(errCode)
logger.Errorf("tmq subscribe error:%d %s", errCode, errStr)
t.wrapperCloseConsumer(logger, isDebug, cPointer)
wsTMQErrorMsg(ctx, session, logger, int(errCode), errStr, action, req.ReqID, nil)
return
}
t.consumer = cPointer
wstool.WSWriteJson(session, logger, &TMQSubscribeResp{
Action: action,
ReqID: req.ReqID,
Timing: wstool.GetDuration(ctx),
})
}

func (t *TMQ) closeConsumerWithErrLog(logger *logrus.Entry, isDebug bool, consumer unsafe.Pointer) {
errCode := t.wrapperCloseConsumer(logger, isDebug, consumer)
if errCode != 0 {
errMsg := wrapper.TMQErr2Str(errCode)
logger.Errorf("tmq close consumer error, consumer:%p, code:%d, msg:%s", t.consumer, errCode, errMsg)
}
}

type TMQCommitReq struct {
ReqID uint64 `json:"req_id"`
MessageID uint64 `json:"message_id"` // unused
Expand Down

0 comments on commit 7bd82f5

Please sign in to comment.