Skip to content

Commit

Permalink
add select test and fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon9997 committed Jan 17, 2025
1 parent 2026565 commit d23e071
Show file tree
Hide file tree
Showing 11 changed files with 72,383 additions and 23 deletions.
1 change: 1 addition & 0 deletions include/util/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM TAOS_DEF_ERROR_CODE(0, 0x6201)
#define TSDB_CODE_VTABLE_PRIMTS_HAS_REF TAOS_DEF_ERROR_CODE(0, 0x6202)
#define TSDB_CODE_VTABLE_NOT_VIRTUAL_SUPER_TABLE TAOS_DEF_ERROR_CODE(0, 0x6203)
#define TSDB_CODE_VTABLE_SCAN_NO_REF TAOS_DEF_ERROR_CODE(0, 0x6204)
#ifdef __cplusplus
}
#endif
Expand Down
2 changes: 2 additions & 0 deletions source/libs/executor/src/operator.c
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
}
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
} else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type) {
code = createVirtualTableMergeOperatorInfo(NULL, pHandle, NULL, 0, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
} else {
code = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = code;
Expand Down
29 changes: 22 additions & 7 deletions source/libs/executor/src/virtualtablescanoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@ int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SVirtualTableScanInfo* pSortMergeInfo = &pInfo->virtualScanInfo;

int32_t numOfBufPage = pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize;
int32_t numOfBufPage = pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize;

pSortMergeInfo->pSortHandle = NULL;
if (pOperator->numOfDownstream == 0) {
return TSDB_CODE_SUCCESS;
}

VTS_ERR_RET(tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize,
numOfBufPage, pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pSortMergeInfo->pSortHandle));

Expand Down Expand Up @@ -94,7 +97,7 @@ int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
}

int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) {
int32_t code = 0;
int32_t code = 0;

if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
Expand Down Expand Up @@ -189,8 +192,11 @@ int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
blockDataCleanup(pDataBlock);

if (pHandle == NULL) {
return TSDB_CODE_SUCCESS;
}

if (pSortMergeInfo->pIntermediateBlock == NULL) {
pSortMergeInfo->pIntermediateBlock = NULL;
VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock));
if (pSortMergeInfo->pIntermediateBlock == NULL) {
return TSDB_CODE_SUCCESS;
Expand Down Expand Up @@ -235,12 +241,14 @@ int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
return TSDB_CODE_SUCCESS;
}

VTS_ERR_RET(pOperator->fpSet._openFn(pOperator));
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
SVirtualTableScanInfo* pSortMergeInfo = &pInfo->virtualScanInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

VTS_ERR_RET(pOperator->fpSet._openFn(pOperator));

