Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions Gradata/src/gradata/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,23 @@ def cmd_health(args):


def cmd_report(args):
report_type = args.type
if report_type == "case-study-seed":
from gradata.enhancements.case_study_seed import (
generate_case_study_seed,
render_case_study_markdown,
)

brain_root = _resolve_brain_root(args)
seed = generate_case_study_seed(brain_root / "system.db")
if getattr(args, "json", False):
print(json.dumps(seed, indent=2))
else:
print(render_case_study_markdown(seed), end="")
return

brain = _get_brain(args)

try:
try:
from gradata_cloud.scoring.reports import (
Expand Down Expand Up @@ -2102,9 +2118,14 @@ def main():
p_health.add_argument("--json", action="store_true")

# report
p_report = sub.add_parser("report", help="Generate reports (csv, metrics, rules)")
p_report.add_argument("type", choices=["csv", "metrics", "rules", "health"], help="Report type")
p_report = sub.add_parser("report", help="Generate reports (csv, metrics, rules, health, case-study-seed)")
p_report.add_argument(
"type",
choices=["csv", "metrics", "rules", "health", "case-study-seed"],
help="Report type",
)
p_report.add_argument("--window", type=int, default=20, help="Rolling window size")
p_report.add_argument("--json", action="store_true", help="Output supported reports as JSON")

# watch — sidecar file watcher
p_watch = sub.add_parser("watch", help="Watch a directory for AI-generated file edits")
Expand Down
198 changes: 198 additions & 0 deletions Gradata/src/gradata/enhancements/case_study_seed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
"""Case-study seed reports from local Gradata evidence.

The report is intentionally evidence-first: it summarizes correction/rule/application
counts without emitting raw prompts or drafts by default.
"""

from __future__ import annotations

import contextlib
import json
import sqlite3
from collections import Counter
from pathlib import Path
from typing import Any

_RULE_EVENT_TYPES = {"RULE_GRADUATED"}
_INJECTION_EVENT_TYPES = {
"LESSON_APPLIED",
"LESSON_FIRED",
"RULE_APPLICATION",
"JIT_INJECTION",
}


def _rows(db_path: Path) -> list[dict[str, Any]]:
if not db_path.exists():
return []
with contextlib.closing(sqlite3.connect(str(db_path))) as con:
con.row_factory = sqlite3.Row
try:
rows = con.execute(
"SELECT ts, session, type, source, data_json FROM events ORDER BY id ASC"
).fetchall()
except sqlite3.OperationalError:
return []
out: list[dict[str, Any]] = []
for row in rows:
try:
data = json.loads(row["data_json"] or "{}")
except json.JSONDecodeError:
data = {}
out.append(
{
"ts": row["ts"],
"session": row["session"],
"type": row["type"],
"source": row["source"],
"data": data if isinstance(data, dict) else {},
}
)
return out


def _event_data(event: dict[str, Any]) -> dict[str, Any]:
data = event.get("data")
return data if isinstance(data, dict) else {}


def _category_pattern(event: dict[str, Any]) -> tuple[str, str]:
data = _event_data(event)
category = str(data.get("category") or data.get("rule_category") or "uncategorized")
pattern = str(
data.get("pattern")
or data.get("lesson_description")
or data.get("rule")
or data.get("text")
or "unspecified pattern"
)
return category, pattern


def _matches(event: dict[str, Any], category: str, pattern: str) -> bool:
ev_category, ev_pattern = _category_pattern(event)
return ev_category == category and (ev_pattern == pattern or pattern in ev_pattern or ev_pattern in pattern)


def _safe_before_after(event: dict[str, Any]) -> dict[str, Any]:
data = _event_data(event)
before_summary = data.get("before_summary") or data.get("draft_summary") or "raw content omitted"
after_summary = data.get("after_summary") or data.get("final_summary") or "raw content omitted"
return {
"session": event.get("session"),
"before_summary": str(before_summary),
"after_summary": str(after_summary),
}


def generate_case_study_seed(db_path: str | Path) -> dict[str, Any]:
"""Return a privacy-safe case-study seed from local event evidence.

Raw fields such as ``before``, ``after``, ``draft``, ``final``, and prompts are
not included. Callers get summaries/counts/caveats suitable for requesting a
real customer's publication permission; not a synthetic testimonial.
"""
db = Path(db_path)
events = _rows(db)
corrections = [e for e in events if e.get("type") == "CORRECTION"]
rule_events = [e for e in events if e.get("type") in _RULE_EVENT_TYPES]
injection_events = [e for e in events if e.get("type") in _INJECTION_EVENT_TYPES]

counts = Counter(_category_pattern(e) for e in corrections)
if counts:
(category, pattern), matching_count = counts.most_common(1)[0]
else:
category, pattern, matching_count = "none", "no correction evidence found", 0

matching_corrections = [e for e in corrections if _category_pattern(e) == (category, pattern)]
associated_rules = []
for event in rule_events:
if _matches(event, category, pattern):
data = _event_data(event)
associated_rules.append(
{
"session": event.get("session"),
"rule": str(data.get("rule") or data.get("text") or data.get("pattern") or pattern),
"category": str(data.get("category") or category),
}
)

matching_injections = [e for e in injection_events if _category_pattern(e)[0] == category]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Narrow injection matching to the selected mistake pattern.

matching_injections currently filters only by category, so unrelated injections in the same category inflate the seed evidence for the top pattern.

Suggested fix
-    matching_injections = [e for e in injection_events if _category_pattern(e)[0] == category]
+    matching_injections = [e for e in injection_events if _matches(e, category, pattern)]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/enhancements/case_study_seed.py` at line 120,
matching_injections currently only filters injection_events by category using
_category_pattern(e)[0]; narrow it to the chosen mistake pattern as well by
adding a second predicate that compares the event's pattern id/name (from
_category_pattern(e)[1]) to the selected pattern variable (e.g.,
selected_pattern or selected_mistake_pattern) so the comprehension becomes:
filter injection_events where _category_pattern(e)[0] == category AND
_category_pattern(e)[1] == selected_pattern (use the actual variable name used
in this scope).


return {
"report": "case-study-seed",
"source_db": str(db),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Avoid exposing full local DB paths in privacy-safe output.

Line [124] includes the full filesystem path in source_db, which can leak host/user info when this seed is shared externally.

Suggested fix
-        "source_db": str(db),
+        "source_db": db.name,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/enhancements/case_study_seed.py` at line 124, The seed
currently sets "source_db": str(db) exposing full filesystem paths; change this
to a privacy-safe representation by replacing the full path with a sanitized
value (e.g., Path(db).name or os.path.basename(db) or replace the user's home
dir with "~") before assigning to source_db; update imports if needed
(pathlib.Path or os.path) and keep the variable names source_db and db so the
change is localized and easy to spot in case_study_seed.py.

"top_repeated_mistake": {
"category": category,
"pattern": pattern,
"correction_count": matching_count,
},
"associated_rules": associated_rules[:5],
"before_after_evidence": [_safe_before_after(e) for e in matching_corrections[:3]],
"event_counts": {
"corrections": len(corrections),
"matching_corrections": len(matching_corrections),
"rules_graduated": len(associated_rules),
"injections_or_applications": len(matching_injections),
},
"privacy": {
"raw_prompt_content_included": False,
"redaction_note": "Raw drafts/prompts/finals are omitted by default; only summaries/counts are emitted.",
},
"caveats": [
"This is a seed for human review and customer permission, not a publish-ready claim.",
"Counts are local to this brain's system.db and may miss unsynced/cloud-only events.",
"Before/after evidence uses summary fields when present; otherwise it records that raw content was omitted.",
],
}


