Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/libs/nodes/plannodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ typedef struct SPhysiNode {
bool dynamicOp;
EOrder inputTsOrder;
EOrder outputTsOrder;
EDataOrderLevel requireDataOrder;
EDataOrderLevel resultDataOrder;
SDataBlockDescNode* pOutputDataBlockDesc;
SNode* pConditions;
SNodeList* pChildren;
Expand Down
19 changes: 19 additions & 0 deletions source/libs/executor/inc/executorInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,27 @@ typedef struct SOptrBasicInfo {
bool mergeResultBlock;
int32_t inputTsOrder;
int32_t outputTsOrder;
EDataOrderLevel inputDataOrder;
EDataOrderLevel outputDataOrder;
} SOptrBasicInfo;

static FORCE_INLINE void setOptrBasicInfoOrder(SOptrBasicInfo* pInfo, const SPhysiNode* pNode) {
pInfo->inputTsOrder = pNode->inputTsOrder;
pInfo->outputTsOrder = pNode->outputTsOrder;
pInfo->inputDataOrder = pNode->requireDataOrder;
pInfo->outputDataOrder = pNode->resultDataOrder;
}

static FORCE_INLINE bool optrHasOrderedInput(const SOptrBasicInfo* pInfo, EDataOrderLevel minLevel) {
return (pInfo->inputTsOrder == ORDER_ASC || pInfo->inputTsOrder == ORDER_DESC) &&
pInfo->inputDataOrder >= minLevel;
}

static FORCE_INLINE bool optrHasOrderedOutput(const SOptrBasicInfo* pInfo, EDataOrderLevel minLevel) {
return (pInfo->outputTsOrder == ORDER_ASC || pInfo->outputTsOrder == ORDER_DESC) &&
pInfo->outputDataOrder >= minLevel;
}
Comment on lines +535 to +543
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The helper functions optrHasOrderedInput and optrHasOrderedOutput use ORDER_ASC and ORDER_DESC (likely from the EOrder enum), while other parts of the executor (e.g., timewindowoperator.c:787, externalwindowoperator.c:1647) use TSDB_ORDER_ASC. Mixing these different constant sets for the same semantic meaning can lead to subtle bugs if their values ever diverge and makes the code harder to maintain. It is recommended to use a consistent set of constants throughout the executor layer.


typedef struct SIndefRowsWindowState {
STimeWindow win; // logical window range for this state
uint64_t groupId; // source group id of this logical window
Expand Down
3 changes: 1 addition & 2 deletions source/libs/executor/src/aggregateoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
pInfo->groupId = UINT64_MAX;
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pAggNode->node);
pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
pInfo->pOperator = pOperator;
pInfo->cleanGroupResInfo = false;
Expand Down
4 changes: 1 addition & 3 deletions source/libs/executor/src/anomalywindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
QUERY_CHECK_CODE(code, lino, _error);

initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->binfo.inputTsOrder = pAnomalyNode->window.node.inputTsOrder;
pInfo->binfo.outputTsOrder = pAnomalyNode->window.node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pAnomalyNode->window.node);

pInfo->anomalyCols = taosArrayInit(LIST_LENGTH(pColNodes), sizeof(SColumn));
pInfo->anomalyData = taosArrayInit(LIST_LENGTH(pColNodes), sizeof(SStateKeys));
Expand Down Expand Up @@ -885,4 +884,3 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
void destroyForecastInfo(void* param) {}

#endif

3 changes: 1 addition & 2 deletions source/libs/executor/src/countwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
QUERY_CHECK_CODE(code, lino, _error);

initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->binfo.inputTsOrder = physiNode->inputTsOrder;
pInfo->binfo.outputTsOrder = physiNode->outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, physiNode);
pInfo->windowCount = pCountWindowNode->windowCount;
pInfo->windowSliding = pCountWindowNode->windowSliding;
// sizeof(SCountWindowResult)
Expand Down
3 changes: 1 addition & 2 deletions source/libs/executor/src/eventwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
QUERY_CHECK_CODE(code, lino, _error);

initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->binfo.inputTsOrder = physiNode->inputTsOrder;
pInfo->binfo.outputTsOrder = physiNode->outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, physiNode);
pInfo->winSup.lastTs = INT64_MIN;

code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
Expand Down
13 changes: 7 additions & 6 deletions source/libs/executor/src/externalwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1644,8 +1644,9 @@ int32_t createMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstream, SPh

pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
pExtW->mode = pPhynode->window.pFuncs ? EEXT_MODE_AGG : EEXT_MODE_SCALAR;
pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
pPhynode->window.node.outputTsOrder = pPhynode->window.node.inputTsOrder;
setOptrBasicInfoOrder(&pExtW->binfo, &pPhynode->window.node);
pExtW->needGroupSort = pPhynode->needGroupSort;
pExtW->calcWithPartition = pPhynode->calcWithPartition;
pExtW->extWinSplit = pPhynode->extWinSplit;
Expand Down Expand Up @@ -4359,8 +4360,9 @@ int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNo
pExtW->fillMode = pPhynode->extFill.mode;
pExtW->pFillExprs = pPhynode->extFill.pFillExprs;
pExtW->pFillValues = pPhynode->extFill.pFillValues;
pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
pPhynode->window.node.outputTsOrder = pPhynode->window.node.inputTsOrder;
setOptrBasicInfoOrder(&pExtW->binfo, &pPhynode->window.node);
pExtW->isDynWindow = false;

qDebug("%s create extWin operator, execModel:%d, phySubquery:%p", GET_TASKID(pTaskInfo), pTaskInfo->execModel,
Expand Down Expand Up @@ -4489,8 +4491,7 @@ int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNo
pTaskInfo->pStreamRuntimeInfo);
TSDB_CHECK_CODE(code, lino, _error);

pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
setOptrBasicInfoOrder(&pExtW->binfo, pNode);
code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
TSDB_CHECK_CODE(code, lino, _error);

Expand Down
3 changes: 1 addition & 2 deletions source/libs/executor/src/groupoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,7 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);

pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pAggNode->node);

pInfo->pOperator = pOperator;

Expand Down
3 changes: 1 addition & 2 deletions source/libs/executor/src/mergeoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,7 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
pOperator->pPhyNode = pPhyNode;
pInfo->groupMerge = pMergePhyNode->groupSort;
pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId;
pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pMergePhyNode->node);
pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId;

pInfo->type = pMergePhyNode->type;
Expand Down
6 changes: 2 additions & 4 deletions source/libs/executor/src/projectoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,7 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
TSDB_CHECK_CODE(code, lino, _error);

pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pProjPhyNode->node);
pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;

Expand Down Expand Up @@ -733,8 +732,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
TSDB_CHECK_CODE(code, lino, _error);

pInfo->binfo.pRes = pResBlock;
pInfo->binfo.inputTsOrder = pNode->inputTsOrder;
pInfo->binfo.outputTsOrder = pNode->outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, pNode);
code = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr, &pInfo->pPseudoColInfo);
TSDB_CHECK_CODE(code, lino, _error);

Expand Down
6 changes: 2 additions & 4 deletions source/libs/executor/src/sortoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
}
if (code != TSDB_CODE_SUCCESS) goto _error;

pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pSortNode->node);
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);

setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
Expand Down Expand Up @@ -959,8 +958,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error);

pInfo->binfo.inputTsOrder = pSortPhyNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pSortPhyNode->node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pSortPhyNode->node);

