From 7dab1f2bc25fc79c15920924d793ae683a250778 Mon Sep 17 00:00:00 2001 From: gtarpenning Date: Thu, 11 Jun 2026 11:23:22 -0700 Subject: [PATCH 1/3] feat(weave): validate monitor query fields on save Validate Monitor.query field references against ALLOWED_CALL_FIELDS at obj_create time so an unknown field (e.g. operation_name) is rejected with a 4xx listing the allowed fields, instead of being stored and failing silently on every scoring-worker cycle. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/trace/test_base_object_classes.py | 104 +++++++++++++++++- .../calls_query_builder.py | 66 +++++++++++ .../clickhouse_trace_server_batched.py | 6 + weave/trace_server/sqlite_trace_server.py | 8 ++ 4 files changed, 183 insertions(+), 1 deletion(-) diff --git a/tests/trace/test_base_object_classes.py b/tests/trace/test_base_object_classes.py index c3b770426274..04aeaf98ffff 100644 --- a/tests/trace/test_base_object_classes.py +++ b/tests/trace/test_base_object_classes.py @@ -22,7 +22,7 @@ from weave.trace.refs import ObjectRef from weave.trace.weave_client import WeaveClient from weave.trace_server import trace_server_interface as tsi -from weave.trace_server.errors import ObjectNameTypeCollision +from weave.trace_server.errors import InvalidFieldError, ObjectNameTypeCollision from weave.trace_server.interface.builtin_object_classes.test_only_example import ( TestOnlyNestedBaseModel, ) @@ -1153,3 +1153,105 @@ def test_obj_create_rejects_name_type_collision(client: WeaveClient): } ) ) + + +def _monitor_val(query: dict | None) -> dict: + """A serialized Monitor object val carrying a query and weave class hierarchy.""" + return { + "name": "test-monitor", + "description": None, + "sampling_rate": 1.0, + "scorers": [], + "op_names": [], + "query": query, + "is_traced": True, + "active": False, + "scorer_debounce_config": None, + "_type": "Monitor", + "_class_name": "Monitor", + "_bases": ["Object", "BaseModel"], + } + + +def _create_monitor(client: WeaveClient, object_id: str, query: dict | None): + return client.server.obj_create( + tsi.ObjCreateReq.model_validate( + { + "obj": { + "project_id": client.project_id, + "object_id": object_id, + "val": _monitor_val(query), + } + } + ) + ) + + +def test_monitor_create_rejects_unknown_query_field(client: WeaveClient): + """A Monitor query on an unknown field is rejected at create with the allowed list.""" + bad_query = { + "$expr": {"$eq": [{"$getField": "operation_name"}, {"$literal": "predict"}]} + } + with pytest.raises(InvalidFieldError) as exc_info: + _create_monitor(client, "bad-monitor", bad_query) + message = str(exc_info.value) + assert "Field operation_name is not allowed" in message + assert "op_name" in message + assert "summary.weave.*" in message + assert "inputs.*" in message + + # Nothing was stored for the rejected monitor. + objs_res = client.server.objs_query( + tsi.ObjQueryReq.model_validate( + {"project_id": client.project_id, "filter": {"object_ids": ["bad-monitor"]}} + ) + ) + assert objs_res.objs == [] + + +def test_monitor_create_accepts_valid_query_fields(client: WeaveClient): + """Static, dynamic, and absent monitor queries all create successfully.""" + valid_query = { + "$expr": { + "$and": [ + {"$eq": [{"$getField": "parent_id"}, {"$literal": None}]}, + { + "$eq": [ + {"$getField": "summary.weave.status"}, + {"$literal": "success"}, + ] + }, + ] + } + } + dynamic_query = { + "$expr": {"$eq": [{"$getField": "inputs.foo"}, {"$literal": "bar"}]} + } + + valid_res = _create_monitor(client, "valid-monitor", valid_query) + dynamic_res = _create_monitor(client, "dynamic-monitor", dynamic_query) + no_query_res = _create_monitor(client, "no-query-monitor", None) + + assert valid_res.object_id == "valid-monitor" + assert dynamic_res.object_id == "dynamic-monitor" + assert no_query_res.object_id == "no-query-monitor" + + objs_res = client.server.objs_query( + tsi.ObjQueryReq.model_validate( + { + "project_id": client.project_id, + "filter": { + "object_ids": [ + "valid-monitor", + "dynamic-monitor", + "no-query-monitor", + ] + }, + } + ) + ) + assert {obj.object_id for obj in objs_res.objs} == { + "valid-monitor", + "dynamic-monitor", + "no-query-monitor", + } diff --git a/weave/trace_server/calls_query_builder/calls_query_builder.py b/weave/trace_server/calls_query_builder/calls_query_builder.py index b489436f831f..397c9097af3c 100644 --- a/weave/trace_server/calls_query_builder/calls_query_builder.py +++ b/weave/trace_server/calls_query_builder/calls_query_builder.py @@ -1943,6 +1943,72 @@ def get_field_by_name(name: str) -> CallsMergedField: return ALLOWED_CALL_FIELDS[name] +# Dotted prefixes get_field_by_name accepts beyond ALLOWED_CALL_FIELDS. +ALLOWED_DYNAMIC_FIELD_PREFIXES = ( + "feedback.*", + "annotation_queue_items.queue_id", + "summary.weave.*", + "inputs.*", + "output.*", + "attributes.*", + "summary.*", +) + + +MONITOR_OBJECT_CLASSES = frozenset({"Monitor", "ClassifierMonitor"}) + + +def validate_monitor_query_fields( + base_object_class: str | None, + leaf_object_class: str | None, + val: object, +) -> None: + """Reject a Monitor whose `query` references a field outside the allowed set.""" + if ( + base_object_class not in MONITOR_OBJECT_CLASSES + and leaf_object_class not in MONITOR_OBJECT_CLASSES + ): + return + if not isinstance(val, dict): + return + raw_query = val.get("query") + if raw_query is None: + return + cleaned = _strip_weave_object_keys(raw_query) + validate_query_field_names(tsi_query.Query.model_validate(cleaned)) + + +def validate_query_field_names(query: tsi_query.Query) -> None: + """Validate every field reference in `query` against the allowed call fields.""" + try: + process_query_to_conditions(query, ParamBuilder(), "calls_merged") + except InvalidFieldError as e: + raise InvalidFieldError(_invalid_field_message(str(e))) from e + + +def _strip_weave_object_keys(value: object) -> object: + """Drop weave bookkeeping keys (`_type`, `_class_name`, `_bases`) from a serialized query.""" + if isinstance(value, dict): + return { + k: _strip_weave_object_keys(v) + for k, v in value.items() + if not k.startswith("_") + } + if isinstance(value, list): + return [_strip_weave_object_keys(v) for v in value] + return value + + +def _invalid_field_message(reason: str) -> str: + """Append the allowed field list and dynamic prefixes to a field rejection.""" + allowed = ", ".join(sorted(ALLOWED_CALL_FIELDS)) + prefixes = ", ".join(ALLOWED_DYNAMIC_FIELD_PREFIXES) + return ( + f"{reason}. Allowed fields: {allowed}. " + f"Allowed dynamic field prefixes: {prefixes}" + ) + + def _field_as_sql_maybe_agg( field: CallsMergedField, pb: ParamBuilder, diff --git a/weave/trace_server/clickhouse_trace_server_batched.py b/weave/trace_server/clickhouse_trace_server_batched.py index e025b166d200..1ebd5897c1fc 100644 --- a/weave/trace_server/clickhouse_trace_server_batched.py +++ b/weave/trace_server/clickhouse_trace_server_batched.py @@ -103,6 +103,7 @@ build_calls_complete_update_query, build_calls_stats_query, combine_conditions, + validate_monitor_query_fields, ) from weave.trace_server.calls_query_builder.usage_query_builder import ( build_usage_query, @@ -1988,6 +1989,11 @@ def obj_create(self, req: tsi.ObjCreateReq) -> tsi.ObjCreateRes: actual=digest, label=f"obj {req.obj.object_id!r}", ) + validate_monitor_query_fields( + digest_result.base_object_class, + digest_result.leaf_object_class, + processed_val, + ) kind = get_kind(processed_val) self._reject_obj_name_type_collision( diff --git a/weave/trace_server/sqlite_trace_server.py b/weave/trace_server/sqlite_trace_server.py index 07bb568fd85d..dbd5eb7bb98e 100644 --- a/weave/trace_server/sqlite_trace_server.py +++ b/weave/trace_server/sqlite_trace_server.py @@ -33,6 +33,9 @@ from weave.trace_server import eval_results_helpers as eval_helpers from weave.trace_server import trace_server_interface as tsi from weave.trace_server.call_stats_helpers import validate_call_stats_range +from weave.trace_server.calls_query_builder.calls_query_builder import ( + validate_monitor_query_fields, +) from weave.trace_server.ch_sentinel_values import EXPIRE_AT_NEVER from weave.trace_server.common_interface import SortBy from weave.trace_server.digest_validation import validate_expected_digest @@ -1907,6 +1910,11 @@ def obj_create(self, req: tsi.ObjCreateReq) -> tsi.ObjCreateRes: actual=digest, label=f"obj {req.obj.object_id!r}", ) + validate_monitor_query_fields( + digest_result.base_object_class, + digest_result.leaf_object_class, + processed_val, + ) project_id, object_id, wb_user_id = ( req.obj.project_id, req.obj.object_id, From f7b2c1c63cf355f2b549c87caeac91a2b5491195 Mon Sep 17 00:00:00 2001 From: gtarpenning Date: Thu, 11 Jun 2026 11:53:36 -0700 Subject: [PATCH 2/3] test(weave): real serialized monitor query + derive allowed prefixes - Create Monitors via the client serialization so the stored query carries weave bookkeeping keys, exercising the strip path (proven load-bearing: removing the strip now fails the test). - Assert the complete InvalidFieldError message with ==, not substrings. - Derive ALLOWED_DYNAMIC_FIELD_PREFIXES from ALLOWED_CALL_FIELDS so they can't drift as *_dump dynamic fields change. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/trace/test_base_object_classes.py | 58 ++++++++----------- .../calls_query_builder.py | 19 ++++-- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/tests/trace/test_base_object_classes.py b/tests/trace/test_base_object_classes.py index 04aeaf98ffff..604d23552551 100644 --- a/tests/trace/test_base_object_classes.py +++ b/tests/trace/test_base_object_classes.py @@ -20,8 +20,13 @@ import weave from weave.trace import base_objects from weave.trace.refs import ObjectRef -from weave.trace.weave_client import WeaveClient +from weave.trace.serialization.serialize import to_json +from weave.trace.weave_client import WeaveClient, map_to_refs from weave.trace_server import trace_server_interface as tsi +from weave.trace_server.calls_query_builder.calls_query_builder import ( + ALLOWED_CALL_FIELDS, + ALLOWED_DYNAMIC_FIELD_PREFIXES, +) from weave.trace_server.errors import InvalidFieldError, ObjectNameTypeCollision from weave.trace_server.interface.builtin_object_classes.test_only_example import ( TestOnlyNestedBaseModel, @@ -1155,32 +1160,17 @@ def test_obj_create_rejects_name_type_collision(client: WeaveClient): ) -def _monitor_val(query: dict | None) -> dict: - """A serialized Monitor object val carrying a query and weave class hierarchy.""" - return { - "name": "test-monitor", - "description": None, - "sampling_rate": 1.0, - "scorers": [], - "op_names": [], - "query": query, - "is_traced": True, - "active": False, - "scorer_debounce_config": None, - "_type": "Monitor", - "_class_name": "Monitor", - "_bases": ["Object", "BaseModel"], - } - - -def _create_monitor(client: WeaveClient, object_id: str, query: dict | None): +def _create_monitor(client: WeaveClient, name: str, query: dict | None): + """Create a Monitor via the client's serialization so the stored `query` carries weave bookkeeping keys, exercising the server-side strip + validation.""" + monitor = weave.Monitor(name=name, scorers=[], query=query) + val = to_json(map_to_refs(monitor), client.project_id, client) return client.server.obj_create( tsi.ObjCreateReq.model_validate( { "obj": { "project_id": client.project_id, - "object_id": object_id, - "val": _monitor_val(query), + "object_id": name, + "val": val, } } ) @@ -1188,17 +1178,19 @@ def _create_monitor(client: WeaveClient, object_id: str, query: dict | None): def test_monitor_create_rejects_unknown_query_field(client: WeaveClient): - """A Monitor query on an unknown field is rejected at create with the allowed list.""" + """A Monitor query on an unknown field is rejected with the complete allowed-field list.""" bad_query = { "$expr": {"$eq": [{"$getField": "operation_name"}, {"$literal": "predict"}]} } with pytest.raises(InvalidFieldError) as exc_info: _create_monitor(client, "bad-monitor", bad_query) - message = str(exc_info.value) - assert "Field operation_name is not allowed" in message - assert "op_name" in message - assert "summary.weave.*" in message - assert "inputs.*" in message + + expected_message = ( + "Field operation_name is not allowed. " + f"Allowed fields: {', '.join(sorted(ALLOWED_CALL_FIELDS))}. " + f"Allowed dynamic field prefixes: {', '.join(ALLOWED_DYNAMIC_FIELD_PREFIXES)}" + ) + assert str(exc_info.value) == expected_message # Nothing was stored for the rejected monitor. objs_res = client.server.objs_query( @@ -1228,13 +1220,9 @@ def test_monitor_create_accepts_valid_query_fields(client: WeaveClient): "$expr": {"$eq": [{"$getField": "inputs.foo"}, {"$literal": "bar"}]} } - valid_res = _create_monitor(client, "valid-monitor", valid_query) - dynamic_res = _create_monitor(client, "dynamic-monitor", dynamic_query) - no_query_res = _create_monitor(client, "no-query-monitor", None) - - assert valid_res.object_id == "valid-monitor" - assert dynamic_res.object_id == "dynamic-monitor" - assert no_query_res.object_id == "no-query-monitor" + _create_monitor(client, "valid-monitor", valid_query) + _create_monitor(client, "dynamic-monitor", dynamic_query) + _create_monitor(client, "no-query-monitor", None) objs_res = client.server.objs_query( tsi.ObjQueryReq.model_validate( diff --git a/weave/trace_server/calls_query_builder/calls_query_builder.py b/weave/trace_server/calls_query_builder/calls_query_builder.py index 397c9097af3c..a7310bdaef36 100644 --- a/weave/trace_server/calls_query_builder/calls_query_builder.py +++ b/weave/trace_server/calls_query_builder/calls_query_builder.py @@ -1943,18 +1943,25 @@ def get_field_by_name(name: str) -> CallsMergedField: return ALLOWED_CALL_FIELDS[name] -# Dotted prefixes get_field_by_name accepts beyond ALLOWED_CALL_FIELDS. -ALLOWED_DYNAMIC_FIELD_PREFIXES = ( +# Dotted prefixes get_field_by_name accepts beyond ALLOWED_CALL_FIELDS. The +# dynamic-field prefixes are derived from ALLOWED_CALL_FIELDS so they can't +# drift as `*_dump` dynamic fields are added or removed. +_DUMP_SUFFIX = "_dump" +_SPECIAL_DYNAMIC_FIELD_PREFIXES = ( "feedback.*", "annotation_queue_items.queue_id", "summary.weave.*", - "inputs.*", - "output.*", - "attributes.*", - "summary.*", +) +ALLOWED_DYNAMIC_FIELD_PREFIXES = _SPECIAL_DYNAMIC_FIELD_PREFIXES + tuple( + f"{name[: -len(_DUMP_SUFFIX)]}.*" + for name, field in ALLOWED_CALL_FIELDS.items() + if isinstance(field, CallsMergedDynamicField) and name.endswith(_DUMP_SUFFIX) ) +# Serialized `_class_name`/`_bases` values for Monitor objects. These mirror the +# SDK classes in weave/flow/monitor.py but are kept as server-side strings on +# purpose: the trace server must not import from weave/flow. MONITOR_OBJECT_CLASSES = frozenset({"Monitor", "ClassifierMonitor"}) From e0e4c5283675b8c5dc9e43ba32cfdf27807eebcf Mon Sep 17 00:00:00 2001 From: gtarpenning Date: Thu, 11 Jun 2026 16:32:10 -0700 Subject: [PATCH 3/3] fix(weave): reject malformed monitor queries cleanly + hide _dump field names Address review on monitor query validation: - guard tsi Query.model_validate so an object that merely shares the Monitor class name with a non-query payload is left untouched, not 500'd - map ValueError/TypeError from compilation (e.g. empty $and) to InvalidRequest instead of leaking a raw 500 - drop internal `*_dump` column names from the allowed-fields error message - strip only the three weave bookkeeping keys, not every `_`-prefixed key - rename validate_query_field_names -> validate_query_compiles Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/trace/test_base_object_classes.py | 43 +++++++++++++++++-- .../calls_query_builder.py | 35 ++++++++++----- 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/tests/trace/test_base_object_classes.py b/tests/trace/test_base_object_classes.py index 604d23552551..17c178749bd2 100644 --- a/tests/trace/test_base_object_classes.py +++ b/tests/trace/test_base_object_classes.py @@ -24,10 +24,15 @@ from weave.trace.weave_client import WeaveClient, map_to_refs from weave.trace_server import trace_server_interface as tsi from weave.trace_server.calls_query_builder.calls_query_builder import ( + _DUMP_SUFFIX, ALLOWED_CALL_FIELDS, ALLOWED_DYNAMIC_FIELD_PREFIXES, ) -from weave.trace_server.errors import InvalidFieldError, ObjectNameTypeCollision +from weave.trace_server.errors import ( + InvalidFieldError, + InvalidRequest, + ObjectNameTypeCollision, +) from weave.trace_server.interface.builtin_object_classes.test_only_example import ( TestOnlyNestedBaseModel, ) @@ -1185,17 +1190,28 @@ def test_monitor_create_rejects_unknown_query_field(client: WeaveClient): with pytest.raises(InvalidFieldError) as exc_info: _create_monitor(client, "bad-monitor", bad_query) + allowed = ", ".join( + sorted(k for k in ALLOWED_CALL_FIELDS if not k.endswith(_DUMP_SUFFIX)) + ) expected_message = ( "Field operation_name is not allowed. " - f"Allowed fields: {', '.join(sorted(ALLOWED_CALL_FIELDS))}. " + f"Allowed fields: {allowed}. " f"Allowed dynamic field prefixes: {', '.join(ALLOWED_DYNAMIC_FIELD_PREFIXES)}" ) assert str(exc_info.value) == expected_message + assert _DUMP_SUFFIX not in str(exc_info.value) - # Nothing was stored for the rejected monitor. + # A structurally invalid query (empty $and) is a bad request, not a field error. + with pytest.raises(InvalidRequest): + _create_monitor(client, "empty-and-monitor", {"$expr": {"$and": []}}) + + # Neither rejected monitor was stored. objs_res = client.server.objs_query( tsi.ObjQueryReq.model_validate( - {"project_id": client.project_id, "filter": {"object_ids": ["bad-monitor"]}} + { + "project_id": client.project_id, + "filter": {"object_ids": ["bad-monitor", "empty-and-monitor"]}, + } ) ) assert objs_res.objs == [] @@ -1224,6 +1240,23 @@ def test_monitor_create_accepts_valid_query_fields(client: WeaveClient): _create_monitor(client, "dynamic-monitor", dynamic_query) _create_monitor(client, "no-query-monitor", None) + # A Monitor-classed object whose `query` is not a recognizable query shape is + # left untouched (we only validate queries we understand), never 500'd. + monitor = weave.Monitor(name="opaque-monitor", scorers=[], query=None) + opaque_val = to_json(map_to_refs(monitor), client.project_id, client) + opaque_val["query"] = "not-a-query" + client.server.obj_create( + tsi.ObjCreateReq.model_validate( + { + "obj": { + "project_id": client.project_id, + "object_id": "opaque-monitor", + "val": opaque_val, + } + } + ) + ) + objs_res = client.server.objs_query( tsi.ObjQueryReq.model_validate( { @@ -1233,6 +1266,7 @@ def test_monitor_create_accepts_valid_query_fields(client: WeaveClient): "valid-monitor", "dynamic-monitor", "no-query-monitor", + "opaque-monitor", ] }, } @@ -1242,4 +1276,5 @@ def test_monitor_create_accepts_valid_query_fields(client: WeaveClient): "valid-monitor", "dynamic-monitor", "no-query-monitor", + "opaque-monitor", } diff --git a/weave/trace_server/calls_query_builder/calls_query_builder.py b/weave/trace_server/calls_query_builder/calls_query_builder.py index a7310bdaef36..42b8cf921247 100644 --- a/weave/trace_server/calls_query_builder/calls_query_builder.py +++ b/weave/trace_server/calls_query_builder/calls_query_builder.py @@ -31,7 +31,7 @@ from dataclasses import dataclass from typing import Any, Literal, cast -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ValidationError from typing_extensions import Self from weave.shared.trace_server_interface_util import ( @@ -64,7 +64,7 @@ trace_id_index_expr, ) from weave.trace_server.common_interface import SortBy -from weave.trace_server.errors import InvalidFieldError +from weave.trace_server.errors import InvalidFieldError, InvalidRequest from weave.trace_server.interface import query as tsi_query from weave.trace_server.interface.feedback_types import MULTI_VALUE_FEEDBACK_TYPES from weave.trace_server.interface.query import ( @@ -1943,9 +1943,11 @@ def get_field_by_name(name: str) -> CallsMergedField: return ALLOWED_CALL_FIELDS[name] -# Dotted prefixes get_field_by_name accepts beyond ALLOWED_CALL_FIELDS. The -# dynamic-field prefixes are derived from ALLOWED_CALL_FIELDS so they can't -# drift as `*_dump` dynamic fields are added or removed. +# Field references get_field_by_name accepts beyond exact ALLOWED_CALL_FIELDS +# keys, used only to build the user-facing error message. The `*_dump` prefixes +# are derived so they can't drift; the special entries must be kept in sync by +# hand with the explicit branches in get_field_by_name above +# (`annotation_queue_items.queue_id` is an exact ref, not a prefix). _DUMP_SUFFIX = "_dump" _SPECIAL_DYNAMIC_FIELD_PREFIXES = ( "feedback.*", @@ -1982,15 +1984,26 @@ def validate_monitor_query_fields( if raw_query is None: return cleaned = _strip_weave_object_keys(raw_query) - validate_query_field_names(tsi_query.Query.model_validate(cleaned)) + try: + query = tsi_query.Query.model_validate(cleaned) + except ValidationError: + # Not a recognizable query (e.g. a user object that happens to share + # the Monitor class name); leave the write untouched. + return + validate_query_compiles(query) -def validate_query_field_names(query: tsi_query.Query) -> None: - """Validate every field reference in `query` against the allowed call fields.""" +def validate_query_compiles(query: tsi_query.Query) -> None: + """Validate that `query` references only allowed call fields and is well-formed.""" try: process_query_to_conditions(query, ParamBuilder(), "calls_merged") except InvalidFieldError as e: raise InvalidFieldError(_invalid_field_message(str(e))) from e + except (ValueError, TypeError) as e: + raise InvalidRequest(f"Invalid query: {e}") from e + + +_WEAVE_BOOKKEEPING_KEYS = frozenset({"_type", "_class_name", "_bases"}) def _strip_weave_object_keys(value: object) -> object: @@ -1999,7 +2012,7 @@ def _strip_weave_object_keys(value: object) -> object: return { k: _strip_weave_object_keys(v) for k, v in value.items() - if not k.startswith("_") + if k not in _WEAVE_BOOKKEEPING_KEYS } if isinstance(value, list): return [_strip_weave_object_keys(v) for v in value] @@ -2008,7 +2021,9 @@ def _strip_weave_object_keys(value: object) -> object: def _invalid_field_message(reason: str) -> str: """Append the allowed field list and dynamic prefixes to a field rejection.""" - allowed = ", ".join(sorted(ALLOWED_CALL_FIELDS)) + allowed = ", ".join( + sorted(k for k in ALLOWED_CALL_FIELDS if not k.endswith(_DUMP_SUFFIX)) + ) prefixes = ", ".join(ALLOWED_DYNAMIC_FIELD_PREFIXES) return ( f"{reason}. Allowed fields: {allowed}. "