Skip to content

Implement RDB serialization and deserialization for graph data#359

Closed
AviAvni wants to merge 47 commits intomainfrom
rdb
Closed

Implement RDB serialization and deserialization for graph data#359
AviAvni wants to merge 47 commits intomainfrom
rdb

Conversation

@AviAvni
Copy link
Copy Markdown
Contributor

@AviAvni AviAvni commented Apr 12, 2026

  • Added decoder/mod.rs to handle loading graphs from RDB streams, supporting both single-key and multi-key formats.
  • Implemented rdb_load_graph function to decode graph headers, schemas, and payloads, managing pending graphs for multi-key scenarios.
  • Introduced encoder/mod.rs for encoding graphs into RDB format, including functions for single-key and multi-key payload distribution.
  • Created mod.rs to manage serialization modules, including global states for virtual key management and decoding.
  • Updated tests in test_persistency.py and test_replication.py to reflect changes in index creation syntax and ensure compatibility with new serialization methods.

Summary by CodeRabbit

  • New Features

    • RDB v19: single- and multi-key graph save/load with virtual-key orchestration
    • New commands: GRAPH.DEBUG (persistence helpers) and GRAPH.EFFECT (apply serialized mutation buffers)
  • Enhancements

    • Effects-buffer replication path, schema-version validation on writes, and EFFECTS_THRESHOLD now runtime-configurable
    • Broader on-disk property support (vectors, temporal types) and improved restore with synchronous index population and multi-key reconstruction
  • Tests

    • Flow tests updated: persistency/replication assertions adjusted, new RDB compatibility tests added, some legacy decoder tests removed or skipped

- Added `decoder/mod.rs` to handle loading graphs from RDB streams, supporting both single-key and multi-key formats.
- Implemented `rdb_load_graph` function to decode graph headers, schemas, and payloads, managing pending graphs for multi-key scenarios.
- Introduced `encoder/mod.rs` for encoding graphs into RDB format, including functions for single-key and multi-key payload distribution.
- Created `mod.rs` to manage serialization modules, including global states for virtual key management and decoding.
- Updated tests in `test_persistency.py` and `test_replication.py` to reflect changes in index creation syntax and ensure compatibility with new serialization methods.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 12, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds v19 RDB persistence and effects replication: new GraphBLAS Encode/Decode traits and impls, buffered RDB I/O, encoder/decoder with multi-key virtual-key orchestration, Redis persistence hooks and virtual graphmeta keys, GRAPH.DEBUG/EFFECT commands, schema-versioning/effects buffering, and related runtime/indexing and test changes.

Changes

