diff --git a/graph/src/planner/mod.rs b/graph/src/planner/mod.rs index 094f3c80..f52f75f1 100644 --- a/graph/src/planner/mod.rs +++ b/graph/src/planner/mod.rs @@ -244,6 +244,113 @@ pub fn subtree_contains( .any(|n| predicate(n.data())) } +/// Returns true if a QueryExpr tree contains any non-deterministic function call. +fn expr_has_non_deterministic(expr: &DynTree>) -> bool { + expr.root() + .walk_with(&mut Traversal.bfs().over_nodes()) + .any(|n| matches!(n.data(), ExprIR::FuncInvocation(func) if func.non_deterministic)) +} + +/// Returns true if a SetItem references any non-deterministic expression. +fn set_item_has_non_deterministic(item: &SetItem, Variable>) -> bool { + match item { + SetItem::Attribute { target, value, .. } => { + expr_has_non_deterministic(target) || expr_has_non_deterministic(value) + } + SetItem::Label { .. } => false, + } +} + +/// Returns true if a QueryGraph (CREATE/MERGE pattern) contains non-deterministic expressions. +fn query_graph_has_non_deterministic(qg: &QueryGraph, Arc, Variable>) -> bool { + for node in qg.nodes() { + if expr_has_non_deterministic(&node.attrs) { + return true; + } + } + for rel in qg.relationships() { + if expr_has_non_deterministic(&rel.attrs) { + return true; + } + } + false +} + +/// Returns true if an IndexQuery tree contains any non-deterministic function call. +fn index_query_has_non_deterministic(query: &IndexQuery>) -> bool { + match query { + IndexQuery::Range { min, max, .. } => { + min.as_ref().is_some_and(|e| expr_has_non_deterministic(e)) + || max.as_ref().is_some_and(|e| expr_has_non_deterministic(e)) + } + IndexQuery::And(queries) | IndexQuery::Or(queries) => { + queries.iter().any(index_query_has_non_deterministic) + } + IndexQuery::Point { point, radius, .. } => { + expr_has_non_deterministic(point) || expr_has_non_deterministic(radius) + } + IndexQuery::InList { list, .. } => expr_has_non_deterministic(list), + IndexQuery::Equal { value, .. } | IndexQuery::ArrayContains { value, .. } => { + expr_has_non_deterministic(value) + } + } +} + +/// Returns true if the execution plan contains any non-deterministic function call. +#[must_use] +pub fn plan_is_non_deterministic(plan: &DynTree) -> bool { + plan.root() + .walk_with(&mut Traversal.bfs().over_nodes()) + .any(|node| match node.data() { + IR::Create(qg) => query_graph_has_non_deterministic(qg), + IR::Merge { + pattern, + on_create, + on_match, + } => { + query_graph_has_non_deterministic(pattern) + || on_create.iter().any(set_item_has_non_deterministic) + || on_match.iter().any(set_item_has_non_deterministic) + } + IR::Set(items) => items.iter().any(set_item_has_non_deterministic), + IR::Remove(exprs) | IR::Delete { exprs, .. } => { + exprs.iter().any(|e| expr_has_non_deterministic(e)) + } + IR::Unwind { expr, .. } + | IR::Filter(expr) + | IR::Skip(expr) + | IR::Limit(expr) + | IR::ForEach { list: expr, .. } => expr_has_non_deterministic(expr), + IR::Sort(exprs) => exprs.iter().any(|(e, _)| expr_has_non_deterministic(e)), + IR::Project { exprs, .. } => exprs.iter().any(|(_, e)| expr_has_non_deterministic(e)), + IR::Aggregate { + keys, aggregations, .. + } => { + keys.iter().any(|(_, e)| expr_has_non_deterministic(e)) + || aggregations + .iter() + .any(|(_, e)| expr_has_non_deterministic(e)) + } + IR::ProcedureCall { args, .. } => args.iter().any(|e| expr_has_non_deterministic(e)), + IR::LoadCsv { + file_path, + delimiter, + .. + } => expr_has_non_deterministic(file_path) || expr_has_non_deterministic(delimiter), + IR::ValueHashJoin { lhs_exp, rhs_exp } => { + expr_has_non_deterministic(lhs_exp) || expr_has_non_deterministic(rhs_exp) + } + IR::NodeByIndexScan { query, .. } => index_query_has_non_deterministic(query), + IR::NodeByFulltextScan { label, query, .. } => { + expr_has_non_deterministic(label) || expr_has_non_deterministic(query) + } + IR::NodeByLabelAndIdScan { filter, .. } | IR::NodeByIdSeek { filter, .. } => { + filter.iter().any(|(e, _)| expr_has_non_deterministic(e)) + } + _ => false, + }) +} + /// Formats a relationship for CondTraverse/ExpandInto display. /// Shows node labels and hides anonymous edge aliases. fn fmt_rel_with_labels(rel: &QueryRelationship, Arc, Variable>) -> String { diff --git a/graph/src/runtime/functions/aggregation.rs b/graph/src/runtime/functions/aggregation.rs index 9fcc674f..e445adb1 100644 --- a/graph/src/runtime/functions/aggregation.rs +++ b/graph/src/runtime/functions/aggregation.rs @@ -257,6 +257,7 @@ pub fn register(funcs: &mut Functions) { "percentileCont", percentile, false, + false, vec![ Type::Union(vec![Type::Int, Type::Float, Type::Null]), Type::Union(vec![Type::Int, Type::Float]), @@ -307,6 +308,7 @@ pub fn register(funcs: &mut Functions) { "stDevP", stdev, false, + false, vec![Type::Union(vec![Type::Int, Type::Float, Type::Null])], FnType::Aggregation { initial: Value::List(Arc::new(thin_vec![ diff --git a/graph/src/runtime/functions/conversion.rs b/graph/src/runtime/functions/conversion.rs index 59dfb67e..1b0c4dd4 100644 --- a/graph/src/runtime/functions/conversion.rs +++ b/graph/src/runtime/functions/conversion.rs @@ -35,7 +35,7 @@ use super::{FnType, Functions, Type}; use crate::runtime::{runtime::Runtime, value::Value}; use std::sync::Arc; -use thin_vec::{ThinVec, thin_vec}; +use thin_vec::ThinVec; pub fn register(funcs: &mut Functions) { cypher_fn!(funcs, "tointeger", @@ -84,6 +84,7 @@ pub fn register(funcs: &mut Functions) { "toIntegerOrNull", value_to_integer, false, + false, vec![Type::Any], FnType::Function, Type::Union(vec![Type::Int, Type::Null]), @@ -105,6 +106,7 @@ pub fn register(funcs: &mut Functions) { "toFloatOrNull", value_to_float, false, + false, vec![Type::Any], FnType::Function, Type::Union(vec![Type::Float, Type::Null]), @@ -140,6 +142,7 @@ pub fn register(funcs: &mut Functions) { "tostringornull", value_to_string, false, + false, vec![Type::Any], FnType::Function, Type::Union(vec![Type::String, Type::Null]), @@ -197,6 +200,7 @@ pub fn register(funcs: &mut Functions) { "toBooleanOrNull", to_boolean, false, + false, vec![Type::Any], FnType::Function, Type::Union(vec![Type::Bool, Type::Null]), diff --git a/graph/src/runtime/functions/math.rs b/graph/src/runtime/functions/math.rs index 5c72f8e1..4d7838a0 100644 --- a/graph/src/runtime/functions/math.rs +++ b/graph/src/runtime/functions/math.rs @@ -139,6 +139,7 @@ pub fn register(funcs: &mut Functions) { cypher_fn!(funcs, "randomUUID", args: [], ret: Type::String, + non_deterministic, fn random_uuid(_, _args) { // Generate 16 random bytes (128 bits) let mut rng = rand::rng(); @@ -184,6 +185,7 @@ pub fn register(funcs: &mut Functions) { cypher_fn!(funcs, "rand", args: [], ret: Type::Float, + non_deterministic, #[allow(clippy::needless_pass_by_value)] fn rand(_, args) { debug_assert!(args.is_empty()); diff --git a/graph/src/runtime/functions/mod.rs b/graph/src/runtime/functions/mod.rs index df6c5027..ede7c272 100644 --- a/graph/src/runtime/functions/mod.rs +++ b/graph/src/runtime/functions/mod.rs @@ -72,6 +72,58 @@ /// | `procedure:` | read-only procedure | `Procedure(yields)` | /// | `write procedure:` | write procedure | `Procedure(yields)` | macro_rules! cypher_fn { + // ── Non-deterministic scalar function (fixed args) ── + ($funcs:ident, $name:expr, + args: [$($arg:expr),* $(,)?], + ret: $ret:expr, + non_deterministic, + $(#[$attr:meta])* + fn $fn_name:ident($rt:pat, $args:pat) $body:block + ) => { + $(#[$attr])* + fn $fn_name( + $rt: &Runtime, + $args: ThinVec, + ) -> Result + $body + + $funcs.add( + $name, + $fn_name, + false, + true, + vec![$($arg),*], + FnType::Function, + $ret, + ); + }; + + // ── Non-deterministic variable-length argument function ── + ($funcs:ident, $name:expr, + var_arg: $arg_type:expr, + ret: $ret:expr, + non_deterministic, + $(#[$attr:meta])* + fn $fn_name:ident($rt:pat, $args:pat) $body:block + ) => { + $(#[$attr])* + fn $fn_name( + $rt: &Runtime, + $args: ThinVec, + ) -> Result + $body + + $funcs.add_var_len( + $name, + $fn_name, + false, + true, + $arg_type, + FnType::Function, + $ret, + ); + }; + // ── Scalar function (FnType::Function, write=false, fixed args) ── ($funcs:ident, $name:expr, args: [$($arg:expr),* $(,)?], @@ -90,6 +142,7 @@ macro_rules! cypher_fn { $name, $fn_name, false, + false, vec![$($arg),*], FnType::Function, $ret, @@ -114,6 +167,7 @@ macro_rules! cypher_fn { $name, $fn_name, false, + false, $arg_type, FnType::Function, $ret, @@ -139,6 +193,7 @@ macro_rules! cypher_fn { $name, $fn_name, false, + false, vec![$($arg),*], FnType::Aggregation { initial: $init, finalizer: None }, $ret, @@ -165,6 +220,7 @@ macro_rules! cypher_fn { $name, $fn_name, false, + false, vec![$($arg),*], FnType::Aggregation { initial: $init, finalizer: Some(Box::new($finalizer)) }, $ret, @@ -190,6 +246,7 @@ macro_rules! cypher_fn { $name, $fn_name, false, + false, vec![$($arg),*], FnType::Internal, $ret, @@ -215,6 +272,7 @@ macro_rules! cypher_fn { $name, $fn_name, false, + false, vec![$($arg),*], FnType::Procedure(vec![$(String::from($yield_col)),*]), $ret, @@ -240,6 +298,7 @@ macro_rules! cypher_fn { $name, $fn_name, true, + false, vec![$($arg),*], FnType::Procedure(vec![$(String::from($yield_col)),*]), $ret, @@ -457,6 +516,7 @@ pub struct GraphFn { pub name: String, pub func: RuntimeFn, pub write: bool, + pub non_deterministic: bool, pub args_type: FnArguments, pub fn_type: FnType, pub ret_type: Type, @@ -470,6 +530,7 @@ impl Debug for GraphFn { f.debug_struct("GraphFn") .field("name", &self.name) .field("write", &self.write) + .field("non_deterministic", &self.non_deterministic) .field("args_type", &self.args_type) .field("fn_type", &self.fn_type) .field("ret_type", &self.ret_type) @@ -483,6 +544,7 @@ impl GraphFn { name: &str, func: fn(&Runtime, ThinVec) -> Result, write: bool, + non_deterministic: bool, args_type: FnArguments, fn_type: FnType, ret_type: Type, @@ -491,6 +553,7 @@ impl GraphFn { name: String::from(name), func: Arc::new(func), write, + non_deterministic, args_type, fn_type, ret_type, @@ -506,6 +569,7 @@ impl GraphFn { crate::udf::js_context::call_udf_bridge(&udf_name, rt, &args) }), write: false, + non_deterministic: false, args_type: FnArguments::VarLength(Type::Any), fn_type: FnType::Udf, ret_type: Type::Any, @@ -622,11 +686,13 @@ impl Functions { Self::default() } + #[allow(clippy::too_many_arguments)] pub fn add( &mut self, name: &str, func: fn(&Runtime, ThinVec) -> Result, write: bool, + non_deterministic: bool, args_type: Vec, fn_type: FnType, ret_type: Type, @@ -640,6 +706,7 @@ impl Functions { name, func, write, + non_deterministic, FnArguments::Fixed(args_type), fn_type, ret_type, @@ -647,11 +714,13 @@ impl Functions { self.functions.insert(lower_name, graph_fn); } + #[allow(clippy::too_many_arguments)] pub fn add_var_len( &mut self, name: &str, func: fn(&Runtime, ThinVec) -> Result, write: bool, + non_deterministic: bool, arg_type: Type, fn_type: FnType, ret_type: Type, @@ -665,6 +734,7 @@ impl Functions { &name, func, write, + non_deterministic, FnArguments::VarLength(arg_type), fn_type, ret_type, diff --git a/graph/src/runtime/functions/temporal.rs b/graph/src/runtime/functions/temporal.rs index 668bc789..f0cb9f87 100644 --- a/graph/src/runtime/functions/temporal.rs +++ b/graph/src/runtime/functions/temporal.rs @@ -419,6 +419,7 @@ pub fn register(funcs: &mut Functions) { cypher_fn!(funcs, "timestamp", args: [], ret: Type::Int, + non_deterministic, fn timestamp_fn(_, args) { debug_assert!(args.is_empty()); let now = Utc::now(); @@ -428,8 +429,9 @@ pub fn register(funcs: &mut Functions) { // ── date() ── cypher_fn!(funcs, "date", - args: [Type::Union(vec![Type::Map, Type::String, Type::Null])], + var_arg: Type::Union(vec![Type::Map, Type::String, Type::Null]), ret: Type::Union(vec![Type::Date, Type::Null]), + non_deterministic, fn date_fn(_, args) { let mut iter = args.into_iter(); match iter.next() { @@ -444,6 +446,12 @@ pub fn register(funcs: &mut Functions) { Ok(Value::Date(ts)) } Some(Value::Null) => Ok(Value::Null), + None => { + // Zero args: return current date + let now = Utc::now().date_naive(); + let ts = now.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp(); + Ok(Value::Date(ts)) + } _ => unreachable!(), } } @@ -451,8 +459,9 @@ pub fn register(funcs: &mut Functions) { // ── localtime() ── cypher_fn!(funcs, "localtime", - args: [Type::Union(vec![Type::Map, Type::String, Type::Null])], + var_arg: Type::Union(vec![Type::Map, Type::String, Type::Null]), ret: Type::Union(vec![Type::Time, Type::Null]), + non_deterministic, fn localtime_fn(_, args) { let mut iter = args.into_iter(); match iter.next() { @@ -472,6 +481,14 @@ pub fn register(funcs: &mut Functions) { Ok(Value::Time(ts)) } Some(Value::Null) => Ok(Value::Null), + None => { + // Zero args: return current local time + let now = Utc::now().time(); + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let dt = NaiveDateTime::new(epoch, now); + let ts = dt.and_utc().timestamp(); + Ok(Value::Time(ts)) + } _ => unreachable!(), } } @@ -479,8 +496,9 @@ pub fn register(funcs: &mut Functions) { // ── localdatetime() ── cypher_fn!(funcs, "localdatetime", - args: [Type::Union(vec![Type::Map, Type::String, Type::Null])], + var_arg: Type::Union(vec![Type::Map, Type::String, Type::Null]), ret: Type::Union(vec![Type::Datetime, Type::Null]), + non_deterministic, fn localdatetime_fn(_, args) { let mut iter = args.into_iter(); match iter.next() { @@ -497,6 +515,12 @@ pub fn register(funcs: &mut Functions) { Ok(Value::Datetime(ts)) } Some(Value::Null) => Ok(Value::Null), + None => { + // Zero args: return current local datetime + let now = Utc::now().naive_utc(); + let ts = now.and_utc().timestamp(); + Ok(Value::Datetime(ts)) + } _ => unreachable!(), } } @@ -530,4 +554,42 @@ pub fn register(funcs: &mut Functions) { } } ); + + // ── date.transaction() ── + cypher_fn!(funcs, "date.transaction", + args: [], + ret: Type::Date, + non_deterministic, + fn date_transaction_fn(rt, _args) { + let now = rt.transaction_timestamp.date_naive(); + let ts = now.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp(); + Ok(Value::Date(ts)) + } + ); + + // ── localtime.transaction() ── + cypher_fn!(funcs, "localtime.transaction", + args: [], + ret: Type::Time, + non_deterministic, + fn localtime_transaction_fn(rt, _args) { + let now = rt.transaction_timestamp.time(); + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let dt = NaiveDateTime::new(epoch, now); + let ts = dt.and_utc().timestamp(); + Ok(Value::Time(ts)) + } + ); + + // ── localdatetime.transaction() ── + cypher_fn!(funcs, "localdatetime.transaction", + args: [], + ret: Type::Datetime, + non_deterministic, + fn localdatetime_transaction_fn(rt, _args) { + let now = rt.transaction_timestamp.naive_utc(); + let ts = now.and_utc().timestamp(); + Ok(Value::Datetime(ts)) + } + ); } diff --git a/graph/src/runtime/runtime.rs b/graph/src/runtime/runtime.rs index bd30d212..4b86eec9 100644 --- a/graph/src/runtime/runtime.rs +++ b/graph/src/runtime/runtime.rs @@ -63,6 +63,7 @@ use crate::{ }, }; use atomic_refcell::AtomicRefCell; +use chrono::{DateTime, Utc}; use once_cell::unsync::Lazy; use orx_tree::{Bfs, Dyn, DynNode, DynTree, MemoryPolicy, NodeIdx, NodeRef}; use roaring::RoaringTreemap; @@ -144,6 +145,10 @@ pub struct Runtime<'a> { pub env_pool: &'a Pool, /// Maximum number of result rows to return. Negative means unlimited. pub result_set_size: i64, + /// Timestamp captured at the start of the transaction/query. + /// Used by `date.transaction()`, `localtime.transaction()`, and `localdatetime.transaction()` + /// so every call in the same transaction returns the same value. + pub transaction_timestamp: DateTime, } pub trait GetVariables { @@ -343,6 +348,7 @@ impl<'a> Runtime<'a> { merge_pattern_cache: RefCell::new(HashMap::new()), env_pool, result_set_size, + transaction_timestamp: Utc::now(), } }