Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- **GFQL public schema declarations (#1337)**: Added experimental `graphistry.schema` exports for `NodeType`, `EdgeType`, `GraphSchema`, and `EdgeTopology`, plus top-level `graphistry` re-exports. `NodeType` and `EdgeType` accept Arrow-first `pyarrow.Schema` declarations, preserve dtype/nullability through GFQL `RowSchema`, and export back to Arrow with label/type columns via `to_arrow()`. `graphistry.bind(..., schema=schema)` / `g.bind(schema=schema)` attach public schema declarations to plotters, and Cypher preflight validation consumes the adapted internal `GraphSchemaCatalog` for declared labels, properties, relationship types, and source/destination topology checks. `GraphSchema(strict=False)` makes schema-bound `g.gfql_validate(...)` permissive by default while explicit call-level `strict=True` still forces strict validation.

### Changed
- **GFQL schema physical-column concordance (#1640)**: `GraphSchema` now rejects incompatible logical types for the same physical node or edge property column across declared labels/types, including Arrow-imported declarations. Same-type nullability differences remain type-local, and merged table Arrow schemas mark the aggregate field nullable when needed.
- **GFQL call executor implementation shrink (#1058)**: DRYed private call execution, postcall graph-stat selection, and policy exception enrichment while preserving validated `call()` execution, postcall-on-error behavior, and policy-denial precedence.
- **AI feature test/runtime performance (#1058)**: Reused normalized `SentenceTransformer` model instances within each Python process during `encode_textual()` calls, reducing repeated model construction in `test-full-ai` and user workflows that encode with the same model repeatedly. Added `test-full-ai` duration reporting for continued CI profiling.
- **GFQL Cypher result postprocess shrink (#1058)**: Collapsed private result-projection alias/metadata helpers while preserving prefixed alias whole-row rendering, reentry entity metadata, and pandas/cuDF projection behavior.
Expand Down
16 changes: 14 additions & 2 deletions docs/source/gfql/schema.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ Schema Objects
Groups node/edge contracts and adapts them to the internal
``GraphSchemaCatalog`` used by binder/preflight validation. ``strict=False``
makes schema-bound ``g.gfql_validate(...)`` permissive by default; callers can
still override per call with ``g.gfql_validate(..., strict=True)``.
still override per call with ``g.gfql_validate(..., strict=True)``. A physical
node property column must have the same logical type for every node type that
declares it, and a physical edge property column must have the same logical
type for every edge type that declares it. Use separate column names when two
labels or relationship types need incompatible values under the same property
name.

``NodeType.to_arrow()`` and ``EdgeType.to_arrow()``
Export declarations as ``pyarrow.Schema`` objects through GFQL's row-schema
Expand All @@ -110,7 +115,14 @@ Schema Objects
Export/import a declaration payload containing per-node/per-edge Arrow
schemas plus merged ``nodes`` and ``edges`` table schemas. The merged schemas
are useful for dataframe boundary validation; the per-type entries preserve
type names and edge topology.
type names and edge topology. When the same column is declared with the same
Arrow type but different nullability, merged table schemas mark that column as
nullable while the per-type declarations keep their original nullability.

Nullability is type-local. If ``Cat.lives`` is declared non-nullable and
``House`` does not declare ``lives``, ``Cat.lives`` remains non-nullable in the
``Cat`` declaration. Boundary validation against a full fused node table still
accounts for which labels are active in each row.

What Preflight Checks
---------------------
Expand Down
133 changes: 131 additions & 2 deletions graphistry/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from __future__ import annotations

import json
from dataclasses import dataclass, field
from typing import Any, Dict, FrozenSet, Iterable, Mapping, Optional, Tuple, Union, cast

Expand All @@ -18,6 +19,7 @@
NodeRefInput = Union["NodeType", str, Iterable[str]]
PropertySchemaInput = Union[Mapping[str, Any], RowSchema, Any]
GraphArrowDeclaration = Mapping[str, Any]
_METADATA_LOGICAL_TYPE_KEY = b"gfql.logical_type"


def _is_arrow_schema(value: Any) -> bool:
Expand Down Expand Up @@ -145,6 +147,63 @@ def _labels_from_node_ref(value: NodeRefInput) -> FrozenSet[str]:
return frozenset(str(label) for label in value if str(label))


def _logical_type_concordance_key(logical_type: LogicalType) -> Tuple[Any, ...]:
if isinstance(logical_type, ScalarType):
return ("scalar", logical_type.kind.lower())
if isinstance(logical_type, ListType):
return ("list", _logical_type_concordance_key(logical_type.element_type))
if isinstance(logical_type, NodeRef):
return ("node", tuple(sorted(logical_type.labels)))
if isinstance(logical_type, EdgeRef):
return (
"edge",
logical_type.type,
logical_type.src_label,
logical_type.dst_label,
)
if isinstance(logical_type, PathType):
return ("path", logical_type.min_hops, logical_type.max_hops)
return ("unknown", repr(logical_type))


def _validate_physical_column_concordance(
*,
kind: str,
declarations: Iterable[Tuple[str, Mapping[str, LogicalType]]],
) -> None:
seen: Dict[str, Tuple[str, LogicalType, Tuple[Any, ...]]] = {}
for owner, properties in declarations:
for column, logical_type in properties.items():
key = _logical_type_concordance_key(logical_type)
existing = seen.get(column)
if existing is None:
seen[column] = (owner, logical_type, key)
continue

existing_owner, existing_type, existing_key = existing
if existing_key != key:
raise ValueError(
f"Conflicting GraphSchema declaration for {kind} column {column!r}: "
f"{existing_owner} has {existing_type!r}; {owner} has {logical_type!r}. "
"Use one physical column type across all labels/types, or rename one "
"property column."
)


def _validate_graph_schema_concordance(
node_types: Iterable["NodeType"],
edge_types: Iterable["EdgeType"],
) -> None:
_validate_physical_column_concordance(
kind="nodes",
declarations=((node_type.name, node_type.properties) for node_type in node_types),
)
_validate_physical_column_concordance(
kind="edges",
declarations=((edge_type.name, edge_type.properties) for edge_type in edge_types),
)


@dataclass(frozen=True)
class NodeType:
"""Experimental declarative node contract for GFQL schema validation.
Expand Down Expand Up @@ -342,8 +401,11 @@ def __init__(
edge_source_column: Optional[str] = None,
edge_destination_column: Optional[str] = None,
) -> None:
object.__setattr__(self, "node_types", tuple(node_types))
object.__setattr__(self, "edge_types", tuple(edge_types))
node_types_tuple = tuple(node_types)
edge_types_tuple = tuple(edge_types)
_validate_graph_schema_concordance(node_types_tuple, edge_types_tuple)
object.__setattr__(self, "node_types", node_types_tuple)
object.__setattr__(self, "edge_types", edge_types_tuple)
object.__setattr__(self, "strict", bool(strict))
object.__setattr__(self, "node_id_column", node_id_column)
object.__setattr__(self, "edge_source_column", edge_source_column)
Expand Down Expand Up @@ -538,6 +600,15 @@ def _merge_arrow_schemas(schemas: Iterable[Any], *, kind: str) -> Any:
for arrow_field in schema:
existing = fields.get(arrow_field.name)
if existing is not None and existing != arrow_field:
merged_metadata = _merge_field_metadata_for_nullability(existing, arrow_field)
if existing.type == arrow_field.type and merged_metadata is not None:
fields[arrow_field.name] = pa.field(
arrow_field.name,
arrow_field.type,
nullable=existing.nullable or arrow_field.nullable,
metadata=merged_metadata,
)
continue
raise ValueError(
f"Conflicting Arrow declaration for {kind} column {arrow_field.name!r}: "
f"{existing!r} vs {arrow_field!r}"
Expand All @@ -546,6 +617,64 @@ def _merge_arrow_schemas(schemas: Iterable[Any], *, kind: str) -> Any:
return pa.schema(list(fields.values()), metadata=metadata)


def _merge_field_metadata_for_nullability(existing: Any, incoming: Any) -> Optional[Mapping[bytes, bytes]]:
if existing.metadata == incoming.metadata:
return existing.metadata

existing_metadata = dict(existing.metadata or {})
incoming_metadata = dict(incoming.metadata or {})
existing_payload_raw = existing_metadata.get(_METADATA_LOGICAL_TYPE_KEY)
incoming_payload_raw = incoming_metadata.get(_METADATA_LOGICAL_TYPE_KEY)
if existing_payload_raw is None or incoming_payload_raw is None:
return None

existing_other = {
key: value
for key, value in existing_metadata.items()
if key != _METADATA_LOGICAL_TYPE_KEY
}
incoming_other = {
key: value
for key, value in incoming_metadata.items()
if key != _METADATA_LOGICAL_TYPE_KEY
}
if existing_other != incoming_other:
return None

try:
existing_payload = json.loads(existing_payload_raw.decode("utf-8"))
incoming_payload = json.loads(incoming_payload_raw.decode("utf-8"))
except (AttributeError, json.JSONDecodeError, UnicodeDecodeError):
return None

if not (
isinstance(existing_payload, dict)
and isinstance(incoming_payload, dict)
and existing_payload.get("family") == "scalar"
and incoming_payload.get("family") == "scalar"
):
return None

existing_compare = dict(existing_payload)
incoming_compare = dict(incoming_payload)
existing_compare.pop("nullable", None)
incoming_compare.pop("nullable", None)
if existing_compare != incoming_compare:
return None

merged_payload = dict(incoming_payload)
merged_payload["nullable"] = bool(
existing_payload.get("nullable", True)
or incoming_payload.get("nullable", True)
)
merged_metadata = dict(incoming_metadata)
merged_metadata[_METADATA_LOGICAL_TYPE_KEY] = json.dumps(
merged_payload,
sort_keys=True,
).encode("utf-8")
return merged_metadata


def _edge_type_from_arrow_entry(
name: str,
entry: Any,
Expand Down
145 changes: 144 additions & 1 deletion graphistry/tests/compute/gfql/test_public_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from graphistry.exceptions import SchemaValidationError
from graphistry.compute.exceptions import ErrorCode, GFQLValidationError
from graphistry.compute.gfql.ir.logical_plan import RowSchema
from graphistry.compute.gfql.ir.types import ScalarType
from graphistry.compute.gfql.ir.types import ListType, NodeRef, ScalarType
from graphistry.schema import EdgeType, GraphSchema, NodeType


Expand Down Expand Up @@ -185,6 +185,149 @@ def test_graph_schema_arrow_declaration_round_trip() -> None:
assert imported.edge_types[0].topology.as_metadata() == schema.edge_types[0].topology.as_metadata()


def test_graph_schema_rejects_conflicting_node_physical_column_types() -> None:
with pytest.raises(ValueError) as exc_info:
GraphSchema(
node_types=[
NodeType("Cat", {"age": int}),
NodeType("Dog", {"age": str}),
]
)

message = str(exc_info.value)
assert "nodes" in message
assert "age" in message
assert "Cat" in message
assert "Dog" in message
assert "int64" in message
assert "string" in message


def test_graph_schema_rejects_conflicting_edge_physical_column_types() -> None:
with pytest.raises(ValueError) as exc_info:
GraphSchema(
edge_types=[
EdgeType("LIKES", source="Cat", destination="Toy", properties={"weight": int}),
EdgeType("CHASES", source="Dog", destination="Toy", properties={"weight": str}),
]
)

message = str(exc_info.value)
assert "edges" in message
assert "weight" in message
assert "LIKES" in message
assert "CHASES" in message
assert "int64" in message
assert "string" in message


def test_graph_schema_from_arrow_rejects_conflicting_node_physical_column_types() -> None:
pa = pytest.importorskip("pyarrow")

with pytest.raises(ValueError) as exc_info:
GraphSchema.from_arrow(
{
"node_types": {
"Cat": pa.schema([pa.field("age", pa.int64())]),
"Dog": pa.schema([pa.field("age", pa.large_string())]),
},
"edge_types": {},
}
)

message = str(exc_info.value)
assert "nodes" in message
assert "age" in message
assert "Cat" in message
assert "Dog" in message


def test_graph_schema_from_arrow_rejects_conflicting_edge_physical_column_types() -> None:
pa = pytest.importorskip("pyarrow")

with pytest.raises(ValueError) as exc_info:
GraphSchema.from_arrow(
{
"node_types": {},
"edge_types": {
"LIKES": {
"source": ("Cat",),
"destination": ("Toy",),
"schema": pa.schema([pa.field("weight", pa.int64())]),
},
"CHASES": {
"source": ("Dog",),
"destination": ("Toy",),
"schema": pa.schema([pa.field("weight", pa.large_string())]),
},
},
}
)

message = str(exc_info.value)
assert "edges" in message
assert "weight" in message
assert "LIKES" in message
assert "CHASES" in message


def test_graph_schema_rejects_conflicting_list_physical_column_types() -> None:
with pytest.raises(ValueError) as exc_info:
GraphSchema(
node_types=[
NodeType("Cat", {"tags": ListType(ScalarType("int64"))}),
NodeType("Dog", {"tags": ListType(ScalarType("string"))}),
]
)

message = str(exc_info.value)
assert "nodes" in message
assert "tags" in message
assert "Cat" in message
assert "Dog" in message


def test_graph_schema_rejects_conflicting_node_ref_physical_column_types() -> None:
with pytest.raises(ValueError) as exc_info:
GraphSchema(
node_types=[
NodeType("Cat", {"friend": NodeRef(frozenset({"Cat"}))}),
NodeType("Dog", {"friend": NodeRef(frozenset({"Dog"}))}),
]
)

message = str(exc_info.value)
assert "nodes" in message
assert "friend" in message
assert "Cat" in message
assert "Dog" in message


def test_graph_schema_allows_type_local_nullability_for_shared_node_column() -> None:
pa = pytest.importorskip("pyarrow")

cat = NodeType("Cat", pa.schema([pa.field("lives", pa.int64(), nullable=False)]))
dog = NodeType("Dog", pa.schema([pa.field("lives", pa.int64(), nullable=True)]))

schema = GraphSchema(node_types=[cat, dog])

assert schema.node_types[0].properties["lives"] == ScalarType("int64", nullable=False)
assert schema.node_types[1].properties["lives"] == ScalarType("int64", nullable=True)
assert schema.node_arrow().field("lives").nullable is True


def test_graph_schema_absent_property_does_not_change_type_local_nullability() -> None:
pa = pytest.importorskip("pyarrow")

cat = NodeType("Cat", pa.schema([pa.field("lives", pa.int64(), nullable=False)]))
house = NodeType("House", pa.schema([pa.field("address", pa.large_string())]))

schema = GraphSchema(node_types=[cat, house])

assert schema.node_types[0].properties["lives"] == ScalarType("int64", nullable=False)
assert "lives" not in schema.node_types[1].properties


def test_bind_schema_is_chainable_and_used_by_preflight() -> None:
schema = _schema()
g = _graph(schema).bind(point_color="name")
Expand Down
Loading