diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index bb3cee9aeb05..7b3a25abbe31 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -31,6 +31,7 @@ typedef struct SScalarExtraInfo { void* pStreamInfo; void* pStreamRange; void* pSubJobCtx; + bool isStream; sclFetchFromRemote fp; } SScalarExtraInfo; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 86d6c7278269..e11798442122 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -342,6 +342,9 @@ static int32_t mmProcessStreamFetchMsg(SMnodeMgmt *pMgmt, SRpcMsg* pMsg) { STREAM_CHECK_NULL_GOTO(pResList, terrno); uint64_t ts = 0; bool hasNext = false; + // Init thread_local scalar ctx; mnode workers may not have set it for + // this task, breaking scalar-subquery re-fetch on later events. + setTaskScalarExtraInfo(sStreamReaderCalcInfo->pTaskInfo); STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, true)); for(size_t i = 0; i < taosArrayGetSize(pResList); i++){ diff --git a/source/dnode/vnode/src/vnd/vnodeStream.c b/source/dnode/vnode/src/vnd/vnodeStream.c index 98706ba19e4f..55d21caa370e 100644 --- a/source/dnode/vnode/src/vnd/vnodeStream.c +++ b/source/dnode/vnode/src/vnd/vnodeStream.c @@ -4063,6 +4063,12 @@ static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueI pResList = taosArrayInit(4, POINTER_BYTES); STREAM_CHECK_NULL_GOTO(pResList, terrno); uint64_t ts = 0; + // Stream fetches enter via vnode worker threads that may not have + // initialized the thread_local gTaskScalarExtra for this task. Without + // this, scalar subqueries embedded in the operator (e.g. WHERE ts >= + // (SELECT last_row(ts) FROM ref)) fail with "no subJob ctx" on whichever + // event happens to land on a fresh worker thread. + setTaskScalarExtraInfo(sStreamReaderCalcInfo->pTaskInfo); STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL)); for(size_t i = 0; i < taosArrayGetSize(pResList); i++){ diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 1bda531d35d0..8a7a9d001ee1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -3213,7 +3213,97 @@ static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* ran return code; } -static int32_t getPrimaryTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* isStrict) { +static EDealRes condHasRemoteValueWalker(SNode* pNode, void* pContext) { + if (nodeType(pNode) == QUERY_NODE_REMOTE_VALUE || + nodeType(pNode) == QUERY_NODE_REMOTE_VALUE_LIST || + nodeType(pNode) == QUERY_NODE_REMOTE_ROW || + nodeType(pNode) == QUERY_NODE_REMOTE_TABLE || + nodeType(pNode) == QUERY_NODE_REMOTE_ZERO_ROWS) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + return DEAL_RES_CONTINUE; +} + +static bool condHasRemoteValue(SNode* pNode) { + bool found = false; + nodesWalkExpr(pNode, condHasRemoteValueWalker, &found); + return found; +} + +static int32_t getPrimaryTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* isStrict, bool isStream) { + // In stream mode, a WHERE primary-key cond that references a remote + // subquery (e.g. WHERE ts >= (SELECT last_row(ts) FROM ref)) must not be + // folded to a literal time range here; doing so freezes the bound at + // task-build time and the same range is reused for every trigger event. + // Keep the original cond intact so the caller merges it into pConditions + // and the runtime filter re-evaluates the RemoteValueNode per fetch. + // Still attempt to extract any independent static primary-key bounds + // from the original AND-expression so e.g. + // ts >= '2026-05-01' AND ts < '2026-05-02' AND ts >= (SELECT ...) + // can use the literal range for scan pruning while the remote predicate + // is enforced by the residual filter. We strip remote-bearing children + // from a clone of the AND before calling filterGetTimeRange because the + // collector aborts when it sees any uncollectable child. + if (isStream && condHasRemoteValue(*pPrimaryKeyCond)) { + *isStrict = false; + // Preserve the incoming window (seeded by initQueryTableDataCond from + // scanRange/winRange). We will intersect any literal bound extracted + // below with it; if extraction yields no bound the incoming range + // remains as-is rather than being widened to a full scan. + STimeWindow incoming = *pTimeRange; + + SNode* pProbe = NULL; + int32_t cloneCode = nodesCloneNode(*pPrimaryKeyCond, &pProbe); + if (TSDB_CODE_SUCCESS != cloneCode || NULL == pProbe) { + nodesDestroyNode(pProbe); + return TSDB_CODE_SUCCESS != cloneCode ? cloneCode : TSDB_CODE_SUCCESS; + } + + if (nodeType(pProbe) == QUERY_NODE_LOGIC_CONDITION && + ((SLogicConditionNode*)pProbe)->condType == LOGIC_COND_TYPE_AND) { + SLogicConditionNode* pAnd = (SLogicConditionNode*)pProbe; + SListCell* pCell = pAnd->pParameterList ? pAnd->pParameterList->pHead : NULL; + while (pCell != NULL) { + if (condHasRemoteValue(pCell->pNode)) { + pCell = nodesListErase(pAnd->pParameterList, pCell); + } else { + pCell = pCell->pNext; + } + } + int32_t remaining = pAnd->pParameterList ? (int32_t)pAnd->pParameterList->length : 0; + if (remaining == 0) { + nodesDestroyNode(pProbe); + return TSDB_CODE_SUCCESS; + } + if (remaining == 1) { + SNode* pOnly = pAnd->pParameterList->pHead->pNode; + pAnd->pParameterList->pHead->pNode = NULL; + nodesDestroyNode(pProbe); + pProbe = pOnly; + } + } else if (condHasRemoteValue(pProbe)) { + nodesDestroyNode(pProbe); + return TSDB_CODE_SUCCESS; + } + + STimeWindow extracted = TSWINDOW_INITIALIZER; + bool probeStrict = true; + int32_t code = filterGetTimeRange(pProbe, &extracted, &probeStrict, NULL); + nodesDestroyNode(pProbe); + if (TSDB_CODE_SUCCESS == code) { + // Intersect the extracted literal range with the incoming window. + pTimeRange->skey = TMAX(incoming.skey, extracted.skey); + pTimeRange->ekey = TMIN(incoming.ekey, extracted.ekey); + } else { + *pTimeRange = incoming; + } + // Force isStrict=false: the residual remote predicate still has to + // be evaluated by the runtime filter on every event. + *isStrict = false; + return code; + } + SNode* pNew = NULL; int32_t code = scalarCalculateRemoteConstants(*pPrimaryKeyCond, &pNew); if (TSDB_CODE_SUCCESS == code) { @@ -3292,7 +3382,8 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, STableScanPhysiNode* if (pTableScanNode->pPrimaryCond) { bool isStrict = false; - code = getPrimaryTimeRange((SNode**)&pTableScanNode->pPrimaryCond, &pCond->twindows, &isStrict); + bool isStream = (readHandle != NULL && readHandle->streamRtInfo != NULL); + code = getPrimaryTimeRange((SNode**)&pTableScanNode->pPrimaryCond, &pCond->twindows, &isStrict, isStream); if (code || !isStrict) { code = nodesMergeNode((SNode**)&pTableScanNode->scan.node.pConditions, &pTableScanNode->pPrimaryCond); } @@ -4454,7 +4545,21 @@ int32_t setValueFromResBlock(STaskSubJobCtx* ctx, SValueNode* pRes, SSDataBlock* pRes->flag &= (~VALUE_FLAG_VAL_UNSET); pRes->translate = true; - + // Reset isNull so a previously-NULL placeholder (e.g. from an earlier + // stream event whose subquery returned no rows) isn't re-emitted as + // NULL when this fetch produced a real value. + pRes->isNull = false; + + // For variable-length / DECIMAL types, datum.p owns a heap buffer that + // was transferred from the response block on the previous fetch. In + // stream mode this function is called per trigger event, so free the + // prior buffer before nodesSetValueNodeValueExt overwrites datum.p, + // otherwise we leak one buffer per event. + if (IS_VAR_DATA_TYPE(pRes->node.resType.type) || + pRes->node.resType.type == TSDB_DATA_TYPE_DECIMAL) { + taosMemoryFreeClear(pRes->datum.p); + } + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0); if (colDataIsNull_s(pCol, 0)) { pRes->isNull = true; @@ -4479,7 +4584,8 @@ void handleRemoteValueRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetri if (IS_STREAM_MODE(pTaskInfo)) { SNode** ppRes = taosArrayGet(ctx->subResNodes, pParam->subQIdx); if (NULL == *ppRes && 0 == pRsp->numOfRows) { - pRemote->val.node.type = QUERY_NODE_VALUE; + // First-call empty result: keep node type as REMOTE_VALUE so the + // scalar walker re-dispatches sclWalkRemoteValue per evaluation. pRemote->val.isNull = true; pRemote->val.translate = true; pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET); @@ -4502,7 +4608,11 @@ void handleRemoteValueRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetri blockDataDestroy(pResBlock); } else if (NULL != *ppRes && 0 == pRsp->numOfRows) { - pRemote->val.node.type = QUERY_NODE_VALUE; + // Completion sentinel after a data row: do NOT mutate the AST node + // type to QUERY_NODE_VALUE. In stream mode the node must remain + // REMOTE_VALUE so the next per-event walker pass re-fires the fetch + // (otherwise every subsequent trigger event replays the literal + // captured on the first event). pRsp->completed = true; } @@ -4724,7 +4834,11 @@ void handleRemoteRowRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetriev blockDataDestroy(pResBlock); } else if (NULL != *ppRes && 0 == pRsp->numOfRows) { - pRemote->val.node.type = QUERY_NODE_VALUE; + // EOF after data in stream mode: keep the node typed as REMOTE_ROW + // (do NOT rewrite to QUERY_NODE_VALUE) so the next per-event walker + // pass re-dispatches the case in sclInitParam and re-fires the + // fetch; otherwise > ANY / row subqueries would permanently replay + // the previous event's value. pRsp->completed = true; } @@ -5218,8 +5332,34 @@ int32_t qFetchRemoteNode(void* pCtx, int32_t subQIdx, SNode* pRes) { return TSDB_CODE_QRY_SUBQ_NOT_FOUND; } + // In stream mode the subquery must be re-evaluated for every trigger event; + // a cached SNode from an earlier event would replay a stale value and turn + // dynamic subqueries (e.g. WHERE ts >= (SELECT last_row(ts) FROM ref)) + // into constants. Always go to the wire so req.reset=true reaches the + // vnode reader and rebuilds its task with the current ranges. + // + // Note: pRes here is a pointer into the operator's AST (the SRemoteValueNode + // placeholder), not a heap-owned node. In non-stream mode we still cache + // that borrowed pointer in subResNodes[] so subsequent calls memcpy from it. + // In stream mode the slot must not retain a stale pRes across trigger + // events or be treated as owning pRes: a stale entry would create a + // dangling alias once the AST is rebuilt, and freeing through that slot + // would double-free the AST node. Stream response handlers (e.g. + // handleRemoteValueRes) may still use subResNodes[] transiently during a + // single fetch to distinguish data from the completion sentinel; the + // requirement here is to clear the slot to NULL before each refetch so + // that only stale per-event state is invalidated, not to take ownership. SNode** ppRes = taosArrayGet(ctx->subResNodes, subQIdx); - if (NULL == *ppRes) { + if (ctx->isStream) { + // Clear the slot before refetching so that an empty result from this + // event takes handleRemoteValueRes's first-call branch (which marks + // the placeholder NULL) instead of the "EOF after data" branch + // (which would silently retain the previous event's value). + if (ppRes) { + *ppRes = NULL; + } + TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); + } else if (NULL == *ppRes) { TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); *ppRes = pRes; } else { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b47bf1995a28..a1f56d8cb490 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -44,6 +44,8 @@ SGlobalExecInfo gExecInfo = {0}; void setTaskScalarExtraInfo(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; gTaskScalarExtra.pSubJobCtx = pTaskInfo->pSubJobCtx; + gTaskScalarExtra.isStream = + (pTaskInfo->pSubJobCtx != NULL) && ((STaskSubJobCtx*)pTaskInfo->pSubJobCtx)->isStream; gTaskScalarExtra.fp = qFetchRemoteNode; } diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index ca73c6324280..2d33b7bdbb40 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -54,6 +54,7 @@ typedef struct SScalarCtx { SOperatorValueType type; SScalarStreamCtx stream; void* pSubJobCtx; + bool isStream; sclFetchFromRemote fetchFp; } SScalarCtx; diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 0e010a93b877..0f5e4a823e08 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -694,11 +694,43 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t param->colAlloced = false; break; } - case QUERY_NODE_REMOTE_VALUE: - SCL_ERR_RET(TSDB_CODE_QRY_SUBQ_EXEC_ERROR); + case QUERY_NODE_REMOTE_VALUE: { + // After handleRemoteValueRes settles the inner SValueNode, treat the + // node as a plain value for parameter setup. Pass the inner val and + // temporarily flip its type so the QUERY_NODE_VALUE branch runs; + // restore so subsequent walker passes (in stream mode) keep + // re-dispatching sclWalkRemoteValue. + SRemoteValueNode *pRemote = (SRemoteValueNode *)node; + ENodeType oldType = pRemote->val.node.type; + pRemote->val.node.type = QUERY_NODE_VALUE; + int32_t code = sclInitParam((SNode *)&pRemote->val, param, ctx, rowNum); + pRemote->val.node.type = oldType; + SCL_ERR_RET(code); break; + } case QUERY_NODE_REMOTE_VALUE_LIST: { SRemoteValueListNode* pRemote = (SRemoteValueListNode*)node; + // In stream mode the IN-list subquery must be re-evaluated for every + // trigger event; otherwise the cached hash filter from event 1 would + // be reused indefinitely. Free the prior hash and force a refetch. + if (ctx->isStream && !(pRemote->flag & VALUELIST_FLAG_VAL_UNSET)) { + if (pRemote->hashAllocated) { + taosHashCleanup(pRemote->pHashFilter); + taosHashCleanup(pRemote->pHashFilterOthers); + pRemote->pHashFilter = NULL; + pRemote->pHashFilterOthers = NULL; + pRemote->hashAllocated = false; + } + // Also reset the cached presence flags; otherwise an event that + // returns zero rows after a non-empty event would keep + // hasValue=true and handleRemoteValueListRes would skip + // rebuilding the empty-list hash, leaving the param with stale + // flags pointing at NULL hashes. + pRemote->hasValue = false; + pRemote->hasNull = false; + pRemote->hasNotNull = false; + pRemote->flag |= VALUELIST_FLAG_VAL_UNSET; + } if (!(pRemote->flag & VALUELIST_FLAG_VAL_UNSET)) { sclDebug("remoteValueList already got res, node:%p, hasValue:%d, hasNull:%d, hasNotNull:%d, pHashFilter:%p,%d, pHashFilterOthers:%p,%d", node, pRemote->hasValue, pRemote->hasNull, pRemote->hasNotNull, pRemote->pHashFilter, pRemote->pHashFilter ? taosHashGetSize(pRemote->pHashFilter) : 0, @@ -750,6 +782,12 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t return TSDB_CODE_QRY_SUBQ_NOT_FOUND; } + // Force per-event refetch in stream mode so a value/flags cached on + // a previous event isn't replayed for the current trigger. + if (ctx->isStream) { + pRemote->valSet = false; + } + if (!pRemote->valSet) { SCL_ERR_RET((*ctx->fetchFp)(ctx->pSubJobCtx, pRemote->subQIdx, node)); } @@ -779,7 +817,16 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t SCL_ERR_RET((*ctx->fetchFp)(ctx->pSubJobCtx, pRemote->subQIdx, node)); - SCL_ERR_RET(sclInitParam(node, param, ctx, rowNum)); + // setZeroRowsResValue rewrites node->type to QUERY_NODE_VALUE so the + // recursive sclInitParam below dispatches the literal-value branch. + // In stream mode the next per-event walker pass must re-dispatch + // this REMOTE_ZERO_ROWS case (otherwise EXISTS / NOT EXISTS replay + // event 1's row count forever); restore the type after the read. + int32_t code = sclInitParam(node, param, ctx, rowNum); + if (ctx->isStream) { + pRemote->val.node.type = QUERY_NODE_REMOTE_ZERO_ROWS; + } + SCL_ERR_RET(code); break; } @@ -2655,6 +2702,7 @@ int32_t sclCalcConstants(SNode *pNode, bool dual, bool remoteIncluded, bool null ctx.stream.pStreamRuntimeFuncInfo = pExtra->pStreamInfo; ctx.stream.streamTsRange = pExtra->pStreamRange; ctx.pSubJobCtx = pExtra->pSubJobCtx; + ctx.isStream = pExtra->isStream; ctx.fetchFp = pExtra->fp; } @@ -2837,6 +2885,7 @@ int32_t scalarCalculateInRange(SNode *pNode, SArray *pBlockList, SScalarParam *p ctx.stream.pStreamRuntimeFuncInfo = pExtra->pStreamInfo; ctx.stream.streamTsRange = pExtra->pStreamRange; ctx.pSubJobCtx = pExtra->pSubJobCtx; + ctx.isStream = pExtra->isStream; ctx.fetchFp = pExtra->fp; } diff --git a/test/cases/18-StreamProcessing/02-Stream/test_stream_subquery_per_event.py b/test/cases/18-StreamProcessing/02-Stream/test_stream_subquery_per_event.py new file mode 100644 index 000000000000..d4d07f8af0cd --- /dev/null +++ b/test/cases/18-StreamProcessing/02-Stream/test_stream_subquery_per_event.py @@ -0,0 +1,921 @@ +import time + +from new_test_framework.utils import ( + tdLog, + tdSql, + tdStream, + StreamCheckItem, +) + + +class TestStreamSubqueryPerEvent: + """Per-event re-evaluation of stream WHERE subqueries (customer ticket). + + The customer reproducer originally relied on a scalar subquery in the + stream body to provide a per-event lower bound on a secondary source: + + WHERE ts >= (SELECT last_row(ts) FROM inicio_descarga) + + In stream mode every trigger event MUST refetch the subquery; the + older code cached the result on the first event and silently + replayed it forever. This file pins the per-event semantics for the + original reproducer, the workaround control path, and all four + remote-subquery flavours that flow through sclInitParam: + + 1. test_where_subquery (REMOTE_VALUE) + The original SQL shape with a scalar subquery in WHERE. + Exercises qFetchRemoteNode, sclInitParam REMOTE_VALUE, + setTaskScalarExtraInfo on every fetch worker thread, and + (event 4) the slot-clear that makes empty later events take + the first-call NULL branch instead of replaying the prior + value, plus (event 5) setValueFromResBlock resetting + pRes->isNull so the next non-NULL fetch isn't masked. + + 2. test_twstart_workaround + The customer-suggested workaround using _twstart. Control + test for the trigger / count_window engine, independent of + the subquery code. + + 3. test_in_list_subquery (REMOTE_VALUE_LIST) + WHERE x IN (subquery) re-evaluation. Pins the LIST cache + invalidation: pHashFilter must be freed and + VALUELIST_FLAG_VAL_UNSET re-armed every event. + + 4. test_row_subquery (REMOTE_ROW) + WHERE x > ANY (subquery) re-evaluation. Pins the ROW cache + invalidation: pRemote->valSet must be cleared every event. + + 5. test_exists_subquery (REMOTE_ZERO_ROWS) + EXISTS (subquery) re-evaluation. Pins the ZERO_ROWS cache + invalidation: setZeroRowsResValue rewrites node->type to + QUERY_NODE_VALUE, and stream mode must restore it to + QUERY_NODE_REMOTE_ZERO_ROWS so later events refetch. + """ + + def setup_class(cls): + tdLog.debug(f"start to execute {__file__}") + try: + tdStream.createSnode() + except Exception as e: + if "Only one snode" not in str(e): + raise + + def test_where_subquery(self): + """WHERE scalar subquery is re-evaluated per trigger event. + + 1. Build linea_descarga (trigger), cumple_descarga (secondary + source), inicio_descarga (referenced by WHERE subquery). + 2. Pre-seed cumple_descarga with three rows so SUM differs by + the lower bound the subquery returns. + 3. Insert one row into inicio_descarga BEFORE creating the + stream so the subquery is resolvable at plan time. + 4. Create a count_window(1,1,pressure) stream whose body filters + cumple_descarga by ts >= (select last_row(ts) from inicio_descarga). + 5. Drive three trigger events one at a time, advancing + inicio_descarga before each one - the customer pattern. + 6. Verify cumulative output after every event: + after event 1: 1 row, SUM=(3, 3) + after event 2: 2 rows, (3,3) then (2,2) + after event 3: 3 rows, (3,3) (2,2) (1,1) + A regression to constant SUM=3 means the subquery has been + constant-folded again and is no longer per-event. + + Since: v3.4.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-05-11 Created to pin customer reproducer behavior + - 2026-05-13 Updated to dynamic per-event semantics after fix + """ + + streams = [self.WhereSubqueryDynamic()] + tdStream.checkAll(streams) + + def test_twstart_workaround(self): + """inicio_descarga as trigger + _twstart in body (control test). + + This documents the workaround the customer used before the + engine fix and serves as a regression guard for the trigger / + _twstart path, independent of the scalar-subquery code. + + Since: v3.4.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-05-11 Created to demonstrate workaround for ticket + """ + + streams = [self.SubqueryWorkaround()] + tdStream.checkAll(streams) + + class WhereSubqueryDynamic(StreamCheckItem): + def __init__(self): + self.db = "test_subq_where" + + def create(self): + tdLog.info(f"=== create db {self.db} and source tables ===") + tdSql.execute(f"drop database if exists {self.db}") + # drop is async; wait for the dnode to fully release vgroups + for _ in range(60): + try: + tdSql.execute(f"create database {self.db} vgroups 1 buffer 8") + break + except Exception as e: + if 'dropping' in str(e): + time.sleep(1) + continue + raise + tdSql.execute(f"use {self.db}") + + tdSql.execute( + "create table linea_descarga (ts timestamp, pressure int)" + ) + tdSql.execute( + "create table cumple_descarga (ts timestamp, cumple int, total int)" + ) + tdSql.execute( + "create table inicio_descarga (ts timestamp, dummy int)" + ) + + # Pre-seed cumple_descarga so SUM differs by lower bound. + tdSql.execute( + "insert into cumple_descarga values " + "('2026-05-01 00:00:00', 1, 1)," + "('2026-05-01 00:00:01', 1, 1)," + "('2026-05-01 00:00:02', 1, 1)" + ) + + # Subquery must resolve at CREATE STREAM time -> seed one row. + tdSql.execute( + "insert into inicio_descarga values ('2026-05-01 00:00:00', 1)" + ) + + deadline = time.time() + 30 + while True: + tdSql.query("select last_row(ts) from inicio_descarga") + if ( + tdSql.queryResult + and tdSql.queryResult[0] + and tdSql.queryResult[0][0] is not None + ): + break + if time.time() >= deadline: + raise AssertionError( + "Timed out waiting for inicio_descarga seed row to become " + f"visible (got {tdSql.queryResult!r})" + ) + time.sleep(0.5) + + tdLog.info("=== create stream analisis_68 ===") + tdSql.execute( + f"create stream analisis_68 count_window(1, 1, pressure) " + f"from linea_descarga " + f"into resultado_descarga as " + f"select _twstart as ts, " + f" sum(cumple) as acumulado_cumple, " + f" sum(total) as acumulado_total " + f"from cumple_descarga " + f"where ts >= (select last_row(ts) from inicio_descarga)" + ) + + def insert1(self): + tdLog.info("=== event 1: trigger at 00:00:00 (inicio last_row=00:00:00) ===") + tdSql.execute( + "insert into linea_descarga values ('2026-05-01 00:00:00', 1)" + ) + + def check1(self): + tdLog.info("=== check after event 1: 1 row, SUM=(3, 3) ===") + tdSql.checkResultsByFunc( + sql=f"select acumulado_cumple, acumulado_total " + f"from {self.db}.resultado_descarga order by ts", + func=lambda: tdSql.getRows() == 1 + and tdSql.compareData(0, 0, 3) + and tdSql.compareData(0, 1, 3), + ) + + def insert2(self): + tdLog.info( + "=== advance inicio to 00:00:01, then trigger at 00:00:01 ===" + ) + tdSql.execute( + "insert into inicio_descarga values ('2026-05-01 00:00:01', 1)" + ) + tdSql.execute( + "insert into linea_descarga values ('2026-05-01 00:00:01', 1)" + ) + + def check2(self): + tdLog.info("=== check after event 2: 2 rows, (3,3) then (2,2) ===") + tdSql.checkResultsByFunc( + sql=f"select acumulado_cumple, acumulado_total " + f"from {self.db}.resultado_descarga order by ts", + func=lambda: tdSql.getRows() == 2 + and tdSql.compareData(0, 0, 3) + and tdSql.compareData(0, 1, 3) + and tdSql.compareData(1, 0, 2) + and tdSql.compareData(1, 1, 2), + ) + + def insert3(self): + tdLog.info( + "=== advance inicio to 00:00:02, then trigger at 00:00:02 ===" + ) + tdSql.execute( + "insert into inicio_descarga values ('2026-05-01 00:00:02', 1)" + ) + tdSql.execute( + "insert into linea_descarga values ('2026-05-01 00:00:02', 1)" + ) + + def check3(self): + tdLog.info("=== check after event 3: 3 rows, (3,3) (2,2) (1,1) ===") + # Per-event re-evaluation: each event sees inicio_descarga's + # last_row(ts) at trigger time (00:00:00, 00:00:01, 00:00:02), + # so the matching cumple_descarga rows shrink with each event. + tdSql.checkResultsByFunc( + sql=f"select acumulado_cumple, acumulado_total " + f"from {self.db}.resultado_descarga order by ts", + func=lambda: tdSql.getRows() == 3 + and tdSql.compareData(0, 0, 3) + and tdSql.compareData(0, 1, 3) + and tdSql.compareData(1, 0, 2) + and tdSql.compareData(1, 1, 2) + and tdSql.compareData(2, 0, 1) + and tdSql.compareData(2, 1, 1), + ) + self._rows_after_e3 = 3 + + def insert4(self): + # Empty inicio_descarga, then trigger event 4. The subquery + # now returns ZERO rows. Without the qFetchRemoteNode stream + # branch clearing the per-subquery slot before refetch, + # handleRemoteValueRes would fall into its "EOF after data" + # branch and silently retain event 3's lower bound (00:00:02), + # so event 4 would emit acumulado_cumple=1 just like event 3. + tdLog.info( + "=== empty inicio_descarga, then trigger event 4 ===" + ) + tdSql.execute("delete from inicio_descarga") + tdSql.execute( + "insert into linea_descarga values ('2026-05-01 00:00:03', 1)" + ) + + def check4(self): + # Validate the event-4 suppression against the known-good + # row count captured after event 3, rather than taking a new + # post-event-4 baseline. This avoids missing a stale row + # that is already present when we look, and also catches a + # delayed event-4 row before event 5 is driven. + sql = ( + f"select acumulado_cumple, acumulado_total " + f"from {self.db}.resultado_descarga order by ts" + ) + expected_rows = self._rows_after_e3 + deadline = time.time() + 5 + while time.time() < deadline: + tdSql.query(sql) + rows = tdSql.getRows() + if rows != expected_rows: + raise AssertionError( + f"event 4 unexpectedly produced output before event 5: " + f"expected {expected_rows} rows, got {rows}" + ) + time.sleep(0.1) + self._rows_pre_e5 = expected_rows + tdLog.info( + f"=== check4 verified no event-4 output: rows={self._rows_pre_e5} " + f"(stale-(1,1) check remains deferred to check5) ===" + ) + + def insert5(self): + # Re-populate inicio_descarga and trigger again. This checks + # the NULL-to-non-NULL transition after event 4's empty fetch: + # setValueFromResBlock must reset pRes->isNull = false so + # event 5's newly fetched value is not masked by the + # isNull=true left over from event 4. With the bug, event 5's + # WHERE evaluates against a NULL lower bound and matches no + # cumple rows -> aggregate NULL. With the fix, the lower + # bound is 00:00:01 again and exactly two cumple rows match + # -> SUM=(2, 2). + tdLog.info( + "=== event 5: re-insert inicio @ 00:00:01, trigger linea ===" + ) + tdSql.execute( + f"insert into {self.db}.inicio_descarga " + f"values ('2026-05-01 00:00:01', 1)" + ) + tdSql.execute( + f"insert into {self.db}.linea_descarga " + f"values ('2026-05-01 00:00:04', 1)" + ) + + def check5(self): + sql = ( + f"select acumulado_cumple, acumulado_total " + f"from {self.db}.resultado_descarga order by ts" + ) + rows_pre_e5 = self._rows_pre_e5 + # Wait for event 5's (2,2) row to arrive; this also flushes + # any pending event-4 output, so we can scan the new tail + # for the stale (1,1) marker. + tdSql.checkResultsByFunc( + sql=sql, + func=lambda: tdSql.getRows() > rows_pre_e5 + and tdSql.getData(tdSql.getRows() - 1, 0) == 2 + and tdSql.getData(tdSql.getRows() - 1, 1) == 2, + ) + tdSql.query(sql) + rows_after_e5 = tdSql.getRows() + assert rows_after_e5 > rows_pre_e5, ( + f"event 5 produced no new row (was {rows_pre_e5}, " + f"now {rows_after_e5}); stream stalled after empty event" + ) + # Verify event 4 did not silently reuse event 3's value (1,1). + # Any row appended between rows_pre_e5 and the final event-5 + # row (last_row) belongs to event 4. + last_row = rows_after_e5 - 1 + for i in range(rows_pre_e5, last_row): + v0 = tdSql.getData(i, 0) + v1 = tdSql.getData(i, 1) + assert not (v0 == 1 and v1 == 1), ( + f"event 4 reused event 3's stale subquery value " + f"(1,1) at row {i}; qFetchRemoteNode stream branch " + f"is not clearing the subResNodes slot before refetch" + ) + v0 = tdSql.getData(last_row, 0) + v1 = tdSql.getData(last_row, 1) + assert v0 == 2 and v1 == 2, ( + f"event 5 produced ({v0},{v1}); expected (2,2). " + f"setValueFromResBlock did not reset pRes->isNull, so " + f"event 4's NULL state masked the new subquery value." + ) + + class SubqueryWorkaround(StreamCheckItem): + def __init__(self): + self.db = "test_subq_workaround" + + def create(self): + tdLog.info(f"=== create db {self.db} and source tables ===") + tdSql.execute(f"drop database if exists {self.db}") + # drop is async; wait for the dnode to fully release vgroups + for _ in range(60): + try: + tdSql.execute(f"create database {self.db} vgroups 1 buffer 8") + break + except Exception as e: + if 'dropping' in str(e): + time.sleep(1) + continue + raise + tdSql.execute(f"use {self.db}") + + tdSql.execute( + "create table linea_descarga (ts timestamp, pressure int)" + ) + tdSql.execute( + "create table cumple_descarga (ts timestamp, cumple int, total int)" + ) + tdSql.execute( + "create table inicio_descarga (ts timestamp, dummy int)" + ) + + tdSql.execute( + "insert into cumple_descarga values " + "('2026-05-01 00:00:00', 1, 1)," + "('2026-05-01 00:00:01', 1, 1)," + "('2026-05-01 00:00:02', 1, 1)" + ) + + tdLog.info("=== create workaround stream analisis_wa ===") + # inicio_descarga is the trigger; each row forms its own + # count_window(1) window; _twstart binds the per-window + # lower bound dynamically into the cumple_descarga filter. + tdSql.execute( + f"create stream analisis_wa count_window(1, 1, dummy) " + f"from inicio_descarga " + f"into resultado_descarga as " + f"select _twstart as ts, " + f" sum(cumple) as acumulado_cumple, " + f" sum(total) as acumulado_total " + f"from cumple_descarga " + f"where ts >= _twstart" + ) + + def insert1(self): + tdLog.info("=== inicio at 00:00:00 (matches all 3 cumple rows) ===") + tdSql.execute( + "insert into inicio_descarga values ('2026-05-01 00:00:00', 1)" + ) + + def check1(self): + tdLog.info("=== check after inicio 1: 1 row, SUM=(3, 3) ===") + tdSql.checkResultsByFunc( + sql=f"select acumulado_cumple, acumulado_total " + f"from {self.db}.resultado_descarga order by ts", + func=lambda: tdSql.getRows() == 1 + and tdSql.compareData(0, 0, 3) + and tdSql.compareData(0, 1, 3), + ) + + def insert2(self): + tdLog.info( + "=== inicio at 00:00:01 (matches cumple rows at 01, 02) ===" + ) + tdSql.execute( + "insert into inicio_descarga values ('2026-05-01 00:00:01', 1)" + ) + + def check2(self): + tdLog.info( + "=== check after inicio 2: 2 rows, second SUM=(2, 2) ===" + ) + tdSql.checkResultsByFunc( + sql=f"select acumulado_cumple, acumulado_total " + f"from {self.db}.resultado_descarga order by ts", + func=lambda: tdSql.getRows() == 2 + and tdSql.compareData(0, 0, 3) + and tdSql.compareData(0, 1, 3) + and tdSql.compareData(1, 0, 2) + and tdSql.compareData(1, 1, 2), + ) + + def insert3(self): + tdLog.info( + "=== inicio at 00:00:02 (matches only cumple row at 02) ===" + ) + tdSql.execute( + "insert into inicio_descarga values ('2026-05-01 00:00:02', 1)" + ) + + def check3(self): + tdLog.info( + "=== check after inicio 3: 3 rows, third SUM=(1, 1) ===" + ) + tdSql.checkResultsByFunc( + sql=f"select acumulado_cumple, acumulado_total " + f"from {self.db}.resultado_descarga order by ts", + func=lambda: tdSql.getRows() == 3 + and tdSql.compareData(0, 0, 3) + and tdSql.compareData(0, 1, 3) + and tdSql.compareData(1, 0, 2) + and tdSql.compareData(1, 1, 2) + and tdSql.compareData(2, 0, 1) + and tdSql.compareData(2, 1, 1), + ) + + # ------------------------------------------------------------------ + # IN-list subquery (REMOTE_VALUE_LIST) + # ------------------------------------------------------------------ + + def test_in_list_subquery(self): + """REMOTE_VALUE_LIST must be refreshed per stream event. + + Bug: in stream mode, the LIST cache check in sclInitParam() + short-circuited once VALUELIST_FLAG_VAL_UNSET was cleared on + the first event. Every subsequent trigger event reused the same + pHashFilter, so the IN-list never reflected later changes to + the source table. + + Since: v3.4.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-05-14 Created to pin LIST cache invalidation + """ + streams = [self.InListPerEvent()] + tdStream.checkAll(streams) + + class InListPerEvent(StreamCheckItem): + def __init__(self): + self.db = "test_subq_inlist" + + def _wait_seed_visible(self, count_sql, label, timeout=15): + deadline = time.time() + timeout + while time.time() < deadline: + try: + tdSql.query(count_sql) + if tdSql.getData(0, 0) == 1: + return + except Exception: + pass + time.sleep(0.5) + raise RuntimeError( + f"{label} seed row did not become queryable before CREATE STREAM" + ) + + def create(self): + tdLog.info(f"=== create db {self.db} ===") + tdSql.execute(f"drop database if exists {self.db}") + # drop is async; wait for the dnode to fully release vgroups + for _ in range(60): + try: + tdSql.execute(f"create database {self.db} vgroups 1 buffer 8") + break + except Exception as e: + if 'dropping' in str(e): + time.sleep(1) + continue + raise + tdSql.execute(f"use {self.db}") + + tdSql.execute("create table linea (ts timestamp, p int)") + tdSql.execute("create table data (ts timestamp, f1 int, v int)") + tdSql.execute("create table whitelist (ts timestamp, id int)") + + tdSql.execute( + "insert into data values " + "('2026-05-01 00:00:00', 1, 10)," + "('2026-05-01 00:00:01', 2, 20)," + "('2026-05-01 00:00:02', 3, 30)" + ) + # Seed whitelist so the IN-subquery resolves at CREATE STREAM. + tdSql.execute( + "insert into whitelist values ('2026-05-01 00:00:00', 1)" + ) + + self._wait_seed_visible( + "select count(*) from whitelist " + "where ts = '2026-05-01 00:00:00' and id = 1", + "whitelist", + ) + tdLog.info("=== create stream sum_in_whitelist ===") + tdSql.execute( + f"create stream sum_in_whitelist count_window(1, 1, p) " + f"from linea " + f"into r as " + f"select _twstart as ts, sum(v) as total " + f"from data " + f"where f1 in (select id from whitelist)" + ) + + def insert1(self): + tdLog.info("=== event 1: whitelist={1} -> match f1=1 -> SUM=10 ===") + tdSql.execute( + "insert into linea values ('2026-05-01 00:00:00', 1)" + ) + + def check1(self): + tdSql.checkResultsByFunc( + sql=f"select total from {self.db}.r order by ts", + func=lambda: tdSql.getRows() == 1 + and tdSql.compareData(0, 0, 10), + ) + + def insert2(self): + tdLog.info("=== add id=2 to whitelist, trigger event 2 ===") + tdSql.execute( + "insert into whitelist values ('2026-05-01 00:00:01', 2)" + ) + tdSql.execute( + "insert into linea values ('2026-05-01 00:00:01', 1)" + ) + + def check2(self): + # Event 2 must see whitelist={1,2}: SUM=10+20=30. + # Bug-without-fix would cache {1} and emit 10 again. + tdSql.checkResultsByFunc( + sql=f"select total from {self.db}.r order by ts", + func=lambda: tdSql.getRows() == 2 + and tdSql.compareData(0, 0, 10) + and tdSql.compareData(1, 0, 30), + ) + + def insert3(self): + tdLog.info("=== whitelist -> {}, trigger event 3 ===") + tdSql.execute( + "delete from whitelist where ts = '2026-05-01 00:00:00'" + ) + tdSql.execute( + "delete from whitelist where ts = '2026-05-01 00:00:01'" + ) + tdSql.execute( + "insert into linea values ('2026-05-01 00:00:02', 1)" + ) + + def check3(self): + # Event 3 empties the whitelist so no rows match IN and the + # stream body emits no output. Poll for 15 seconds to both + # allow the stream time to process event 3 and verify that no + # stale IN-list row appears. The 15-second window ensures + # event 3 is evaluated while the whitelist is still empty, + # before insert4 re-populates it. + deadline = time.monotonic() + 15.0 + while True: + tdSql.query(f"select total from {self.db}.r order by ts") + assert tdSql.getRows() == 2, ( + f"event 3 stale IN-list: got {tdSql.getRows()} rows, " + f"expected 2 (cached IN-list was not invalidated)" + ) + if time.monotonic() >= deadline: + break + time.sleep(0.5) + + def insert4(self): + # Re-add id=1 to whitelist and trigger event 4. Whitelist is + # now {1} so the correct SUM is 10 (only f1=1, v=10 matches). + # A stale IN-list from event 3 would have used {1,2} and + # produced SUM=30, which is distinct from 10 and raises the + # row count above rows_after_e2+1, catching the regression. + tdLog.info("=== add id=1 to whitelist, trigger event 4 ===") + tdSql.execute( + "insert into whitelist values ('2026-05-01 00:00:03', 1)" + ) + tdSql.execute( + "insert into linea values ('2026-05-01 00:00:03', 1)" + ) + + def check4(self): + # Event 4: whitelist={1}, so only f1=1 (v=10) matches -> SUM=10. + # check3 already verified event 3 produced no stale row, so we + # just wait for the correct event-4 output. + tdSql.checkResultsByFunc( + sql=f"select total from {self.db}.r order by ts", + func=lambda: tdSql.getRows() == 3 + and tdSql.compareData(0, 0, 10) + and tdSql.compareData(1, 0, 30) + and tdSql.compareData(2, 0, 10), + ) + + # ------------------------------------------------------------------ + # Row-comparison subquery (REMOTE_ROW) + # ------------------------------------------------------------------ + + def test_row_subquery(self): + """REMOTE_ROW must be refreshed per stream event. + + `> ANY (subquery)` is rewritten by the planner to `> MIN(...)`, + which materialises into a REMOTE_ROW node. In stream mode the + ROW cache check in sclInitParam() short-circuited once + pRemote->valSet was set on the first event, so the threshold + was frozen forever. + + Since: v3.4.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-05-14 Created to pin ROW cache invalidation + """ + streams = [self.RowPerEvent()] + tdStream.checkAll(streams) + + class RowPerEvent(StreamCheckItem): + def __init__(self): + self.db = "test_subq_row" + + def _wait_seed_visible(self, count_sql, label, timeout=15): + deadline = time.time() + timeout + while time.time() < deadline: + try: + tdSql.query(count_sql) + if tdSql.getData(0, 0) == 1: + return + except Exception: + pass + time.sleep(0.5) + raise RuntimeError( + f"{label} seed row did not become queryable before CREATE STREAM" + ) + + def create(self): + tdLog.info(f"=== create db {self.db} ===") + tdSql.execute(f"drop database if exists {self.db}") + # drop is async; wait for the dnode to fully release vgroups + for _ in range(60): + try: + tdSql.execute(f"create database {self.db} vgroups 1 buffer 8") + break + except Exception as e: + if 'dropping' in str(e): + time.sleep(1) + continue + raise + tdSql.execute(f"use {self.db}") + + tdSql.execute("create table linea (ts timestamp, p int)") + tdSql.execute("create table data (ts timestamp, v int)") + tdSql.execute("create table threshold (ts timestamp, t int)") + + tdSql.execute( + "insert into data values " + "('2026-05-01 00:00:00', 10)," + "('2026-05-01 00:00:01', 20)," + "('2026-05-01 00:00:02', 30)," + "('2026-05-01 00:00:03', 40)" + ) + # Seed threshold so the row-subquery resolves at CREATE STREAM. + tdSql.execute( + "insert into threshold values ('2026-05-01 00:00:00', 35)" + ) + + self._wait_seed_visible( + "select count(*) from threshold " + "where ts = '2026-05-01 00:00:00' and t = 35", + "threshold", + ) + tdLog.info("=== create stream sum_gt_any_threshold ===") + tdSql.execute( + f"create stream sum_gt_any_threshold count_window(1, 1, p) " + f"from linea " + f"into r as " + f"select _twstart as ts, sum(v) as total " + f"from data " + f"where v > any (select t from threshold)" + ) + + def insert1(self): + tdLog.info("=== event 1: threshold={35} -> v>35 -> SUM=40 ===") + tdSql.execute( + "insert into linea values ('2026-05-01 00:00:00', 1)" + ) + + def check1(self): + tdSql.checkResultsByFunc( + sql=f"select total from {self.db}.r order by ts", + func=lambda: tdSql.getRows() == 1 + and tdSql.compareData(0, 0, 40), + ) + + def insert2(self): + tdLog.info("=== add t=15 (new min), trigger event 2 ===") + tdSql.execute( + "insert into threshold values ('2026-05-01 00:00:01', 15)" + ) + tdSql.execute( + "insert into linea values ('2026-05-01 00:00:01', 1)" + ) + + def check2(self): + # Event 2 must see new min 15: rows v in {20,30,40}, SUM=90. + # Bug-without-fix would cache 35 and emit 40 again. + tdSql.checkResultsByFunc( + sql=f"select total from {self.db}.r order by ts", + func=lambda: tdSql.getRows() == 2 + and tdSql.compareData(0, 0, 40) + and tdSql.compareData(1, 0, 90), + ) + + def insert3(self): + tdLog.info("=== threshold -> {5}, trigger event 3 ===") + tdSql.execute("delete from threshold") + tdSql.execute( + "insert into threshold values ('2026-05-01 00:00:02', 5)" + ) + tdSql.execute( + "insert into linea values ('2026-05-01 00:00:02', 1)" + ) + + def check3(self): + # Event 3: threshold={5}, all 4 rows match, SUM=100. + tdSql.checkResultsByFunc( + sql=f"select total from {self.db}.r order by ts", + func=lambda: tdSql.getRows() == 3 + and tdSql.compareData(0, 0, 40) + and tdSql.compareData(1, 0, 90) + and tdSql.compareData(2, 0, 100), + ) + + def test_exists_subquery(self): + """REMOTE_ZERO_ROWS (EXISTS) must be refreshed per stream event. + + `EXISTS (subquery)` is rewritten by the planner to a + REMOTE_ZERO_ROWS node holding a 0/1 row count. handleRemoteZeroRowsRes + forces the AST node type to QUERY_NODE_VALUE after fetching, so the + scalar walker stops re-dispatching the case and replays the cached + row count for every later event. The fix restores the + QUERY_NODE_REMOTE_ZERO_ROWS type in stream mode so the next trigger + re-fetches. + + Since: v3.4.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-05-14 Created to pin REMOTE_ZERO_ROWS per-event refetch + """ + streams = [self.ExistsPerEvent()] + tdStream.checkAll(streams) + + class ExistsPerEvent(StreamCheckItem): + def __init__(self): + self.db = "test_subq_exists" + + def _wait_seed_visible(self, count_sql, label, timeout=15): + deadline = time.time() + timeout + while time.time() < deadline: + try: + tdSql.query(count_sql) + if tdSql.getData(0, 0) >= 1: + return + except Exception: + pass + time.sleep(0.5) + raise RuntimeError( + f"{label} seed row did not become queryable before CREATE STREAM" + ) + + def create(self): + tdLog.info(f"=== create db {self.db} ===") + tdSql.execute(f"drop database if exists {self.db}") + for _ in range(60): + try: + tdSql.execute(f"create database {self.db} vgroups 1 buffer 8") + break + except Exception as e: + if 'dropping' in str(e): + time.sleep(1) + continue + raise + tdSql.execute(f"use {self.db}") + + tdSql.execute("create table linea (ts timestamp, p int)") + tdSql.execute("create table data (ts timestamp, v int)") + tdSql.execute("create table gate (ts timestamp, on_off int)") + + tdSql.execute( + "insert into data values " + "('2026-05-01 00:00:00', 10)," + "('2026-05-01 00:00:01', 20)," + "('2026-05-01 00:00:02', 30)" + ) + # Seed gate so EXISTS resolves to TRUE at CREATE STREAM. + tdSql.execute( + "insert into gate values ('2026-05-01 00:00:00', 1)" + ) + self._wait_seed_visible( + "select count(*) from gate", "gate", + ) + + tdLog.info("=== create stream sum_when_gate_open ===") + # Use EXISTS inside the projection so every trigger event emits a + # row regardless of gate state. flag is 1 when gate has rows, + # 0 otherwise. Without the REMOTE_ZERO_ROWS type-restore fix, + # the AST node is rewritten to QUERY_NODE_VALUE on event 1 and + # the walker stops re-dispatching the case, so event 2 keeps + # replaying flag=1 even after the gate is emptied. + tdSql.execute( + f"create stream sum_when_gate_open count_window(1, 1, p) " + f"from linea " + f"into r as " + f"select _twstart as ts, " + f"max(case when exists (select * from gate) " + f"then 1 else 0 end) as flag " + f"from data" + ) + + def insert1(self): + tdLog.info("=== event 1: gate non-empty -> flag=1 ===") + tdSql.execute( + "insert into linea values ('2026-05-01 00:00:00', 1)" + ) + + def check1(self): + tdSql.checkResultsByFunc( + sql=f"select flag from {self.db}.r order by ts", + func=lambda: tdSql.getRows() == 1 + and tdSql.compareData(0, 0, 1), + ) + + def insert2(self): + tdLog.info("=== event 2: empty gate -> flag must be 0 ===") + tdSql.execute("delete from gate") + tdSql.execute( + "insert into linea values ('2026-05-01 00:00:01', 1)" + ) + + def check2(self): + sql = f"select flag from {self.db}.r order by ts" + deadline = time.time() + 60 + while time.time() < deadline: + tdSql.query(sql) + if tdSql.getRows() >= 2: + break + time.sleep(2) + tdSql.query(sql) + rows = tdSql.getRows() + for i in range(rows): + tdLog.info(f"check2 row{i} flag={tdSql.getData(i, 0)!r}") + assert rows == 2, ( + f"event 2 produced no new row (rows={rows}); stream stalled " + f"or output was suppressed" + ) + v = tdSql.getData(1, 0) + assert v == 0, ( + f"event 2 emitted flag={v} (expected 0); REMOTE_ZERO_ROWS " + f"node replayed event 1's cached row count instead of " + f"refetching after the gate was emptied" + ) diff --git a/test/ci/cases.task b/test/ci/cases.task index 9f7b722866e5..e3625c1ae353 100644 --- a/test/ci/cases.task +++ b/test/ci/cases.task @@ -741,6 +741,7 @@ ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/test_stream_same_name.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/test_stream_window_query.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/test_stream_output_table.py +,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/test_stream_subquery_per_event.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/stream_drop.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/stream_fetch.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/02-Stream/stream_subquery.py