Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
491f2f2
fix(subq/cache): bypass subquery cache per event
stephenkgu May 14, 2026
f05d247
test suites for bypass subquery result cache per event
stephenkgu May 14, 2026
0bb85be
use the form of old type for clarity
stephenkgu May 14, 2026
2e85723
new fix for slot updating, and new suite to cateh this edge case
stephenkgu May 14, 2026
47d27ea
Potential fix for pull request finding
stephenkgu May 14, 2026
1a47156
Potential fix for pull request finding
stephenkgu May 14, 2026
10d15af
Potential fix for pull request finding
stephenkgu May 14, 2026
1eead38
Potential fix for pull request finding
stephenkgu May 14, 2026
71048f2
fix: per-event refetch for REMOTE_VALUE_LIST and REMOTE_ROW; reset is…
stephenkgu May 14, 2026
6b728f0
test: actively wait for event 4 to settle in test_where_subquery
stephenkgu May 14, 2026
4bdb269
test: poll for seed-row visibility instead of fixed 10s sleeps
stephenkgu May 15, 2026
04c4e75
test: remove unused expected_ts in seed-row poll
stephenkgu May 15, 2026
3ee4f60
Potential fix for pull request finding
stephenkgu May 15, 2026
4c24913
fix: per-event refetch for REMOTE_ZERO_ROWS in stream mode
stephenkgu May 15, 2026
e0ac5e3
fix: reset cached IN-list presence flags on stream invalidation
stephenkgu May 15, 2026
137ca30
fix: keep REMOTE_ROW typed as remote on stream completion
stephenkgu May 15, 2026
2fa5f04
Merge branch 'main' into fix/6984935627
stephenkgu May 15, 2026
00614a5
test: split event 5 into insert5/check5 step methods
stephenkgu May 15, 2026
b2a6aa3
test: defer event-4 stale-(1,1) check to check5 to avoid 60s wait
stephenkgu May 15, 2026
b70a74a
test: aggregate EXISTS projection so each event emits one row
stephenkgu May 15, 2026
c721799
doc: clarify stream-mode subResNodes[] usage in qFetchRemoteNode
stephenkgu May 15, 2026
c11b25a
fix: extract literal PK bounds in stream remote-value cond
stephenkgu May 15, 2026
be6e943
test: document EXISTS coverage in class docstring
stephenkgu May 15, 2026
c50fe41
Potential fix for pull request finding
stephenkgu May 15, 2026
c6e6b3f
Potential fix for pull request finding
stephenkgu May 15, 2026
9b37877
fix: remove dead non-stream branch in handleRemoteRowRes EOF path
stephenkgu May 15, 2026
059be77
Merge branch 'fix/6984935627' of https://github.com/taosdata/TDengine…
stephenkgu May 15, 2026
ff12863
fix: free prior var-length datum.p in setValueFromResBlock
stephenkgu May 15, 2026
86e1af4
test: set _rows_after_e3 in check3 for check4 baseline
stephenkgu May 15, 2026
3a6ea8f
fix: strip remote children before extracting PK time range in stream
stephenkgu May 15, 2026
a17ef4e
fix: propagate clone failure in stream PK time-range probe
stephenkgu May 15, 2026
1949f2f
Potential fix for pull request finding
stephenkgu May 15, 2026
a4a9b62
fix: preserve and intersect incoming scan window in stream PK probe
stephenkgu May 15, 2026
ad0163c
test: clarify event 5 NULL-mask comment without review-thread reference
stephenkgu May 15, 2026
2d5da53
Potential fix for pull request finding
stephenkgu May 15, 2026
819bae4
test: fix IN-list check3 race by waiting for event 3 before insert4
stephenkgu May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/libs/scalar/scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ typedef struct SScalarExtraInfo {
void* pStreamInfo;
void* pStreamRange;
void* pSubJobCtx;
bool isStream;
Comment thread
stephenkgu marked this conversation as resolved.
sclFetchFromRemote fp;
} SScalarExtraInfo;

Expand Down
3 changes: 3 additions & 0 deletions source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ static int32_t mmProcessStreamFetchMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) {
STREAM_CHECK_NULL_GOTO(pResList, terrno);
uint64_t ts = 0;
bool hasNext = false;
// Init thread_local scalar ctx; mnode workers may not have set it for
// this task, breaking scalar-subquery re-fetch on later events.
setTaskScalarExtraInfo(sStreamReaderCalcInfo->pTaskInfo);
STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, true));

for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
Expand Down
6 changes: 6 additions & 0 deletions source/dnode/vnode/src/vnd/vnodeStream.c
Original file line number Diff line number Diff line change
Expand Up @@ -4063,6 +4063,12 @@ static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueI
pResList = taosArrayInit(4, POINTER_BYTES);
STREAM_CHECK_NULL_GOTO(pResList, terrno);
uint64_t ts = 0;
// Stream fetches enter via vnode worker threads that may not have
// initialized the thread_local gTaskScalarExtra for this task. Without
// this, scalar subqueries embedded in the operator (e.g. WHERE ts >=
// (SELECT last_row(ts) FROM ref)) fail with "no subJob ctx" on whichever
// event happens to land on a fresh worker thread.
setTaskScalarExtraInfo(sStreamReaderCalcInfo->pTaskInfo);
STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));

