Skip to content

Commit

Permalink
Merge pull request #28711 from taosdata/fix/optGetConnRule
Browse files Browse the repository at this point in the history
opt:(rpc) opt get conn rule from connheap
  • Loading branch information
guanshengliang authored Nov 11, 2024
2 parents 94e5bbd + 0087ba3 commit 82cf873
Showing 1 changed file with 68 additions and 56 deletions.
124 changes: 68 additions & 56 deletions source/libs/transport/src/transCli.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
// process data read from server, add decompress etc later
// handle except about conn

#define REQS_ON_CONN(conn) (conn ? (transQueueSize(&conn->reqsToSend) + transQueueSize(&conn->reqsSentOut)) : 0)
static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code);
// handle req from app
static void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq);
Expand Down Expand Up @@ -289,7 +290,7 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key);
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn);
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn);
static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn);
static int8_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn, SCliConn** pNewConn);

// thread obj
static int32_t createThrdObj(void* trans, SCliThrd** pThrd);
Expand Down Expand Up @@ -327,14 +328,18 @@ typedef struct {
int64_t lastConnFailTs;
} SHeap;

int32_t compareHeapNode(const HeapNode* a, const HeapNode* b);
int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b));
void transHeapDestroy(SHeap* heap);
int32_t transHeapGet(SHeap* heap, SCliConn** p);
int32_t transHeapInsert(SHeap* heap, SCliConn* p);
int32_t transHeapDelete(SHeap* heap, SCliConn* p);
int32_t transHeapBalance(SHeap* heap, SCliConn* p);
int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p);
static int32_t compareHeapNode(const HeapNode* a, const HeapNode* b);
static int32_t transHeapInit(SHeap* heap, int32_t (*cmpFunc)(const HeapNode* a, const HeapNode* b));
static void transHeapDestroy(SHeap* heap);

static int32_t transHeapGet(SHeap* heap, SCliConn** p);
static int32_t transHeapInsert(SHeap* heap, SCliConn* p);
static int32_t transHeapDelete(SHeap* heap, SCliConn* p);
static int32_t transHeapBalance(SHeap* heap, SCliConn* p);
static int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p);
static int32_t transHeapMayBalance(SHeap* heap, SCliConn* p);

static FORCE_INLINE void logConnMissHit(SCliConn* pConn);

#define CLI_RELEASE_UV(loop) \
do { \
Expand Down Expand Up @@ -494,47 +499,30 @@ int8_t cliMayRecycleConn(SCliConn* conn) {
if (transQueueSize(&conn->reqsToSend) == 0 && transQueueSize(&conn->reqsSentOut) == 0 &&
taosHashGetSize(conn->pQTable) == 0) {
cliResetConnTimer(conn);
conn->forceDelFromHeap = 1;
code = delConnFromHeapCache(pThrd->connHeapCache, conn);
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
tDebug("%s conn %p failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn,
tstrerror(code));

TAOS_UNUSED(transHeapMayBalance(conn->heap, conn));
return 1;
} else {
if (code != 0) {
tDebug("%s conn %p failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn,
tstrerror(code));
return 0;
}
}
addConnToPool(pThrd->pool, conn);
return 1;
} else if ((transQueueSize(&conn->reqsToSend) == 0) && (transQueueSize(&conn->reqsSentOut) == 0) &&
(taosHashGetSize(conn->pQTable) != 0)) {
tDebug("%s conn %p do balance directly", CONN_GET_INST_LABEL(conn), conn);
TAOS_UNUSED(transHeapBalance(conn->heap, conn));
TAOS_UNUSED(transHeapMayBalance(conn->heap, conn));
} else {
SCliConn* topConn = NULL;
if (conn->heap != NULL) {
code = transHeapGet(conn->heap, &topConn);
if (code != 0) {
tDebug("%s conn %p failed to get top conn since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
return 0;
}

if (topConn == conn) {
return 0;
}
int32_t topReqs = transQueueSize(&topConn->reqsSentOut) + transQueueSize(&topConn->reqsToSend);
int32_t currReqs = transQueueSize(&conn->reqsSentOut) + transQueueSize(&conn->reqsToSend);
if (topReqs <= currReqs) {
tTrace("%s conn %p not balance conn heap since top conn has less req, topConnReqs:%d, currConnReqs:%d",
CONN_GET_INST_LABEL(conn), conn, topReqs, currReqs);
return 0;
} else {
tDebug("%s conn %p do balance conn heap since top conn has more reqs, topConnReqs:%d, currConnReqs:%d",
CONN_GET_INST_LABEL(conn), conn, topReqs, currReqs);
TAOS_UNUSED(transHeapBalance(conn->heap, conn));
}
}
tTrace("%s conn %p may do balance", CONN_GET_INST_LABEL(conn), conn);
TAOS_UNUSED(transHeapMayBalance(conn->heap, conn));
}
return 0;
}
Expand Down Expand Up @@ -785,15 +773,8 @@ void cliConnCheckTimoutMsg(SCliConn* conn) {
if (transQueueSize(&conn->reqsSentOut) == 0) {
return;
}
code = cliConnRemoveTimeoutMsg(conn);
if (code != 0) {
tDebug("%s conn %p do remove timeout msg", CONN_GET_INST_LABEL(conn), conn);
if (!cliMayRecycleConn(conn)) {
TAOS_UNUSED(transHeapBalance(conn->heap, conn));
}
} else {
TAOS_UNUSED(cliMayRecycleConn(conn));
}
TAOS_UNUSED(cliConnRemoveTimeoutMsg(conn));
TAOS_UNUSED(cliMayRecycleConn(conn));
}
void cliConnTimeout__checkReq(uv_timer_t* handle) {
SCliConn* conn = handle->data;
Expand Down Expand Up @@ -3804,6 +3785,8 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) {
int32_t totalReqs = reqsNum + reqsSentOut;

if (totalReqs >= pInst->shareConnLimit) {
logConnMissHit(pConn);

if (pConn->list == NULL && pConn->dstAddr != NULL) {
pConn->list = taosHashGet((SHashObj*)pThrd->pool, pConn->dstAddr, strlen(pConn->dstAddr));
if (pConn->list != NULL) {
Expand Down Expand Up @@ -3860,11 +3843,12 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
} else {
tTrace("conn %p get conn from heap cache for key:%s", pConn, key);
if (shouldSWitchToOtherConn(pConn, key)) {
code = balanceConnHeapCache(pConnHeapCache, pConn);
if (code != 0) {
tTrace("failed to balance conn heap cache for key:%s", key);
SCliConn* pNewConn = NULL;
code = balanceConnHeapCache(pConnHeapCache, pConn, &pNewConn);
if (code == 1) {
tTrace("conn %p start to handle reqs", pNewConn);
return pNewConn;
}
logConnMissHit(pConn);
return NULL;
}
}
Expand Down Expand Up @@ -3916,15 +3900,19 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
return code;
}

static int32_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
static int8_t balanceConnHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn, SCliConn** pNewConn) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
SCliConn* pTopConn = NULL;
if (pConn->heap != NULL && pConn->inHeap != 0) {
SHeap* heap = pConn->heap;
tTrace("conn %p'heap may should do balance, numOfConn:%d", pConn, (int)(heap->heap->nelts));
int64_t now = taosGetTimestampMs();
if (((now - heap->lastUpdateTs) / 1000) > 30) {
heap->lastUpdateTs = now;
tTrace("conn %p'heap do balance, numOfConn:%d", pConn, (int)(heap->heap->nelts));
return transHeapBalance(pConn->heap, pConn);
TAOS_UNUSED(transHeapBalance(pConn->heap, pConn));
if (transHeapGet(pConn->heap, &pTopConn) == 0 && pConn != pTopConn) {
int32_t curReqs = REQS_ON_CONN(pConn);
int32_t topReqs = REQS_ON_CONN(pTopConn);
if (curReqs > topReqs && topReqs < pInst->shareConnLimit) {
*pNewConn = pTopConn;
return 1;
}
}
}
return 0;
Expand All @@ -3934,8 +3922,8 @@ int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) {
SCliConn* args1 = container_of(a, SCliConn, node);
SCliConn* args2 = container_of(b, SCliConn, node);

int32_t totalReq1 = transQueueSize(&args1->reqsToSend) + transQueueSize(&args1->reqsSentOut);
int32_t totalReq2 = transQueueSize(&args2->reqsToSend) + transQueueSize(&args2->reqsSentOut);
int32_t totalReq1 = REQS_ON_CONN(args1);
int32_t totalReq2 = REQS_ON_CONN(args2);
if (totalReq1 > totalReq2) {
return 0;
}
Expand Down Expand Up @@ -4016,6 +4004,30 @@ int32_t transHeapUpdateFailTs(SHeap* heap, SCliConn* p) {
heap->lastConnFailTs = taosGetTimestampMs();
return 0;
}
int32_t transHeapMayBalance(SHeap* heap, SCliConn* p) {
if (p->inHeap == 0 || heap == NULL || heap->heap == NULL) {
return 0;
}
SCliThrd* pThrd = p->hostThrd;
STrans* pInst = pThrd->pInst;
int32_t balanceLimit = pInst->shareConnLimit >= 4 ? pInst->shareConnLimit / 2 : 2;

SCliConn* topConn = NULL;
int32_t code = transHeapGet(heap, &topConn);
if (code != 0) {
return code;
}

if (topConn == p) return code;

int32_t reqsOnTop = REQS_ON_CONN(topConn);
int32_t reqsOnCur = REQS_ON_CONN(p);

if (reqsOnTop >= balanceLimit && reqsOnCur < balanceLimit) {
TAOS_UNUSED(transHeapBalance(heap, p));
}
return code;
}

int32_t transHeapBalance(SHeap* heap, SCliConn* p) {
if (p->inHeap == 0 || heap == NULL || heap->heap == NULL) {
Expand Down

0 comments on commit 82cf873

Please sign in to comment.