diff --git a/Gradata/src/gradata/cli.py b/Gradata/src/gradata/cli.py index 34fe147d..e4c6c412 100644 --- a/Gradata/src/gradata/cli.py +++ b/Gradata/src/gradata/cli.py @@ -665,7 +665,23 @@ def cmd_health(args): def cmd_report(args): + report_type = args.type + if report_type == "case-study-seed": + from gradata.enhancements.case_study_seed import ( + generate_case_study_seed, + render_case_study_markdown, + ) + + brain_root = _resolve_brain_root(args) + seed = generate_case_study_seed(brain_root / "system.db") + if getattr(args, "json", False): + print(json.dumps(seed, indent=2)) + else: + print(render_case_study_markdown(seed), end="") + return + brain = _get_brain(args) + try: try: from gradata_cloud.scoring.reports import ( @@ -2102,9 +2118,14 @@ def main(): p_health.add_argument("--json", action="store_true") # report - p_report = sub.add_parser("report", help="Generate reports (csv, metrics, rules)") - p_report.add_argument("type", choices=["csv", "metrics", "rules", "health"], help="Report type") + p_report = sub.add_parser("report", help="Generate reports (csv, metrics, rules, health, case-study-seed)") + p_report.add_argument( + "type", + choices=["csv", "metrics", "rules", "health", "case-study-seed"], + help="Report type", + ) p_report.add_argument("--window", type=int, default=20, help="Rolling window size") + p_report.add_argument("--json", action="store_true", help="Output supported reports as JSON") # watch — sidecar file watcher p_watch = sub.add_parser("watch", help="Watch a directory for AI-generated file edits") diff --git a/Gradata/src/gradata/enhancements/case_study_seed.py b/Gradata/src/gradata/enhancements/case_study_seed.py new file mode 100644 index 00000000..36b4fbcd --- /dev/null +++ b/Gradata/src/gradata/enhancements/case_study_seed.py @@ -0,0 +1,198 @@ +"""Case-study seed reports from local Gradata evidence. + +The report is intentionally evidence-first: it summarizes correction/rule/application +counts without emitting raw prompts or drafts by default. +""" + +from __future__ import annotations + +import contextlib +import json +import sqlite3 +from collections import Counter +from pathlib import Path +from typing import Any + +_RULE_EVENT_TYPES = {"RULE_GRADUATED"} +_INJECTION_EVENT_TYPES = { + "LESSON_APPLIED", + "LESSON_FIRED", + "RULE_APPLICATION", + "JIT_INJECTION", +} + + +def _rows(db_path: Path) -> list[dict[str, Any]]: + if not db_path.exists(): + return [] + with contextlib.closing(sqlite3.connect(str(db_path))) as con: + con.row_factory = sqlite3.Row + try: + rows = con.execute( + "SELECT ts, session, type, source, data_json FROM events ORDER BY id ASC" + ).fetchall() + except sqlite3.OperationalError: + return [] + out: list[dict[str, Any]] = [] + for row in rows: + try: + data = json.loads(row["data_json"] or "{}") + except json.JSONDecodeError: + data = {} + out.append( + { + "ts": row["ts"], + "session": row["session"], + "type": row["type"], + "source": row["source"], + "data": data if isinstance(data, dict) else {}, + } + ) + return out + + +def _event_data(event: dict[str, Any]) -> dict[str, Any]: + data = event.get("data") + return data if isinstance(data, dict) else {} + + +def _category_pattern(event: dict[str, Any]) -> tuple[str, str]: + data = _event_data(event) + category = str(data.get("category") or data.get("rule_category") or "uncategorized") + pattern = str( + data.get("pattern") + or data.get("lesson_description") + or data.get("rule") + or data.get("text") + or "unspecified pattern" + ) + return category, pattern + + +def _matches(event: dict[str, Any], category: str, pattern: str) -> bool: + ev_category, ev_pattern = _category_pattern(event) + return ev_category == category and (ev_pattern == pattern or pattern in ev_pattern or ev_pattern in pattern) + + +def _safe_before_after(event: dict[str, Any]) -> dict[str, Any]: + data = _event_data(event) + before_summary = data.get("before_summary") or data.get("draft_summary") or "raw content omitted" + after_summary = data.get("after_summary") or data.get("final_summary") or "raw content omitted" + return { + "session": event.get("session"), + "before_summary": str(before_summary), + "after_summary": str(after_summary), + } + + +def generate_case_study_seed(db_path: str | Path) -> dict[str, Any]: + """Return a privacy-safe case-study seed from local event evidence. + + Raw fields such as ``before``, ``after``, ``draft``, ``final``, and prompts are + not included. Callers get summaries/counts/caveats suitable for requesting a + real customer's publication permission; not a synthetic testimonial. + """ + db = Path(db_path) + events = _rows(db) + corrections = [e for e in events if e.get("type") == "CORRECTION"] + rule_events = [e for e in events if e.get("type") in _RULE_EVENT_TYPES] + injection_events = [e for e in events if e.get("type") in _INJECTION_EVENT_TYPES] + + counts = Counter(_category_pattern(e) for e in corrections) + if counts: + (category, pattern), matching_count = counts.most_common(1)[0] + else: + category, pattern, matching_count = "none", "no correction evidence found", 0 + + matching_corrections = [e for e in corrections if _category_pattern(e) == (category, pattern)] + associated_rules = [] + for event in rule_events: + if _matches(event, category, pattern): + data = _event_data(event) + associated_rules.append( + { + "session": event.get("session"), + "rule": str(data.get("rule") or data.get("text") or data.get("pattern") or pattern), + "category": str(data.get("category") or category), + } + ) + + matching_injections = [e for e in injection_events if _category_pattern(e)[0] == category] + + return { + "report": "case-study-seed", + "source_db": str(db), + "top_repeated_mistake": { + "category": category, + "pattern": pattern, + "correction_count": matching_count, + }, + "associated_rules": associated_rules[:5], + "before_after_evidence": [_safe_before_after(e) for e in matching_corrections[:3]], + "event_counts": { + "corrections": len(corrections), + "matching_corrections": len(matching_corrections), + "rules_graduated": len(associated_rules), + "injections_or_applications": len(matching_injections), + }, + "privacy": { + "raw_prompt_content_included": False, + "redaction_note": "Raw drafts/prompts/finals are omitted by default; only summaries/counts are emitted.", + }, + "caveats": [ + "This is a seed for human review and customer permission, not a publish-ready claim.", + "Counts are local to this brain's system.db and may miss unsynced/cloud-only events.", + "Before/after evidence uses summary fields when present; otherwise it records that raw content was omitted.", + ], + } + + +def render_case_study_markdown(seed: dict[str, Any]) -> str: + top = seed.get("top_repeated_mistake", {}) + counts = seed.get("event_counts", {}) + lines = [ + "# Case-study seed", + "", + "Evidence-only draft for human review and publication permission.", + "", + "## Top repeated mistake", + f"- Category: {top.get('category', 'unknown')}", + f"- Pattern: {top.get('pattern', 'unknown')}", + f"- Matching corrections: {top.get('correction_count', 0)}", + "", + "## Associated rules", + ] + rules = seed.get("associated_rules") or [] + if rules: + for rule in rules: + lines.append(f"- {rule.get('rule', '')}") + else: + lines.append("- None found in local RULE_GRADUATED events.") + lines.extend(["", "## Before/after evidence"]) + evidence = seed.get("before_after_evidence") or [] + if evidence: + for item in evidence: + lines.append( + f"- Session {item.get('session')}: before={item.get('before_summary')} -> after={item.get('after_summary')}" + ) + else: + lines.append("- No correction summaries available.") + lines.extend( + [ + "", + "## Evidence counts", + f"- Corrections: {counts.get('corrections', 0)}", + f"- Matching corrections: {counts.get('matching_corrections', 0)}", + f"- Rules graduated: {counts.get('rules_graduated', 0)}", + f"- Injections/applications: {counts.get('injections_or_applications', 0)}", + "", + "## Privacy", + f"- Raw prompt content included: {seed.get('privacy', {}).get('raw_prompt_content_included', False)}", + f"- Note: {seed.get('privacy', {}).get('redaction_note', '')}", + "", + "## Caveats", + ] + ) + for caveat in seed.get("caveats", []): + lines.append(f"- {caveat}") + return "\n".join(lines) + "\n" diff --git a/Gradata/tests/test_case_study_seed_report.py b/Gradata/tests/test_case_study_seed_report.py new file mode 100644 index 00000000..3e0fc179 --- /dev/null +++ b/Gradata/tests/test_case_study_seed_report.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import json +import sqlite3 +from pathlib import Path +from types import SimpleNamespace + +from gradata.cli import cmd_report +from gradata.enhancements.case_study_seed import ( + generate_case_study_seed, + render_case_study_markdown, +) + + +def _seed_brain(brain_dir: Path) -> None: + brain_dir.mkdir(parents=True, exist_ok=True) + con = sqlite3.connect(brain_dir / "system.db") + con.execute( + """ + CREATE TABLE events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts TEXT NOT NULL, + session INTEGER, + type TEXT NOT NULL, + source TEXT, + data_json TEXT + ) + """ + ) + con.commit() + con.close() + + +def _event(brain_dir: Path, etype: str, data: dict, *, session: int = 1, ts: str = "2026-06-08T10:00:00+00:00") -> None: + con = sqlite3.connect(brain_dir / "system.db") + con.execute( + "INSERT INTO events(ts, session, type, source, data_json) VALUES (?,?,?,?,?)", + (ts, session, etype, "test", json.dumps(data)), + ) + con.commit() + con.close() + + +def test_case_study_seed_uses_top_repeated_mistake_and_omits_raw_prompt(tmp_path): + brain_dir = tmp_path / "brain" + _seed_brain(brain_dir) + for session in (1, 2, 3): + _event( + brain_dir, + "CORRECTION", + { + "category": "tone", + "pattern": "AI draft sounds too formal", + "before": "Dear Jane, confidential enterprise pricing is attached", + "after": "Jane — quick note with pricing next steps", + "before_summary": "Overly formal outreach", + "after_summary": "Short direct AE-style note", + }, + session=session, + ) + _event( + brain_dir, + "CORRECTION", + {"category": "format", "pattern": "Too many bullets", "before": "SECRET", "after": "ok"}, + session=4, + ) + _event( + brain_dir, + "RULE_GRADUATED", + {"category": "tone", "pattern": "AI draft sounds too formal", "rule": "Use concise AE-style language."}, + session=5, + ) + _event( + brain_dir, + "LESSON_APPLIED", + {"category": "tone", "lesson_description": "Use concise AE-style language."}, + session=6, + ) + + seed = generate_case_study_seed(brain_dir / "system.db") + + assert seed["top_repeated_mistake"]["category"] == "tone" + assert seed["top_repeated_mistake"]["pattern"] == "AI draft sounds too formal" + assert seed["event_counts"] == { + "corrections": 4, + "matching_corrections": 3, + "rules_graduated": 1, + "injections_or_applications": 1, + } + assert seed["before_after_evidence"][0] == { + "session": 1, + "before_summary": "Overly formal outreach", + "after_summary": "Short direct AE-style note", + } + assert len(seed["before_after_evidence"]) == 3 + assert "confidential enterprise pricing" not in json.dumps(seed) + assert seed["privacy"]["raw_prompt_content_included"] is False + + +def test_render_case_study_markdown_is_evidence_not_testimonial(tmp_path): + brain_dir = tmp_path / "brain" + _seed_brain(brain_dir) + _event(brain_dir, "CORRECTION", {"category": "testing", "pattern": "Skipped focused tests"}) + + markdown = render_case_study_markdown(generate_case_study_seed(brain_dir / "system.db")) + + assert "# Case-study seed" in markdown + assert "Top repeated mistake" in markdown + assert "Evidence counts" in markdown + assert "Caveats" in markdown + assert "testimonial" not in markdown.lower() + + +def test_report_case_study_seed_json_cli_output(tmp_path, capsys, monkeypatch): + monkeypatch.delenv("GRADATA_BRAIN", raising=False) + monkeypatch.delenv("BRAIN_DIR", raising=False) + brain_dir = tmp_path / "brain" + _seed_brain(brain_dir) + _event(brain_dir, "CORRECTION", {"category": "api", "pattern": "Invented API fields"}) + + cmd_report(SimpleNamespace(brain_dir=brain_dir, type="case-study-seed", window=20, json=True)) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["top_repeated_mistake"]["pattern"] == "Invented API fields"