diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 26c2d4b9b875..fc3b31f5d47c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -501,6 +501,8 @@ typedef struct SPhysiNode { bool dynamicOp; EOrder inputTsOrder; EOrder outputTsOrder; + EDataOrderLevel requireDataOrder; + EDataOrderLevel resultDataOrder; SDataBlockDescNode* pOutputDataBlockDesc; SNode* pConditions; SNodeList* pChildren; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 3bc7f013ba27..002b6badee91 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -521,8 +521,27 @@ typedef struct SOptrBasicInfo { bool mergeResultBlock; int32_t inputTsOrder; int32_t outputTsOrder; + EDataOrderLevel inputDataOrder; + EDataOrderLevel outputDataOrder; } SOptrBasicInfo; +static FORCE_INLINE void setOptrBasicInfoOrder(SOptrBasicInfo* pInfo, const SPhysiNode* pNode) { + pInfo->inputTsOrder = pNode->inputTsOrder; + pInfo->outputTsOrder = pNode->outputTsOrder; + pInfo->inputDataOrder = pNode->requireDataOrder; + pInfo->outputDataOrder = pNode->resultDataOrder; +} + +static FORCE_INLINE bool optrHasOrderedInput(const SOptrBasicInfo* pInfo, EDataOrderLevel minLevel) { + return (pInfo->inputTsOrder == ORDER_ASC || pInfo->inputTsOrder == ORDER_DESC) && + pInfo->inputDataOrder >= minLevel; +} + +static FORCE_INLINE bool optrHasOrderedOutput(const SOptrBasicInfo* pInfo, EDataOrderLevel minLevel) { + return (pInfo->outputTsOrder == ORDER_ASC || pInfo->outputTsOrder == ORDER_DESC) && + pInfo->outputDataOrder >= minLevel; +} + typedef struct SIndefRowsWindowState { STimeWindow win; // logical window range for this state uint64_t groupId; // source group id of this logical window diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index a9c130f40e50..7b4a18a3b665 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -125,8 +125,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized; pInfo->groupId = UINT64_MAX; - pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; - pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pAggNode->node); pInfo->hasCountFunc = pAggNode->hasCountLikeFunc; pInfo->pOperator = pOperator; pInfo->cleanGroupResInfo = false; diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index 7d9871bbe1c1..f368e4b5a75d 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -191,8 +191,7 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p QUERY_CHECK_CODE(code, lino, _error); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->binfo.inputTsOrder = pAnomalyNode->window.node.inputTsOrder; - pInfo->binfo.outputTsOrder = pAnomalyNode->window.node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pAnomalyNode->window.node); pInfo->anomalyCols = taosArrayInit(LIST_LENGTH(pColNodes), sizeof(SColumn)); pInfo->anomalyData = taosArrayInit(LIST_LENGTH(pColNodes), sizeof(SStateKeys)); @@ -885,4 +884,3 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p void destroyForecastInfo(void* param) {} #endif - diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index a68c353a80c3..e38c348eb9f4 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -499,8 +499,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy QUERY_CHECK_CODE(code, lino, _error); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; - pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, physiNode); pInfo->windowCount = pCountWindowNode->windowCount; pInfo->windowSliding = pCountWindowNode->windowSliding; // sizeof(SCountWindowResult) diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 6eca23d02b01..2de829dc0984 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -133,8 +133,7 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy QUERY_CHECK_CODE(code, lino, _error); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; - pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, physiNode); pInfo->winSup.lastTs = INT64_MIN; code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); diff --git a/source/libs/executor/src/externalwindowoperator.c b/source/libs/executor/src/externalwindowoperator.c index 79b433a80d05..98a2242a1624 100644 --- a/source/libs/executor/src/externalwindowoperator.c +++ b/source/libs/executor/src/externalwindowoperator.c @@ -1644,8 +1644,9 @@ int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPh pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId; pExtW->mode = pPhynode->window.pFuncs ? EEXT_MODE_AGG : EEXT_MODE_SCALAR; - pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC; - pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder; + pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC; + pPhynode->window.node.outputTsOrder = pPhynode->window.node.inputTsOrder; + setOptrBasicInfoOrder(&pExtW->binfo, &pPhynode->window.node); pExtW->needGroupSort = pPhynode->needGroupSort; pExtW->calcWithPartition = pPhynode->calcWithPartition; pExtW->extWinSplit = pPhynode->extWinSplit; @@ -4359,8 +4360,9 @@ int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNo pExtW->fillMode = pPhynode->extFill.mode; pExtW->pFillExprs = pPhynode->extFill.pFillExprs; pExtW->pFillValues = pPhynode->extFill.pFillValues; - pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC; - pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder; + pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC; + pPhynode->window.node.outputTsOrder = pPhynode->window.node.inputTsOrder; + setOptrBasicInfoOrder(&pExtW->binfo, &pPhynode->window.node); pExtW->isDynWindow = false; qDebug("%s create extWin operator, execModel:%d, phySubquery:%p", GET_TASKID(pTaskInfo), pTaskInfo->execModel, @@ -4489,8 +4491,7 @@ int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNo pTaskInfo->pStreamRuntimeInfo); TSDB_CHECK_CODE(code, lino, _error); - pExtW->binfo.inputTsOrder = pNode->inputTsOrder; - pExtW->binfo.outputTsOrder = pNode->outputTsOrder; + setOptrBasicInfoOrder(&pExtW->binfo, pNode); code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 6c0b1c8c8544..96a04687d653 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -723,8 +723,7 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo); pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; - pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; - pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pAggNode->node); pInfo->pOperator = pOperator; diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index f91831a00624..58f9a6b806b5 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -649,8 +649,7 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS pOperator->pPhyNode = pPhyNode; pInfo->groupMerge = pMergePhyNode->groupSort; pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId; - pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder; - pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pMergePhyNode->node); pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId; pInfo->type = pMergePhyNode->type; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 50b101acfdf4..73845ce66926 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -340,8 +340,7 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes); TSDB_CHECK_CODE(code, lino, _error); - pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder; - pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pProjPhyNode->node); pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup; pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId; @@ -733,8 +732,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* TSDB_CHECK_CODE(code, lino, _error); pInfo->binfo.pRes = pResBlock; - pInfo->binfo.inputTsOrder = pNode->inputTsOrder; - pInfo->binfo.outputTsOrder = pNode->outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, pNode); code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index ac1b3a0221e6..34e348a3f5b1 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -194,8 +194,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN } if (code != TSDB_CODE_SUCCESS) goto _error; - pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder; - pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pSortNode->node); initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -959,8 +958,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); TSDB_CHECK_CODE(code, lino, _error); - pInfo->binfo.inputTsOrder = pSortPhyNode->node.inputTsOrder; - pInfo->binfo.outputTsOrder = pSortPhyNode->node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pSortPhyNode->node); int32_t numOfOutputCols = 0; code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, diff --git a/source/libs/executor/src/streamexternalwindowoperator.c b/source/libs/executor/src/streamexternalwindowoperator.c index b55d17b758d9..c176efa4b11f 100644 --- a/source/libs/executor/src/streamexternalwindowoperator.c +++ b/source/libs/executor/src/streamexternalwindowoperator.c @@ -763,8 +763,9 @@ int32_t createStreamMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstrea pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId; pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG; - pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC; - pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder; + pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC; + pPhynode->window.node.outputTsOrder = pPhynode->window.node.inputTsOrder; + setOptrBasicInfoOrder(&pExtW->binfo, &pPhynode->window.node); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -2359,8 +2360,9 @@ int32_t createStreamExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNod pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId; pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG); - pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC; - pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder; + pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC; + pPhynode->window.node.outputTsOrder = pPhynode->window.node.inputTsOrder; + setOptrBasicInfoOrder(&pExtW->binfo, &pPhynode->window.node); pExtW->isDynWindow = false; if (pTaskInfo->pStreamRuntimeInfo != NULL){ @@ -2466,8 +2468,7 @@ int32_t createStreamExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNod pTaskInfo->pStreamRuntimeInfo); TSDB_CHECK_CODE(code, lino, _error); - pExtW->binfo.inputTsOrder = pNode->inputTsOrder; - pExtW->binfo.outputTsOrder = pNode->outputTsOrder; + setOptrBasicInfoOrder(&pExtW->binfo, pNode); code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index e092f3d8b94b..935551332e4f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -786,7 +786,7 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul uint64_t tableGroupId = pBlock->info.id.groupId; bool ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC); SResultRow* pResult = NULL; - bool sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC || tsCols == NULL; + bool sorted = optrHasOrderedInput(&pInfo->binfo, DATA_ORDER_LEVEL_IN_BLOCK) || tsCols == NULL; TSKEY ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos]; int32_t ret = TSDB_CODE_SUCCESS; @@ -2325,8 +2325,7 @@ static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIn pIntervalInfo->cleanGroupResInfo = false; pIntervalInfo->handledGroupNum = 0; - pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder; - pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder; + setOptrBasicInfoOrder(&pIntervalInfo->binfo, &pPhynode->window.node); taosArrayDestroy(pIntervalInfo->pInterpCols); pIntervalInfo->pInterpCols = NULL; @@ -2410,8 +2409,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode }; pInfo->win = pTaskInfo->window; - pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder; - pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pPhyNode->window.node); pInfo->interval = interval; pInfo->twAggSup = as; pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock; @@ -2793,8 +2791,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhy goto _error; } } - pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder; - pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pStateNode->window.node); code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0, pTaskInfo->pStreamRuntimeInfo); @@ -2968,8 +2965,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh pInfo->binfo.pRes = pResBlock; pInfo->winSup.prevTs = INT64_MIN; pInfo->reptScan = false; - pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder; - pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pSessionNode->window.node); if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; @@ -3305,8 +3301,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge miaInfo->curTs = INT64_MIN; iaInfo->win = pTaskInfo->window; - iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder; - iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder; + setOptrBasicInfoOrder(&iaInfo->binfo, &pNode->window.node); iaInfo->interval = interval; iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId; iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock; @@ -3664,11 +3659,10 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo; pIntervalInfo->win = pTaskInfo->window; - pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder; + setOptrBasicInfoOrder(&pIntervalInfo->binfo, &pIntervalPhyNode->window.node); pIntervalInfo->interval = interval; pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock; pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; - pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder; SExprSupp* pExprSupp = &pOperator->exprSupp; pExprSupp->hasWindowOrGroup = true; diff --git a/source/libs/executor/src/virtualtablescanoperator.c b/source/libs/executor/src/virtualtablescanoperator.c index 71021cbdb4d4..de8bfdf8eec4 100644 --- a/source/libs/executor/src/virtualtablescanoperator.c +++ b/source/libs/executor/src/virtualtablescanoperator.c @@ -923,8 +923,7 @@ int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->pPhyNode = pVirtualScanPhyNode; - pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder; - pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder; + setOptrBasicInfoOrder(&pInfo->binfo, &pVirtualScanPhyNode->scan.node); SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo; pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index a7876a79ea38..ab545ab17e10 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -960,6 +960,8 @@ static int32_t physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) { CLONE_NODE_LIST_FIELD(pChildren); COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(outputTsOrder); + COPY_SCALAR_FIELD(requireDataOrder); + COPY_SCALAR_FIELD(resultDataOrder); COPY_SCALAR_FIELD(dynamicOp); COPY_SCALAR_FIELD(forceCreateNonBlockingOptr); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 83c836e11ad5..dee3f50d6eef 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2170,6 +2170,8 @@ static const char* jkPhysiPlanLimit = "Limit"; static const char* jkPhysiPlanSlimit = "SLimit"; static const char* jkPhysiPlanInputTsOrder = "InputOrder"; static const char* jkPhysiPlanOutputTsOrder = "OutputOrder"; +static const char* jkPhysiPlanRequireDataOrder = "RequireDataOrder"; +static const char* jkPhysiPlanResultDataOrder = "ResultDataOrder"; static const char* jkPhysiPlanDynamicOp = "DynamicOp"; static const char* jkPhysiPlanForceCreateNonBlockingOptr = "ForceCreateNonBlockingOptr"; @@ -2195,6 +2197,12 @@ static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkPhysiPlanOutputTsOrder, pNode->outputTsOrder); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkPhysiPlanRequireDataOrder, pNode->requireDataOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkPhysiPlanResultDataOrder, pNode->resultDataOrder); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkPhysiPlanDynamicOp, pNode->dynamicOp); } @@ -2227,6 +2235,12 @@ static int32_t jsonToPhysicPlanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkPhysiPlanOutputTsOrder, pNode->outputTsOrder, code); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkPhysiPlanRequireDataOrder, pNode->requireDataOrder, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkPhysiPlanResultDataOrder, pNode->resultDataOrder, code); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkPhysiPlanDynamicOp, &pNode->dynamicOp); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index f32c7b683f01..7be946d0e238 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2332,6 +2332,8 @@ enum { PHY_NODE_CODE_SLIMIT, PHY_NODE_CODE_INPUT_TS_ORDER, PHY_NODE_CODE_OUTPUT_TS_ORDER, + PHY_NODE_CODE_REQUIRE_DATA_ORDER, + PHY_NODE_CODE_RESULT_DATA_ORDER, PHY_NODE_CODE_DYNAMIC_OP, PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR }; @@ -2358,6 +2360,12 @@ static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_REQUIRE_DATA_ORDER, pNode->requireDataOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_RESULT_DATA_ORDER, pNode->resultDataOrder); + } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_NODE_CODE_DYNAMIC_OP, pNode->dynamicOp); } @@ -2396,6 +2404,12 @@ static int32_t msgToPhysiNode(STlvDecoder* pDecoder, void* pObj) { case PHY_NODE_CODE_OUTPUT_TS_ORDER: code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder)); break; + case PHY_NODE_CODE_REQUIRE_DATA_ORDER: + code = tlvDecodeEnum(pTlv, &pNode->requireDataOrder, sizeof(pNode->requireDataOrder)); + break; + case PHY_NODE_CODE_RESULT_DATA_ORDER: + code = tlvDecodeEnum(pTlv, &pNode->resultDataOrder, sizeof(pNode->resultDataOrder)); + break; case PHY_NODE_CODE_DYNAMIC_OP: code = tlvDecodeBool(pTlv, &pNode->dynamicOp); break; diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index 4aef4beb28cc..166c6dafb3e4 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -91,6 +91,8 @@ int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes); bool isPartTableAgg(SAggLogicNode* pAgg); bool isPartTagAgg(SAggLogicNode* pAgg); bool isPartTableWinodw(SWindowLogicNode* pWindow); +void planPromoteScanToTableMerge(SScanLogicNode* pScan, EDataOrderLevel requireLevel, + EDataOrderLevel resultLevel); bool keysHasCol(SNodeList* pKeys); bool keysHasTbname(SNodeList* pKeys); bool projectCouldMergeUnsortDataBlock(SProjectLogicNode* pProject); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 4c0c6020e5ab..c75d0bb7cf4d 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1554,6 +1554,24 @@ static EDataOrderLevel getRequireDataOrder(bool needTimeline, SSelectStmt* pSele : DATA_ORDER_LEVEL_NONE; } +static EDataOrderLevel getWindowMinInputDataOrder(EWindowType winType, SSelectStmt* pSelect) { + switch (winType) { + case WINDOW_TYPE_INTERVAL: + return DATA_ORDER_LEVEL_IN_BLOCK; + default: + return getRequireDataOrder(true, pSelect); + } +} + +static EDataOrderLevel getWindowInitialResultDataOrder(EWindowType winType, SSelectStmt* pSelect) { + switch (winType) { + case WINDOW_TYPE_INTERVAL: + return DATA_ORDER_LEVEL_IN_GROUP; + default: + return getRequireDataOrder(true, pSelect); + } +} + static int32_t addWinJoinPrimKeyToAggFuncs(SSelectStmt* pSelect, SNodeList** pList) { SNodeList* pTargets = *pList; int32_t code = 0; @@ -2204,7 +2222,10 @@ static int32_t createExternalWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SS PLAN_ERR_RET(rewriteExprsForSelect(pWindow->extFill.pFillExprs, pSelect, SQL_CLAUSE_EXT_WINDOW, NULL)); } - pWindow->inputHasOrder = (pWindow->isSingleTable || pWindow->node.requireDataOrder == DATA_ORDER_LEVEL_GLOBAL); + pWindow->inputHasOrder = + (pWindow->isSingleTable || + (pCxt->pCurrRoot != NULL && pCxt->pCurrRoot->resultDataOrder >= DATA_ORDER_LEVEL_GLOBAL && + (pCxt->pCurrRoot->outputTsOrder == ORDER_ASC || pCxt->pCurrRoot->outputTsOrder == ORDER_DESC))); if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) { pWindow->node.pConditions = NULL; @@ -2319,8 +2340,9 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit); pWindow->windowAlgo = INTERVAL_ALGO_HASH; pWindow->node.groupAction = (NULL != pInterval->pFill ? GROUP_ACTION_KEEP : getGroupAction(pCxt, pSelect)); - pWindow->node.requireDataOrder = (pSelect->hasTimeLineFunc ? getRequireDataOrder(true, pSelect) : DATA_ORDER_LEVEL_NONE); - pWindow->node.resultDataOrder = getRequireDataOrder(true, pSelect); + pWindow->node.requireDataOrder = + (pSelect->hasTimeLineFunc ? getWindowMinInputDataOrder(WINDOW_TYPE_INTERVAL, pSelect) : DATA_ORDER_LEVEL_NONE); + pWindow->node.resultDataOrder = getWindowInitialResultDataOrder(WINDOW_TYPE_INTERVAL, pSelect); pWindow->pTspk = NULL; code = nodesCloneNode(pInterval->pCol, &pWindow->pTspk); if (NULL == pWindow->pTspk) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index dce0e9c9804b..8e6f9c7cd75e 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3360,10 +3360,7 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS } pScan->node.outputTsOrder = order; if (TSDB_SUPER_TABLE == pScan->tableType && !pScan->phTbnameScan) { - pScan->scanType = SCAN_TYPE_TABLE_MERGE; - pScan->filesetDelimited = true; - pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; - pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; + planPromoteScanToTableMerge(pScan, DATA_ORDER_LEVEL_GLOBAL, DATA_ORDER_LEVEL_GLOBAL); } pScan->sortPrimaryKey = true; } else if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pSequencingNode)) { @@ -9697,8 +9694,7 @@ static int32_t createMergeScanNodeByScanNode(SScanLogicNode* pScan, SLogicNode** SScanLogicNode* pMergeScan = NULL; PLAN_ERR_JRET(nodesCloneNode((SNode*)pScan, (SNode**)&pMergeScan)); - pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE; - pMergeScan->filesetDelimited = true; + planPromoteScanToTableMerge(pMergeScan, pMergeScan->node.requireDataOrder, pMergeScan->node.resultDataOrder); optResetParent((SLogicNode*)pMergeScan); *pOutputMergeScan = (SLogicNode*)pMergeScan; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index f7a10b8cb8de..b69f06f5441a 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -676,6 +676,8 @@ static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode pPhysiNode->dynamicOp = pLogicNode->dynamicOp; pPhysiNode->inputTsOrder = pLogicNode->inputTsOrder; pPhysiNode->outputTsOrder = pLogicNode->outputTsOrder; + pPhysiNode->requireDataOrder = pLogicNode->requireDataOrder; + pPhysiNode->resultDataOrder = pLogicNode->resultDataOrder; code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 28d902c84912..12a038c1f37f 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -871,8 +871,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo static void stbSplSetTableMergeScan(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; - pScan->scanType = SCAN_TYPE_TABLE_MERGE; - pScan->filesetDelimited = true; + planPromoteScanToTableMerge(pScan, pScan->node.requireDataOrder, pScan->node.resultDataOrder); if (NULL != pScan->pGroupTags) { pScan->groupSort = true; } @@ -1515,8 +1514,7 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu SNodeList* pMergeKeys = NULL; if (TSDB_CODE_SUCCESS == code) { - pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE; - pMergeScan->filesetDelimited = true; + planPromoteScanToTableMerge(pMergeScan, pMergeScan->node.requireDataOrder, pMergeScan->node.resultDataOrder); pMergeScan->node.pChildren = pChildren; splSetParent((SLogicNode*)pMergeScan); diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 8c4a49c3c756..a9ae19ffa432 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -50,6 +50,20 @@ int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...) { return errCode; } +void planPromoteScanToTableMerge(SScanLogicNode* pScan, EDataOrderLevel requireLevel, EDataOrderLevel resultLevel) { + if (requireLevel < DATA_ORDER_LEVEL_IN_BLOCK) { + requireLevel = DATA_ORDER_LEVEL_IN_BLOCK; + } + if (resultLevel < DATA_ORDER_LEVEL_IN_BLOCK) { + resultLevel = DATA_ORDER_LEVEL_IN_BLOCK; + } + + pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->filesetDelimited = true; + pScan->node.requireDataOrder = requireLevel; + pScan->node.resultDataOrder = resultLevel; +} + typedef struct SCreateColumnCxt { int32_t errCode; SNodeList* pList; @@ -185,8 +199,7 @@ static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel if (DATA_ORDER_LEVEL_IN_BLOCK == requirement || pScan->placeholderType == SP_PARTITION_TBNAME || pScan->placeholderType == SP_PARTITION_ROWS) { pScan->scanType = SCAN_TYPE_TABLE; } else if (TSDB_SUPER_TABLE == pScan->tableType) { - pScan->scanType = SCAN_TYPE_TABLE_MERGE; - pScan->filesetDelimited = true; + planPromoteScanToTableMerge(pScan, pScan->node.requireDataOrder, requirement); } if (TSDB_NORMAL_TABLE != pScan->tableType && TSDB_CHILD_TABLE != pScan->tableType) { diff --git a/test/cases/13-TimeSeriesExt/03-TimeWindow/test_interval_bugfix.py b/test/cases/13-TimeSeriesExt/03-TimeWindow/test_interval_bugfix.py index cbf04e004030..83d4dccafec4 100644 --- a/test/cases/13-TimeSeriesExt/03-TimeWindow/test_interval_bugfix.py +++ b/test/cases/13-TimeSeriesExt/03-TimeWindow/test_interval_bugfix.py @@ -47,6 +47,65 @@ def test_interval_bugfix(self): self.td_6739571506_test() self.sliding_month_february() + def test_interval_data_order_level(self): + """Interval: data order level regression + + Validate that a multi-table interval subquery can feed another interval + window without requiring the inner result to be globally ordered. The + inner partition-by-tbname interval output is only group ordered, but the + outer interval should still aggregate correctly. + + Since: v3.4.0.0 + + Labels: common,ci + + Jira: None + """ + + db = "db_interval_order_level" + self.testSql.execute(f"drop database if exists {db}") + self.testSql.execute(f"create database {db}") + self.testSql.execute(f"use {db}") + self.testSql.execute("create stable st(ts timestamp, v int) tags(g int)") + self.testSql.execute("create table t1 using st tags(1)") + self.testSql.execute("create table t2 using st tags(2)") + self.testSql.execute( + "insert into t1 values " + "('2024-01-01 00:00:00.000', 1) " + "('2024-01-01 00:00:01.000', 2) " + "('2024-01-01 00:00:04.000', 3)" + ) + self.testSql.execute( + "insert into t2 values " + "('2024-01-01 00:00:00.000', 10) " + "('2024-01-01 00:00:02.000', 20) " + "('2024-01-01 00:00:05.000', 30)" + ) + + sql = ( + "select cast(_wstart as bigint), sum(c) from " + "(select _wstart, count(*) as c, tbname from st partition by tbname interval(2s)) " + "interval(4s)" + ) + self.testSql.query(sql) + self.testSql.checkRows(2) + self.testSql.checkData(0, 0, 1704067200000) + self.testSql.checkData(0, 1, 4) + self.testSql.checkData(1, 0, 1704067204000) + self.testSql.checkData(1, 1, 2) + + sql_desc = ( + "select cast(_wstart as bigint), sum(c) from " + "(select _wstart, count(*) as c, tbname from st partition by tbname interval(2s) order by 1 desc) " + "interval(4s)" + ) + self.testSql.query(sql_desc) + self.testSql.checkRows(2) + self.testSql.checkData(0, 0, 1704067200000) + self.testSql.checkData(0, 1, 4) + self.testSql.checkData(1, 0, 1704067204000) + self.testSql.checkData(1, 1, 2) + def sliding_month_february(self): """Validate interval(1n) monthly windows over February with various sliding values. @@ -640,4 +699,4 @@ def td_6739571506_test(self): self.testSql.query("select bottom(q_int,71) from (select * from stb) interval(15n,9n) order by ts;") self.testSql.query("select bottom(q_int,71) from (select * from stb) interval(1088n,500n) order by ts;") - return \ No newline at end of file + return diff --git a/test/cases/13-TimeSeriesExt/08-ExternalWindow/test_external.py b/test/cases/13-TimeSeriesExt/08-ExternalWindow/test_external.py index 2745aa37956d..b7b9069976d2 100644 --- a/test/cases/13-TimeSeriesExt/08-ExternalWindow/test_external.py +++ b/test/cases/13-TimeSeriesExt/08-ExternalWindow/test_external.py @@ -77,6 +77,40 @@ def test_External(self): self.fill_external_window_regression() self.scenario_regression() + def test_external_multitable_order_level(self): + """External: multitable order level regression + + Validate that external_window on a multi-table source still returns the + expected windows when the source is not treated as globally ordered. + This protects the unordered fallback path after inputHasOrder is derived + from actual upstream order properties. + + Since: v3.4.0.0 + + Labels: common,ci + + Jira: None + """ + + tdLog.debug(f"start to execute {__file__} multi-table order level regression") + self.dbName = "test" + self.prepare_data() + self.prepare_for_orderby_and_alias() + + tdSql.query( + "select cast(_wstart as bigint) as ws, count(*) as c " + "from ext_ord_src external_window((select ts, endtime, mark from ext_ord_win_all) w)" + ) + tdSql.checkRows(4) + tdSql.checkData(0, 0, 1700100000000) + tdSql.checkData(0, 1, 2) + tdSql.checkData(1, 0, 1700100600000) + tdSql.checkData(1, 1, 3) + tdSql.checkData(2, 0, 1700101200000) + tdSql.checkData(2, 1, 1) + tdSql.checkData(3, 0, 1700101800000) + tdSql.checkData(3, 1, 2) + def mock_test_external_window_single_block(self): dbName = "external_window_test_single_block" self.prepare_mock_data(dbName)