while(1) {
VTS_ERR_RET(doVirtualTableMerge(pOperator, pResBlock));
if (*pResBlock == NULL) {
Expand Down Expand Up @@ -307,8 +315,12 @@ int32_t extractColMap(SNodeList* pNodeList, SHashObj** pSlotMap, int32_t *tsSlot
size_t numOfCols = LIST_LENGTH(pNodeList);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
*tsSlotId = -1;

if (numOfCols == 0) {
return code;
}

*tsSlotId = -1;
*pSlotMap = taosHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno);

Expand Down Expand Up @@ -401,7 +413,10 @@ int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, SReadHa
createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
optrDefaultBufFn, getVirtualTableScanExplainExecInfo, optrDefaultGetNextExtFn, NULL);

VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
if (NULL != pDownstream) {
VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
}


*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;
Expand Down
49 changes: 35 additions & 14 deletions source/libs/planner/src/planLogicCreater.c
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,6 @@ static int32_t scanAddCol(SLogicNode* pLogicNode, SColRef* colRef, const SSchema
pRefTableScanCol->tableHasPk = false;
pRefTableScanCol->numOfPKs = 0;
PLAN_ERR_JRET(nodesListAppend(pLogicScan->pScanCols, (SNode*)pRefTableScanCol));
nodesDestroyList(pLogicScan->node.pTargets);
PLAN_ERR_JRET(createColumnByRewriteExprs(pLogicScan->pScanCols, &pLogicScan->node.pTargets));
return code;
_return:
nodesDestroyNode((SNode*)pRefTableScanCol);
Expand All @@ -724,17 +722,27 @@ static int32_t checkColRefType(const SSchema* vtbSchema, const SSchema* refSchem
}

static int32_t addSubScanNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SVirtualTableNode* pVirtualTable,
SVirtualScanLogicNode *pVtableScan, SNode* pRefTable,
SLogicNode* pRefScan, int32_t index) {
int32_t index, SHashObj *refTablesMap) {
int32_t code = TSDB_CODE_SUCCESS;
col_id_t colId = 0;
SColRef* pColRef = &pVirtualTable->pMeta->colRef[index];
SNode *pRefTable = NULL;

PLAN_ERR_JRET(findRefTableNode(pVirtualTable->refTables, pColRef->refTableName, &pRefTable));
PLAN_ERR_JRET(findRefColId(pRefTable, pColRef->refColName, &colId));
PLAN_ERR_JRET(createScanLogicNode(pCxt, pSelect, (SRealTableNode*)pRefTable, &pRefScan));
PLAN_ERR_JRET(checkColRefType(&pVirtualTable->pMeta->schema[index], &((SRealTableNode*)pRefTable)->pMeta->schema[colId - 1]));
PLAN_ERR_JRET(scanAddCol(pRefScan, pColRef, &pVirtualTable->pMeta->schema[index], colId));
PLAN_ERR_JRET(nodesListStrictAppend(pVtableScan->node.pChildren, (SNode*)pRefScan));

SLogicNode *pRefScan = NULL;
SLogicNode **ppRefScan = (SLogicNode **)taosHashGet(refTablesMap, &pColRef->refTableName, strlen(pColRef->refTableName));
if (NULL == ppRefScan) {
PLAN_ERR_JRET(createScanLogicNode(pCxt, pSelect, (SRealTableNode*)pRefTable, &pRefScan));
PLAN_ERR_JRET(checkColRefType(&pVirtualTable->pMeta->schema[index], &((SRealTableNode*)pRefTable)->pMeta->schema[colId - 1]));
PLAN_ERR_JRET(scanAddCol(pRefScan, pColRef, &pVirtualTable->pMeta->schema[index], colId));
PLAN_ERR_JRET(taosHashPut(refTablesMap, &pColRef->refTableName, strlen(pColRef->refTableName), &pRefScan, POINTER_BYTES));
} else {
pRefScan = *ppRefScan;
PLAN_ERR_JRET(checkColRefType(&pVirtualTable->pMeta->schema[index], &((SRealTableNode*)pRefTable)->pMeta->schema[colId - 1]));
PLAN_ERR_JRET(scanAddCol(pRefScan, pColRef, &pVirtualTable->pMeta->schema[index], colId));
}

_return:
return code;
Expand All @@ -759,9 +767,10 @@ static int32_t createVirtualTableLogicNode(SLogicPlanContext* pCxt, SSelectStmt*
SVirtualTableNode* pVirtualTable, SLogicNode** pLogicNode) {
int32_t code = TSDB_CODE_SUCCESS;
SVirtualScanLogicNode *pVtableScan = NULL;
SLogicNode *pRefScan = NULL;
SNode *pRefTable = NULL;
SHashObj *pRefTablesMap = NULL;

PLAN_ERR_JRET(nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN, (SNode**)&pVtableScan));

PLAN_ERR_JRET(nodesMakeList(&pVtableScan->node.pChildren));

PLAN_ERR_JRET(makeVirtualScanLogicNode(pCxt, pVirtualTable, pSelect->hasRepeatScanFuncs, pVtableScan));
Expand All @@ -776,6 +785,8 @@ static int32_t createVirtualTableLogicNode(SLogicPlanContext* pCxt, SSelectStmt*
int32_t slotId = 0;
bool onlyTs = true;

pRefTablesMap = taosHashInit(LIST_LENGTH(pVtableScan->pScanCols), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);

FOREACH(pNode, pVtableScan->pScanCols) {
SColumnNode *pCol = (SColumnNode*)pNode;
col_id_t index = (col_id_t)(pCol->colId - 1);
Expand All @@ -789,7 +800,7 @@ static int32_t createVirtualTableLogicNode(SLogicPlanContext* pCxt, SSelectStmt*
tstrncpy(pCol->refColName, pColRef->refColName, TSDB_COL_NAME_LEN);
pCol->hasRef = true;

PLAN_ERR_JRET(addSubScanNode(pCxt, pSelect, pVirtualTable, pVtableScan, pRefTable, pRefScan, index));
PLAN_ERR_JRET(addSubScanNode(pCxt, pSelect, pVirtualTable, index, pRefTablesMap));
} else if (pCol->isPrimTs || pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
// do nothing
} else {
Expand All @@ -799,7 +810,7 @@ static int32_t createVirtualTableLogicNode(SLogicPlanContext* pCxt, SSelectStmt*
}

if (onlyTs) {
if (0 != LIST_LENGTH(pVtableScan->node.pChildren)) {
if (0 != taosHashGetSize(pRefTablesMap)) {
PLAN_ERR_JRET(TSDB_CODE_VTABLE_SCAN_UNMATCHED_COLUMN);
}
for (int32_t i = 0; i < pVirtualTable->pMeta->tableInfo.numOfColumns; i++) {
Expand All @@ -808,21 +819,31 @@ static int32_t createVirtualTableLogicNode(SLogicPlanContext* pCxt, SSelectStmt*
} else {
col_id_t index = (col_id_t)(pVirtualTable->pMeta->schema[i].colId - 1);
if (pVirtualTable->pMeta->colRef[index].hasRef) {
PLAN_ERR_JRET(addSubScanNode(pCxt, pSelect, pVirtualTable, pVtableScan, pRefTable, pRefScan, index));
PLAN_ERR_JRET(addSubScanNode(pCxt, pSelect, pVirtualTable, index, pRefTablesMap));
}
}
}
}

void *pIter = NULL;
while ((pIter = taosHashIterate(pRefTablesMap, pIter))) {
SScanLogicNode **pRefScanNode = (SScanLogicNode**)pIter;
nodesDestroyList((*pRefScanNode)->node.pTargets);
PLAN_ERR_JRET(createColumnByRewriteExprs((*pRefScanNode)->pScanCols, &(*pRefScanNode)->node.pTargets));
PLAN_ERR_JRET(nodesListStrictAppend(pVtableScan->node.pChildren, (SNode*)(*pRefScanNode)));
}

// set output
PLAN_ERR_JRET(createColumnByRewriteExprs(pVtableScan->pScanCols, &pVtableScan->node.pTargets));
PLAN_ERR_JRET(createColumnByRewriteExprs(pVtableScan->pScanPseudoCols, &pVtableScan->node.pTargets));

*pLogicNode = (SLogicNode*)pVtableScan;
taosHashCleanup(pRefTablesMap);
return code;

_return:
taosHashCleanup(pRefTablesMap);
nodesDestroyNode((SNode*)pVtableScan);
nodesDestroyNode((SNode*)pRefScan);
return code;
}

Expand Down
9 changes: 7 additions & 2 deletions source/libs/planner/src/planPhysiCreater.c
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,7 @@ static int32_t createVirtualTableScanPhysiNodeFinalize(SPhysiPlanContext* pCxt,
return code;
}

static int32_t createVirtualTableScanPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
static int32_t createVirtualTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SNodeList* pChildren,
SVirtualScanLogicNode * pScanLogicNode, SPhysiNode** pPhyNode) {
int32_t code = TSDB_CODE_SUCCESS;
SVirtualScanPhysiNode * pVirtualScan =
Expand All @@ -1768,6 +1768,11 @@ static int32_t createVirtualTableScanPhysiNode(SPhysiPlanContext* pCxt, SNodeLis
return terrno;
}

if (pScanLogicNode->pVgroupList) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
}

PLAN_ERR_RET(createVirtualTableScanPhysiNodeFinalize(pCxt, pChildren, pScanLogicNode, (SVirtualScanPhysiNode*)pVirtualScan, pPhyNode));
PLAN_ERR_RET(setVtableListSlotId(pCxt, pChildren, pScanLogicNode->node.pTargets, &pVirtualScan->pTargets));
return code;
Expand Down Expand Up @@ -2983,7 +2988,7 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
return createDynQueryCtrlPhysiNode(pCxt, pChildren, (SDynQueryCtrlLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN:
return createVirtualTableScanPhysiNode(pCxt, pChildren, (SVirtualScanLogicNode*)pLogicNode, pPhyNode);
return createVirtualTableScanPhysiNode(pCxt, pSubplan, pChildren, (SVirtualScanLogicNode*)pLogicNode, pPhyNode);
default:
break;
}
Expand Down
1 change: 1 addition & 0 deletions source/util/src/terror.c
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_SCAN_UNMATCHED_COLUMN, "Virtual table scan
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM, "Virtual table scan invalid downstream operator type")
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_PRIMTS_HAS_REF, "Virtual table prim timestamp column should not has ref column")
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_VIRTUAL_SUPER_TABLE, "Create virtual child table must use virtual super table")
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_SCAN_NO_REF, "Virtual table has no reference table")
#ifdef TAOS_ERROR_C
};
#endif
Expand Down
Loading

0 comments on commit d23e071

Please sign in to comment.