Skip to content

Commit

Permalink
chore: add tmq poll debug logs
Browse files Browse the repository at this point in the history
  • Loading branch information
huskar-t committed Apr 24, 2024
1 parent 5ffa28a commit b75851e
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions controller/ws/tmq/tmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,30 +578,43 @@ func (t *TMQ) poll(ctx context.Context, session *melody.Session, req *TMQPollReq
wsTMQErrorMsg(ctx, session, 0xffff, "tmq not init", TMQPoll, req.ReqID, nil)
return
}
if t.isAutoCommit && time.Now().After(t.nextTime) {
now := time.Now()
isDebug := log.IsDebug()
logger := wstool.GetLogger(session).WithField("action", TMQPoll).WithField(config.ReqIDKey, req.ReqID)
if t.isAutoCommit && now.After(t.nextTime) {
t.asyncLocker.Lock()
s := log.GetLogNow(isDebug)
logger.Debugln("get commit async lock cost:", log.GetLogDuration(isDebug, s))
s = log.GetLogNow(isDebug)
asynctmq.TaosaTMQCommitA(t.thread, t.consumer, nil, t.handler.Handler)
errCode := <-t.handler.Caller.CommitResult
logger.Debugln("tmq_commit_sync cost:", log.GetLogDuration(isDebug, s))
t.asyncLocker.Unlock()
if errCode != 0 {
logger := wstool.GetLogger(session).WithField("action", TMQPoll).WithField(config.ReqIDKey, req.ReqID)
errStr := wrapper.TMQErr2Str(errCode)
logger.Errorln("tmq autocommit error:", taoserrors.NewError(int(errCode), errStr))
}
t.nextTime = time.Now().Add(t.autocommitInterval)
t.nextTime = now.Add(t.autocommitInterval)
}
s := log.GetLogNow(isDebug)
t.asyncLocker.Lock()
logger.Debugln("get async lock cost:", log.GetLogDuration(isDebug, s))
s = log.GetLogNow(isDebug)
asynctmq.TaosaTMQPollA(t.thread, t.consumer, req.BlockingTime, t.handler.Handler)
message := <-t.handler.Caller.PollResult
logger.Debugln("tmq_poll cost:", log.GetLogDuration(isDebug, s))
t.asyncLocker.Unlock()
resp := &TMQPollResp{
Action: TMQPoll,
ReqID: req.ReqID,
}
if message != nil {
s = log.GetLogNow(isDebug)
t.freeMessage()
logger.Debugln("free message cost:", log.GetLogDuration(isDebug, s))
messageType := wrapper.TMQGetResType(message)
if messageTypeIsValid(messageType) {
s = log.GetLogNow(isDebug)
index := atomic.AddUint64(&t.tmpMessage.Index, 1)

t.tmpMessage.Index = index
Expand All @@ -618,6 +631,7 @@ func (t *TMQ) poll(ctx context.Context, session *melody.Session, req *TMQPollReq
resp.MessageID = t.tmpMessage.Index
resp.MessageType = messageType
resp.Offset = t.tmpMessage.Offset
logger.Debugln("get message cost:", log.GetLogDuration(isDebug, s))
} else {
wsTMQErrorMsg(ctx, session, 0xffff, "unavailable tmq type:"+strconv.Itoa(int(messageType)), TMQPoll, req.ReqID, nil)
return
Expand Down Expand Up @@ -1022,6 +1036,11 @@ func (t *TMQ) Close(logger logrus.FieldLogger) {
if t.closed {
return
}
s := time.Now()
logger.Info("tmq close")
defer func() {
logger.Info("tmq close end, cost:", time.Since(s).String())
}()
defer func() {
t.asyncLocker.Lock()
asynctmq.DestroyTMQThread(t.thread)
Expand Down

0 comments on commit b75851e

Please sign in to comment.