int32_t numOfOutputCols = 0;
code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
Expand Down
13 changes: 7 additions & 6 deletions source/libs/executor/src/streamexternalwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,9 @@ int32_t createStreamMergeAlignedExternalWindowOperator(SOperatorInfo* pDownstrea

pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : EEXT_MODE_AGG;
pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
pPhynode->window.node.outputTsOrder = pPhynode->window.node.inputTsOrder;
setOptrBasicInfoOrder(&pExtW->binfo, &pPhynode->window.node);

size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
Expand Down Expand Up @@ -2359,8 +2360,9 @@ int32_t createStreamExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNod

pExtW->primaryTsIndex = ((SColumnNode*)pPhynode->window.pTspk)->slotId;
pExtW->mode = pPhynode->window.pProjs ? EEXT_MODE_SCALAR : (pPhynode->window.indefRowsFunc ? EEXT_MODE_INDEFR_FUNC : EEXT_MODE_AGG);
pExtW->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
pExtW->binfo.outputTsOrder = pExtW->binfo.inputTsOrder;
pPhynode->window.node.inputTsOrder = TSDB_ORDER_ASC;
pPhynode->window.node.outputTsOrder = pPhynode->window.node.inputTsOrder;
setOptrBasicInfoOrder(&pExtW->binfo, &pPhynode->window.node);
pExtW->isDynWindow = false;

if (pTaskInfo->pStreamRuntimeInfo != NULL){
Expand Down Expand Up @@ -2466,8 +2468,7 @@ int32_t createStreamExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNod
pTaskInfo->pStreamRuntimeInfo);
TSDB_CHECK_CODE(code, lino, _error);

pExtW->binfo.inputTsOrder = pNode->inputTsOrder;
pExtW->binfo.outputTsOrder = pNode->outputTsOrder;
setOptrBasicInfoOrder(&pExtW->binfo, pNode);
code = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfExpr, &pExtW->pPseudoColInfo);
TSDB_CHECK_CODE(code, lino, _error);

Expand Down
20 changes: 7 additions & 13 deletions source/libs/executor/src/timewindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
uint64_t tableGroupId = pBlock->info.id.groupId;
bool ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
SResultRow* pResult = NULL;
bool sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC || tsCols == NULL;
bool sorted = optrHasOrderedInput(&pInfo->binfo, DATA_ORDER_LEVEL_IN_BLOCK) || tsCols == NULL;
TSKEY ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos];
int32_t ret = TSDB_CODE_SUCCESS;

Expand Down Expand Up @@ -2325,8 +2325,7 @@ static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIn

pIntervalInfo->cleanGroupResInfo = false;
pIntervalInfo->handledGroupNum = 0;
pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
setOptrBasicInfoOrder(&pIntervalInfo->binfo, &pPhynode->window.node);

taosArrayDestroy(pIntervalInfo->pInterpCols);
pIntervalInfo->pInterpCols = NULL;
Expand Down Expand Up @@ -2410,8 +2409,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
};

pInfo->win = pTaskInfo->window;
pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pPhyNode->window.node);
pInfo->interval = interval;
pInfo->twAggSup = as;
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
Expand Down Expand Up @@ -2793,8 +2791,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhy
goto _error;
}
}
pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pStateNode->window.node);

code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
pTaskInfo->pStreamRuntimeInfo);
Expand Down Expand Up @@ -2968,8 +2965,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pSessionNode->window.node);

if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
Expand Down Expand Up @@ -3305,8 +3301,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge

miaInfo->curTs = INT64_MIN;
iaInfo->win = pTaskInfo->window;
iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
setOptrBasicInfoOrder(&iaInfo->binfo, &pNode->window.node);
iaInfo->interval = interval;
iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
Expand Down Expand Up @@ -3664,11 +3659,10 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva

SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
pIntervalInfo->win = pTaskInfo->window;
pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
setOptrBasicInfoOrder(&pIntervalInfo->binfo, &pIntervalPhyNode->window.node);
pIntervalInfo->interval = interval;
pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;

SExprSupp* pExprSupp = &pOperator->exprSupp;
pExprSupp->hasWindowOrGroup = true;
Expand Down
3 changes: 1 addition & 2 deletions source/libs/executor/src/virtualtablescanoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,8 +923,7 @@ int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t

pOperator->pPhyNode = pVirtualScanPhyNode;

pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
setOptrBasicInfoOrder(&pInfo->binfo, &pVirtualScanPhyNode->scan.node);

SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
Expand Down
2 changes: 2 additions & 0 deletions source/libs/nodes/src/nodesCloneFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,8 @@ static int32_t physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) {
CLONE_NODE_LIST_FIELD(pChildren);
COPY_SCALAR_FIELD(inputTsOrder);
COPY_SCALAR_FIELD(outputTsOrder);
COPY_SCALAR_FIELD(requireDataOrder);
COPY_SCALAR_FIELD(resultDataOrder);
COPY_SCALAR_FIELD(dynamicOp);
COPY_SCALAR_FIELD(forceCreateNonBlockingOptr);
return TSDB_CODE_SUCCESS;
Expand Down
14 changes: 14 additions & 0 deletions source/libs/nodes/src/nodesCodeFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2170,6 +2170,8 @@ static const char* jkPhysiPlanLimit = "Limit";
static const char* jkPhysiPlanSlimit = "SLimit";
static const char* jkPhysiPlanInputTsOrder = "InputOrder";
static const char* jkPhysiPlanOutputTsOrder = "OutputOrder";
static const char* jkPhysiPlanRequireDataOrder = "RequireDataOrder";
static const char* jkPhysiPlanResultDataOrder = "ResultDataOrder";
static const char* jkPhysiPlanDynamicOp = "DynamicOp";
static const char* jkPhysiPlanForceCreateNonBlockingOptr = "ForceCreateNonBlockingOptr";

Expand All @@ -2195,6 +2197,12 @@ static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkPhysiPlanOutputTsOrder, pNode->outputTsOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkPhysiPlanRequireDataOrder, pNode->requireDataOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkPhysiPlanResultDataOrder, pNode->resultDataOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkPhysiPlanDynamicOp, pNode->dynamicOp);
}
Expand Down Expand Up @@ -2227,6 +2235,12 @@ static int32_t jsonToPhysicPlanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkPhysiPlanOutputTsOrder, pNode->outputTsOrder, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkPhysiPlanRequireDataOrder, pNode->requireDataOrder, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkPhysiPlanResultDataOrder, pNode->resultDataOrder, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkPhysiPlanDynamicOp, &pNode->dynamicOp);
}
Expand Down
14 changes: 14 additions & 0 deletions source/libs/nodes/src/nodesMsgFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2332,6 +2332,8 @@ enum {
PHY_NODE_CODE_SLIMIT,
PHY_NODE_CODE_INPUT_TS_ORDER,
PHY_NODE_CODE_OUTPUT_TS_ORDER,
PHY_NODE_CODE_REQUIRE_DATA_ORDER,
PHY_NODE_CODE_RESULT_DATA_ORDER,
PHY_NODE_CODE_DYNAMIC_OP,
PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR
};
Expand All @@ -2358,6 +2360,12 @@ static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_REQUIRE_DATA_ORDER, pNode->requireDataOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_RESULT_DATA_ORDER, pNode->resultDataOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_NODE_CODE_DYNAMIC_OP, pNode->dynamicOp);
}
Expand Down Expand Up @@ -2396,6 +2404,12 @@ static int32_t msgToPhysiNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_NODE_CODE_OUTPUT_TS_ORDER:
code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder));
break;
case PHY_NODE_CODE_REQUIRE_DATA_ORDER:
code = tlvDecodeEnum(pTlv, &pNode->requireDataOrder, sizeof(pNode->requireDataOrder));
break;
case PHY_NODE_CODE_RESULT_DATA_ORDER:
code = tlvDecodeEnum(pTlv, &pNode->resultDataOrder, sizeof(pNode->resultDataOrder));
break;
case PHY_NODE_CODE_DYNAMIC_OP:
code = tlvDecodeBool(pTlv, &pNode->dynamicOp);
break;
Expand Down
2 changes: 2 additions & 0 deletions source/libs/planner/inc/planInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
bool isPartTableAgg(SAggLogicNode* pAgg);
bool isPartTagAgg(SAggLogicNode* pAgg);
bool isPartTableWinodw(SWindowLogicNode* pWindow);
void planPromoteScanToTableMerge(SScanLogicNode* pScan, EDataOrderLevel requireLevel,
EDataOrderLevel resultLevel);
bool keysHasCol(SNodeList* pKeys);
bool keysHasTbname(SNodeList* pKeys);
bool projectCouldMergeUnsortDataBlock(SProjectLogicNode* pProject);
Expand Down
Loading
Loading