def render_case_study_markdown(seed: dict[str, Any]) -> str:
top = seed.get("top_repeated_mistake", {})
counts = seed.get("event_counts", {})
lines = [
"# Case-study seed",
"",
"Evidence-only draft for human review and publication permission.",
"",
"## Top repeated mistake",
f"- Category: {top.get('category', 'unknown')}",
f"- Pattern: {top.get('pattern', 'unknown')}",
f"- Matching corrections: {top.get('correction_count', 0)}",
"",
"## Associated rules",
]
rules = seed.get("associated_rules") or []
if rules:
for rule in rules:
lines.append(f"- {rule.get('rule', '')}")
else:
lines.append("- None found in local RULE_GRADUATED events.")
lines.extend(["", "## Before/after evidence"])
evidence = seed.get("before_after_evidence") or []
if evidence:
for item in evidence:
lines.append(
f"- Session {item.get('session')}: before={item.get('before_summary')} -> after={item.get('after_summary')}"
)
else:
lines.append("- No correction summaries available.")
lines.extend(
[
"",
"## Evidence counts",
f"- Corrections: {counts.get('corrections', 0)}",
f"- Matching corrections: {counts.get('matching_corrections', 0)}",
f"- Rules graduated: {counts.get('rules_graduated', 0)}",
f"- Injections/applications: {counts.get('injections_or_applications', 0)}",
"",
"## Privacy",
f"- Raw prompt content included: {seed.get('privacy', {}).get('raw_prompt_content_included', False)}",
f"- Note: {seed.get('privacy', {}).get('redaction_note', '')}",
"",
"## Caveats",
]
)
for caveat in seed.get("caveats", []):
lines.append(f"- {caveat}")
return "\n".join(lines) + "\n"
125 changes: 125 additions & 0 deletions Gradata/tests/test_case_study_seed_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import annotations

