Skip to content

Commit

Permalink
refactor stmt-async-bind loop usleep to Producer Consumer Model
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengrongkun committed Jan 20, 2025
1 parent 3f17782 commit 7d8a9a8
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 16 deletions.
2 changes: 2 additions & 0 deletions source/client/inc/clientStmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ typedef struct SStmtQueue {
SStmtQNode* head;
SStmtQNode* tail;
uint64_t qRemainNum;
TdThreadMutex mutex;
TdThreadCond waitCond;
} SStmtQueue;

typedef struct STscStmt {
Expand Down
35 changes: 27 additions & 8 deletions source/client/src/clientStmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,39 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
}

bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) {
while (0 == atomic_load_64(&pStmt->queue.qRemainNum)) {
taosUsleep(1);
return false;
(void)taosThreadMutexLock(&pStmt->queue.mutex);
while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return false;
}
}

SStmtQNode* orig = pStmt->queue.head;

SStmtQNode* node = pStmt->queue.head->next;
pStmt->queue.head = pStmt->queue.head->next;
*param = node;

// taosMemoryFreeClear(orig);
(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);

*param = node;

(void)atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1);
*param = node;

return true;
}

void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
(void)taosThreadMutexLock(&pStmt->queue.mutex);

pStmt->queue.tail->next = param;
pStmt->queue.tail = param;

pStmt->stat.bindDataNum++;
(void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));

(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
}

static int32_t stmtCreateRequest(STscStmt* pStmt) {
Expand Down Expand Up @@ -415,9 +423,11 @@ void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
pTblBuf->buffIdx = 1;
pTblBuf->buffOffset = sizeof(*pQueue->head);

(void)taosThreadMutexLock(&pQueue->mutex);
pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
pQueue->qRemainNum = 0;
pQueue->head->next = NULL;
(void)taosThreadMutexUnlock(&pQueue->mutex);
}

int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
Expand Down Expand Up @@ -809,6 +819,8 @@ int32_t stmtStartBindThread(STscStmt* pStmt) {
}

int32_t stmtInitQueue(STscStmt* pStmt) {
(void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
(void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
pStmt->queue.tail = pStmt->queue.head;

Expand Down Expand Up @@ -1619,11 +1631,18 @@ int stmtClose(TAOS_STMT* stmt) {

pStmt->queue.stopQueue = true;

(void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);

if (pStmt->bindThreadInUse) {
(void)taosThreadJoin(pStmt->bindThread, NULL);
pStmt->bindThreadInUse = false;
}

(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);

STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
Expand Down
31 changes: 23 additions & 8 deletions source/client/src/clientStmt2.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,35 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void**
}

static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
(void)taosThreadMutexLock(&pStmt->queue.mutex);
while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
taosUsleep(1);
return false;
(void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
return false;
}
}

SStmtQNode* orig = pStmt->queue.head;

SStmtQNode* node = pStmt->queue.head->next;
pStmt->queue.head = pStmt->queue.head->next;

// taosMemoryFreeClear(orig);

*param = node;

(void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);

return true;
}

static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
(void)taosThreadMutexLock(&pStmt->queue.mutex);

pStmt->queue.tail->next = param;
pStmt->queue.tail = param;

pStmt->stat.bindDataNum++;
(void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));

(void)taosThreadMutexUnlock(&pStmt->queue.mutex);
}

static int32_t stmtCreateRequest(STscStmt2* pStmt) {
Expand Down Expand Up @@ -339,9 +343,11 @@ static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
pTblBuf->buffIdx = 1;
pTblBuf->buffOffset = sizeof(*pQueue->head);

(void)taosThreadMutexLock(&pQueue->mutex);
pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
pQueue->qRemainNum = 0;
pQueue->head->next = NULL;
(void)taosThreadMutexUnlock(&pQueue->mutex);
}

static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
Expand Down Expand Up @@ -735,6 +741,8 @@ static int32_t stmtStartBindThread(STscStmt2* pStmt) {
}

static int32_t stmtInitQueue(STscStmt2* pStmt) {
(void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
(void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
pStmt->queue.tail = pStmt->queue.head;

Expand Down Expand Up @@ -1748,11 +1756,18 @@ int stmtClose2(TAOS_STMT2* stmt) {

pStmt->queue.stopQueue = true;

(void)taosThreadMutexLock(&pStmt->queue.mutex);
(void)taosThreadCondSignal(&(pStmt->queue.waitCond));
(void)taosThreadMutexUnlock(&pStmt->queue.mutex);

if (pStmt->bindThreadInUse) {
(void)taosThreadJoin(pStmt->bindThread, NULL);
pStmt->bindThreadInUse = false;
}

(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);

if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
if (tsem_wait(&pStmt->asyncQuerySem) != 0) {
tscError("failed to wait asyncQuerySem");
Expand Down

0 comments on commit 7d8a9a8

Please sign in to comment.