Skip to content

Commit

Permalink
fix: stmt operation adds a mutex lock
Browse files Browse the repository at this point in the history
  • Loading branch information
huskar-t committed Mar 15, 2024
1 parent 023df72 commit cdd0b7b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
34 changes: 26 additions & 8 deletions controller/ws/ws/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,8 @@ func (h *messageHandler) handleStmtPrepare(_ context.Context, request Request, l
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}
stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("get thread lock cost:", log.GetLogDuration(isDebug, s))
code := wrapper.TaosStmtPrepare(stmtItem.stmt, req.SQL)
Expand Down Expand Up @@ -746,6 +748,8 @@ func (h *messageHandler) handleStmtSetTableName(_ context.Context, request Reque
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}
stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("get thread lock cost:", log.GetLogDuration(isDebug, s))
code := wrapper.TaosStmtSetTBName(stmtItem.stmt, req.Name)
Expand Down Expand Up @@ -781,7 +785,8 @@ func (h *messageHandler) handleStmtSetTags(_ context.Context, request Request, l
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}

stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("stmt_get_tag_fields get thread lock cost:", log.GetLogDuration(isDebug, s))
code, tagNums, tagFields := wrapper.TaosStmtGetTagFields(stmtItem.stmt)
Expand Down Expand Up @@ -843,7 +848,8 @@ func (h *messageHandler) handleStmtBind(_ context.Context, request Request, logg
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}

stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("stmt_get_col_fields get thread lock cost:", log.GetLogDuration(isDebug, s))
code, colNums, colFields := wrapper.TaosStmtGetColFields(stmtItem.stmt)
Expand Down Expand Up @@ -892,7 +898,8 @@ func (h *messageHandler) handleBindMessage(_ context.Context, req dealBinaryRequ
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.id)
}

stmtItem.Lock()
defer stmtItem.Unlock()
var data [][]driver.Value
var fieldTypes []*types.ColumnType
if stmtItem.isInsert {
Expand Down Expand Up @@ -1036,7 +1043,8 @@ func (h *messageHandler) handleStmtAddBatch(_ context.Context, request Request,
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}

stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("get thread lock cost:", log.GetLogDuration(isDebug, s))
code := wrapper.TaosStmtAddBatch(stmtItem.stmt)
Expand Down Expand Up @@ -1073,6 +1081,8 @@ func (h *messageHandler) handleStmtExec(_ context.Context, request Request, logg
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}
stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("stmt_execute get thread lock cost:", log.GetLogDuration(isDebug, s))
code := wrapper.TaosStmtExecute(stmtItem.stmt)
Expand Down Expand Up @@ -1133,7 +1143,8 @@ func (h *messageHandler) handleStmtGetColFields(_ context.Context, request Reque
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}

stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("stmt_get_col_fields get thread lock cost:", log.GetLogDuration(isDebug, s))
code, colNums, colFields := wrapper.TaosStmtGetColFields(stmtItem.stmt)
Expand Down Expand Up @@ -1177,6 +1188,8 @@ func (h *messageHandler) handleStmtGetTagFields(_ context.Context, request Reque
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}
stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("stmt_get_tag_fields get thread lock cost:", log.GetLogDuration(isDebug, s))
code, tagNums, tagFields := wrapper.TaosStmtGetTagFields(stmtItem.stmt)
Expand Down Expand Up @@ -1224,6 +1237,8 @@ func (h *messageHandler) handleStmtUseResult(_ context.Context, request Request,
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}
stmtItem.Lock()
defer stmtItem.Unlock()
result := wrapper.TaosStmtUseResult(stmtItem.stmt)
if result == nil {
errStr := wrapper.TaosStmtErrStr(stmtItem.stmt)
Expand Down Expand Up @@ -1261,7 +1276,8 @@ func (h *messageHandler) handleSetTagsMessage(_ context.Context, req dealBinaryR
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.id)
}

stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("stmt_get_tag_fields get thread lock cost:", log.GetLogDuration(isDebug, s))
code, tagNums, tagFields := wrapper.TaosStmtGetTagFields(stmtItem.stmt)
Expand Down Expand Up @@ -1470,7 +1486,8 @@ func (h *messageHandler) handleStmtNumParams(_ context.Context, request Request,
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}

stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("stmt_num_params get thread lock cost:", log.GetLogDuration(isDebug, s))
count, code := wrapper.TaosStmtNumParams(stmtItem.stmt)
Expand Down Expand Up @@ -1509,7 +1526,8 @@ func (h *messageHandler) handleStmtGetParam(_ context.Context, request Request,
if stmtItem == nil {
return wsStmtErrorMsg(0xffff, "stmt is nil", req.StmtID)
}

stmtItem.Lock()
defer stmtItem.Unlock()
thread.Lock()
logger.Debugln("stmt_get_param get thread lock cost:", log.GetLogDuration(isDebug, s))
dataType, length, err := wrapper.TaosStmtGetParam(stmtItem.stmt, req.Index)
Expand Down
2 changes: 1 addition & 1 deletion controller/ws/ws/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type StmtItem struct {
index uint64
stmt unsafe.Pointer
isInsert bool
sync.RWMutex
sync.Mutex
}

func (s *StmtItem) free() {
Expand Down

0 comments on commit cdd0b7b

Please sign in to comment.