import json
import sqlite3
from pathlib import Path
from types import SimpleNamespace

from gradata.cli import cmd_report
from gradata.enhancements.case_study_seed import (
generate_case_study_seed,
render_case_study_markdown,
)


def _seed_brain(brain_dir: Path) -> None:
brain_dir.mkdir(parents=True, exist_ok=True)
con = sqlite3.connect(brain_dir / "system.db")
con.execute(
"""
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
session INTEGER,
type TEXT NOT NULL,
source TEXT,
data_json TEXT
)
"""
)
con.commit()
con.close()


def _event(brain_dir: Path, etype: str, data: dict, *, session: int = 1, ts: str = "2026-06-08T10:00:00+00:00") -> None:
con = sqlite3.connect(brain_dir / "system.db")
con.execute(
"INSERT INTO events(ts, session, type, source, data_json) VALUES (?,?,?,?,?)",
(ts, session, etype, "test", json.dumps(data)),
)
con.commit()
con.close()


def test_case_study_seed_uses_top_repeated_mistake_and_omits_raw_prompt(tmp_path):
brain_dir = tmp_path / "brain"
_seed_brain(brain_dir)
for session in (1, 2, 3):
_event(
brain_dir,
"CORRECTION",
{
"category": "tone",
"pattern": "AI draft sounds too formal",
"before": "Dear Jane, confidential enterprise pricing is attached",
"after": "Jane — quick note with pricing next steps",
"before_summary": "Overly formal outreach",
"after_summary": "Short direct AE-style note",
},
session=session,
)
_event(
brain_dir,
"CORRECTION",
{"category": "format", "pattern": "Too many bullets", "before": "SECRET", "after": "ok"},
session=4,
)
_event(
brain_dir,
"RULE_GRADUATED",
{"category": "tone", "pattern": "AI draft sounds too formal", "rule": "Use concise AE-style language."},
session=5,
)
_event(
brain_dir,
"LESSON_APPLIED",
{"category": "tone", "lesson_description": "Use concise AE-style language."},
session=6,
)

seed = generate_case_study_seed(brain_dir / "system.db")

assert seed["top_repeated_mistake"]["category"] == "tone"
assert seed["top_repeated_mistake"]["pattern"] == "AI draft sounds too formal"
assert seed["event_counts"] == {
"corrections": 4,
"matching_corrections": 3,
"rules_graduated": 1,
"injections_or_applications": 1,
}
assert seed["before_after_evidence"][0] == {
"session": 1,
"before_summary": "Overly formal outreach",
"after_summary": "Short direct AE-style note",
}
assert len(seed["before_after_evidence"]) == 3
assert "confidential enterprise pricing" not in json.dumps(seed)
assert seed["privacy"]["raw_prompt_content_included"] is False


def test_render_case_study_markdown_is_evidence_not_testimonial(tmp_path):
brain_dir = tmp_path / "brain"
_seed_brain(brain_dir)
_event(brain_dir, "CORRECTION", {"category": "testing", "pattern": "Skipped focused tests"})

markdown = render_case_study_markdown(generate_case_study_seed(brain_dir / "system.db"))

assert "# Case-study seed" in markdown
assert "Top repeated mistake" in markdown
assert "Evidence counts" in markdown
assert "Caveats" in markdown
assert "testimonial" not in markdown.lower()


def test_report_case_study_seed_json_cli_output(tmp_path, capsys, monkeypatch):
monkeypatch.delenv("GRADATA_BRAIN", raising=False)
monkeypatch.delenv("BRAIN_DIR", raising=False)
brain_dir = tmp_path / "brain"
_seed_brain(brain_dir)
_event(brain_dir, "CORRECTION", {"category": "api", "pattern": "Invented API fields"})

cmd_report(SimpleNamespace(brain_dir=brain_dir, type="case-study-seed", window=20, json=True))

out = capsys.readouterr().out
data = json.loads(out)
assert data["top_repeated_mistake"]["pattern"] == "Invented API fields"
Loading