Skip to content

Commit

Permalink
fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585
Browse files Browse the repository at this point in the history
  • Loading branch information
wangmm0220 committed Jan 20, 2025
1 parent 86f1c84 commit d8806a5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
2 changes: 1 addition & 1 deletion source/client/src/clientMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ void taos_cleanup(void) {
taosCloseRef(id);

nodesDestroyAllocatorSet();
// cleanupAppInfo();
cleanupAppInfo();
rpcCleanup();
tscDebug("rpc cleanup");

Expand Down
28 changes: 27 additions & 1 deletion source/client/src/clientTmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -1619,6 +1619,24 @@ void tmqMgmtClose(void) {
}

if (tmqMgmt.rsetId >= 0) {
tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
int64_t refId = 0;

while (tmq) {
refId = tmq->refId;
if (refId == 0) {
break;
}
taosWLockLatch(&tmq->lock);
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
taosWUnLockLatch(&tmq->lock);

if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) {
qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno));
}

tmq = taosIterateRef(tmqMgmt.rsetId, refId);
}
taosCloseRef(tmqMgmt.rsetId);
tmqMgmt.rsetId = -1;
}
Expand Down Expand Up @@ -2617,15 +2635,23 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {

int32_t tmq_consumer_close(tmq_t* tmq) {
if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
int32_t code = 0;
taosWLockLatch(&tmq->lock);
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
goto end;
}
tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
int32_t code = tmq_unsubscribe(tmq);
code = tmq_unsubscribe(tmq);
if (code == 0) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
if (code != 0){
tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
}
}

end:
taosWUnLockLatch(&tmq->lock);
return code;
}

Expand Down

0 comments on commit d8806a5

Please sign in to comment.