fix(subq/cache): bypass subquery cache per event#35335
Open
stephenkgu wants to merge 36 commits into
Open
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR attempts to make stream scalar subqueries re-evaluate per trigger event instead of reusing cached remote subquery results.
Changes:
- Propagates stream-mode state through scalar/executor context.
- Keeps
REMOTE_VALUEnodes dispatchable in stream mode and bypasses remote-node cache. - Adds stream fetch worker initialization and a CI test entry.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
test/ci/cases.task |
Adds a new stream subquery per-event pytest case to CI. |
include/libs/scalar/scalar.h |
Adds isStream to scalar extra info. |
source/libs/scalar/inc/sclInt.h |
Adds isStream to scalar runtime context. |
source/libs/scalar/src/scalar.c |
Handles REMOTE_VALUE as a value while preserving remote-node dispatch and propagates stream state. |
source/libs/executor/src/executor.c |
Initializes thread-local scalar extra info with stream state. |
source/libs/executor/src/executil.c |
Changes stream remote fetch/cache behavior and primary time-range handling for remote subquery conditions. |
source/dnode/vnode/src/vnd/vnodeStream.c |
Initializes scalar extra info before vnode stream fetch execution. |
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c |
Initializes scalar extra info before mnode stream fetch execution. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+5271
to
+5272
| if (ctx->isStream) { | ||
| TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); |
Comment on lines
+4514
to
4517
| // First-call empty result: keep node type as REMOTE_VALUE so the | ||
| // scalar walker re-dispatches sclWalkRemoteValue per evaluation. | ||
| pRemote->val.isNull = true; | ||
| pRemote->val.translate = true; |
| ctx.stream.pStreamRuntimeFuncInfo = pExtra->pStreamInfo; | ||
| ctx.stream.streamTsRange = pExtra->pStreamRange; | ||
| ctx.pSubJobCtx = pExtra->pSubJobCtx; | ||
| ctx.isStream = pExtra->isStream; |
Comment on lines
+4514
to
4518
| // First-call empty result: keep node type as REMOTE_VALUE so the | ||
| // scalar walker re-dispatches sclWalkRemoteValue per evaluation. | ||
| pRemote->val.isNull = true; | ||
| pRemote->val.translate = true; | ||
| pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET); |
Comment on lines
+5271
to
+5279
| if (ctx->isStream) { | ||
| // Clear the slot before refetching so that an empty result from this | ||
| // event takes handleRemoteValueRes's first-call branch (which marks | ||
| // the placeholder NULL) instead of the "EOF after data" branch | ||
| // (which would silently retain the previous event's value). | ||
| if (ppRes) { | ||
| *ppRes = NULL; | ||
| } | ||
| TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); |
| if (ppRes) { | ||
| *ppRes = NULL; | ||
| } | ||
| TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); |
Comment on lines
+225
to
+228
| tdSql.execute("delete from inicio_descarga") | ||
| tdSql.execute( | ||
| "insert into linea_descarga values ('2026-05-01 00:00:03', 1)" | ||
| ) |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Comment on lines
+5273
to
+5281
| if (ctx->isStream) { | ||
| // Clear the slot before refetching so that an empty result from this | ||
| // event takes handleRemoteValueRes's first-call branch (which marks | ||
| // the placeholder NULL) instead of the "EOF after data" branch | ||
| // (which would silently retain the previous event's value). | ||
| if (ppRes) { | ||
| *ppRes = NULL; | ||
| } | ||
| TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); |
Comment on lines
+5273
to
+5281
| if (ctx->isStream) { | ||
| // Clear the slot before refetching so that an empty result from this | ||
| // event takes handleRemoteValueRes's first-call branch (which marks | ||
| // the placeholder NULL) instead of the "EOF after data" branch | ||
| // (which would silently retain the previous event's value). | ||
| if (ppRes) { | ||
| *ppRes = NULL; | ||
| } | ||
| TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); |
Comment on lines
+5273
to
+5281
| if (ctx->isStream) { | ||
| // Clear the slot before refetching so that an empty result from this | ||
| // event takes handleRemoteValueRes's first-call branch (which marks | ||
| // the placeholder NULL) instead of the "EOF after data" branch | ||
| // (which would silently retain the previous event's value). | ||
| if (ppRes) { | ||
| *ppRes = NULL; | ||
| } | ||
| TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); |
Comment on lines
+5273
to
+5281
| if (ctx->isStream) { | ||
| // Clear the slot before refetching so that an empty result from this | ||
| // event takes handleRemoteValueRes's first-call branch (which marks | ||
| // the placeholder NULL) instead of the "EOF after data" branch | ||
| // (which would silently retain the previous event's value). | ||
| if (ppRes) { | ||
| *ppRes = NULL; | ||
| } | ||
| TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); |
| ctx.stream.pStreamRuntimeFuncInfo = pExtra->pStreamInfo; | ||
| ctx.stream.streamTsRange = pExtra->pStreamRange; | ||
| ctx.pSubJobCtx = pExtra->pSubJobCtx; | ||
| ctx.isStream = pExtra->isStream; |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
| // (which would silently retain the previous event's value). | ||
| if (ppRes) { | ||
| *ppRes = NULL; | ||
| } |
Comment on lines
+4514
to
4518
| // First-call empty result: keep node type as REMOTE_VALUE so the | ||
| // scalar walker re-dispatches sclWalkRemoteValue per evaluation. | ||
| pRemote->val.isNull = true; | ||
| pRemote->val.translate = true; | ||
| pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET); |
Comment on lines
+5273
to
+5281
| if (ctx->isStream) { | ||
| // Clear the slot before refetching so that an empty result from this | ||
| // event takes handleRemoteValueRes's first-call branch (which marks | ||
| // the placeholder NULL) instead of the "EOF after data" branch | ||
| // (which would silently retain the previous event's value). | ||
| if (ppRes) { | ||
| *ppRes = NULL; | ||
| } | ||
| TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes)); |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Comment on lines
+4538
to
+4542
| // Completion sentinel after a data row: do NOT mutate the AST node | ||
| // type to QUERY_NODE_VALUE. In stream mode the node must remain | ||
| // REMOTE_VALUE so the next per-event walker pass re-fires the fetch | ||
| // (otherwise every subsequent trigger event replays the literal | ||
| // captured on the first event). |
…Null in setValueFromResBlock - setValueFromResBlock: clear pRes->isNull so a NULL placeholder from an earlier empty stream-event subquery does not mask a later non-NULL value. - sclInitParam REMOTE_VALUE_LIST: in stream mode free cached hash filter and re-arm VALUELIST_FLAG_VAL_UNSET so each trigger re-evaluates the IN list. - sclInitParam REMOTE_ROW: in stream mode force valSet=false so each trigger re-fetches the row subquery (e.g. > ANY rewrites). - Tests cover IN-list, ROW, and the empty->non-empty WHERE transition.
The previous tdSql.checkResultsByFunc call could return on the pre-event 3-row snapshot, missing a stale (1,1) row emitted seconds later. Replace with a bounded poll that waits for either rows>=4 or a 60s deadline, then asserts the stale value is absent. Verified the test still catches the regression by reverting the qFetchRemoteNode slot-clear.
Previously when a stream WHERE cond contained a remote subquery, all primary-key range pruning was disabled. Now still call filterGetTimeRange to extract independent literal bounds (e.g. `ts >= '2026-05-01' AND ts < '2026-05-02' AND ts >= (SELECT ...)`) for scan pruning while keeping the residual remote predicate; isStrict=false ensures the cond stays in pConditions for per-event re-evaluation.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
unreachable; non-stream EOF is handled below at the post-return path. Drop the dead conditional and clarify the comment.
…into fix/6984935627
Streams call this per trigger event; for VAR/DECIMAL types datum.p owns a heap buffer (ownership transferred from the response block via nodesSetValueNodeValueExt with needFree=false). Without freeing the previous buffer first, every event leaks one allocation in stream REMOTE_VALUE / REMOTE_ROW subqueries with string/binary results.
filterGetTimeRange aborts collection when any AND-child is uncollectable (e.g. ts >= remote-subquery), so passing the original cond returned a full window and disabled scan pruning even when independent literal bounds were present. Clone the cond, drop remote-bearing children from the AND, then call filterGetTimeRange so literal bounds in `ts >= '2026-05-01' AND ts < '2026-05-02' AND ts >= (SELECT ...)` are extracted; isStrict stays false so the residual remote predicate is evaluated per event.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
initQueryTableDataCond seeds pCond->twindows from scanRange/winRange before calling getPrimaryTimeRange. The previous stream branch reset that to TSWINDOW_INITIALIZER (full scan) when no static bound could be extracted, widening the scan past the trigger window. Save the incoming window, intersect any extracted literal range with it, and fall back to the original on extraction failure.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Comment on lines
+618
to
+626
| deadline = time.monotonic() + 1.0 | ||
| while True: | ||
| tdSql.checkResultsByFunc( | ||
| sql=f"select total from {self.db}.r order by ts", | ||
| func=expected_rows, | ||
| ) | ||
| if time.monotonic() >= deadline: | ||
| break | ||
| time.sleep(0.1) |
Event 3 empties the whitelist. check3 ran without waiting, so insert4 could re-populate the whitelist before the stream evaluated event 3's subquery. The stream then saw id=1 and produced a stale SUM=30 row. Fix: replace the two conflicting check3 methods with one that polls for 15 s while asserting rows stays at 2. This both detects a stale row immediately and ensures event 3 is processed with an empty whitelist before insert4 runs. Simplify check4 accordingly: it now just waits for the correct (10, 30, 10) sequence.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Issue(s)
Checklist
Please check the items in the checklist if applicable.