for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
Expand Down
68 changes: 63 additions & 5 deletions source/libs/executor/src/executil.c
Original file line number Diff line number Diff line change
Expand Up @@ -3213,7 +3213,38 @@ static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* ran
return code;
}

static int32_t getPrimaryTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* isStrict) {
static EDealRes condHasRemoteValueWalker(SNode* pNode, void* pContext) {
if (nodeType(pNode) == QUERY_NODE_REMOTE_VALUE ||
nodeType(pNode) == QUERY_NODE_REMOTE_VALUE_LIST ||
nodeType(pNode) == QUERY_NODE_REMOTE_ROW ||
nodeType(pNode) == QUERY_NODE_REMOTE_TABLE ||
nodeType(pNode) == QUERY_NODE_REMOTE_ZERO_ROWS) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}

static bool condHasRemoteValue(SNode* pNode) {
bool found = false;
nodesWalkExpr(pNode, condHasRemoteValueWalker, &found);
return found;
}

static int32_t getPrimaryTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* isStrict, bool isStream) {
// In stream mode, a WHERE primary-key cond that references a remote
// subquery (e.g. WHERE ts >= (SELECT last_row(ts) FROM ref)) must not be
// folded to a literal time range here; doing so freezes the bound at
// task-build time and the same range is reused for every trigger event.
// Leave the cond intact so the caller merges it into pConditions and the
// runtime filter re-evaluates the RemoteValueNode per fetch. Combined
// with qFetchRemoteNode's stream-mode cache bypass, this re-fires the
// wire request (req.reset=true) for each event.
if (isStream && condHasRemoteValue(*pPrimaryKeyCond)) {
*isStrict = false;
return TSDB_CODE_SUCCESS;
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
}

SNode* pNew = NULL;
int32_t code = scalarCalculateRemoteConstants(*pPrimaryKeyCond, &pNew);
if (TSDB_CODE_SUCCESS == code) {
Expand Down Expand Up @@ -3292,7 +3323,8 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, STableScanPhysiNode*

if (pTableScanNode->pPrimaryCond) {
bool isStrict = false;
code = getPrimaryTimeRange((SNode**)&pTableScanNode->pPrimaryCond, &pCond->twindows, &isStrict);
bool isStream = (readHandle != NULL && readHandle->streamRtInfo != NULL);
code = getPrimaryTimeRange((SNode**)&pTableScanNode->pPrimaryCond, &pCond->twindows, &isStrict, isStream);
if (code || !isStrict) {
code = nodesMergeNode((SNode**)&pTableScanNode->scan.node.pConditions, &pTableScanNode->pPrimaryCond);
}
Expand Down Expand Up @@ -4479,7 +4511,8 @@ void handleRemoteValueRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetri
if (IS_STREAM_MODE(pTaskInfo)) {
SNode** ppRes = taosArrayGet(ctx->subResNodes, pParam->subQIdx);
if (NULL == *ppRes && 0 == pRsp->numOfRows) {
pRemote->val.node.type = QUERY_NODE_VALUE;
// First-call empty result: keep node type as REMOTE_VALUE so the
// scalar walker re-dispatches sclWalkRemoteValue per evaluation.
pRemote->val.isNull = true;
pRemote->val.translate = true;
Comment on lines +4587 to 4590
pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET);
Comment on lines +4587 to 4591
Comment on lines +4587 to 4591
Comment thread
stephenkgu marked this conversation as resolved.
Expand All @@ -4502,7 +4535,11 @@ void handleRemoteValueRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetri

blockDataDestroy(pResBlock);
} else if (NULL != *ppRes && 0 == pRsp->numOfRows) {
pRemote->val.node.type = QUERY_NODE_VALUE;
// Completion sentinel after a data row: do NOT mutate the AST node
// type to QUERY_NODE_VALUE. In stream mode the node must remain
// REMOTE_VALUE so the next per-event walker pass re-fires the fetch
// (otherwise every subsequent trigger event replays the literal
// captured on the first event).
Comment on lines +4611 to +4615
pRsp->completed = true;
}

Expand Down Expand Up @@ -5218,8 +5255,29 @@ int32_t qFetchRemoteNode(void* pCtx, int32_t subQIdx, SNode* pRes) {
return TSDB_CODE_QRY_SUBQ_NOT_FOUND;
}

// In stream mode the subquery must be re-evaluated for every trigger event;
// a cached SNode from an earlier event would replay a stale value and turn
// dynamic subqueries (e.g. WHERE ts >= (SELECT last_row(ts) FROM ref))
// into constants. Always go to the wire so req.reset=true reaches the
// vnode reader and rebuilds its task with the current ranges.
//
// Note: pRes here is a pointer into the operator's AST (the SRemoteValueNode
// placeholder), not a heap-owned node. In non-stream mode we still cache
// that borrowed pointer in subResNodes[] so subsequent calls memcpy from it,
// but in stream mode we must NOT touch the slot: writing pRes would create
// a dangling alias once the AST is rebuilt, and freeing it would double-free
// the AST node.
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
SNode** ppRes = taosArrayGet(ctx->subResNodes, subQIdx);
if (NULL == *ppRes) {
if (ctx->isStream) {
// Clear the slot before refetching so that an empty result from this
// event takes handleRemoteValueRes's first-call branch (which marks
// the placeholder NULL) instead of the "EOF after data" branch
// (which would silently retain the previous event's value).
Comment thread
stephenkgu marked this conversation as resolved.
if (ppRes) {
*ppRes = NULL;
}
TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes));
Comment on lines +5353 to +5361
Comment thread
stephenkgu marked this conversation as resolved.
Comment on lines +5353 to +5361
Comment on lines +5353 to +5361
Comment on lines +5353 to +5361
Comment on lines +5353 to +5361
Comment on lines +5353 to +5361
Comment on lines +5353 to +5361
Comment thread
stephenkgu marked this conversation as resolved.
Comment thread
stephenkgu marked this conversation as resolved.
} else if (NULL == *ppRes) {
TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes));
*ppRes = pRes;
} else {
Expand Down
2 changes: 2 additions & 0 deletions source/libs/executor/src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ SGlobalExecInfo gExecInfo = {0};
void setTaskScalarExtraInfo(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
gTaskScalarExtra.pSubJobCtx = pTaskInfo->pSubJobCtx;
gTaskScalarExtra.isStream =
(pTaskInfo->pSubJobCtx != NULL) && ((STaskSubJobCtx*)pTaskInfo->pSubJobCtx)->isStream;
gTaskScalarExtra.fp = qFetchRemoteNode;
}

Expand Down
1 change: 1 addition & 0 deletions source/libs/scalar/inc/sclInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct SScalarCtx {
SOperatorValueType type;
SScalarStreamCtx stream;
void* pSubJobCtx;
bool isStream;
Comment thread
stephenkgu marked this conversation as resolved.
sclFetchFromRemote fetchFp;
} SScalarCtx;

Expand Down
17 changes: 15 additions & 2 deletions source/libs/scalar/src/scalar.c
Original file line number Diff line number Diff line change
Expand Up @@ -694,9 +694,20 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t
param->colAlloced = false;
break;
}
case QUERY_NODE_REMOTE_VALUE:
SCL_ERR_RET(TSDB_CODE_QRY_SUBQ_EXEC_ERROR);
case QUERY_NODE_REMOTE_VALUE: {
// After handleRemoteValueRes settles the inner SValueNode, treat the
// node as a plain value for parameter setup. Pass the inner val and
// temporarily flip its type so the QUERY_NODE_VALUE branch runs;
// restore so subsequent walker passes (in stream mode) keep
// re-dispatching sclWalkRemoteValue.
SRemoteValueNode *pRemote = (SRemoteValueNode *)node;
ENodeType oldType = pRemote->val.node.type;
pRemote->val.node.type = QUERY_NODE_VALUE;
int32_t code = sclInitParam((SNode *)&pRemote->val, param, ctx, rowNum);
pRemote->val.node.type = oldType;
SCL_ERR_RET(code);
Comment thread
stephenkgu marked this conversation as resolved.
break;
}
case QUERY_NODE_REMOTE_VALUE_LIST: {
SRemoteValueListNode* pRemote = (SRemoteValueListNode*)node;
if (!(pRemote->flag & VALUELIST_FLAG_VAL_UNSET)) {
Expand Down Expand Up @@ -2655,6 +2666,7 @@ int32_t sclCalcConstants(SNode *pNode, bool dual, bool remoteIncluded, bool null
ctx.stream.pStreamRuntimeFuncInfo = pExtra->pStreamInfo;
ctx.stream.streamTsRange = pExtra->pStreamRange;
ctx.pSubJobCtx = pExtra->pSubJobCtx;
ctx.isStream = pExtra->isStream;
ctx.fetchFp = pExtra->fp;
}

Expand Down Expand Up @@ -2837,6 +2849,7 @@ int32_t scalarCalculateInRange(SNode *pNode, SArray *pBlockList, SScalarParam *p
ctx.stream.pStreamRuntimeFuncInfo = pExtra->pStreamInfo;
ctx.stream.streamTsRange = pExtra->pStreamRange;
ctx.pSubJobCtx = pExtra->pSubJobCtx;
ctx.isStream = pExtra->isStream;
ctx.fetchFp = pExtra->fp;
}

Expand Down
Loading
Loading