diff --git a/CHANGELOG.md b/CHANGELOG.md index a0e5639f9b..9fe264d4db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - **GFQL public schema declarations (#1337)**: Added experimental `graphistry.schema` exports for `NodeType`, `EdgeType`, `GraphSchema`, and `EdgeTopology`, plus top-level `graphistry` re-exports. `NodeType` and `EdgeType` accept Arrow-first `pyarrow.Schema` declarations, preserve dtype/nullability through GFQL `RowSchema`, and export back to Arrow with label/type columns via `to_arrow()`. `graphistry.bind(..., schema=schema)` / `g.bind(schema=schema)` attach public schema declarations to plotters, and Cypher preflight validation consumes the adapted internal `GraphSchemaCatalog` for declared labels, properties, relationship types, and source/destination topology checks. `GraphSchema(strict=False)` makes schema-bound `g.gfql_validate(...)` permissive by default while explicit call-level `strict=True` still forces strict validation. ### Changed +- **GFQL schema physical-column concordance (#1640)**: `GraphSchema` now rejects incompatible logical types for the same physical node or edge property column across declared labels/types, including Arrow-imported declarations. Same-type nullability differences remain type-local, and merged table Arrow schemas mark the aggregate field nullable when needed. - **GFQL call executor implementation shrink (#1058)**: DRYed private call execution, postcall graph-stat selection, and policy exception enrichment while preserving validated `call()` execution, postcall-on-error behavior, and policy-denial precedence. - **AI feature test/runtime performance (#1058)**: Reused normalized `SentenceTransformer` model instances within each Python process during `encode_textual()` calls, reducing repeated model construction in `test-full-ai` and user workflows that encode with the same model repeatedly. Added `test-full-ai` duration reporting for continued CI profiling. - **GFQL Cypher result postprocess shrink (#1058)**: Collapsed private result-projection alias/metadata helpers while preserving prefixed alias whole-row rendering, reentry entity metadata, and pandas/cuDF projection behavior. diff --git a/docs/source/gfql/schema.rst b/docs/source/gfql/schema.rst index 9d99b41f41..ae87a815b4 100644 --- a/docs/source/gfql/schema.rst +++ b/docs/source/gfql/schema.rst @@ -94,7 +94,12 @@ Schema Objects Groups node/edge contracts and adapts them to the internal ``GraphSchemaCatalog`` used by binder/preflight validation. ``strict=False`` makes schema-bound ``g.gfql_validate(...)`` permissive by default; callers can - still override per call with ``g.gfql_validate(..., strict=True)``. + still override per call with ``g.gfql_validate(..., strict=True)``. A physical + node property column must have the same logical type for every node type that + declares it, and a physical edge property column must have the same logical + type for every edge type that declares it. Use separate column names when two + labels or relationship types need incompatible values under the same property + name. ``NodeType.to_arrow()`` and ``EdgeType.to_arrow()`` Export declarations as ``pyarrow.Schema`` objects through GFQL's row-schema @@ -110,7 +115,14 @@ Schema Objects Export/import a declaration payload containing per-node/per-edge Arrow schemas plus merged ``nodes`` and ``edges`` table schemas. The merged schemas are useful for dataframe boundary validation; the per-type entries preserve - type names and edge topology. + type names and edge topology. When the same column is declared with the same + Arrow type but different nullability, merged table schemas mark that column as + nullable while the per-type declarations keep their original nullability. + +Nullability is type-local. If ``Cat.lives`` is declared non-nullable and +``House`` does not declare ``lives``, ``Cat.lives`` remains non-nullable in the +``Cat`` declaration. Boundary validation against a full fused node table still +accounts for which labels are active in each row. What Preflight Checks --------------------- diff --git a/graphistry/schema.py b/graphistry/schema.py index 203e08c771..ef670202ba 100644 --- a/graphistry/schema.py +++ b/graphistry/schema.py @@ -6,6 +6,7 @@ from __future__ import annotations +import json from dataclasses import dataclass, field from typing import Any, Dict, FrozenSet, Iterable, Mapping, Optional, Tuple, Union, cast @@ -18,6 +19,7 @@ NodeRefInput = Union["NodeType", str, Iterable[str]] PropertySchemaInput = Union[Mapping[str, Any], RowSchema, Any] GraphArrowDeclaration = Mapping[str, Any] +_METADATA_LOGICAL_TYPE_KEY = b"gfql.logical_type" def _is_arrow_schema(value: Any) -> bool: @@ -145,6 +147,63 @@ def _labels_from_node_ref(value: NodeRefInput) -> FrozenSet[str]: return frozenset(str(label) for label in value if str(label)) +def _logical_type_concordance_key(logical_type: LogicalType) -> Tuple[Any, ...]: + if isinstance(logical_type, ScalarType): + return ("scalar", logical_type.kind.lower()) + if isinstance(logical_type, ListType): + return ("list", _logical_type_concordance_key(logical_type.element_type)) + if isinstance(logical_type, NodeRef): + return ("node", tuple(sorted(logical_type.labels))) + if isinstance(logical_type, EdgeRef): + return ( + "edge", + logical_type.type, + logical_type.src_label, + logical_type.dst_label, + ) + if isinstance(logical_type, PathType): + return ("path", logical_type.min_hops, logical_type.max_hops) + return ("unknown", repr(logical_type)) + + +def _validate_physical_column_concordance( + *, + kind: str, + declarations: Iterable[Tuple[str, Mapping[str, LogicalType]]], +) -> None: + seen: Dict[str, Tuple[str, LogicalType, Tuple[Any, ...]]] = {} + for owner, properties in declarations: + for column, logical_type in properties.items(): + key = _logical_type_concordance_key(logical_type) + existing = seen.get(column) + if existing is None: + seen[column] = (owner, logical_type, key) + continue + + existing_owner, existing_type, existing_key = existing + if existing_key != key: + raise ValueError( + f"Conflicting GraphSchema declaration for {kind} column {column!r}: " + f"{existing_owner} has {existing_type!r}; {owner} has {logical_type!r}. " + "Use one physical column type across all labels/types, or rename one " + "property column." + ) + + +def _validate_graph_schema_concordance( + node_types: Iterable["NodeType"], + edge_types: Iterable["EdgeType"], +) -> None: + _validate_physical_column_concordance( + kind="nodes", + declarations=((node_type.name, node_type.properties) for node_type in node_types), + ) + _validate_physical_column_concordance( + kind="edges", + declarations=((edge_type.name, edge_type.properties) for edge_type in edge_types), + ) + + @dataclass(frozen=True) class NodeType: """Experimental declarative node contract for GFQL schema validation. @@ -342,8 +401,11 @@ def __init__( edge_source_column: Optional[str] = None, edge_destination_column: Optional[str] = None, ) -> None: - object.__setattr__(self, "node_types", tuple(node_types)) - object.__setattr__(self, "edge_types", tuple(edge_types)) + node_types_tuple = tuple(node_types) + edge_types_tuple = tuple(edge_types) + _validate_graph_schema_concordance(node_types_tuple, edge_types_tuple) + object.__setattr__(self, "node_types", node_types_tuple) + object.__setattr__(self, "edge_types", edge_types_tuple) object.__setattr__(self, "strict", bool(strict)) object.__setattr__(self, "node_id_column", node_id_column) object.__setattr__(self, "edge_source_column", edge_source_column) @@ -538,6 +600,15 @@ def _merge_arrow_schemas(schemas: Iterable[Any], *, kind: str) -> Any: for arrow_field in schema: existing = fields.get(arrow_field.name) if existing is not None and existing != arrow_field: + merged_metadata = _merge_field_metadata_for_nullability(existing, arrow_field) + if existing.type == arrow_field.type and merged_metadata is not None: + fields[arrow_field.name] = pa.field( + arrow_field.name, + arrow_field.type, + nullable=existing.nullable or arrow_field.nullable, + metadata=merged_metadata, + ) + continue raise ValueError( f"Conflicting Arrow declaration for {kind} column {arrow_field.name!r}: " f"{existing!r} vs {arrow_field!r}" @@ -546,6 +617,64 @@ def _merge_arrow_schemas(schemas: Iterable[Any], *, kind: str) -> Any: return pa.schema(list(fields.values()), metadata=metadata) +def _merge_field_metadata_for_nullability(existing: Any, incoming: Any) -> Optional[Mapping[bytes, bytes]]: + if existing.metadata == incoming.metadata: + return existing.metadata + + existing_metadata = dict(existing.metadata or {}) + incoming_metadata = dict(incoming.metadata or {}) + existing_payload_raw = existing_metadata.get(_METADATA_LOGICAL_TYPE_KEY) + incoming_payload_raw = incoming_metadata.get(_METADATA_LOGICAL_TYPE_KEY) + if existing_payload_raw is None or incoming_payload_raw is None: + return None + + existing_other = { + key: value + for key, value in existing_metadata.items() + if key != _METADATA_LOGICAL_TYPE_KEY + } + incoming_other = { + key: value + for key, value in incoming_metadata.items() + if key != _METADATA_LOGICAL_TYPE_KEY + } + if existing_other != incoming_other: + return None + + try: + existing_payload = json.loads(existing_payload_raw.decode("utf-8")) + incoming_payload = json.loads(incoming_payload_raw.decode("utf-8")) + except (AttributeError, json.JSONDecodeError, UnicodeDecodeError): + return None + + if not ( + isinstance(existing_payload, dict) + and isinstance(incoming_payload, dict) + and existing_payload.get("family") == "scalar" + and incoming_payload.get("family") == "scalar" + ): + return None + + existing_compare = dict(existing_payload) + incoming_compare = dict(incoming_payload) + existing_compare.pop("nullable", None) + incoming_compare.pop("nullable", None) + if existing_compare != incoming_compare: + return None + + merged_payload = dict(incoming_payload) + merged_payload["nullable"] = bool( + existing_payload.get("nullable", True) + or incoming_payload.get("nullable", True) + ) + merged_metadata = dict(incoming_metadata) + merged_metadata[_METADATA_LOGICAL_TYPE_KEY] = json.dumps( + merged_payload, + sort_keys=True, + ).encode("utf-8") + return merged_metadata + + def _edge_type_from_arrow_entry( name: str, entry: Any, diff --git a/graphistry/tests/compute/gfql/test_public_schema.py b/graphistry/tests/compute/gfql/test_public_schema.py index 163ab90877..6ed0e023fd 100644 --- a/graphistry/tests/compute/gfql/test_public_schema.py +++ b/graphistry/tests/compute/gfql/test_public_schema.py @@ -7,7 +7,7 @@ from graphistry.exceptions import SchemaValidationError from graphistry.compute.exceptions import ErrorCode, GFQLValidationError from graphistry.compute.gfql.ir.logical_plan import RowSchema -from graphistry.compute.gfql.ir.types import ScalarType +from graphistry.compute.gfql.ir.types import ListType, NodeRef, ScalarType from graphistry.schema import EdgeType, GraphSchema, NodeType @@ -185,6 +185,149 @@ def test_graph_schema_arrow_declaration_round_trip() -> None: assert imported.edge_types[0].topology.as_metadata() == schema.edge_types[0].topology.as_metadata() +def test_graph_schema_rejects_conflicting_node_physical_column_types() -> None: + with pytest.raises(ValueError) as exc_info: + GraphSchema( + node_types=[ + NodeType("Cat", {"age": int}), + NodeType("Dog", {"age": str}), + ] + ) + + message = str(exc_info.value) + assert "nodes" in message + assert "age" in message + assert "Cat" in message + assert "Dog" in message + assert "int64" in message + assert "string" in message + + +def test_graph_schema_rejects_conflicting_edge_physical_column_types() -> None: + with pytest.raises(ValueError) as exc_info: + GraphSchema( + edge_types=[ + EdgeType("LIKES", source="Cat", destination="Toy", properties={"weight": int}), + EdgeType("CHASES", source="Dog", destination="Toy", properties={"weight": str}), + ] + ) + + message = str(exc_info.value) + assert "edges" in message + assert "weight" in message + assert "LIKES" in message + assert "CHASES" in message + assert "int64" in message + assert "string" in message + + +def test_graph_schema_from_arrow_rejects_conflicting_node_physical_column_types() -> None: + pa = pytest.importorskip("pyarrow") + + with pytest.raises(ValueError) as exc_info: + GraphSchema.from_arrow( + { + "node_types": { + "Cat": pa.schema([pa.field("age", pa.int64())]), + "Dog": pa.schema([pa.field("age", pa.large_string())]), + }, + "edge_types": {}, + } + ) + + message = str(exc_info.value) + assert "nodes" in message + assert "age" in message + assert "Cat" in message + assert "Dog" in message + + +def test_graph_schema_from_arrow_rejects_conflicting_edge_physical_column_types() -> None: + pa = pytest.importorskip("pyarrow") + + with pytest.raises(ValueError) as exc_info: + GraphSchema.from_arrow( + { + "node_types": {}, + "edge_types": { + "LIKES": { + "source": ("Cat",), + "destination": ("Toy",), + "schema": pa.schema([pa.field("weight", pa.int64())]), + }, + "CHASES": { + "source": ("Dog",), + "destination": ("Toy",), + "schema": pa.schema([pa.field("weight", pa.large_string())]), + }, + }, + } + ) + + message = str(exc_info.value) + assert "edges" in message + assert "weight" in message + assert "LIKES" in message + assert "CHASES" in message + + +def test_graph_schema_rejects_conflicting_list_physical_column_types() -> None: + with pytest.raises(ValueError) as exc_info: + GraphSchema( + node_types=[ + NodeType("Cat", {"tags": ListType(ScalarType("int64"))}), + NodeType("Dog", {"tags": ListType(ScalarType("string"))}), + ] + ) + + message = str(exc_info.value) + assert "nodes" in message + assert "tags" in message + assert "Cat" in message + assert "Dog" in message + + +def test_graph_schema_rejects_conflicting_node_ref_physical_column_types() -> None: + with pytest.raises(ValueError) as exc_info: + GraphSchema( + node_types=[ + NodeType("Cat", {"friend": NodeRef(frozenset({"Cat"}))}), + NodeType("Dog", {"friend": NodeRef(frozenset({"Dog"}))}), + ] + ) + + message = str(exc_info.value) + assert "nodes" in message + assert "friend" in message + assert "Cat" in message + assert "Dog" in message + + +def test_graph_schema_allows_type_local_nullability_for_shared_node_column() -> None: + pa = pytest.importorskip("pyarrow") + + cat = NodeType("Cat", pa.schema([pa.field("lives", pa.int64(), nullable=False)])) + dog = NodeType("Dog", pa.schema([pa.field("lives", pa.int64(), nullable=True)])) + + schema = GraphSchema(node_types=[cat, dog]) + + assert schema.node_types[0].properties["lives"] == ScalarType("int64", nullable=False) + assert schema.node_types[1].properties["lives"] == ScalarType("int64", nullable=True) + assert schema.node_arrow().field("lives").nullable is True + + +def test_graph_schema_absent_property_does_not_change_type_local_nullability() -> None: + pa = pytest.importorskip("pyarrow") + + cat = NodeType("Cat", pa.schema([pa.field("lives", pa.int64(), nullable=False)])) + house = NodeType("House", pa.schema([pa.field("address", pa.large_string())])) + + schema = GraphSchema(node_types=[cat, house]) + + assert schema.node_types[0].properties["lives"] == ScalarType("int64", nullable=False) + assert "lives" not in schema.node_types[1].properties + + def test_bind_schema_is_chainable_and_used_by_preflight() -> None: schema = _schema() g = _graph(schema).bind(point_color="name")