Cohort / File(s) Summary
Dependencies
Cargo.toml
Added roaring = "0.11.3" and thin-vec = "0.2.14".
Test runner & lists
flow.sh, flow_tests_done.txt, flow_tests_todo.txt
RLTest invoked with --enable-debug-command and --enable-protected-configs; several tests moved from TODO to DONE and todo list pruned.
Serialization core (GraphBLAS)
graph/src/graph/graphblas/serialization.rs, graph/src/graph/graphblas/mod.rs
Introduces Reader/Writer and generic Encode/Decode traits, SI-type tags, EncodeState/PayloadEntry, and RoaringTreemap Encode/Decode; module exported.
GraphBLAS types & impls
graph/src/graph/graphblas/...
matrix.rs, vector.rs, tensor.rs, versioned_matrix.rs
Added Encode/Decode<19> for Matrix/Vector/Tensor/VersionedMatrix; UINT64 vector/matrix support, iterator generalization, select/clear/extract helpers, and related FFI expansions.
AttributeStore & Graph core
graph/src/graph/attribute_store.rs, graph/src/graph/graph.rs
AttributeStore: materializing get_all_attrs_by_id to Vec, import_attrs, encode-context fields and Encode/Decode<19> impls. Graph: name & schema_version, restore/rebuild_derived_matrices, encode_payload, sync index creation/population, and new accessors.
MVCC / ThreadedGraph & replication
graph/src/graph/mvcc_graph.rs, src/graph_core.rs
Added MvccGraph::from_graph; commit bumps schema_version on schema diffs; ThreadedGraph::from_mvcc; execute_query_write now returns optional effects buffer + modified flag and includes replication helpers.
Runtime, pending & effects
graph/src/runtime/{pending.rs,value.rs}, graph/src/runtime/runtime.rs, graph/src/runtime/ops/{commit.rs,set.rs}
Pending now tracks schema baseline, splits new/existing staged attrs, builds effects buffer API; Value SI-type Encode/Decode<19>; Runtime holds effects buffer/count and transaction timestamp; commit snapshots effects pre-commit; SET logic refined for deleted relationships.
Indexer
graph/src/index/indexer.rs
Added get_all_pending_fields() snapshot API to list labels with pending index fields.
Serializers module & state
src/serializers/mod.rs
New serializers module exporting buffered_io/decoder/encoder, ENCODING_VERSION = 19, global VKEY_STATE/DECODE_STATE, and Header/Schema/PendingGraph types.
Buffered I/O
src/serializers/buffered_io.rs
Adds BufferedWriter/BufferedReader implementing Writer/Reader for typed/chunked RDB I/O with inline small buffers and blob handling.
Encoder
src/serializers/encoder/mod.rs
rdb_save_graph/rdb_save_graph_key, payload construction, and multi-key partitioning (build_multi_key_payloads) including matrix placement semantics.
Decoder
src/serializers/decoder/mod.rs
rdb_load_graph supporting single- and multi-key flows with PendingGraph accumulation, finalize_pending_graph, payload decoding into pending, and index rebuild helpers.
Redis persistence & types
src/redis_type.rs, src/module_init.rs, src/lib.rs
Implements multi-key/virtual-key orchestration, GRAPHMETA_TYPE, on_persistence lifecycle, AUX save/load branching, virtual-key creation/deletion, and module wiring changes.
Commands: debug & effect
src/commands/{debug.rs,effect.rs}, src/commands/mod.rs
Added graph_debug for AUX START/END virtual-key ops and graph_effect to apply effects buffers; commands exported.
Config & query behavior
src/commands/config_cmd.rs, src/config.rs, src/commands/query.rs
EFFECTS_THRESHOLD made runtime-settable; graph query accepts version <u64> and rejects mismatched schema_version with structured error reply.
Planner & non-determinism
graph/src/planner/mod.rs, graph/src/planner/optimizer/*
Added plan_is_non_deterministic and helpers; improved id-filter detection via references_var.
Function registry
graph/src/runtime/functions/*
Added non_deterministic flag to GraphFn and registration APIs; macro arms and registrations updated; several functions marked non-deterministic or signature-adjusted.
Effects application
src/commands/effect.rs, graph/src/runtime/ops/commit.rs
apply_effects parser/applier implementing the effects wire format with schema/index ops; CommitOp snapshots effects and updates baseline.
Tests & utilities
tests/flow/*, tests/flow/graph_utils.py
Flow tests updated for v19 persistence/effects: timezone/float expectations, index creation queries adjusted, some tests skipped, deterministic ordering fixes, constraints check relaxed; new RDB compat tests added, previous decoder-version test removed.
Dev/Test environment
.devcontainer/Dockerfile, build/Dockerfile, tests/requirements.txt
Added falkordb-bulk-loader to test/dev Python dependencies and Docker build steps.
Misc
assorted files
RLTest flags, small iterator/ownership signature changes, iterator/FFI adjustments, CLI wiring and registration tweaks.

Sequence Diagram(s)

sequenceDiagram
    participant RedisModule as Redis Module
    participant Graph
    participant Encoder
    participant Decoder
    participant BufferedIO as BufferedWriter/Reader
    participant RDB as RDB Stream

    rect rgba(0, 128, 255, 0.5)
        Note over RedisModule,RDB: RDB Save (multi-key)
        RedisModule->>Graph: build_multi_key_payloads(key_count)
        RedisModule->>Encoder: rdb_save_graph_key(payloads)
        Encoder->>BufferedIO: write Header/Schema/PayloadDirectory
        Encoder->>Graph: graph.encode_payload -> payload writes
        BufferedIO->>RDB: flush typed chunks / blobs
    end

    rect rgba(0, 200, 100, 0.5)
        Note over RDB,RedisModule: RDB Load (multi-key)
        RDB->>RedisModule: deliver key chunk
        RedisModule->>Decoder: rdb_load_graph(chunk)
        Decoder->>Decoder: decode Header/Schema, append payloads to PendingGraph
        alt keys_remaining > 0
            Decoder->>RedisModule: store pending state, wait for more keys
        else keys_remaining == 0
            Decoder->>Graph: finalize_pending_graph -> restore Graph, rebuild matrices
            Graph->>Graph: populate_indexes_sync()
            Graph-->>RedisModule: finalized Graph installed
        end
    end
Loading

Estimated Code Review Effort

🎯 5 (Critical) | ⏱️ ~150 minutes

Poem

🐇 I nibble bytes beneath the moon,
I stitch matrices in careful tune.
Virtual keys hop, then stay,
Effects ripple on their way.
Dawn brings graphs restored by day.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the primary change: implementing RDB serialization and deserialization for graph data across multiple files and modules.
Docstring Coverage ✅ Passed Docstring coverage is 88.42% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch rdb

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🧹 Nitpick comments (2)
graph/src/index/indexer.rs (1)

442-452: Method name and behavior documentation are inconsistent; clarify intent.

get_all_pending_fields (lines 444–452) returns fields for all labels (per the doc comment "for all labels") regardless of pending state, yet the method name suggests filtering to only pending labels. This creates a potential for misuse by future callers. Either add filtering by pending_count() if only pending labels should be included, or rename to get_all_fields() to match the documented and actual behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/index/indexer.rs` around lines 442 - 452, The method
get_all_pending_fields currently returns fields for all labels but its name
implies it should only return labels with pending work; either change its
behavior to filter by pending_count() or rename it to get_all_fields and update
the doc comment. To fix, choose one approach: (A) if you want only pending
labels, update get_all_pending_fields to call
self.index.read().iter().filter(|(_, idx)| idx.pending_count() > 0).map(...) so
only labels with pending_count() > 0 are returned and adjust the doc comment
accordingly; or (B) if the current behavior is correct, rename the function to
get_all_fields and change the doc comment to "Get fields for all labels" so the
name, implementation, and docs match. Ensure references to
get_all_pending_fields elsewhere are updated to the new name or behavior.
graph/src/graph/mvcc_graph.rs (1)

95-99: Initialize the indexer self-reference inside from_graph.

Every current load site immediately follows this with read() + set_indexer_graph(...) (src/redis_type.rs:57-62, 71-77, 689-707). Moving that into the constructor would keep the restored-graph invariant in one place and avoid future call sites forgetting it.

Suggested refactor
 pub fn from_graph(graph: Graph) -> Self {
+    let graph = Arc::new(AtomicRefCell::new(graph));
+    graph.borrow_mut().set_indexer_graph(graph.clone());
     Self {
-        graph: Arc::new(AtomicRefCell::new(graph)),
+        graph,
         write: AtomicBool::new(false),
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/mvcc_graph.rs` around lines 95 - 99, The constructor
from_graph currently returns an MVCCGraph wrapping the restored Graph but leaves
the Graph's back-reference (set_indexer_graph) to be set by callers; move that
responsibility into from_graph by, after building Self (the MVCCGraph),
obtaining a reference to the inner graph via self.graph.read() and calling
set_indexer_graph(...) with a Weak/weak Arc pointing to the newly created
MVCCGraph (use Arc::downgrade on the MVCCGraph Arc you create), so callers no
longer need to call read() + set_indexer_graph; update from_graph to construct
the Arc-wrapped MVCCGraph, then set_indexer_graph on the inner Graph before
returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@graph/src/graph/attribute_store.rs`:
- Around line 139-142: The fields encode_deleted and encode_max_id on
AttributeStore use RefCell, which breaks the type's unsafe impl Sync; replace
the unsynchronized interior mutability with thread-safe primitives or remove the
shared mutable state entirely. Specifically, change encode_deleted:
RefCell<Option<RoaringTreemap>> to a thread-safe container (e.g.,
Mutex<Option<RoaringTreemap>> or RwLock<Option<RoaringTreemap>>) and change
encode_max_id: RefCell<u64> to a Mutex<u64> or AtomicU64 as appropriate, then
update set_encode_context(), encode paths and any callers to acquire the lock
(or use atomic operations) when reading/writing these fields; alternatively,
refactor set_encode_context() and encode_* functions to accept the encode
context as an explicit parameter and remove these fields from AttributeStore and
the unsafe impl Sync entirely.

In `@graph/src/graph/graphblas/matrix.rs`:
- Around line 437-458: The code currently copies raw GxB_Container bytes (using
container_bytes and copy_nonoverlapping with CONTAINER_STRUCT_SIZE after
GxB_Container_new/MaybeUninit) which preserves process-local pointers
(vector_future, matrix_future, void_future) and risks OOB reads; instead
validate that container_bytes.len() >= CONTAINER_STRUCT_SIZE and reject
undersized input, then zero-initialize the newly allocated container (after
GxB_Container_new/container.assume_init()), and only deserialize the scalar
metadata fields (nrows, ncols, nrows_nonempty, ncols_nonempty, nvals, format,
orientation, iso, jumbled) by reading those fields from container_bytes into the
container struct; explicitly set all pointer/array fields (x,h,b,i,p,Y and
vector_future/matrix_future/void_future) to null_mut() to avoid resurrecting
stale pointers. Ensure the same fix is applied at the other occurrence
referenced (lines ~501-504) where raw container bytes are copied.

In `@graph/src/graph/graphblas/serialization.rs`:
- Around line 174-210: The decode and decode_with_count implementations accept
buffers with misaligned or extra bytes; change decode(r: &mut dyn Reader) to
fail if bytes.len() % 8 != 0 (return Err with a clear "misaligned deleted
entities buffer" message) so partial 8-byte ids are rejected, and change
decode_with_count(&mut self, r: &mut dyn Reader, count: u64) to require
bytes.len() == expected_len (not just >=) and return an error if it is longer or
shorter; keep using u64::from_le_bytes and the same insert logic but only after
these strict length checks and update the error strings to reflect exact-length
mismatch.

In `@graph/src/graph/graphblas/vector.rs`:
- Around line 149-160: The code mixes allocators: GxB_Vector_serialize() returns
a buffer allocated by GraphBLAS but the code frees it with std::alloc::dealloc,
and Vector<bool>::decode currently allocates with std::alloc::alloc then passes
that buffer to GxB_Vector_load() which frees it with GraphBLAS — causing heap
corruption when a custom allocator is used. Fix by using the GraphBLAS allocator
pair for both sides: change the free in the serialization path to call the
GraphBLAS deallocator (use the GraphBLAS free function provided by your
GraphBLAS binding) instead of std::alloc::dealloc, and in Vector<bool>::decode
either allocate the load buffer via the GraphBLAS allocator or let GraphBLAS
allocate (pass null if supported) so that GxB_Vector_load() frees with the same
allocator; update code around GxB_Vector_serialize, Vector<bool>::decode and
GxB_Vector_load accordingly.

In `@graph/src/runtime/pending.rs`:
- Around line 157-160: The resize logic only considers self.set_node_labels and
can shrink self.remove_node_labels, dropping staged removals; change the
computation of new_nrows/new_ncols to take the max across both matrices
(self.set_node_labels and self.remove_node_labels) before calling resize so
neither buffer is reduced inadvertently—update the calculations that produce
new_nrows/new_ncols (used with set_node_labels.resize and
remove_node_labels.resize) to use the maximum of corresponding nrows/ncols from
both matrices.

In `@graph/src/runtime/value.rs`:
- Around line 1831-1834: The current encoder in runtime/value.rs silently writes
si_type::T_NULL for Self::Map, Self::Node, Self::Relationship, and Self::Path
which corrupts data; update the match in the serializer (the branch handling
Self::Null | Self::Map(_) | Self::Node(_) | Self::Relationship(_) |
Self::Path(_)) to either implement proper encoding for Map, Node, Relationship,
and Path (add per-variant encode routines called from this match) or, if those
variants are unsupported for persisted payloads, return an explicit error
instead of calling w.write_unsigned(si_type::T_NULL); ensure you use the same
error type returned by the surrounding write/encode function so callers get a
failure signal rather than silent corruption.

In `@src/module_init.rs`:
- Around line 81-87: The persistence subscribe result is only asserted with
debug_assert_eq! (so ignored in release), which can silently disable
persistence; change the check after calling RedisModule_SubscribeToServerEvent
(with ctx.ctx, RedisModuleEvent_Persistence, Some(on_persistence)) into a
runtime validation: if res != REDISMODULE_OK as c_int then log or emit an error
and return Status::Err from the surrounding init routine (instead of
panicking/asserting), ensuring virtual-key persistence registration failures
prevent the module from loading.

In `@src/redis_type.rs`:
- Around line 83-84: The current code only swaps placeholder_tg.graph, leaving
the placeholder ThreadedGraph's sender/receiver/write_loop bound to the old
instance which breaks write serialization; instead, replace the entire
ThreadedGraph instance (not just placeholder_tg.graph) or reinitialize all
queue-bound members (sender, receiver, write_loop) together. Locate the code
that uses ThreadedGraph::new to create the placeholder (the
Arc<RwLock<ThreadedGraph>> assigned to arc) and update the reload logic where
placeholder_tg.graph is swapped (references to placeholder_tg.graph) so that you
either construct a new ThreadedGraph with the loaded graph and swap the whole
ThreadedGraph into the Arc<RwLock>, or re-create and assign new sender,
receiver, and write_loop alongside the new graph to preserve the queue
semantics.
- Around line 43-50: The code unsafely converts bytes returned from
RedisModule_StringPtrLen into UTF-8 using std::str::from_utf8_unchecked (e.g.,
when building key_name after calling raw::RedisModule_GetKeyNameFromIO and in
other cursor/key conversions), which can cause UB for binary-safe Redis data;
change those conversions to use
String::from_utf8_lossy(std::slice::from_raw_parts(ptr.cast(), len)).to_string()
(preserve the existing null-checks and unsafe pointer reads, but replace
from_utf8_unchecked with from_utf8_lossy for the final String) for all
occurrences where bytes from raw::RedisModule_StringPtrLen are turned into Rust
Strings (including key_name and the SCAN/cursor conversions).

In `@src/serializers/buffered_io.rs`:
- Around line 68-124: Change the host-endian serialization to explicit
little-endian in buffered_io.rs: replace all uses of to_ne_bytes with
to_le_bytes and all from_ne_bytes with from_le_bytes for the primitive
serializers/readers (e.g., write_unsigned, write_signed, write_double,
write_float and the buffer-length encoding in write_buffer) and their
corresponding deserializers/read methods referenced in the file so on-disk RDB
payload is little-endian and portable across architectures.

In `@src/serializers/encoder/mod.rs`:
- Around line 64-67: The calculation of key_count when vkey_max == 0 incorrectly
sets key_count = 1 while remaining_capacity is later initialized from vkey_max,
causing the distribution loop (which emits
Nodes/DeletedNodes/Edges/DeletedEdges) to never run and all payloads to collapse
to key 0; update the logic in the encoder where key_count, remaining_capacity,
and loop initialization are computed (references: key_count, total_entities,
vkey_max, remaining_capacity and the distribution loop that emits
Nodes/DeletedNodes/Edges/DeletedEdges — also apply same fix to the analogous
block around lines 88-111) so that when vkey_max == 0 you either bypass division
and initialize remaining_capacity to a nonzero value or handle the zero-capacity
case by setting key_count = 0 and skipping emission correctly, ensuring the
distribution loop can emit entity payloads appropriately.

In `@tests/flow/test_persistency.py`:
- Around line 109-110: A critical end-to-end persistence test, test_save_load,
was entirely skipped with `@skip`() which removes the main regression net;
re-enable it by removing the `@skip`() decorator on the test_save_load function
and either (a) narrow its assertions to only the parts currently stable, or (b)
mark specific failing cases as xfail (e.g., using pytest.mark.xfail on those
subcases) so the full test remains active; locate the test function named
test_save_load in tests/flow/test_persistency.py and update the
decorator/assertions accordingly.

---

Nitpick comments:
In `@graph/src/graph/mvcc_graph.rs`:
- Around line 95-99: The constructor from_graph currently returns an MVCCGraph
wrapping the restored Graph but leaves the Graph's back-reference
(set_indexer_graph) to be set by callers; move that responsibility into
from_graph by, after building Self (the MVCCGraph), obtaining a reference to the
inner graph via self.graph.read() and calling set_indexer_graph(...) with a
Weak/weak Arc pointing to the newly created MVCCGraph (use Arc::downgrade on the
MVCCGraph Arc you create), so callers no longer need to call read() +
set_indexer_graph; update from_graph to construct the Arc-wrapped MVCCGraph,
then set_indexer_graph on the inner Graph before returning.

In `@graph/src/index/indexer.rs`:
- Around line 442-452: The method get_all_pending_fields currently returns
fields for all labels but its name implies it should only return labels with
pending work; either change its behavior to filter by pending_count() or rename
it to get_all_fields and update the doc comment. To fix, choose one approach:
(A) if you want only pending labels, update get_all_pending_fields to call
self.index.read().iter().filter(|(_, idx)| idx.pending_count() > 0).map(...) so
only labels with pending_count() > 0 are returned and adjust the doc comment
accordingly; or (B) if the current behavior is correct, rename the function to
get_all_fields and change the doc comment to "Get fields for all labels" so the
name, implementation, and docs match. Ensure references to
get_all_pending_fields elsewhere are updated to the new name or behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 35f5a4b1-cbbd-48cc-b21a-7c6714b99da1

📥 Commits

Reviewing files that changed from the base of the PR and between 1384df5 and 65c0416.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (26)
  • Cargo.toml
  • flow.sh
  • flow_tests_done.txt
  • flow_tests_todo.txt
  • graph/src/graph/attribute_store.rs
  • graph/src/graph/graph.rs
  • graph/src/graph/graphblas/matrix.rs
  • graph/src/graph/graphblas/mod.rs
  • graph/src/graph/graphblas/serialization.rs
  • graph/src/graph/graphblas/tensor.rs
  • graph/src/graph/graphblas/vector.rs
  • graph/src/graph/graphblas/versioned_matrix.rs
  • graph/src/graph/mvcc_graph.rs
  • graph/src/index/indexer.rs
  • graph/src/runtime/pending.rs
  • graph/src/runtime/value.rs
  • src/graph_core.rs
  • src/lib.rs
  • src/module_init.rs
  • src/redis_type.rs
  • src/serializers/buffered_io.rs
  • src/serializers/decoder/mod.rs
  • src/serializers/encoder/mod.rs
  • src/serializers/mod.rs
  • tests/flow/test_persistency.py
  • tests/flow/test_replication.py
💤 Files with no reviewable changes (1)
  • flow_tests_todo.txt

Comment thread graph/src/graph/attribute_store.rs Outdated
Comment on lines +437 to +458
let container_bytes = r.read_buffer()?;
unsafe {
let mut container: MaybeUninit<super::GxB_Container> = MaybeUninit::uninit();
let info = GxB_Container_new(container.as_mut_ptr());
debug_assert_eq!(info, GrB_Info::GrB_SUCCESS);
let container = container.assume_init();

// Copy struct data into the allocated container
std::ptr::copy_nonoverlapping(
container_bytes.as_ptr(),
container.cast::<u8>(),
CONTAINER_STRUCT_SIZE,
);

// Nullify vector/matrix pointers (will be populated below)
(*container).x = null_mut();
(*container).h = null_mut();
(*container).b = null_mut();
(*container).i = null_mut();
(*container).p = null_mut();
(*container).Y = null_mut();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== GxB_Container_struct fields =="
rg -n -A30 -B2 'struct GxB_Container_struct|vector_future|matrix_future|void_future' graph/src/graph/graphblas/mod.rs

echo
echo "== Matrix serializer/deserializer handling =="
rg -n -A20 -B5 'copy_nonoverlapping|CONTAINER_STRUCT_SIZE|\(\*container\)\.(x|h|b|i|p|Y) = null_mut|GxB_load_Matrix_from_Container|write_buffer\(container_bytes\)' graph/src/graph/graphblas/matrix.rs

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 11545


Serialize only scalar metadata, not raw GxB_Container_struct bytes.

This code copies process-local pointer fields into the serialized image and later restores them unsafely. The deserializer only nulls x/h/p/i/b/Y but leaves vector_future (11-element pointer array), matrix_future (15-element pointer array), and void_future (16-element void pointer array) as serialized garbage. This causes undefined behavior when GraphBLAS accesses these stale pointers. Additionally, copy_nonoverlapping() at line 445 does not validate container_bytes.len() >= CONTAINER_STRUCT_SIZE, allowing corrupted payloads to trigger out-of-bounds reads before GraphBLAS initialization completes.

Extract and serialize only the scalar metadata fields (nrows, ncols, nrows_nonempty, ncols_nonempty, nvals, format, orientation, iso, jumbled), validate buffer size on deserialization, zero-initialize the container, and reject undersized inputs.

Also applies to: 501-504

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graphblas/matrix.rs` around lines 437 - 458, The code
currently copies raw GxB_Container bytes (using container_bytes and
copy_nonoverlapping with CONTAINER_STRUCT_SIZE after
GxB_Container_new/MaybeUninit) which preserves process-local pointers
(vector_future, matrix_future, void_future) and risks OOB reads; instead
validate that container_bytes.len() >= CONTAINER_STRUCT_SIZE and reject
undersized input, then zero-initialize the newly allocated container (after
GxB_Container_new/container.assume_init()), and only deserialize the scalar
metadata fields (nrows, ncols, nrows_nonempty, ncols_nonempty, nvals, format,
orientation, iso, jumbled) by reading those fields from container_bytes into the
container struct; explicitly set all pointer/array fields (x,h,b,i,p,Y and
vector_future/matrix_future/void_future) to null_mut() to avoid resurrecting
stale pointers. Ensure the same fix is applied at the other occurrence
referenced (lines ~501-504) where raw container bytes are copied.

Comment thread graph/src/graph/graphblas/serialization.rs
Comment on lines +149 to +160
unsafe {
let mut blob: *mut c_void = null_mut();
let mut blob_size: u64 = 0;

let info = GxB_Vector_serialize(&raw mut blob, &raw mut blob_size, self.v, null_mut());
debug_assert_eq!(info, GrB_Info::GrB_SUCCESS);

let blob_slice = std::slice::from_raw_parts(blob.cast::<u8>(), blob_size as usize);
w.write_buffer(blob_slice);

let layout = std::alloc::Layout::from_size_align(blob_size as usize, 8).unwrap();
std::alloc::dealloc(blob.cast::<u8>(), layout);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== GraphBLAS init allocator hooks =="
rg -n -A12 -B3 'pub fn init\(|GxB_init\(' graph/src/graph/graphblas/matrix.rs

echo
echo "== Vector serialization buffer ownership =="
rg -n -A18 -B5 'GxB_Vector_serialize|GxB_Vector_load|std::alloc::dealloc|std::alloc::alloc' graph/src/graph/graphblas/vector.rs

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 5385


Fix allocator ownership mismatch in vector serialization.

GxB_Vector_serialize() allocates memory via GraphBLAS's configured allocator (line 153), but line 160 frees it with std::alloc::dealloc(). Similarly, in Vector<bool>::decode(), line 270 allocates with std::alloc::alloc() and passes it to GxB_Vector_load() at line 277, which frees it via GraphBLAS's allocator. When GraphBLAS is initialized with custom allocators (Redis) via matrix.rs::init(), this mixing of allocator families causes heap corruption.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graphblas/vector.rs` around lines 149 - 160, The code mixes
allocators: GxB_Vector_serialize() returns a buffer allocated by GraphBLAS but
the code frees it with std::alloc::dealloc, and Vector<bool>::decode currently
allocates with std::alloc::alloc then passes that buffer to GxB_Vector_load()
which frees it with GraphBLAS — causing heap corruption when a custom allocator
is used. Fix by using the GraphBLAS allocator pair for both sides: change the
free in the serialization path to call the GraphBLAS deallocator (use the
GraphBLAS free function provided by your GraphBLAS binding) instead of
std::alloc::dealloc, and in Vector<bool>::decode either allocate the load buffer
via the GraphBLAS allocator or let GraphBLAS allocate (pass null if supported)
so that GxB_Vector_load() frees with the same allocator; update code around
GxB_Vector_serialize, Vector<bool>::decode and GxB_Vector_load accordingly.

Comment thread graph/src/runtime/pending.rs Outdated
Comment thread src/redis_type.rs Outdated
Comment thread src/redis_type.rs
Comment thread src/serializers/buffered_io.rs
Comment thread src/serializers/encoder/mod.rs
Comment on lines +109 to +110
# TODO: enable after indexes completed
@skip()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Please keep an end-to-end save/load test enabled in this PR.

test_save_load is the broadest RDB round-trip check in this file, and skipping it removes the main regression net for the persistence rewrite. If index rebuild is still incomplete, narrow the assertions or xfail the specific failing cases instead of disabling the whole test.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/flow/test_persistency.py` around lines 109 - 110, A critical end-to-end
persistence test, test_save_load, was entirely skipped with `@skip`() which
removes the main regression net; re-enable it by removing the `@skip`() decorator
on the test_save_load function and either (a) narrow its assertions to only the
parts currently stable, or (b) mark specific failing cases as xfail (e.g., using
pytest.mark.xfail on those subcases) so the full test remains active; locate the
test function named test_save_load in tests/flow/test_persistency.py and update
the decorator/assertions accordingly.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 12, 2026

Codecov Report

❌ Patch coverage is 85.71429% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 40.70%. Comparing base (4d5a2b4) to head (7c14cc8).
⚠️ Report is 53 commits behind head on main.

Files with missing lines Patch % Lines
src/lib.rs 85.71% 1 Missing ⚠️

❗ There is a different number of reports uploaded between BASE (4d5a2b4) and HEAD (7c14cc8). Click for more details.

HEAD has 1 upload less than BASE
Flag BASE (4d5a2b4) HEAD (7c14cc8)
1 0
Additional details and impacted files
@@             Coverage Diff             @@
##             main     #359       +/-   ##
===========================================
- Coverage   81.97%   40.70%   -41.27%     
===========================================
  Files         111       97       -14     
  Lines       30424    28484     -1940     
===========================================
- Hits        24939    11595    -13344     
- Misses       5485    16889    +11404     
Flag Coverage Δ
fuzz 40.70% <85.71%> (-0.13%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (3)
graph/src/graph/graphblas/matrix.rs (1)

454-467: ⚠️ Potential issue | 🔴 Critical

Stop serializing raw GxB_Container_struct memory.

The size check on Line 440 fixed the short-buffer case, but this still round-trips GraphBLAS container pointers/padding verbatim and only nulls x/h/b/i/p/Y on load. vector_future/matrix_future and the rest of the non-scalar container state are still copied back as stale garbage, so restore can hand invalid pointers back to GraphBLAS. Please serialize only scalar metadata and rebuild the container explicitly.

#!/bin/bash
set -euo pipefail

printf '== GxB_Container_struct fields ==\n'
rg -n -A30 -B2 'pub struct GxB_Container_struct' graph/src/graph/graphblas/mod.rs

printf '\n== Raw container serialization in matrix.rs ==\n'
rg -n -A20 -B5 'copy_nonoverlapping|CONTAINER_STRUCT_SIZE|\(\*container\)\.(x|h|b|i|p|Y)|vector_future|matrix_future|void_future' graph/src/graph/graphblas/matrix.rs

Also applies to: 511-514

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graphblas/matrix.rs` around lines 454 - 467, The code
currently restores a GxB_Container_struct by memcpy'ing raw container bytes
(CONTAINER_STRUCT_SIZE) which rehydrates stale pointer/padding (container,
vector_future, matrix_future, void_future, etc.); instead, stop using
std::ptr::copy_nonoverlapping on container_bytes and implement field-by-field
deserialization: allocate a fresh GxB_Container_struct, parse and assign only
scalar metadata fields (IDs, dimensions, types, flags) into the new container,
explicitly nullify pointer fields ((*container).x/h/b/i/p/Y = null_mut()), and
reconstruct any non-scalar state (vectors, matrices, futures) from their
serialized representations into new heap allocations and set the container
pointers accordingly; remove any restoration of raw padding or pointer bytes so
no stale pointers are handed back to GraphBLAS.
graph/src/runtime/value.rs (1)

1831-1838: ⚠️ Potential issue | 🟠 Major

Unsupported Value variants still serialize as NULL.

Map, Node, Relationship, and Path still collapse to T_NULL, so any accidental Encode<19> call on those variants becomes a silent lossy round-trip instead of a hard failure. Since this impl lives on Value itself, callers get no signal that data was discarded. As per coding guidelines, "Value enum (runtime/value.rs) must support all data types: Null, Bool, Int, Float, String, List, Map, Node, Relationship, Path, Point, etc."

src/redis_type.rs (1)

497-518: ⚠️ Potential issue | 🔴 Critical

from_utf8_unchecked is still used on Redis reply bytes.

These SCAN cursor/key conversions have the same problem called out earlier: Redis reply strings are binary-safe, so from_utf8_unchecked can invoke UB on non-UTF-8 data and mis-handle binary keys. The StringPtrLen paths were fixed, but the CallReplyStringPtr paths still need the same treatment.

#!/bin/bash
set -euo pipefail
rg -n -C2 'from_utf8_unchecked' src/redis_type.rs

Expected result: no remaining from_utf8_unchecked conversions on bytes returned by Redis module APIs.

Also applies to: 586-607

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/redis_type.rs` around lines 497 - 518, The code is calling
raw::RedisModule_CallReplyStringPtr and then using std::str::from_utf8_unchecked
on the returned bytes (variables cursor_ptr/new_cursor and kptr/key_name), which
is UB for non-UTF-8 Redis binary replies; replace these unchecked conversions
with a binary-safe approach: create a &[u8] via
std::slice::from_raw_parts(cursor_ptr.cast(), cursor_len) (and similarly for
kptr), then either parse with std::str::from_utf8(...) and handle the Err
(propagate or map to a clear error) or use String::from_utf8_lossy(...) if lossy
conversion is acceptable for keys/cursor; apply the same change to both the
cursor conversion (new_cursor) and the key conversion (key_name) and update any
callers that expect &str to accept owned Strings/Results as needed, referencing
raw::RedisModule_CallReplyStringPtr, raw::call_reply_array_element,
raw::call_reply_length, cursor_ptr/new_cursor and kptr/key_name.
🧹 Nitpick comments (1)
graph/src/graph/attribute_store.rs (1)

380-386: Avoid forcing a Vec clone on every attribute read.

This accessor now allocates/clones even for one-pass reads. graph/src/graph/graph.rs:2063-2075 calls it from entity_size_bytes(), so memory accounting now clones each entity’s attribute list just to iterate once. Please keep a borrowed/iterator-based path for hot reads and use an owned variant only where serialization actually needs ownership.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/attribute_store.rs` around lines 380 - 386, The current
get_all_attrs_by_id forces a Vec clone on every read; add a borrowed/iterator
path and use the owned variant only when needed: keep get_all_attrs_by_id as an
owned method if callers need ownership, and introduce a new borrow-friendly
method (e.g., get_attrs_by_id or iter_attrs_by_id) that returns either a
slice/iterator or a Cow<&[ (u16, Value) ]>/Iterator from
self.cache.get_entity(...) without cloning; update hot callers (like
entity_size_bytes) to use the new borrow/iterator method and leave serialization
code to call the owned get_all_attrs_by_id or an explicit to_owned conversion
that calls populate_cache_from_fjall only when necessary.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@graph/src/graph/attribute_store.rs`:
- Around line 665-700: encode_with_range currently rescans 0..=max_id and
re-skips live ids for every call, causing O(key_count * max_id); instead build
an iterator over the live IDs and apply .skip(offset).take(count) so we only
visit the needed ids once. Concretely: inside encode_with_range use let ids =
(0..=max_id).filter(|id| !deleted.contains(*id)).skip(offset as
usize).take(count as usize) and then loop over ids to call
get_all_attrs_by_id(id) and write payload; keep existing uses of encode_deleted,
encode_max_id and get_all_attrs_by_id unchanged but replace the for id in
0..=max_id loop with the filtered iterator approach.
- Around line 720-726: The loop currently casts r.read_unsigned()? to u16 which
silently wraps oversized values; instead, after reading the unsigned id via
r.read_unsigned(), explicitly validate that it fits into u16 (or into
attrs_name.len()) and return a decode/load error if it exceeds u16::MAX or the
number of known attributes so we don't silently place attributes in the wrong
column; update the attr_id handling in the loop (where attr_id is computed and
used to index self.attrs_name and insert into entity_attrs) to perform this
bounds/overflow check and fail the load rather than truncating.

In `@src/lib.rs`:
- Line 78: The registration of the sensitive command ["graph.DEBUG",
graph_debug, ...] must be gated so it cannot be registered unconditionally;
update the code that pushes this tuple in src/lib.rs to only register it when an
admin/test-only switch is enabled (e.g., a compile feature like cfg(feature =
"admin_commands") or a runtime check such as is_admin_commands_enabled()),
referencing the graph_debug handler and the registration array so you only
push/insert that entry when the switch returns true; ensure normal
builds/clients never see the "graph.DEBUG" command unless the admin/test flag is
explicitly enabled.

In `@src/serializers/encoder/mod.rs`:
- Around line 41-44: EncodeState::RelationMatrices currently omits writing its
item count to the stream (unlike EncodeState::LabelsMatrices) and instead relies
on hdr.relationship_count during decode; change the encoder so RelationMatrices
writes a self-describing count before iterating items (mirror the pattern used
by EncodeState::LabelsMatrices), and update the corresponding decoder to read
that embedded count instead of using hdr.relationship_count so decoding
validates bounds locally; look for EncodeState::RelationMatrices and
EncodeState::LabelsMatrices in the encoder and the decode path that references
hdr.relationship_count and make the writes/reads consistent.

---

Duplicate comments:
In `@graph/src/graph/graphblas/matrix.rs`:
- Around line 454-467: The code currently restores a GxB_Container_struct by
memcpy'ing raw container bytes (CONTAINER_STRUCT_SIZE) which rehydrates stale
pointer/padding (container, vector_future, matrix_future, void_future, etc.);
instead, stop using std::ptr::copy_nonoverlapping on container_bytes and
implement field-by-field deserialization: allocate a fresh GxB_Container_struct,
parse and assign only scalar metadata fields (IDs, dimensions, types, flags)
into the new container, explicitly nullify pointer fields
((*container).x/h/b/i/p/Y = null_mut()), and reconstruct any non-scalar state
(vectors, matrices, futures) from their serialized representations into new heap
allocations and set the container pointers accordingly; remove any restoration
of raw padding or pointer bytes so no stale pointers are handed back to
GraphBLAS.

In `@src/redis_type.rs`:
- Around line 497-518: The code is calling raw::RedisModule_CallReplyStringPtr
and then using std::str::from_utf8_unchecked on the returned bytes (variables
cursor_ptr/new_cursor and kptr/key_name), which is UB for non-UTF-8 Redis binary
replies; replace these unchecked conversions with a binary-safe approach: create
a &[u8] via std::slice::from_raw_parts(cursor_ptr.cast(), cursor_len) (and
similarly for kptr), then either parse with std::str::from_utf8(...) and handle
the Err (propagate or map to a clear error) or use String::from_utf8_lossy(...)
if lossy conversion is acceptable for keys/cursor; apply the same change to both
the cursor conversion (new_cursor) and the key conversion (key_name) and update
any callers that expect &str to accept owned Strings/Results as needed,
referencing raw::RedisModule_CallReplyStringPtr, raw::call_reply_array_element,
raw::call_reply_length, cursor_ptr/new_cursor and kptr/key_name.

---

Nitpick comments:
In `@graph/src/graph/attribute_store.rs`:
- Around line 380-386: The current get_all_attrs_by_id forces a Vec clone on
every read; add a borrowed/iterator path and use the owned variant only when
needed: keep get_all_attrs_by_id as an owned method if callers need ownership,
and introduce a new borrow-friendly method (e.g., get_attrs_by_id or
iter_attrs_by_id) that returns either a slice/iterator or a Cow<&[ (u16, Value)
]>/Iterator from self.cache.get_entity(...) without cloning; update hot callers
(like entity_size_bytes) to use the new borrow/iterator method and leave
serialization code to call the owned get_all_attrs_by_id or an explicit to_owned
conversion that calls populate_cache_from_fjall only when necessary.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6684b023-e1d9-455c-b111-b11df504a5a1

📥 Commits

Reviewing files that changed from the base of the PR and between 65c0416 and 3ce7c6d.

📒 Files selected for processing (15)
  • flow_tests_done.txt
  • flow_tests_todo.txt
  • graph/src/graph/attribute_store.rs
  • graph/src/graph/graphblas/matrix.rs
  • graph/src/graph/graphblas/serialization.rs
  • graph/src/runtime/pending.rs
  • graph/src/runtime/value.rs
  • src/commands/debug.rs
  • src/commands/mod.rs
  • src/lib.rs
  • src/module_init.rs
  • src/redis_type.rs
  • src/serializers/buffered_io.rs
  • src/serializers/encoder/mod.rs
  • tests/flow/test_rdb_load.py
💤 Files with no reviewable changes (1)
  • flow_tests_todo.txt
🚧 Files skipped from review as they are similar to previous changes (1)
  • flow_tests_done.txt

Comment thread graph/src/graph/attribute_store.rs Outdated
Comment thread graph/src/graph/attribute_store.rs Outdated
Comment on lines +720 to +726
for _ in 0..attr_count {
let attr_id = r.read_unsigned()? as u16;
let value = Value::decode(r)?;

if (attr_id as usize) < self.attrs_name.len() {
let attr_name = self.attrs_name[attr_id as usize].clone();
entity_attrs.insert(attr_name, value);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject oversized attribute ids instead of truncating them.

r.read_unsigned()? as u16 will silently wrap malformed values above u16::MAX, so a corrupted RDB can land attributes in the wrong column instead of failing the load.

Suggested fix
-                let attr_id = r.read_unsigned()? as u16;
+                let attr_id = u16::try_from(r.read_unsigned()?)
+                    .map_err(|_| "attribute id exceeds u16".to_string())?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _ in 0..attr_count {
let attr_id = r.read_unsigned()? as u16;
let value = Value::decode(r)?;
if (attr_id as usize) < self.attrs_name.len() {
let attr_name = self.attrs_name[attr_id as usize].clone();
entity_attrs.insert(attr_name, value);
for _ in 0..attr_count {
let attr_id = u16::try_from(r.read_unsigned()?)
.map_err(|_| "attribute id exceeds u16".to_string())?;
let value = Value::decode(r)?;
if (attr_id as usize) < self.attrs_name.len() {
let attr_name = self.attrs_name[attr_id as usize].clone();
entity_attrs.insert(attr_name, value);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/attribute_store.rs` around lines 720 - 726, The loop
currently casts r.read_unsigned()? to u16 which silently wraps oversized values;
instead, after reading the unsigned id via r.read_unsigned(), explicitly
validate that it fits into u16 (or into attrs_name.len()) and return a
decode/load error if it exceeds u16::MAX or the number of known attributes so we
don't silently place attributes in the wrong column; update the attr_id handling
in the loop (where attr_id is computed and used to index self.attrs_name and
insert into entity_attrs) to perform this bounds/overflow check and fail the
load rather than truncating.

Comment thread src/lib.rs
["graph.MEMORY", graph_memory, "readonly deny-script", 2, 2, 1, ""],
["graph.CONFIG", graph_config, "readonly deny-script allow-busy", 0, 0, 0, ""],
["graph.UDF", graph_udf, "write deny-script", 0, 0, 0, ""],
["graph.DEBUG", graph_debug, "write deny-script", 0, 0, 0, ""],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Gate graph.DEBUG behind a test/admin-only switch.

This command can clear global decode state, create/delete virtual keys, and finalize pending RDB loads. Registering it unconditionally makes those internal persistence controls callable by any client on a production server.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib.rs` at line 78, The registration of the sensitive command
["graph.DEBUG", graph_debug, ...] must be gated so it cannot be registered
unconditionally; update the code that pushes this tuple in src/lib.rs to only
register it when an admin/test-only switch is enabled (e.g., a compile feature
like cfg(feature = "admin_commands") or a runtime check such as
is_admin_commands_enabled()), referencing the graph_debug handler and the
registration array so you only push/insert that entry when the switch returns
true; ensure normal builds/clients never see the "graph.DEBUG" command unless
the admin/test flag is explicitly enabled.

Comment on lines +41 to +44
// --- Payload data ---
for p in payloads {
graph.encode_payload(&mut w, p);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf '%s\n' '--- encoder arms ---'
rg -n -C3 'EncodeState::(LabelsMatrices|RelationMatrices)' graph/src/graph/graph.rs

printf '\n%s\n' '--- decoder arms ---'
rg -n -C4 'EncodeState::(LabelsMatrices|RelationMatrices)' src/serializers/decoder/mod.rs

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 2305


LabelsMatrices and RelationMatrices use inconsistent framing in the encoder.

EncodeState::LabelsMatrices writes its count to the stream (line 2106), making it self-describing during decode. EncodeState::RelationMatrices does not—it only writes per-item ids during iteration and relies entirely on hdr.relationship_count in the decoder (lines 167–171, 244–248). This inconsistency creates fragility: LabelsMatrices can validate its bounds independently, but RelationMatrices silently desynchronizes if the header's relationship count ever becomes incorrect or is parsed wrong.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/serializers/encoder/mod.rs` around lines 41 - 44,
EncodeState::RelationMatrices currently omits writing its item count to the
stream (unlike EncodeState::LabelsMatrices) and instead relies on
hdr.relationship_count during decode; change the encoder so RelationMatrices
writes a self-describing count before iterating items (mirror the pattern used
by EncodeState::LabelsMatrices), and update the corresponding decoder to read
that embedded count instead of using hdr.relationship_count so decoding
validates bounds locally; look for EncodeState::RelationMatrices and
EncodeState::LabelsMatrices in the encoder and the decode path that references
hdr.relationship_count and make the writes/reads consistent.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (4)
src/redis_type.rs (1)

513-519: ⚠️ Potential issue | 🔴 Critical

from_utf8_unchecked is still used on SCAN key names.

The remaining key-name conversions still turn binary-safe Redis reply bytes into &str with from_utf8_unchecked. For non-UTF-8 keys that is UB, and these paths then feed the result back into key lookup/delete. This is the same safety issue that was fixed earlier for RedisModule_StringPtrLen, but it is still present in the SCAN helpers.

Are values returned by RedisModule_CallReplyStringPtr binary-safe bytes, and does Rust std::str::from_utf8_unchecked require valid UTF-8?

Also applies to: 603-609

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/redis_type.rs` around lines 513 - 519, The code uses
std::str::from_utf8_unchecked on bytes returned by
RedisModule_CallReplyStringPtr (see variables elem, kptr, key_len, key_name in
the SCAN helpers), which is undefined behavior for non-UTF-8 keys; instead treat
the reply as binary-safe bytes: build a &[u8] or Vec<u8> from
std::slice::from_raw_parts(kptr.cast(), key_len) and pass that binary slice to
downstream key-lookup/delete operations (or create a RedisModuleString via the
appropriate Redis API that accepts a pointer+len) rather than converting to
&str/String with from_utf8_unchecked; update any downstream code that expects
key_name to accept bytes or a RedisModuleString accordingly.
graph/src/runtime/value.rs (1)

1831-1841: ⚠️ Potential issue | 🟠 Major

Don't silently serialize unsupported Value variants as NULL.

Map, Node, Relationship, and Path still round-trip lossy here: once they hit this encoder, the persisted payload becomes indistinguishable from an actual NULL. If these variants are invalid in attribute storage, reject them before this code path or hard-fail here instead of emitting T_NULL.

As per coding guidelines, Value enum (runtime/value.rs) must support all data types: Null, Bool, Int, Float, String, List, Map, Node, Relationship, Path, Point, etc.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/runtime/value.rs` around lines 1831 - 1841, The current branch
silently encodes unsupported variants (Self::Map, Self::Node,
Self::Relationship, Self::Path) as si_type::T_NULL via w.write_unsigned, causing
real NULLs and these types to be indistinguishable; replace the
debug_assert+write_null with an explicit hard failure that cannot be ignored —
either return/propagate an error from the enclosing serializer function or panic
with a clear message (e.g., "unsupported value type in property storage: ...")
so callers cannot persist these variants; update any callers to handle the error
if you choose to return a Result instead of panicking.
graph/src/graph/attribute_store.rs (2)

726-733: ⚠️ Potential issue | 🟠 Major

Reject invalid attribute ids instead of truncating or dropping them.

r.read_unsigned()? as u16 can wrap malformed ids, and the subsequent bounds check silently discards anything still out of range. A corrupted RDB should fail the load here, not remap or lose attributes.

🛡️ Suggested change
-                let attr_id = r.read_unsigned()? as u16;
+                let attr_id = u16::try_from(r.read_unsigned()?)
+                    .map_err(|_| "attribute id exceeds u16".to_string())?;
                 let value = Value::decode(r)?;
 
-                if (attr_id as usize) < self.attrs_name.len() {
-                    let attr_name = self.attrs_name[attr_id as usize].clone();
-                    entity_attrs.insert(attr_name, value);
-                }
+                if (attr_id as usize) >= self.attrs_name.len() {
+                    return Err(format!("unknown attribute id {attr_id}"));
+                }
+
+                let attr_name = self.attrs_name[attr_id as usize].clone();
+                entity_attrs.insert(attr_name, value);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/attribute_store.rs` around lines 726 - 733, The code
currently reads attr ids with r.read_unsigned()? as u16 which can wrap malformed
values and then silently drop out-of-range ids; change the logic in the loop
that reads attributes (symbols: attr_count, attr_id, Value::decode,
self.attrs_name, entity_attrs) to parse the unsigned id into a sufficiently-wide
integer (avoid a direct cast to u16), check that it fits in the valid index
range (0..self.attrs_name.len()) and if it does not, return a decoding/load
error instead of truncating or skipping; on success convert to usize for
indexing, clone the attribute name and insert into entity_attrs as before, and
propagate a clear error (with context) when an invalid id is encountered so
corrupted RDBs fail to load.

671-707: ⚠️ Potential issue | 🟠 Major

encode_with_range() still rescans the full ID space for every chunk.

When multi-key saves advance offset, each call starts from 0..=max_id again and re-skips the earlier live ids. That keeps serialization roughly proportional to chunk_count * max_id instead of the number of entities actually written.

♻️ Suggested change
-        let mut skipped = 0u64;
-        let mut encoded = 0u64;
-
-        for id in 0..=max_id {
-            if deleted.contains(id) {
-                continue;
-            }
-            if skipped < offset {
-                skipped += 1;
-                continue;
-            }
+        for id in (0..=max_id)
+            .filter(|id| !deleted.contains(*id))
+            .skip(offset as usize)
+            .take(count as usize)
+        {
             w.write_unsigned(id);
 
             let props: Vec<(u16, Value)> = self.get_all_attrs_by_id(id);
             w.write_unsigned(props.len() as u64);
 
             for (attr_id, value) in props {
                 w.write_unsigned(attr_id as u64);
                 value.encode(w);
             }
-
-            encoded += 1;
-            if encoded >= count {
-                break;
-            }
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/attribute_store.rs` around lines 671 - 707, encode_with_range
currently iterates 0..=max_id and re-skips live IDs for every chunk, causing
O(chunk_count * max_id) work; instead produce an iterator of live IDs from the
current encode_deleted binding (using the same binding variable) and advance
that iterator by offset then take count items to encode, eliminating the skipped
counter and repeated rescans. In practice modify encode_with_range to (1)
acquire the encode_deleted lock and derive an iterator over live ids (the
complement of deleted) up to encode_max_id, (2) call iterator.nth(offset-1) or
use iterator.skip(offset) to position at the first id to emit, (3) loop over
iterator.take(count) writing id, fetching props via get_all_attrs_by_id(id) and
encoding them via value.encode(w), and (4) stop when take(count) is exhausted;
keep references to encode_max_id, encode_deleted, get_all_attrs_by_id, and
Writer.write_unsigned to locate the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@graph/src/graph/attribute_store.rs`:
- Around line 144-147: The shared encode context fields encode_deleted and
encode_max_id on AttributeStore can be clobbered by concurrent serializations;
change the implementation so set_encode_context() and encode_with_range()
execute logically atomically by removing reliance on shared state: make
set_encode_context() return an owned context struct (e.g., EncodeContext {
deleted: RoaringTreemap, max_id: u64 }) and pass that context as a parameter
into encode_with_range(&self, ctx: EncodeContext, ...) (or alternatively have a
single public encode_with_range_from_context(...) that both sets and consumes
the context internally), and remove or ignore the shared Mutex/Atomic fields for
encoding so two overlapping serializations use separate context instances and
cannot overwrite each other.

In `@src/redis_type.rs`:
- Around line 224-234: The AUX loader graphmeta_aux_load ignores errors from
load_unsigned(rdb) and always returns success; change graphmeta_aux_load to
check the result of load_unsigned(rdb) and propagate a non-zero
error/early-return if it fails instead of continuing to call
finalize_pending_graphs(), and apply the same fix to the other AUX handler that
also calls load_unsigned (the second occurrence in the file) so both handlers
validate load_unsigned's return and abort the load on failure.
- Around line 181-198: graphmeta_rdb_load currently materializes dummy values
into the live keyspace which survive restart; instead, keep returning the
non-null dummy Redis requires but ensure any virtual/internal keys created
during rdb_load are tracked and removed during the AFTER_RDB finalization path
(graphmeta_aux_load/graphmeta_free) — modify graphmeta_rdb_load to record
created virtual key names into the pending-graph structure rather than leaving
materialized keys, then update graphmeta_aux_load (and the finalizer path
referenced around the lines noted) to iterate that recorded list and delete each
key from Redis (using the module delete API) as part of the finalization so no
internal keys remain visible after load/save.
- Around line 42-52: The code currently collapses null IO key names into the
shared sentinel "<unknown>", causing different graphs to share keys; change the
behavior in the unsafe block that calls raw::RedisModule_GetKeyNameFromIO (and
raw::RedisModule_StringPtrLen) so that if rm_key_name is null you do not return
"<unknown>" but instead fail the load (return an Err or propagate a decode
error) or else derive a per-graph key using the decoded graph name available in
your decode context; update callers that populate DECODE_STATE.finalized and
DECODE_STATE.placeholders to handle the error or the per-graph key instead of a
global sentinel so payloads cannot be aliased across graphs.

---

Duplicate comments:
In `@graph/src/graph/attribute_store.rs`:
- Around line 726-733: The code currently reads attr ids with r.read_unsigned()?
as u16 which can wrap malformed values and then silently drop out-of-range ids;
change the logic in the loop that reads attributes (symbols: attr_count,
attr_id, Value::decode, self.attrs_name, entity_attrs) to parse the unsigned id
into a sufficiently-wide integer (avoid a direct cast to u16), check that it
fits in the valid index range (0..self.attrs_name.len()) and if it does not,
return a decoding/load error instead of truncating or skipping; on success
convert to usize for indexing, clone the attribute name and insert into
entity_attrs as before, and propagate a clear error (with context) when an
invalid id is encountered so corrupted RDBs fail to load.
- Around line 671-707: encode_with_range currently iterates 0..=max_id and
re-skips live IDs for every chunk, causing O(chunk_count * max_id) work; instead
produce an iterator of live IDs from the current encode_deleted binding (using
the same binding variable) and advance that iterator by offset then take count
items to encode, eliminating the skipped counter and repeated rescans. In
practice modify encode_with_range to (1) acquire the encode_deleted lock and
derive an iterator over live ids (the complement of deleted) up to
encode_max_id, (2) call iterator.nth(offset-1) or use iterator.skip(offset) to
position at the first id to emit, (3) loop over iterator.take(count) writing id,
fetching props via get_all_attrs_by_id(id) and encoding them via
value.encode(w), and (4) stop when take(count) is exhausted; keep references to
encode_max_id, encode_deleted, get_all_attrs_by_id, and Writer.write_unsigned to
locate the change.

In `@graph/src/runtime/value.rs`:
- Around line 1831-1841: The current branch silently encodes unsupported
variants (Self::Map, Self::Node, Self::Relationship, Self::Path) as
si_type::T_NULL via w.write_unsigned, causing real NULLs and these types to be
indistinguishable; replace the debug_assert+write_null with an explicit hard
failure that cannot be ignored — either return/propagate an error from the
enclosing serializer function or panic with a clear message (e.g., "unsupported
value type in property storage: ...") so callers cannot persist these variants;
update any callers to handle the error if you choose to return a Result instead
of panicking.

In `@src/redis_type.rs`:
- Around line 513-519: The code uses std::str::from_utf8_unchecked on bytes
returned by RedisModule_CallReplyStringPtr (see variables elem, kptr, key_len,
key_name in the SCAN helpers), which is undefined behavior for non-UTF-8 keys;
instead treat the reply as binary-safe bytes: build a &[u8] or Vec<u8> from
std::slice::from_raw_parts(kptr.cast(), key_len) and pass that binary slice to
downstream key-lookup/delete operations (or create a RedisModuleString via the
appropriate Redis API that accepts a pointer+len) rather than converting to
&str/String with from_utf8_unchecked; update any downstream code that expects
key_name to accept bytes or a RedisModuleString accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 13b7a771-b8a5-401f-8913-76951d2de07a

📥 Commits

Reviewing files that changed from the base of the PR and between 3ce7c6d and 788c6f2.

📒 Files selected for processing (5)
  • graph/src/graph/attribute_store.rs
  • graph/src/runtime/value.rs
  • src/commands/debug.rs
  • src/redis_type.rs
  • src/serializers/encoder/mod.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/commands/debug.rs
  • src/serializers/encoder/mod.rs

Comment thread graph/src/graph/attribute_store.rs Outdated
Comment thread src/redis_type.rs Outdated
Comment thread src/redis_type.rs Outdated
Comment on lines +224 to +234
unsafe extern "C" fn graphmeta_aux_load(
rdb: *mut RedisModuleIO,
_encver: i32,
when: i32,
) -> i32 {
let Ok(count) = load_unsigned(rdb) else {
return 1; // REDISMODULE_ERR
};
let _ = load_unsigned(rdb);
if when == raw::Aux::After as i32 {
// AFTER_RDB: All graphmeta keys are loaded. Finalize pending graphs.
finalize_pending_graphs();
}
0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate AFTER_RDB AUX decode failures.

Both AFTER_RDB branches ignore load_unsigned(rdb) errors and still return success. A truncated/corrupt AUX section would then continue into finalize_pending_graphs() on partial state instead of aborting the load.

Suggested fix
 unsafe extern "C" fn graphmeta_aux_load(
     rdb: *mut RedisModuleIO,
     _encver: i32,
     when: i32,
 ) -> i32 {
-    let _ = load_unsigned(rdb);
+    if load_unsigned(rdb).is_err() {
+        return 1;
+    }
     if when == raw::Aux::After as i32 {
         // AFTER_RDB: All graphmeta keys are loaded. Finalize pending graphs.
         finalize_pending_graphs();
     }
     0
 }
@@
     } else {
         // AFTER_RDB: Read placeholder, finalize pending multi-key graphs.
-        let _ = load_unsigned(rdb);
+        if load_unsigned(rdb).is_err() {
+            return 1;
+        }
         // Note: finalization may also happen in graphmeta_aux_load(AFTER_RDB)
         // if graphmeta keys are loaded after this callback.
         finalize_pending_graphs();
         0

Also applies to: 297-303

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/redis_type.rs` around lines 224 - 234, The AUX loader graphmeta_aux_load
ignores errors from load_unsigned(rdb) and always returns success; change
graphmeta_aux_load to check the result of load_unsigned(rdb) and propagate a
non-zero error/early-return if it fails instead of continuing to call
finalize_pending_graphs(), and apply the same fix to the other AUX handler that
also calls load_unsigned (the second occurrence in the file) so both handlers
validate load_unsigned's return and abort the load on failure.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/commands/query.rs (1)

76-88: Consider extracting version check logic into a helper function.

The version validation and error response logic is duplicated across three locations. This could be extracted into a helper function to reduce duplication and improve maintainability.

♻️ Suggested refactor
/// Returns `true` if validation passed or no version check was requested.
/// Returns `false` if version mismatch - response has been sent.
fn validate_schema_version(
    ctx: &Context,
    version_check: Option<u64>,
    current_schema_version: u64,
) -> bool {
    if let Some(provided_version) = version_check {
        if provided_version != current_schema_version {
            raw::reply_with_array(ctx.ctx, 2);
            let err_msg = CString::new("ERR invalid graph version").unwrap();
            raw::reply_with_error(ctx.ctx, err_msg.as_ptr());
            raw::reply_with_long_long(ctx.ctx, current_schema_version as i64);
            return false;
        }
    }
    true
}

Then at each call site:

let current_schema_version = graph.read().graph.read().borrow().schema_version;
if !validate_schema_version(ctx, version_check, current_schema_version) {
    return Ok(redis_module::RedisValue::NoReply);
}

Also applies to: 100-112, 121-132

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/commands/query.rs` around lines 76 - 88, Extract the duplicated version
validation and error-response logic into a helper like
validate_schema_version(...) that takes the request Context (ctx), the
Option<u64> version_check, and the current_schema_version (from
graph.read().graph.read().borrow().schema_version), and returns a bool
indicating success; move the raw::reply_with_array / raw::reply_with_error /
raw::reply_with_long_long calls into that helper so callers no longer duplicate
the response path, and replace each inline block (the one that drops
read_key/graph and returns NoReply) with: compute current_schema_version, call
validate_schema_version(ctx, version_check, current_schema_version) and if it
returns false then return Ok(redis_module::RedisValue::NoReply). Ensure the
helper does not touch read_key/graph drops (drop them at call site before
calling helper) and reuse the same CString error text.
graph/src/graph/graph.rs (2)

527-532: Consider optimizing node iteration in rebuild path.

The loop iterates 0..max_id and checks deleted_nodes.contains(id) for each ID. For graphs with high churn, this could be slow. An alternative would be to directly iterate the node IDs that are known to exist (e.g., from a decoded list or by iterating label matrices).

Since this only runs during RDB restore, the current approach is acceptable, but worth noting for future optimization if restore times become an issue.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graph.rs` around lines 527 - 532, The rebuild loop currently
iterates 0..max_id and checks deleted_nodes.contains(id) for each id (using
node_count, deleted_nodes, max_id and all_nodes_matrix.set), which is O(max_id)
and slow with high churn; instead, iterate only existing node IDs by sourcing
them from the decoded node list or by iterating the existing label/index
matrices used during RDB restore (or by iterating deleted_nodes complement if
you have a fast iterator), and call all_nodes_matrix.set(id, id, true) only for
those ids; update the rebuild code path that constructs the all_nodes_matrix to
use that direct iterator to avoid per-id contains checks.

2089-2089: Remove unnecessary let this = &self; rebindings.

self is already an immutable reference in this method. The rebinding to this adds no value.

♻️ Suggested diff
         EncodeState::Nodes => {
-            let this = &self;
             let count = p.count;
             let offset = p.offset;
-            this.node_attrs.set_encode_context(
-                &this.deleted_nodes,
-                this.max_node_id(),
+            self.node_attrs.set_encode_context(
+                &self.deleted_nodes,
+                self.max_node_id(),
                 &global_attrs,
             );
-            this.node_attrs.encode_with_range(w, count, offset);
+            self.node_attrs.encode_with_range(w, count, offset);
         }

(Similarly for EncodeState::Edges)

Also applies to: 2103-2103

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graph.rs` at line 2089, Remove the redundant rebinding "let
this = &self;" wherever it appears (e.g. in the method containing that line and
in EncodeState::Edges), since "self" is already an immutable reference; replace
any uses of "this" with "self" and delete the unnecessary let-binding to
simplify the code and avoid pointless aliases.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@graph/src/graph/graph.rs`:
- Around line 480-482: The restore logic currently recomputes schema_version as
"(node_labels.len() + relationship_types.len()) as u64" which loses runtime
increments done in MvccGraph::commit; instead persist schema_version in the RDB
payload: update the RDB serialization path to write the current schema_version
value (the variable named schema_version) and update the RDB
deserialization/restore path to read and assign that persisted schema_version
rather than recomputing it from node_labels/relationship_types; ensure the
serialized field is named/ordered consistently with existing RDB fields and that
MvccGraph::commit continues to increment the in-memory schema_version as before.

---

Nitpick comments:
In `@graph/src/graph/graph.rs`:
- Around line 527-532: The rebuild loop currently iterates 0..max_id and checks
deleted_nodes.contains(id) for each id (using node_count, deleted_nodes, max_id
and all_nodes_matrix.set), which is O(max_id) and slow with high churn; instead,
iterate only existing node IDs by sourcing them from the decoded node list or by
iterating the existing label/index matrices used during RDB restore (or by
iterating deleted_nodes complement if you have a fast iterator), and call
all_nodes_matrix.set(id, id, true) only for those ids; update the rebuild code
path that constructs the all_nodes_matrix to use that direct iterator to avoid
per-id contains checks.
- Line 2089: Remove the redundant rebinding "let this = &self;" wherever it
appears (e.g. in the method containing that line and in EncodeState::Edges),
since "self" is already an immutable reference; replace any uses of "this" with
"self" and delete the unnecessary let-binding to simplify the code and avoid
pointless aliases.

In `@src/commands/query.rs`:
- Around line 76-88: Extract the duplicated version validation and
error-response logic into a helper like validate_schema_version(...) that takes
the request Context (ctx), the Option<u64> version_check, and the
current_schema_version (from graph.read().graph.read().borrow().schema_version),
and returns a bool indicating success; move the raw::reply_with_array /
raw::reply_with_error / raw::reply_with_long_long calls into that helper so
callers no longer duplicate the response path, and replace each inline block
(the one that drops read_key/graph and returns NoReply) with: compute
current_schema_version, call validate_schema_version(ctx, version_check,
current_schema_version) and if it returns false then return
Ok(redis_module::RedisValue::NoReply). Ensure the helper does not touch
read_key/graph drops (drop them at call site before calling helper) and reuse
the same CString error text.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5432b004-e113-40ce-9d95-dcf8a473bf54

📥 Commits

Reviewing files that changed from the base of the PR and between 788c6f2 and 179e0be.

📒 Files selected for processing (11)
  • .devcontainer/Dockerfile
  • build/Dockerfile
  • flow_tests_done.txt
  • flow_tests_todo.txt
  • graph/src/graph/attribute_store.rs
  • graph/src/graph/graph.rs
  • graph/src/graph/mvcc_graph.rs
  • src/commands/config_cmd.rs
  • src/commands/query.rs
  • src/config.rs
  • tests/requirements.txt
💤 Files with no reviewable changes (1)
  • flow_tests_todo.txt
✅ Files skipped from review due to trivial changes (3)
  • tests/requirements.txt
  • build/Dockerfile
  • .devcontainer/Dockerfile
🚧 Files skipped from review as they are similar to previous changes (2)
  • flow_tests_done.txt
  • graph/src/graph/mvcc_graph.rs

Comment thread graph/src/graph/graph.rs
Comment on lines +480 to +482
let node_cap = node_count + deleted_nodes.len();
let relationship_cap = relationship_count + deleted_relationships.len();
let schema_version = (node_labels.len() + relationship_types.len()) as u64;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if schema_version is encoded/decoded in the RDB serialization
rg -n "schema_version" src/serializers/ -B 2 -A 2

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 55


🏁 Script executed:

# Find and examine encode_payload and schema_version usage
fd -e rs | xargs grep -l "encode_payload" | head -5

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 128


🏁 Script executed:

# Search for schema_version usage across the codebase
rg -n "schema_version" graph/src/ | head -30

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 750


🏁 Script executed:

# Check the restore function implementation
sed -n '456,511p' graph/src/graph/graph.rs

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 2215


🏁 Script executed:

# Check how schema_version is used - search for reads/comparisons beyond mvcc_graph
rg -n "schema_version" graph/src/ -A 2 -B 2 | grep -v "schema_version:" | head -50

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 2262


🏁 Script executed:

# Check encode_payload in encoder to see what is actually being persisted
rg -n "fn encode_payload" graph/src/graph/graph.rs -A 50 | head -80

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 2305


Persist schema_version in RDB format instead of recalculating it.

The restored schema_version is calculated as (node_labels.len() + relationship_types.len()) as u64 (line 482), but at runtime in MvccGraph::commit, schema_version is incremented only when the schema actually changes (labels, types, or attributes). This creates a semantic mismatch: a database that undergoes many schema changes will have a much higher schema_version at runtime than after RDB restore, which resets it to the cardinality of labels and types. Since schema_version is not currently serialized in the RDB format, it is lost during persistence.

To maintain consistency and correctness, encode and decode schema_version directly in the RDB payload.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graph.rs` around lines 480 - 482, The restore logic currently
recomputes schema_version as "(node_labels.len() + relationship_types.len()) as
u64" which loses runtime increments done in MvccGraph::commit; instead persist
schema_version in the RDB payload: update the RDB serialization path to write
the current schema_version value (the variable named schema_version) and update
the RDB deserialization/restore path to read and assign that persisted
schema_version rather than recomputing it from node_labels/relationship_types;
ensure the serialized field is named/ordered consistently with existing RDB
fields and that MvccGraph::commit continues to increment the in-memory
schema_version as before.

- Introduced a new command `GRAPH.EFFECT` to apply serialized effects for maintaining replica consistency.
- Enhanced the `Pending` struct to track schema baselines and effects count.
- Updated the `commit` operation to build an effects buffer before clearing pending data.
- Modified the `set` operation to account for created relationships when checking for deletions.
- Added logic to determine when to use effects replication based on execution time and effects count.
- Updated the `ThreadedGraph` to capture effects buffer during query execution.
- Adjusted tests to reflect the new behavior of effects replication and properties set.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/graph_core.rs (1)

388-402: ⚠️ Potential issue | 🔴 Critical

Commit the primary write before replicating it.

In the queued write path, replicate_effects() runs before graph.graph.commit(g). That exposes a publish-before-commit window where replicas can apply a mutation the primary has not made visible yet. query_sync() already uses the safer order, so the queued path should stay consistent with that.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/graph_core.rs` around lines 388 - 402, The queued-write path currently
calls replicate_effects(&ctx, &key_name, effects_buffer, &query) before
committing the graph, creating a publish-before-commit race; change the order so
graph.graph.commit(g) is called and completed (commit the primary write) before
calling replicate_effects, mirroring the order used in query_sync; update the
block that handles Ok((g, effects_buffer)) to call graph.graph.commit(g) (or the
appropriate commit method on g) while still holding the GIL/locks, then signal
the key and finally call replicate_effects with the same effects_buffer and
query to ensure replicas only see committed state.
graph/src/runtime/functions/mod.rs (1)

564-576: ⚠️ Potential issue | 🟠 Major

Default UDFs to non-deterministic unless you can prove otherwise.

GraphFn::new_udf() hard-codes non_deterministic: false, but these functions execute arbitrary JS. That means write queries containing UDF calls can still be replicated via GRAPH.QUERY even when the UDF reads time, randomness, or external state. Either default UDFs to non-deterministic or make determinism an explicit part of UDF registration.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/runtime/functions/mod.rs` around lines 564 - 576, GraphFn::new_udf
currently hardcodes non_deterministic: false but UDFs run arbitrary JS; change
the default to non_deterministic: true so new UDFs are treated non-deterministic
until explicitly marked otherwise. Update the new_udf constructor (the new_udf
function and the non_deterministic field) to set non_deterministic: true, and if
needed add or use an explicit API (e.g., a new_udf_with_determinism or a setter
on GraphFn) to allow callers to mark a UDF deterministic when they can prove it.
graph/src/runtime/functions/temporal.rs (1)

431-455: ⚠️ Potential issue | 🟠 Major

var_arg removes validation for these constructors.

Switching date(), localtime(), and localdatetime() to var_arg drops both arity and type validation because FnArguments::VarLength currently accepts any argument list in graph/src/runtime/functions/mod.rs. That means invalid calls can now hit the _ => unreachable!() arm, and extra arguments are ignored instead of rejected. If the goal is just to allow zero arguments, these need an explicit 0-or-1 argument check rather than the generic varargs registration.

Also applies to: 461-494, 498-525

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/runtime/functions/temporal.rs` around lines 431 - 455, The change
to var_arg removed arity/type validation so date(), localtime(), and
localdatetime() (functions date_fn, localtime_fn, localdatetime_fn) can receive
any number/types of args and hit the unreachable!() arm; restore explicit 0-or-1
argument validation instead of using FnArguments::VarLength (or update
FnArguments handling) by checking args.len() at the start of each *_fn and
returning a proper arity/type error when len()>1, and when len()==1 validate the
single Value is Map | String | Null before proceeding; ensure the same explicit
checks are applied to the other two function registrations referenced in the
comment.
🧹 Nitpick comments (2)
graph/src/graph/graph.rs (2)

2103-2109: Consider caching global_attrs to avoid repeated computation.

build_global_attrs() is called on every encode_payload invocation, which creates a new HashSet and iterates both attribute name collections each time. For multi-key encoding with many payload entries, this overhead adds up.

Consider computing global_attrs once in the encoder and passing it as a parameter to encode_payload.

♻️ Suggested signature change
     /// Encode a single payload entry.
     pub fn encode_payload(
         &self,
         w: &mut dyn Writer,
         p: &PayloadEntry,
+        global_attrs: &[Arc<String>],
     ) {
-        let global_attrs = self.build_global_attrs();
         match p.state {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graph.rs` around lines 2103 - 2109, The call to
build_global_attrs() inside encode_payload causes repeated allocation/iteration;
compute and cache the global_attrs once on the encoder (e.g., as a field on the
struct or computed once before batch encoding) and change encode_payload to
accept a reference to that cached set (or read the field) so
encode_payload(&self, w: &mut dyn Writer, p: &PayloadEntry, global_attrs:
&HashSet<String>) uses the precomputed set instead of calling
build_global_attrs() each time; update callers that invoke encode_payload
accordingly and remove the internal build_global_attrs() call from
encode_payload.

526-532: Consider using iterator over live nodes instead of range iteration.

The current approach iterates over all IDs from 0 to max_id and checks each against deleted_nodes. For large graphs with many deleted nodes, this is O(max_id). Consider using all_nodes_matrix or inverting the logic to iterate non-deleted IDs directly if a more efficient approach exists.

That said, since this runs only during RDB load, the current implementation is acceptable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graph.rs` around lines 526 - 532, The loop currently iterates
0..max_id and checks self.deleted_nodes.contains(id) for every id, which is
O(max_id); replace it with an iterator that yields only live node IDs (e.g. add
or use a method like live_node_ids() that yields ids not in self.deleted_nodes
or iterate over self.deleted_nodes' complement) and then call
self.all_nodes_matrix.set(id, id, true) for each yielded id; reference symbols:
self.node_count, self.deleted_nodes, max_id calculation, and
all_nodes_matrix.set.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@graph/src/planner/mod.rs`:
- Around line 279-325: The plan_is_non_deterministic function misses scan-owned
lookup expressions; update its match arms to also inspect the scan/seek/fulltext
IR variants (e.g., IR::NodeByIndexScan, IR::NodeByIdSeek /
IR::NodeByLabelAndIdScan, IR::NodeByFulltextScan) and run
expr_has_non_deterministic on their lookup/input expression fields (and any
delimiter/path fields they own) just like other arms do; reference
plan_is_non_deterministic and expr_has_non_deterministic when locating where to
add these checks so those operators contribute true when they contain
non-deterministic expressions.

In `@graph/src/runtime/ops/commit.rs`:
- Around line 73-84: The code currently builds and writes effects into
self.runtime.effects_buffer and updates self.runtime.effects_count before
calling pending.commit(), which can leave effects for failed commits; instead,
in the commit path (e.g., inside the method that calls pending.commit()), first
build effects into a local Vec via pending.build_effects_buffer(&self.runtime.g,
&mut local_buf), then call pending.commit(); only if pending.commit() returns
success append local_buf into self.runtime.effects_buffer
(get_or_insert_with(Vec::new)) and increment self.runtime.effects_count by
local_buf.len(); apply the same change for the similar block that touches
effects_buffer/effects_count (the second occurrence mentioned) so effects are
only persisted after a successful pending.commit().

In `@graph/src/runtime/pending.rs`:
- Around line 1005-1021: The deserialization code (e.g., read_string) uses
decoded lengths directly in arithmetic and allocations which allows integer
overflow or huge allocations from a crafted payload; change all places that read
a length (the u64->usize conversion and subsequent *offset + len checks and any
with_capacity(...) calls used when deserializing lists/vectors) to first verify
bounds with checked_add/checked_sub to ensure *offset + len does not overflow
and that enough bytes remain, enforce a sane maximum length before calling
with_capacity or allocating, and only update *offset after successful
validation; apply the same pattern for the vector/list parsing blocks that call
with_capacity to prevent trusting unvalidated lengths.
- Around line 673-684: effects_count() currently sums individual collections and
bitcounts but does not match how records are emitted in build_effects_buffer()
(which folds node/edge creation, labels, and attrs into single
EFFECT_CREATE_NODE/EFFECT_CREATE_EDGE records and also includes schema/attribute
additions); update effects_count to mirror build_effects_buffer's grouping logic
by counting one create record per created node/relationship (not per attr/label
bit), including counts for schema/attribute-add records that
build_effects_buffer emits, or alternatively rename effects_count to indicate
it's a heuristic; locate and update the effects_count function and use the same
grouping/iteration logic and helper functions/constants (e.g.,
build_effects_buffer, EFFECT_CREATE_NODE, EFFECT_CREATE_EDGE) so the returned
number matches the serialized buffer.

In `@tests/flow/graph_utils.py`:
- Around line 49-53: The constraint check in graph_eq was commented out causing
schema mismatches to be ignored; re-enable the constraints comparison in
graph_eq but guard it by detecting at runtime whether the connected environment
supports constraints (e.g., attempt the CALL db.constraints() query or a
feature-capability probe and catch/handle exceptions indicating unsupported
capability). If the probe/query succeeds, fetch and include the constraints
result in the equality comparison; if it fails with a known "unsupported" error,
skip the constraints check and log/annotate that constraints were not compared.
Update graph_eq (and any helper that collects schema info) to follow this
pattern so unsupported environments still pass safely while supported ones
enforce constraint equality.

---

Outside diff comments:
In `@graph/src/runtime/functions/mod.rs`:
- Around line 564-576: GraphFn::new_udf currently hardcodes non_deterministic:
false but UDFs run arbitrary JS; change the default to non_deterministic: true
so new UDFs are treated non-deterministic until explicitly marked otherwise.
Update the new_udf constructor (the new_udf function and the non_deterministic
field) to set non_deterministic: true, and if needed add or use an explicit API
(e.g., a new_udf_with_determinism or a setter on GraphFn) to allow callers to
mark a UDF deterministic when they can prove it.

In `@graph/src/runtime/functions/temporal.rs`:
- Around line 431-455: The change to var_arg removed arity/type validation so
date(), localtime(), and localdatetime() (functions date_fn, localtime_fn,
localdatetime_fn) can receive any number/types of args and hit the
unreachable!() arm; restore explicit 0-or-1 argument validation instead of using
FnArguments::VarLength (or update FnArguments handling) by checking args.len()
at the start of each *_fn and returning a proper arity/type error when len()>1,
and when len()==1 validate the single Value is Map | String | Null before
proceeding; ensure the same explicit checks are applied to the other two
function registrations referenced in the comment.

In `@src/graph_core.rs`:
- Around line 388-402: The queued-write path currently calls
replicate_effects(&ctx, &key_name, effects_buffer, &query) before committing the
graph, creating a publish-before-commit race; change the order so
graph.graph.commit(g) is called and completed (commit the primary write) before
calling replicate_effects, mirroring the order used in query_sync; update the
block that handles Ok((g, effects_buffer)) to call graph.graph.commit(g) (or the
appropriate commit method on g) while still holding the GIL/locks, then signal
the key and finally call replicate_effects with the same effects_buffer and
query to ensure replicas only see committed state.

---

Nitpick comments:
In `@graph/src/graph/graph.rs`:
- Around line 2103-2109: The call to build_global_attrs() inside encode_payload
causes repeated allocation/iteration; compute and cache the global_attrs once on
the encoder (e.g., as a field on the struct or computed once before batch
encoding) and change encode_payload to accept a reference to that cached set (or
read the field) so encode_payload(&self, w: &mut dyn Writer, p: &PayloadEntry,
global_attrs: &HashSet<String>) uses the precomputed set instead of calling
build_global_attrs() each time; update callers that invoke encode_payload
accordingly and remove the internal build_global_attrs() call from
encode_payload.
- Around line 526-532: The loop currently iterates 0..max_id and checks
self.deleted_nodes.contains(id) for every id, which is O(max_id); replace it
with an iterator that yields only live node IDs (e.g. add or use a method like
live_node_ids() that yields ids not in self.deleted_nodes or iterate over
self.deleted_nodes' complement) and then call self.all_nodes_matrix.set(id, id,
true) for each yielded id; reference symbols: self.node_count,
self.deleted_nodes, max_id calculation, and all_nodes_matrix.set.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: eff5619d-e45b-4b3e-b6b8-744e35bafd8f

📥 Commits

Reviewing files that changed from the base of the PR and between 179e0be and 6c29a53.

📒 Files selected for processing (19)
  • graph/src/graph/graph.rs
  • graph/src/planner/mod.rs
  • graph/src/planner/optimizer/utilize_node_by_id.rs
  • graph/src/runtime/functions/aggregation.rs
  • graph/src/runtime/functions/conversion.rs
  • graph/src/runtime/functions/math.rs
  • graph/src/runtime/functions/mod.rs
  • graph/src/runtime/functions/procedures.rs
  • graph/src/runtime/functions/temporal.rs
  • graph/src/runtime/ops/commit.rs
  • graph/src/runtime/ops/set.rs
  • graph/src/runtime/pending.rs
  • graph/src/runtime/runtime.rs
  • src/commands/effect.rs
  • src/commands/mod.rs
  • src/graph_core.rs
  • src/lib.rs
  • tests/flow/graph_utils.py
  • tests/flow/test_effects.py
✅ Files skipped from review due to trivial changes (1)
  • src/lib.rs

Comment thread graph/src/planner/mod.rs
Comment thread graph/src/runtime/ops/commit.rs Outdated
Comment thread graph/src/runtime/pending.rs
Comment on lines +1005 to +1021
pub fn read_string(
buf: &[u8],
offset: &mut usize,
) -> Result<Arc<String>, String> {
if *offset + 8 > buf.len() {
return Err("effects buffer truncated".to_string());
}
let len = u64::from_le_bytes(buf[*offset..*offset + 8].try_into().unwrap()) as usize;
*offset += 8;
if *offset + len > buf.len() {
return Err("effects buffer truncated".to_string());
}
let s = std::str::from_utf8(&buf[*offset..*offset + len])
.map_err(|e| format!("invalid utf8 in effects buffer: {e}"))?;
*offset += len;
Ok(Arc::new(s.to_string()))
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Validate decoded lengths before using them for arithmetic or allocation.

Line 1014 does *offset + len unchecked, so a crafted string length can overflow and panic. Lines 1091 and 1119 also trust serialized list/vector lengths for with_capacity(...) before proving enough bytes remain, which can force huge allocations from a malformed effects payload. Use checked_add/remaining-byte bounds first, and only reserve after validating a sane upper bound.

Also applies to: 1090-1131

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/runtime/pending.rs` around lines 1005 - 1021, The deserialization
code (e.g., read_string) uses decoded lengths directly in arithmetic and
allocations which allows integer overflow or huge allocations from a crafted
payload; change all places that read a length (the u64->usize conversion and
subsequent *offset + len checks and any with_capacity(...) calls used when
deserializing lists/vectors) to first verify bounds with checked_add/checked_sub
to ensure *offset + len does not overflow and that enough bytes remain, enforce
a sane maximum length before calling with_capacity or allocating, and only
update *offset after successful validation; apply the same pattern for the
vector/list parsing blocks that call with_capacity to prevent trusting
unvalidated lengths.

Comment thread tests/flow/graph_utils.py
Comment on lines +49 to +53
# TODO: enable once constraints are supported
# ('constraints', """CALL db.constraints()
# YIELD type, label, properties, entitytype, status
# RETURN type, label, properties, entitytype, status
# ORDER BY type, label, properties, entitytype, status""")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not fully disable constraint equality checks in graph_eq.

At Line 49-53, commenting out constraints allows false positives (graphs may compare equal while schema constraints differ). Keep this validation, but guard it with runtime capability detection so unsupported environments still pass safely.

Suggested patch
 def graph_eq(A, B):
+    def supports_constraints(graph):
+        try:
+            graph.ro_query("CALL db.constraints() YIELD type RETURN type LIMIT 1")
+            return True
+        except Exception:
+            return False
+
     queries = [
@@
-            # constraints
-            # TODO: enable once constraints are supported
-            # ('constraints', """CALL db.constraints()
-            #    YIELD type, label, properties, entitytype, status
-            #    RETURN type, label, properties, entitytype, status
-            #    ORDER BY type, label, properties, entitytype, status""")
             ]
+
+    if supports_constraints(A) and supports_constraints(B):
+        queries.append(
+            ('constraints', """CALL db.constraints()
+               YIELD type, label, properties, entitytype, status
+               RETURN type, label, properties, entitytype, status
+               ORDER BY type, label, properties, entitytype, status""")
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# TODO: enable once constraints are supported
# ('constraints', """CALL db.constraints()
# YIELD type, label, properties, entitytype, status
# RETURN type, label, properties, entitytype, status
# ORDER BY type, label, properties, entitytype, status""")
def graph_eq(A, B):
def supports_constraints(graph):
try:
graph.ro_query("CALL db.constraints() YIELD type RETURN type LIMIT 1")
return True
except Exception:
return False
queries = [
# ... other queries ...
]
if supports_constraints(A) and supports_constraints(B):
queries.append(
('constraints', """CALL db.constraints()
YIELD type, label, properties, entitytype, status
RETURN type, label, properties, entitytype, status
ORDER BY type, label, properties, entitytype, status""")
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/flow/graph_utils.py` around lines 49 - 53, The constraint check in
graph_eq was commented out causing schema mismatches to be ignored; re-enable
the constraints comparison in graph_eq but guard it by detecting at runtime
whether the connected environment supports constraints (e.g., attempt the CALL
db.constraints() query or a feature-capability probe and catch/handle exceptions
indicating unsupported capability). If the probe/query succeeds, fetch and
include the constraints result in the equality comparison; if it fails with a
known "unsupported" error, skip the constraints check and log/annotate that
constraints were not compared. Update graph_eq (and any helper that collects
schema info) to follow this pattern so unsupported environments still pass
safely while supported ones enforce constraint equality.

@AviAvni
Copy link
Copy Markdown
Contributor Author

AviAvni commented Apr 13, 2026

@coderabbitai resume

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 13, 2026

✅ Actions performed

Reviews resumed.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
graph/src/runtime/functions/mod.rs (1)

563-576: ⚠️ Potential issue | 🟠 Major

Treat dynamic UDFs as non-deterministic by default.

new_udf() hardcodes non_deterministic: false, but these functions execute arbitrary JS via call_udf_bridge. That lets write queries using UDFs bypass the new planner check and fall back to query replay even when the UDF reads time, randomness, or other ambient state.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/runtime/functions/mod.rs` around lines 563 - 576, The new_udf
constructor currently sets non_deterministic: false but UDFs execute arbitrary
JS via call_udf_bridge, so update new_udf to mark these dynamic UDFs
non-deterministic by default: set the struct field non_deterministic to true in
the return value of new_udf (in the same block that sets name, func, write,
args_type, fn_type, ret_type) so UDFs are treated as non-deterministic by the
planner; ensure you only change the non_deterministic field in new_udf and
run/update any tests relying on its previous value.
graph/src/runtime/functions/temporal.rs (1)

431-526: ⚠️ Potential issue | 🟠 Major

Reject extra arguments for these 0-or-1 arg temporal constructors.

Switching date, localtime, and localdatetime to add_var_len removes the upper-bound arity check, so calls like date(x, y) now succeed and silently ignore everything after the first argument. Either keep fixed arity with optional support, or explicitly error once iter.next() yields a second value.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/runtime/functions/temporal.rs` around lines 431 - 526, The temporal
constructors date_fn, localtime_fn, and localdatetime_fn currently accept any
extra arguments because they use var_arg; after processing the first argument
(or the zero-arg case) explicitly check if iter.next().is_some() and return an
argument-arity error instead of silently ignoring extras—update each of date_fn,
localtime_fn and localdatetime_fn to validate no additional arguments remain (or
switch to a fixed 0-or-1 arity) and return a clear runtime error when a second
argument is supplied.
♻️ Duplicate comments (4)
src/lib.rs (1)

78-78: ⚠️ Potential issue | 🟠 Major

Do not register graph.DEBUG unconditionally.

This exposes internal persistence/debug controls in normal builds. Please gate the command behind an explicit admin/test-only switch so production deployments never advertise it by default.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib.rs` at line 78, The registration of the "graph.DEBUG" command is
unconditional; make it admin/test-only by gating it behind an explicit flag or
feature so production builds don't advertise it. Add a runtime config flag
(e.g., enable_debug_commands) or a Cargo feature (e.g., "admin_debug") and
change the place where the command array entry is created so the ["graph.DEBUG",
graph_debug, ...] item is only added when that flag/feature is enabled (or wrap
the registration in an if-check or cfg(feature = "...") block). Ensure the check
references the existing graph_debug symbol so the command is omitted entirely
when the switch is off.
graph/src/planner/mod.rs (1)

279-325: ⚠️ Potential issue | 🟠 Major

Non-determinism detection still misses scan-owned expressions.

plan_is_non_deterministic() ignores NodeByIndexScan.query, NodeByFulltextScan.{label,query}, NodeByIdSeek / NodeByLabelAndIdScan filters, and CondVarLenTraverse.edge_filter. A write query using rand() / timestamp() there can still be treated as deterministic and replayed via GRAPH.QUERY, which risks replica divergence.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/planner/mod.rs` around lines 279 - 325, plan_is_non_deterministic
currently skips expressions owned by scan/traverse nodes; update its match to
also inspect scan/traverse-specific expression fields (using the existing
helpers expr_has_non_deterministic, set_item_has_non_deterministic,
query_graph_has_non_deterministic) for the IR variants: NodeByIndexScan (check
query), NodeByFulltextScan (check label and query), NodeByIdSeek and
NodeByLabelAndIdScan (check any filter expressions), and CondVarLenTraverse
(check edge_filter). Add arms or extend the existing IR match branches to call
the appropriate helper on those fields so any rand()/timestamp() in
scan/traverse-owned expressions marks the plan non-deterministic.
graph/src/runtime/pending.rs (2)

1005-1175: ⚠️ Potential issue | 🔴 Critical

Validate decoded lengths before using them in arithmetic or allocations.

read_string() still does *offset + len unchecked, and the list/vector branches reserve capacity from untrusted lengths before proving the buffer contains enough data. A malformed effects payload can still panic or trigger huge allocations on replicas.

Suggested hardening
-    let len = u64::from_le_bytes(buf[*offset..*offset + 8].try_into().unwrap()) as usize;
+    let len = usize::try_from(u64::from_le_bytes(
+        buf[*offset..*offset + 8].try_into().unwrap(),
+    ))
+    .map_err(|_| "effects buffer length too large".to_string())?;
     *offset += 8;
-    if *offset + len > buf.len() {
+    let end = (*offset)
+        .checked_add(len)
+        .filter(|end| *end <= buf.len())
+        .ok_or_else(|| "effects buffer truncated".to_string())?;
+    if end > buf.len() {
         return Err("effects buffer truncated".to_string());
     }
-    let s = std::str::from_utf8(&buf[*offset..*offset + len])
+    let s = std::str::from_utf8(&buf[*offset..end])
         .map_err(|e| format!("invalid utf8 in effects buffer: {e}"))?;
-    *offset += len;
+    *offset = end;
         VALUE_LIST => {
             let len = read_u64(buf, offset)? as usize;
+            let remaining = buf.len().saturating_sub(*offset);
+            if len > remaining {
+                return Err("effects buffer truncated".to_string());
+            }
             let mut items = thin_vec::ThinVec::with_capacity(len);
             for _ in 0..len {
                 items.push(read_value(buf, offset)?);
             }
             Ok(Value::List(Arc::new(items)))
@@
         VALUE_VECF32 => {
             let len = read_u64(buf, offset)? as usize;
+            len.checked_mul(4)
+                .filter(|bytes| *bytes <= buf.len().saturating_sub(*offset))
+                .ok_or_else(|| "effects buffer truncated".to_string())?;
             let mut v = Vec::with_capacity(len);
             for _ in 0..len {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/runtime/pending.rs` around lines 1005 - 1175, The deserialization
routines (read_string, read_value and branches handling VALUE_LIST,
VALUE_VECF32, etc.) use lengths read from the buffer without robust validation
and perform arithmetic/allocation with them, which can overflow or cause huge
allocations; fix by validating decoded lengths before any offset arithmetic or
allocations: in read_string check len using checked_add or compare len <=
buf.len().saturating_sub(*offset) before updating *offset or creating the
string; in read_value, for VALUE_LIST and VALUE_VECF32 read the element
count/length then ensure it is bounded (e.g., len <=
buf.len().saturating_sub(*offset) and below a sane max) before reserving
capacity or looping, and use checked_add/checked_mul for any offset+size
computations to avoid integer overflow/panics. Ensure every path that converts a
length into an allocation or uses it to advance *offset performs these checks
(references: read_string, read_value, VALUE_LIST, VALUE_VECF32, and any other
tag handling multi-byte sequences).

673-684: ⚠️ Potential issue | 🟠 Major

Make effects_count() mirror the records you actually serialize.

This still counts raw map entries and matrix bits, while build_effects_buffer() emits grouped per-entity records and additional schema/attribute-add records. The reported count can therefore drift from the real buffer contents and any guard/threshold logic that relies on it.

As per coding guidelines, "Pending mutations (runtime/pending.rs) must implement batched mutations for transactional semantics."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/runtime/pending.rs` around lines 673 - 684, effects_count()
currently tallies raw entries and matrix bits (created_nodes,
created_relationships, deleted_nodes, deleted_relationships, set_nodes_attrs,
set_relationships_attrs, set_node_labels.nvals(), remove_node_labels.nvals())
which diverges from the actual records emitted by build_effects_buffer(); update
effects_count() to mirror build_effects_buffer()’s grouping logic: count one
record per created/deleted entity (each key in
created_nodes/created_relationships/deleted_nodes/deleted_relationships), count
one per-entity attribute-set record for each entity key in set_nodes_attrs and
set_relationships_attrs plus any additional per-attribute-key
"attribute-add/schema" records that build_effects_buffer emits (use the same
attribute-key collection logic), and count per-entity label add/remove records
as emitted, and to avoid drift extract/shared the grouping/counting logic into a
helper (or reuse the serializer helper used by build_effects_buffer()) so
effects_count() computes the exact number of serialized records instead of raw
map/matrix sizes.
🧹 Nitpick comments (1)
graph/src/graph/graph.rs (1)

2103-2159: Consider caching build_global_attrs() result for multi-payload encoding.

build_global_attrs() is called at the start of encode_payload(), which according to the encoder (context snippet 3, lines 41-44) is invoked in a loop for each PayloadEntry. For multi-key RDB scenarios with many payload entries, this rebuilds the same attribute list repeatedly.

Consider computing global_attrs once in the caller and passing it as a parameter, or caching it within the encoding session.

♻️ Suggested optimization
     /// Encode a single payload entry.
     pub fn encode_payload(
         &self,
         w: &mut dyn Writer,
         p: &PayloadEntry,
+        global_attrs: &[Arc<String>],
     ) {
-        let global_attrs = self.build_global_attrs();
         match p.state {
             EncodeState::Nodes => {
                 let this = &self;
                 let count = p.count;
                 let offset = p.offset;
                 this.node_attrs.set_encode_context(
                     &this.deleted_nodes,
                     this.max_node_id(),
-                    &global_attrs,
+                    global_attrs,
                 );

Then in the encoder, compute once before the loop:

let global_attrs = graph.build_global_attrs();
for p in payloads {
    graph.encode_payload(&mut w, p, &global_attrs);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graph.rs` around lines 2103 - 2159, The code currently calls
build_global_attrs() inside encode_payload() on every PayloadEntry which
rebuilds the same data for multi-payload loops; modify encode_payload to accept
a precomputed reference to global attributes (e.g., change signature to
encode_payload(&self, w: &mut dyn Writer, p: &PayloadEntry, global_attrs:
&GlobalAttrsType) or similar) and remove the internal call to
build_global_attrs(), then update the encoder loop to compute let global_attrs =
self.build_global_attrs() once and pass &global_attrs into each encode_payload
call (alternatively, add a short-lived cache field on the encoder/session and
have encode_payload consult it), ensuring you update usages of
node_attrs.set_encode_context and relationship_attrs.set_encode_context to use
the passed/cached global_attrs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@graph/src/runtime/functions/temporal.rs`:
- Around line 558-594: The three temporal functions (date_transaction_fn,
localtime_transaction_fn, localdatetime_transaction_fn) currently call
Utc::now() on each invocation; change them to read and reuse a
transaction-scoped clock captured at the start of the transaction/query from the
runtime context (the first function parameter currently ignored), e.g. read
ctx.transaction_start_time or ctx.clock; if not present, compute it once, store
it into the context for the transaction, and derive the date/time/datetime
values from that single timestamp so every call in the same transaction returns
the same value.

In `@src/commands/effect.rs`:
- Around line 48-59: Currently you persist a newly created ThreadedGraph into
the key store immediately (via open_key_writable, set_value with GRAPH_TYPE and
ThreadedGraph::new) before calling apply_effects(), so if apply_effects() or
decoding fails the empty key remains; change the flow to create the
Arc<RwLock<ThreadedGraph>> locally and only call key.set_value(&GRAPH_TYPE,
g.clone()) after apply_effects() returns successfully, preserving the existing
behavior of rollback() on error; apply the same change for the second occurrence
around the alternate/new-graph creation block (the code referenced in the
comment for lines 76-90).

---

Outside diff comments:
In `@graph/src/runtime/functions/mod.rs`:
- Around line 563-576: The new_udf constructor currently sets non_deterministic:
false but UDFs execute arbitrary JS via call_udf_bridge, so update new_udf to
mark these dynamic UDFs non-deterministic by default: set the struct field
non_deterministic to true in the return value of new_udf (in the same block that
sets name, func, write, args_type, fn_type, ret_type) so UDFs are treated as
non-deterministic by the planner; ensure you only change the non_deterministic
field in new_udf and run/update any tests relying on its previous value.

In `@graph/src/runtime/functions/temporal.rs`:
- Around line 431-526: The temporal constructors date_fn, localtime_fn, and
localdatetime_fn currently accept any extra arguments because they use var_arg;
after processing the first argument (or the zero-arg case) explicitly check if
iter.next().is_some() and return an argument-arity error instead of silently
ignoring extras—update each of date_fn, localtime_fn and localdatetime_fn to
validate no additional arguments remain (or switch to a fixed 0-or-1 arity) and
return a clear runtime error when a second argument is supplied.

---

Duplicate comments:
In `@graph/src/planner/mod.rs`:
- Around line 279-325: plan_is_non_deterministic currently skips expressions
owned by scan/traverse nodes; update its match to also inspect
scan/traverse-specific expression fields (using the existing helpers
expr_has_non_deterministic, set_item_has_non_deterministic,
query_graph_has_non_deterministic) for the IR variants: NodeByIndexScan (check
query), NodeByFulltextScan (check label and query), NodeByIdSeek and
NodeByLabelAndIdScan (check any filter expressions), and CondVarLenTraverse
(check edge_filter). Add arms or extend the existing IR match branches to call
the appropriate helper on those fields so any rand()/timestamp() in
scan/traverse-owned expressions marks the plan non-deterministic.

In `@graph/src/runtime/pending.rs`:
- Around line 1005-1175: The deserialization routines (read_string, read_value
and branches handling VALUE_LIST, VALUE_VECF32, etc.) use lengths read from the
buffer without robust validation and perform arithmetic/allocation with them,
which can overflow or cause huge allocations; fix by validating decoded lengths
before any offset arithmetic or allocations: in read_string check len using
checked_add or compare len <= buf.len().saturating_sub(*offset) before updating
*offset or creating the string; in read_value, for VALUE_LIST and VALUE_VECF32
read the element count/length then ensure it is bounded (e.g., len <=
buf.len().saturating_sub(*offset) and below a sane max) before reserving
capacity or looping, and use checked_add/checked_mul for any offset+size
computations to avoid integer overflow/panics. Ensure every path that converts a
length into an allocation or uses it to advance *offset performs these checks
(references: read_string, read_value, VALUE_LIST, VALUE_VECF32, and any other
tag handling multi-byte sequences).
- Around line 673-684: effects_count() currently tallies raw entries and matrix
bits (created_nodes, created_relationships, deleted_nodes,
deleted_relationships, set_nodes_attrs, set_relationships_attrs,
set_node_labels.nvals(), remove_node_labels.nvals()) which diverges from the
actual records emitted by build_effects_buffer(); update effects_count() to
mirror build_effects_buffer()’s grouping logic: count one record per
created/deleted entity (each key in
created_nodes/created_relationships/deleted_nodes/deleted_relationships), count
one per-entity attribute-set record for each entity key in set_nodes_attrs and
set_relationships_attrs plus any additional per-attribute-key
"attribute-add/schema" records that build_effects_buffer emits (use the same
attribute-key collection logic), and count per-entity label add/remove records
as emitted, and to avoid drift extract/shared the grouping/counting logic into a
helper (or reuse the serializer helper used by build_effects_buffer()) so
effects_count() computes the exact number of serialized records instead of raw
map/matrix sizes.

In `@src/lib.rs`:
- Line 78: The registration of the "graph.DEBUG" command is unconditional; make
it admin/test-only by gating it behind an explicit flag or feature so production
builds don't advertise it. Add a runtime config flag (e.g.,
enable_debug_commands) or a Cargo feature (e.g., "admin_debug") and change the
place where the command array entry is created so the ["graph.DEBUG",
graph_debug, ...] item is only added when that flag/feature is enabled (or wrap
the registration in an if-check or cfg(feature = "...") block). Ensure the check
references the existing graph_debug symbol so the command is omitted entirely
when the switch is off.

---

Nitpick comments:
In `@graph/src/graph/graph.rs`:
- Around line 2103-2159: The code currently calls build_global_attrs() inside
encode_payload() on every PayloadEntry which rebuilds the same data for
multi-payload loops; modify encode_payload to accept a precomputed reference to
global attributes (e.g., change signature to encode_payload(&self, w: &mut dyn
Writer, p: &PayloadEntry, global_attrs: &GlobalAttrsType) or similar) and remove
the internal call to build_global_attrs(), then update the encoder loop to
compute let global_attrs = self.build_global_attrs() once and pass &global_attrs
into each encode_payload call (alternatively, add a short-lived cache field on
the encoder/session and have encode_payload consult it), ensuring you update
usages of node_attrs.set_encode_context and
relationship_attrs.set_encode_context to use the passed/cached global_attrs.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 13c67170-4ec7-46c8-82ec-637ac85abc7d

📥 Commits

Reviewing files that changed from the base of the PR and between 179e0be and 6c29a53.

📒 Files selected for processing (19)
  • graph/src/graph/graph.rs
  • graph/src/planner/mod.rs
  • graph/src/planner/optimizer/utilize_node_by_id.rs
  • graph/src/runtime/functions/aggregation.rs
  • graph/src/runtime/functions/conversion.rs
  • graph/src/runtime/functions/math.rs
  • graph/src/runtime/functions/mod.rs
  • graph/src/runtime/functions/procedures.rs
  • graph/src/runtime/functions/temporal.rs
  • graph/src/runtime/ops/commit.rs
  • graph/src/runtime/ops/set.rs
  • graph/src/runtime/pending.rs
  • graph/src/runtime/runtime.rs
  • src/commands/effect.rs
  • src/commands/mod.rs
  • src/graph_core.rs
  • src/lib.rs
  • tests/flow/graph_utils.py
  • tests/flow/test_effects.py
✅ Files skipped from review due to trivial changes (2)
  • graph/src/runtime/functions/procedures.rs
  • graph/src/runtime/functions/math.rs
🚧 Files skipped from review as they are similar to previous changes (6)
  • graph/src/runtime/ops/commit.rs
  • graph/src/runtime/functions/aggregation.rs
  • tests/flow/graph_utils.py
  • graph/src/runtime/ops/set.rs
  • tests/flow/test_effects.py
  • src/graph_core.rs

Comment thread graph/src/runtime/functions/temporal.rs
Comment thread src/commands/effect.rs
Comment on lines +48 to +59
// Open existing graph or create a new one
let key = ctx.open_key_writable(&key_str);
let graph = if let Some(g) = key.get_value::<Arc<RwLock<ThreadedGraph>>>(&GRAPH_TYPE)? {
g.clone()
} else {
let g = Arc::new(RwLock::new(ThreadedGraph::new(
*CONFIGURATION_CACHE_SIZE.lock(ctx) as usize,
&key_str.to_string(),
)));
key.set_value(&GRAPH_TYPE, g.clone())?;
g
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Defer persisting a newly created graph until the effects buffer has applied successfully.

Lines 53-57 write a brand-new ThreadedGraph into Redis before apply_effects() runs. If decoding or mutation application fails later, rollback() only reverts the graph transaction; the new empty key remains stored, so the replica diverges from the primary on the error path.

Possible fix
-    let graph = if let Some(g) = key.get_value::<Arc<RwLock<ThreadedGraph>>>(&GRAPH_TYPE)? {
+    let mut created_new = false;
+    let graph = if let Some(g) = key.get_value::<Arc<RwLock<ThreadedGraph>>>(&GRAPH_TYPE)? {
         g.clone()
     } else {
+        created_new = true;
         let g = Arc::new(RwLock::new(ThreadedGraph::new(
             *CONFIGURATION_CACHE_SIZE.lock(ctx) as usize,
             &key_str.to_string(),
         )));
-        key.set_value(&GRAPH_TYPE, g.clone())?;
         g
     };
@@
         Ok(()) => {
             tg.graph.commit(g_arc);
+            if created_new {
+                key.set_value(&GRAPH_TYPE, graph.clone())?;
+            }
             let value = tg.graph.read().borrow().maybe_flush_caches();
             if let Err(e) = value {
                 eprintln!("FalkorDB: cache flush failed: {e}");

Also applies to: 76-90

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/commands/effect.rs` around lines 48 - 59, Currently you persist a newly
created ThreadedGraph into the key store immediately (via open_key_writable,
set_value with GRAPH_TYPE and ThreadedGraph::new) before calling
apply_effects(), so if apply_effects() or decoding fails the empty key remains;
change the flow to create the Arc<RwLock<ThreadedGraph>> locally and only call
key.set_value(&GRAPH_TYPE, g.clone()) after apply_effects() returns
successfully, preserving the existing behavior of rollback() on error; apply the
same change for the second occurrence around the alternate/new-graph creation
block (the code referenced in the comment for lines 76-90).

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
graph/src/runtime/pending.rs (1)

1007-1023: ⚠️ Potential issue | 🟠 Major

Add bounds checks before arithmetic and allocation to prevent overflow/DoS.

The deserialization functions trust serialized lengths without validation:

  1. Line 1014: *offset + len could overflow on 32-bit platforms when len is large
  2. Lines 1094, 1122: with_capacity(len) uses untrusted lengths, enabling memory exhaustion attacks
Suggested fix for read_string
 pub fn read_string(
     buf: &[u8],
     offset: &mut usize,
 ) -> Result<Arc<String>, String> {
     if *offset + 8 > buf.len() {
         return Err("effects buffer truncated".to_string());
     }
     let len = u64::from_le_bytes(buf[*offset..*offset + 8].try_into().unwrap()) as usize;
     *offset += 8;
+    // Bounds check: ensure offset + len doesn't overflow and fits in buffer
+    let end = offset.checked_add(len).ok_or("length overflow")?;
+    if end > buf.len() {
-    if *offset + len > buf.len() {
         return Err("effects buffer truncated".to_string());
     }

Also applies to: 1092-1098, 1120-1133

🧹 Nitpick comments (4)
graph/src/graph/graph.rs (1)

2132-2189: Consider handling all EncodeState variants explicitly.

The encode_payload method uses a catch-all _ => {} for unhandled EncodeState variants. While this may be intentional if some states are handled elsewhere, it silently ignores unknown states. Consider explicitly listing all variants or adding a debug assertion for unexpected states.

Suggested improvement
             EncodeState::AdjMatrix => self.adjacancy_matrix.encode(w),
             EncodeState::LblsMatrix => self.node_labels_matrix.encode(w),
-            _ => {}
+            EncodeState::Done | EncodeState::Header | EncodeState::Schema => {
+                // Handled elsewhere or no-op for this encoding path
+            }
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graph.rs` around lines 2132 - 2189, The encode_payload
function currently swallows unhandled EncodeState variants via a catch-all `_ =>
{}`; update encode_payload to explicitly match every EncodeState variant (or, if
some are intentionally no-ops, list them with comments) and replace the `_` arm
with either explicit handling or a debug/assertion (e.g., panic!/debug_assert!)
to surface unexpected states; refer to the encode_payload method and the
EncodeState enum to enumerate each variant (Nodes, DeletedNodes, Edges,
DeletedEdges, LabelsMatrices, RelationMatrices, AdjMatrix, LblsMatrix, etc.) and
ensure each is either handled or explicitly marked as unreachable.
src/graph_core.rs (2)

371-375: Replication occurs after commit in MULTI/EXEC path.

In query_sync, the graph is committed (line 372) before replication (line 374). If replication fails after commit, the local state and replicas diverge. This matches the existing pattern elsewhere, but the risk should be documented.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/graph_core.rs` around lines 371 - 375, The MULTI/EXEC path in query_sync
currently calls g.graph.commit(new_graph) before replicate_effects, which can
leave local state committed if replication fails; update query_sync to perform
replication of effects_buffer (call replicate_effects(ctx, &key_name,
effects_buffer, query)) before calling g.graph.commit(new_graph), or implement a
compensating rollback mechanism if replication fails, ensuring g.graph.commit
and replicate_effects are atomic with respect to each other; adjust error
handling so replication failures prevent commit (or roll back committed state)
and surface errors appropriately.

451-479: Clarify the threshold heuristic and edge case handling.

The should_use_effects logic:

  1. Always uses effects for non-deterministic queries (correct)
  2. Returns None if threshold is non-zero and n_effects == 0 (correct)
  3. Computes avg_mod_time_us = (exec_time_ms / n_effects) * 1000.0

However, if n_effects is derived from the inaccurate effects_count() (see pending.rs comment), the threshold comparison may not behave as intended. Also, when threshold == 0, effects are always used regardless of buffer content.

Consider documenting the intended behavior of threshold == 0 (always use effects) vs threshold > 0 (use effects only when average modification time exceeds threshold).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/graph_core.rs` around lines 451 - 479, In should_use_effects, clarify and
harden the threshold heuristic: add a brief doc comment above the function
explaining that threshold == 0 means "always use effects when a non-empty
effects_buffer exists" while threshold > 0 uses the average modification time
heuristic; explicitly handle the n_effects == 0 case before doing the division
(avoid computing avg_mod_time_us when effects_count is zero), ensure the
empty-buffer check (runtime.effects_buffer.borrow_mut().take()) is respected
even when threshold == 0, and keep the non-deterministic early-true branch;
reference the function name should_use_effects and the variables threshold,
n_effects, exec_time_ms, and runtime.effects_buffer to find and update the logic
and add the explanatory comment.
src/commands/effect.rs (1)

270-282: Index creation uses sync variant but population may block the command.

create_index_sync is called for replicated index creation (line 280), and then populate_indexes_sync is called at line 304 if any index operations occurred. This blocks the GRAPH.EFFECT command until all indexes are fully populated, which could cause replica lag for large datasets.

Consider whether async population is acceptable for replicas, or document that index replication may cause temporary lag.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/commands/effect.rs` around lines 270 - 282, The code currently calls
g.create_index_sync(...) for replicated index creation and then relies on
populate_indexes_sync(...) when has_index_ops is true, which blocks the
GRAPH.EFFECT command until full population; change this to avoid blocking
replicas by either (A) using the async/index-creation variant (e.g.,
g.create_index or a create_index_async helper) and scheduling background
population instead of create_index_sync, or (B) keep create_index_sync but
remove/populate_indexes_sync call and instead kick off an asynchronous/populate
task (spawn a background job) that sets index state when complete; locate the
calls to create_index_sync, populate_indexes_sync, the has_index_ops flag, and
the GRAPH.EFFECT handling in effect.rs to implement the non-blocking approach
you choose and ensure any error handling/logging reflects asynchronous start
failures.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@graph/src/graph/graph.rs`:
- Around line 917-921: The methods declared as pub const fn
inc_reserved_node_count(&mut self) and pub const fn
inc_reserved_relationship_count(&mut self) are invalid because const fn cannot
take &mut self; remove the const keyword from both declarations so they become
regular mutating methods (pub fn inc_reserved_node_count(&mut self) and pub fn
inc_reserved_relationship_count(&mut self)), keeping their bodies unchanged;
update both function signatures accordingly where they are defined so calls in
effect.rs continue to work.

In `@graph/src/runtime/pending.rs`:
- Around line 1000-1005: The current write_value function silently falls back to
pushing VALUE_NULL for unsupported Value variants which can corrupt replicas;
change write_value to return a Result<(), String> (or a domain error type),
remove the unconditional debug_assert! fallback branch, and instead return an
Err describing the unsupported variant (include the variant debug string);
update all callers of write_value (e.g., where effects buffer is constructed) to
propagate or handle the error so unsupported types are rejected rather than
written as VALUE_NULL.

---

Nitpick comments:
In `@graph/src/graph/graph.rs`:
- Around line 2132-2189: The encode_payload function currently swallows
unhandled EncodeState variants via a catch-all `_ => {}`; update encode_payload
to explicitly match every EncodeState variant (or, if some are intentionally
no-ops, list them with comments) and replace the `_` arm with either explicit
handling or a debug/assertion (e.g., panic!/debug_assert!) to surface unexpected
states; refer to the encode_payload method and the EncodeState enum to enumerate
each variant (Nodes, DeletedNodes, Edges, DeletedEdges, LabelsMatrices,
RelationMatrices, AdjMatrix, LblsMatrix, etc.) and ensure each is either handled
or explicitly marked as unreachable.

In `@src/commands/effect.rs`:
- Around line 270-282: The code currently calls g.create_index_sync(...) for
replicated index creation and then relies on populate_indexes_sync(...) when
has_index_ops is true, which blocks the GRAPH.EFFECT command until full
population; change this to avoid blocking replicas by either (A) using the
async/index-creation variant (e.g., g.create_index or a create_index_async
helper) and scheduling background population instead of create_index_sync, or
(B) keep create_index_sync but remove/populate_indexes_sync call and instead
kick off an asynchronous/populate task (spawn a background job) that sets index
state when complete; locate the calls to create_index_sync,
populate_indexes_sync, the has_index_ops flag, and the GRAPH.EFFECT handling in
effect.rs to implement the non-blocking approach you choose and ensure any error
handling/logging reflects asynchronous start failures.

In `@src/graph_core.rs`:
- Around line 371-375: The MULTI/EXEC path in query_sync currently calls
g.graph.commit(new_graph) before replicate_effects, which can leave local state
committed if replication fails; update query_sync to perform replication of
effects_buffer (call replicate_effects(ctx, &key_name, effects_buffer, query))
before calling g.graph.commit(new_graph), or implement a compensating rollback
mechanism if replication fails, ensuring g.graph.commit and replicate_effects
are atomic with respect to each other; adjust error handling so replication
failures prevent commit (or roll back committed state) and surface errors
appropriately.
- Around line 451-479: In should_use_effects, clarify and harden the threshold
heuristic: add a brief doc comment above the function explaining that threshold
== 0 means "always use effects when a non-empty effects_buffer exists" while
threshold > 0 uses the average modification time heuristic; explicitly handle
the n_effects == 0 case before doing the division (avoid computing
avg_mod_time_us when effects_count is zero), ensure the empty-buffer check
(runtime.effects_buffer.borrow_mut().take()) is respected even when threshold ==
0, and keep the non-deterministic early-true branch; reference the function name
should_use_effects and the variables threshold, n_effects, exec_time_ms, and
runtime.effects_buffer to find and update the logic and add the explanatory
comment.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 919c0b20-e931-45bf-b505-08296942b5bf

📥 Commits

Reviewing files that changed from the base of the PR and between 6fb14c4 and ec86753.

📒 Files selected for processing (6)
  • graph/src/graph/graph.rs
  • graph/src/index/indexer.rs
  • graph/src/runtime/functions/conversion.rs
  • graph/src/runtime/pending.rs
  • src/commands/effect.rs
  • src/graph_core.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • graph/src/index/indexer.rs
  • graph/src/runtime/functions/conversion.rs

Comment thread graph/src/graph/graph.rs
Comment on lines +917 to +921
/// Increment the reserved node counter without allocating a specific ID.
/// Used by effect replay where the actual ID comes from the primary.
pub const fn inc_reserved_node_count(&mut self) {
self.reserved_node_count += 1;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify const fn with &mut self compiles correctly in this Rust version
# and check all call sites of these methods
rg -n "inc_reserved_node_count|inc_reserved_relationship_count" --type rust -C2

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 1496


🏁 Script executed:

#!/bin/bash
# Check Rust version and MSRV
if [ -f "Cargo.toml" ]; then
  echo "=== Cargo.toml (first 50 lines) ==="
  head -50 Cargo.toml
fi

if [ -f "rust-toolchain" ] || [ -f "rust-toolchain.toml" ]; then
  echo -e "\n=== Rust toolchain ==="
  cat rust-toolchain* 2>/dev/null
fi

# Check if cargo check passes
echo -e "\n=== Running cargo check ==="
cd graph
cargo check --all-features 2>&1 | head -100

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 1436


🏁 Script executed:

#!/bin/bash
# Verify the exact method signatures and their contexts
echo "=== Checking method signatures ==="
sed -n '917,922p' graph/src/graph/graph.rs
echo ""
sed -n '1266,1271p' graph/src/graph/graph.rs

# Check if these methods are actually used in const contexts or only at runtime
echo -e "\n=== Checking calling context in effect.rs ==="
sed -n '120,130p' src/commands/effect.rs
echo ""
sed -n '155,165p' src/commands/effect.rs

Repository: FalkorDB/falkordb-rs-next-gen

Length of output: 1760


🌐 Web query:

Rust const fn with mutable reference &mut self

💡 Result:

No, Rust const fn methods cannot take &mut self. Mutable references (&mut T) are allowed as parameters in const fn (stabilized via const_mut_refs in Rust 1.83), but &mut self specifically requires a mutable borrow of self, which conflicts with the const evaluation rules that treat the function body as operating in a const context where mutable borrows have strict limitations. Key evidence from official sources: - The Rust Reference on constant evaluation explicitly forbids "mutable borrows" in general const expressions, with limited exceptions for transient uses inside const fn [15]. - Error E0764 states mutable references can be used in const fn but not leaked to the final constant value; however, &mut self as the receiver implies mutability of the borrowed data during evaluation [16]. - Tracking issue #57349 confirms &mut arguments are allowed in const fn, but practical usage shows &mut self fails compilation due to borrow checker restrictions in const contexts [20]. - RFC 911 (original const fn) limited mutable references to zero-sized types only, and while expanded, self mutability remains restricted [2]. Workarounds: - Use &self with interior mutability (e.g., Cell), now supported via const_refs_to_cell [27]. - Take self by value for consuming methods. - Split logic: compute values with &self in const fn, mutate separately at runtime. This holds as of Rust 1.83+ (2024), confirmed by recent stabilization PRs and issues closed in 2024 [20,27].

Citations:


Remove const annotation from inc_reserved_node_count and inc_reserved_relationship_count.

The const fn annotation with &mut self is invalid in Rust. Const functions cannot take &mut self as a receiver due to borrow checker restrictions in const contexts. Since these methods perform mutations and are only called at runtime (effect.rs lines 123, 160), remove the const keyword:

pub fn inc_reserved_node_count(&mut self) {
    self.reserved_node_count += 1;
}

Same fix applies to inc_reserved_relationship_count at line 1268.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/graph.rs` around lines 917 - 921, The methods declared as pub
const fn inc_reserved_node_count(&mut self) and pub const fn
inc_reserved_relationship_count(&mut self) are invalid because const fn cannot
take &mut self; remove the const keyword from both declarations so they become
regular mutating methods (pub fn inc_reserved_node_count(&mut self) and pub fn
inc_reserved_relationship_count(&mut self)), keeping their bodies unchanged;
update both function signatures accordingly where they are defined so calls in
effect.rs continue to work.

Comment on lines +1000 to +1005
_ => {
debug_assert!(false, "Unsupported value type in effects buffer: {value:?}");
buf.push(VALUE_NULL); // Fallback for unsupported types
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fallback to VALUE_NULL for unsupported types may cause silent data loss.

When write_value encounters an unsupported Value variant, it logs a debug assertion but writes VALUE_NULL, which could silently corrupt data on replicas. Consider returning an error or ensuring all property-valid types are handled.

Suggested fix
         _ => {
-            debug_assert!(false, "Unsupported value type in effects buffer: {value:?}");
-            buf.push(VALUE_NULL); // Fallback for unsupported types
+            // This should never happen for valid property types
+            panic!("Unsupported value type in effects buffer: {value:?}");
         }

Alternatively, if you want graceful handling, change write_value to return Result<(), String> and propagate errors.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
_ => {
debug_assert!(false, "Unsupported value type in effects buffer: {value:?}");
buf.push(VALUE_NULL); // Fallback for unsupported types
}
}
}
_ => {
// This should never happen for valid property types
panic!("Unsupported value type in effects buffer: {value:?}");
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/runtime/pending.rs` around lines 1000 - 1005, The current
write_value function silently falls back to pushing VALUE_NULL for unsupported
Value variants which can corrupt replicas; change write_value to return a
Result<(), String> (or a domain error type), remove the unconditional
debug_assert! fallback branch, and instead return an Err describing the
unsupported variant (include the variant debug string); update all callers of
write_value (e.g., where effects buffer is constructed) to propagate or handle
the error so unsupported types are rejected rather than written as VALUE_NULL.

AviAvni added a commit that referenced this pull request Apr 14, 2026
…atrix/vector/tensor serialization)

Introduce core serialization infrastructure for RDB persistence:
- Writer/Reader traits abstracting over serialization sinks/sources
- Encode<V>/Decode<V> traits for versioned serialization
- si_type module with type tag constants matching C FalkorDB format
- EncodeState/PayloadEntry for chunked virtual-key encoding
- Matrix, Vector, Tensor, VersionedMatrix Encode<19>/Decode<19> impls
  using GxB_Container export/import
- RoaringTreemap Encode<19>/Decode<19> for deleted entity bitmaps

Part of #359 (RDB persistence split).
AviAvni added a commit that referenced this pull request Apr 14, 2026
- Wrap AttributeCache entries in Arc<Vec> for O(1) clone on cache get
- Update AttributeStore and callers for Arc<Vec> return type
- Use std::mem::size_of_val in rs_array_new for correctness
- Flatten nested if-let into let-chains in utilize_index and index/mod
- Mark int_loses_f64_precision and is_indexable as const fn
- Use is_none_or instead of map_or(true, ...) in node_by_index_scan

Split from #359.
AviAvni added a commit that referenced this pull request Apr 14, 2026
Move EFFECTS_THRESHOLD from read-only to runtime-settable config,
allowing tuning without restart.

Split from #359.
AviAvni added a commit that referenced this pull request Apr 14, 2026
Implement RDB v19 serialization for all Value variants including
Bool, Int, Float, String, List, Point, VecF32, and temporal types.

Split from #359.
AviAvni added a commit that referenced this pull request Apr 14, 2026
- Add `non_deterministic` field to GraphFn for tracking functions with
  non-deterministic output (rand, randomUUID, timestamp, etc.)
- Extend cypher_fn! macro with non-deterministic function variants
- Add plan_is_non_deterministic() to walk execution plans and detect
  non-deterministic function usage (used for effects replication decisions)
- Add zero-arg temporal overloads: date(), localtime(), localdatetime()
- Add .transaction() temporal variants using Runtime::transaction_timestamp
- Add transaction_timestamp field to Runtime struct

Split from #359.
AviAvni added a commit that referenced this pull request Apr 14, 2026
…d to Graph

- Move fjall DATABASE static from Graph to AttributeStore as get_database()
- Remove database parameter from AttributeStore::new()
- Add name: String field to Graph struct for persistence/identification
- Add Graph::name() accessor

Split from #359.
AviAvni added a commit that referenced this pull request Apr 14, 2026
- Add schema_version field to Graph (incremented on schema changes)
- Add MvccGraph::from_graph() constructor and schema version tracking
- Add get_type_id_mut() for get-or-create relationship types
- Add inc_reserved_node_count/inc_reserved_relationship_count
- Add max_relationship_id, deleted_*_count, deleted_relationships accessors
- Add label_matrices(), relationship_tensors() accessors
- Add create_index_sync/populate_indexes_sync for synchronous index rebuild
- Add get_all_pending_fields() to Indexer
- Add attribute name accessors and build_global_attrs
- Extract resize_node_matrices/resize_relationship_matrices from resize()

Split from #359.
- Keep rdb's restore(), rebuild_derived_matrices(), encode_payload()
- Keep rdb's effects_buffer/effects_count in Runtime
- Remove duplicate import_attrs, inc_reserved_relationship_count
- Fix reply.rs to work with impl Iterator return types
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant