Skip to content

Commit

Permalink
Merge pull request #28851 from taosdata/ehn/vnode-open-log
Browse files Browse the repository at this point in the history
ehn/vnode-open-log
  • Loading branch information
guanshengliang authored Nov 25, 2024
2 parents 00d01ac + 246a375 commit ed3cc87
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
14 changes: 8 additions & 6 deletions source/dnode/mgmt/mgmt_vnode/src/vmInt.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
}

(void)taosThreadRwlockWrlock(&pMgmt->lock);

SVnodeObj *pOld = NULL;

int32_t r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
if (r != 0) {
dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
Expand All @@ -248,15 +250,15 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
if (r != 0) {
dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
}
if (pOld) {
if (pOld != NULL) {
vmFreeVnodeObj(&pOld);
dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
if (r != 0) {
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
}
}

dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
if (r != 0) {
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);

return code;
Expand Down
10 changes: 10 additions & 0 deletions source/dnode/vnode/src/vnd/vnodeOpen.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
info.config = vnodeCfgDefault;

// load vnode info
vInfo("vgId:%d, start to vnode load info %s", info.config.vgId, dir);
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
Expand Down Expand Up @@ -429,22 +430,26 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
int8_t rollback = vnodeShouldRollback(pVnode);

// open buffer pool
vInfo("vgId:%d, start to open vnode buffer pool", TD_VID(pVnode));
if (vnodeOpenBufPool(pVnode) < 0) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}

// open meta
vInfo("vgId:%d, start to open vnode meta", TD_VID(pVnode));
if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}

vInfo("vgId:%d, start to upgrade meta", TD_VID(pVnode));
if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
}

// open tsdb
vInfo("vgId:%d, start to open vnode tsdb", TD_VID(pVnode));
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback, force) < 0) {
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
Expand All @@ -455,6 +460,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
ret = taosRealPath(tdir, NULL, sizeof(tdir));
TAOS_UNUSED(ret);

vInfo("vgId:%d, start to open vnode wal", TD_VID(pVnode));
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) {
vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
Expand All @@ -467,25 +473,29 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
TAOS_UNUSED(ret);

// open query
vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode));
if (vnodeQueryOpen(pVnode)) {
vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}

// sma required the tq is initialized before the vnode open
vInfo("vgId:%d, start to open vnode tq", TD_VID(pVnode));
if (tqOpen(tdir, pVnode)) {
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}

// open sma
vInfo("vgId:%d, start to open vnode sma", TD_VID(pVnode));
if (smaOpen(pVnode, rollback, force)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}

// vnode begin
vInfo("vgId:%d, start to begin vnode", TD_VID(pVnode));
if (vnodeBegin(pVnode) < 0) {
vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
Expand Down
2 changes: 1 addition & 1 deletion source/libs/sync/src/syncAppendEntries.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _IGNORE;
}

int32_t nRef = atomic_fetch_add_32(&ths->recvCount, 1);
int32_t nRef = atomic_add_fetch_32(&ths->recvCount, 1);
if (nRef <= 0) {
sError("vgId:%d, recv count is %d", ths->vgId, nRef);
}
Expand Down
4 changes: 2 additions & 2 deletions source/libs/sync/src/syncReplication.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI

int32_t nRef = 0;
if (pSyncNode != NULL) {
nRef = atomic_fetch_add_32(&pSyncNode->sendCount, 1);
nRef = atomic_add_fetch_32(&pSyncNode->sendCount, 1);
if (nRef <= 0) {
sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef);
}
}

SSyncLogReplMgr* mgr = syncNodeGetLogReplMgr(pSyncNode, (SRaftId*)destRaftId);
if (mgr != NULL) {
nRef = atomic_fetch_add_32(&mgr->sendCount, 1);
nRef = atomic_add_fetch_32(&mgr->sendCount, 1);
if (nRef <= 0) {
sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef);
}
Expand Down

0 comments on commit ed3cc87

Please sign in to comment.