Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): record the failure of dispatch msg, and set the update nodeId #29107

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ typedef struct {

typedef struct {
int32_t nodeId;
SEpSet epset;
} SDownstreamTaskEpset;

typedef enum {
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/mnode/impl/src/mndStreamUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -1430,7 +1430,7 @@ int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, cons
return TSDB_CODE_INVALID_PARA;
}

pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo));
pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);

if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
Expand Down
1 change: 1 addition & 0 deletions source/libs/stream/inc/streamInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo);
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);

void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo);
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
Expand Down
16 changes: 7 additions & 9 deletions source/libs/stream/src/streamCheckStatus.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec

static void processDownstreamReadyRsp(SStreamTask* pTask);
static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
static void rspMonitorFn(void* param, void* tmrId);
static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
Expand Down Expand Up @@ -226,13 +225,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else {
stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}

streamMetaAddFailedTaskSelf(pTask, now);
Expand Down Expand Up @@ -373,11 +372,10 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
}
}

int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0;
;
bool existed = false;
bool existed = false;

streamMutexLock(&pTask->lock);

Expand Down Expand Up @@ -675,8 +673,8 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, *pTaskId, &p);
if (p != NULL) {
code = addIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpdate list",
id, vgId, p->taskId, p->vgId);
}
}
Expand Down Expand Up @@ -717,7 +715,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {

// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
// of restart in timer thread will result in a dead lock.
// of restart in timer thread will result in a deadlock.
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK);
}
Expand Down
13 changes: 12 additions & 1 deletion source/libs/stream/src/streamDispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,6 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t
int32_t* pFailed, const char* id) {
int32_t numOfRsp = 0;
int32_t numOfFailed = 0;

bool allRsp = false;
int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo);

Expand Down Expand Up @@ -1639,6 +1638,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t notRsp = 0;
int32_t numOfFailed = 0;
bool triggerDispatchRsp = false;
bool addFailure = false;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t tmpCheckpointId = -1;
int32_t tmpTranId = -1;
Expand Down Expand Up @@ -1698,6 +1698,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} else {
if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
// todo handle the role-changed during checkpoint generation, add test case
addFailure = true;
stError(
"s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or "
"restart already, treat it as success",
Expand Down Expand Up @@ -1745,6 +1746,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
}

if (addFailure) { // add failure downstream node id, and start the nodeEp update procedure
// ignore the return error and continue
int32_t unused = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}

// all msg rsp already, continue
// we need to re-try send dispatch msg to downstream tasks
if (allRsp && (numOfFailed == 0)) {
Expand Down Expand Up @@ -1866,6 +1872,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
}

#if 0
// inject errors, and always refuse the upstream dispatch msg and trigger the task nodeEpset update trans.
status = TASK_INPUT_STATUS__REFUSED;
#endif

{
// do send response with the input status
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
Expand Down
Loading