diff --git a/README.md b/README.md index 61ef8a3be..4af92d0da 100644 --- a/README.md +++ b/README.md @@ -198,6 +198,11 @@ name: strix-penetration-test on: pull_request: +permissions: + security-events: write + actions: read + contents: read + jobs: security-scan: runs-on: ubuntu-latest @@ -214,13 +219,23 @@ jobs: STRIX_LLM: ${{ secrets.STRIX_LLM }} LLM_API_KEY: ${{ secrets.LLM_API_KEY }} - run: strix -n -t ./ --scan-mode quick + run: strix -n -t ./ --scan-mode quick --sarif-output results.sarif + + - name: Upload SARIF to GitHub code scanning + if: always() + uses: github/codeql-action/upload-sarif@v4 + with: + sarif_file: results.sarif ``` > [!TIP] > In CI pull request runs, Strix automatically scopes quick reviews to changed files. > If diff-scope cannot resolve, ensure checkout uses full history (`fetch-depth: 0`) or pass > `--diff-base` explicitly. +> `--sarif-output` writes GitHub-compatible SARIF before Strix exits with code `2` for +> confirmed vulnerabilities, so `if: always()` preserves code scanning upload on findings. +> Use `--sarif` to write `strix_runs//results.sarif`, or +> `--sarif-output ` to choose the upload path. ### Configuration diff --git a/strix/interface/main.py b/strix/interface/main.py index bc88da673..f29f0695a 100644 --- a/strix/interface/main.py +++ b/strix/interface/main.py @@ -9,11 +9,12 @@ import os import shutil import sys +from importlib.metadata import version from pathlib import Path from typing import Any import litellm -from docker.errors import DockerException +from docker.errors import DockerException # type: ignore[import-untyped] from rich.console import Console from rich.panel import Panel from rich.text import Text @@ -44,6 +45,7 @@ ) from strix.runtime.docker_runtime import HOST_GATEWAY_HOSTNAME # noqa: E402 from strix.telemetry import posthog # noqa: E402 +from strix.telemetry.sarif import write_sarif_report # noqa: E402 from strix.telemetry.tracer import get_global_tracer # noqa: E402 @@ -257,8 +259,6 @@ async def warm_up_llm() -> None: def get_version() -> str: try: - from importlib.metadata import version - return version("strix-agent") except Exception: # noqa: BLE001 return "unknown" @@ -387,6 +387,21 @@ def parse_arguments() -> argparse.Namespace: help="Path to a custom config file (JSON) to use instead of ~/.strix/cli-config.json", ) + parser.add_argument( + "--sarif", + action="store_true", + help="Write GitHub code scanning SARIF results after the scan completes.", + ) + + parser.add_argument( + "--sarif-output", + type=str, + help=( + "Path for SARIF output. Defaults to strix_runs//results.sarif " + "when --sarif is enabled." + ), + ) + args = parser.parse_args() if args.instruction and args.instruction_file: @@ -453,7 +468,7 @@ def display_completion_message(args: argparse.Namespace, results_path: Path) -> stats_text = build_final_stats_text(tracer) - panel_parts = [completion_text, "\n\n", target_text] + panel_parts: list[str | Text] = [completion_text, "\n\n", target_text] if stats_text.plain: panel_parts.extend(["\n", stats_text]) @@ -484,6 +499,22 @@ def display_completion_message(args: argparse.Namespace, results_path: Path) -> console.print() +def write_requested_sarif_output(args: argparse.Namespace, results_path: Path) -> Path | None: + """Write SARIF output when requested and report write failures without crashing.""" + if not args.sarif and not args.sarif_output: + return None + + output_path = Path(args.sarif_output) if args.sarif_output else results_path / "results.sarif" + tracer = get_global_tracer() + vulnerability_reports = tracer.vulnerability_reports if tracer else [] + try: + write_sarif_report(output_path, vulnerability_reports, tool_version=get_version()) + except OSError as error: + Console().print(f"[yellow]Failed to write SARIF:[/] {error}") + return None + return output_path + + def pull_docker_image() -> None: console = Console() client = check_docker_connection() @@ -632,6 +663,7 @@ def main() -> None: # noqa: PLR0912, PLR0915 posthog.end(tracer, exit_reason=exit_reason) results_path = Path("strix_runs") / args.run_name + write_requested_sarif_output(args, results_path) display_completion_message(args, results_path) if args.non_interactive: diff --git a/strix/telemetry/sarif.py b/strix/telemetry/sarif.py new file mode 100644 index 000000000..9bdfaa545 --- /dev/null +++ b/strix/telemetry/sarif.py @@ -0,0 +1,270 @@ +"""Build GitHub-compatible SARIF output from Strix vulnerability reports.""" + +from __future__ import annotations + +import json +from pathlib import PurePosixPath +from typing import TYPE_CHECKING, Any, cast + + +if TYPE_CHECKING: + from pathlib import Path + + +SARIF_SCHEMA = "https://json.schemastore.org/sarif-2.1.0.json" +SARIF_VERSION = "2.1.0" +TOOL_NAME = "Strix" +TOOL_INFORMATION_URI = "https://strix.ai" + + +def build_sarif_report( + vulnerability_reports: list[dict[str, Any]], + *, + tool_version: str | None = None, +) -> dict[str, Any]: + """Return a SARIF 2.1.0 document for findings with safe source locations.""" + rules_by_id: dict[str, dict[str, Any]] = {} + results: list[dict[str, Any]] = [] + locationless_findings: list[dict[str, Any]] = [] + dropped_unsafe_location_findings: list[dict[str, Any]] = [] + + for report in vulnerability_reports: + locations, dropped_location_count = _build_locations(report.get("code_locations")) + if not locations: + locationless_findings.append(_locationless_summary(report)) + continue + if dropped_location_count: + dropped_unsafe_location_findings.append( + _dropped_location_summary(report, dropped_location_count) + ) + + rule_id = _rule_id(report) + rules_by_id.setdefault(rule_id, _build_rule(rule_id, report)) + results.append(_build_result(rule_id, report, locations)) + + driver: dict[str, Any] = { + "name": TOOL_NAME, + "informationUri": TOOL_INFORMATION_URI, + "rules": list(rules_by_id.values()), + } + if tool_version: + driver["version"] = tool_version + + run: dict[str, Any] = { + "tool": {"driver": driver}, + "results": results, + } + if locationless_findings: + run["properties"] = { + "locationlessFindingCount": len(locationless_findings), + "locationlessFindings": locationless_findings, + } + if dropped_unsafe_location_findings: + properties = run.setdefault("properties", {}) + properties["droppedUnsafeLocationCount"] = sum( + finding["droppedLocationCount"] for finding in dropped_unsafe_location_findings + ) + properties["droppedUnsafeLocationFindings"] = dropped_unsafe_location_findings + + return { + "version": SARIF_VERSION, + "$schema": SARIF_SCHEMA, + "runs": [run], + } + + +def write_sarif_report( + output_path: Path, + vulnerability_reports: list[dict[str, Any]], + *, + tool_version: str | None = None, +) -> None: + """Write a SARIF report to disk, creating parent directories first.""" + output_path.parent.mkdir(parents=True, exist_ok=True) + sarif = build_sarif_report(vulnerability_reports, tool_version=tool_version) + with output_path.open("w", encoding="utf-8") as sarif_file: + json.dump(sarif, sarif_file, ensure_ascii=False, indent=2) + sarif_file.write("\n") + + +def _build_rule(rule_id: str, report: dict[str, Any]) -> dict[str, Any]: + """Build a SARIF rule descriptor from a Strix finding.""" + title = _string_value(report.get("title")) or rule_id + full_description = _string_value(report.get("description")) or title + rule: dict[str, Any] = { + "id": rule_id, + "name": title, + "shortDescription": {"text": title}, + "fullDescription": {"text": full_description}, + "help": {"text": _help_text(report, full_description)}, + } + + tags = [_string_value(report.get(key)) for key in ("cwe", "cve")] + rule_tags = [tag for tag in tags if tag] + if rule_tags: + rule["properties"] = {"tags": rule_tags} + + return rule + + +def _build_result( + rule_id: str, + report: dict[str, Any], + locations: list[dict[str, Any]], +) -> dict[str, Any]: + """Build one SARIF result using validated physical locations.""" + title = _string_value(report.get("title")) or rule_id + return { + "ruleId": rule_id, + "level": _sarif_level(report.get("severity")), + "message": {"text": title}, + "locations": locations, + "properties": _result_properties(report), + } + + +def _result_properties(report: dict[str, Any]) -> dict[str, Any]: + """Return non-empty Strix finding metadata for SARIF properties.""" + properties: dict[str, Any] = {} + for key in ( + "id", + "severity", + "cvss", + "target", + "endpoint", + "method", + "cve", + "cwe", + "impact", + "remediation_steps", + ): + value = report.get(key) + if value not in (None, ""): + properties[key] = value + return properties + + +def _build_locations(raw_locations: Any) -> tuple[list[dict[str, Any]], int]: + """Return SARIF locations and a count of dropped unsafe locations.""" + if not isinstance(raw_locations, list): + return [], 0 + + raw_locations_list = cast("list[Any]", raw_locations) # type: ignore[redundant-cast] + locations: list[dict[str, Any]] = [] + dropped_location_count = 0 + for raw_location in raw_locations_list: + if not isinstance(raw_location, dict): + dropped_location_count += 1 + continue + + location = cast("dict[str, Any]", raw_location) + file_path = _string_value(location.get("file")) + start_line = location.get("start_line") + end_line = location.get("end_line") + if not file_path or type(start_line) is not int or start_line < 1: + dropped_location_count += 1 + continue + uri = _sarif_uri(file_path) + if uri is None: + dropped_location_count += 1 + continue + + region: dict[str, Any] = {"startLine": start_line} + if type(end_line) is int and end_line >= start_line: + region["endLine"] = end_line + + snippet = _string_value(location.get("snippet")) + if snippet: + region["snippet"] = {"text": snippet} + + locations.append( + { + "physicalLocation": { + "artifactLocation": {"uri": uri}, + "region": region, + } + } + ) + + return locations, dropped_location_count + + +def _rule_id(report: dict[str, Any]) -> str: + """Choose a stable SARIF rule id from CWE, CVE, id, or title.""" + for key in ("cwe", "cve", "id"): + value = _string_value(report.get(key)) + if value: + return value + + title = _string_value(report.get("title")) or "strix-finding" + return _slugify(title) + + +def _sarif_level(severity: Any) -> str: + """Map Strix severity labels to SARIF result levels.""" + normalized = (_string_value(severity) or "").lower() + if normalized in {"critical", "high"}: + return "error" + if normalized == "medium": + return "warning" + return "note" + + +def _sarif_uri(file_path: str) -> str | None: + """Return a safe repo-relative SARIF URI, or None for unsafe paths.""" + uri = PurePosixPath(file_path.replace("\\", "/")).as_posix() + parts = PurePosixPath(uri).parts + if not uri or uri.startswith("/") or not parts: + return None + if ":" in parts[0] or any(part == ".." for part in parts): + return None + return uri + + +def _string_value(value: Any) -> str | None: + """Return a stripped non-empty string value, or None.""" + if isinstance(value, str): + stripped = value.strip() + return stripped or None + return None + + +def _slugify(value: str) -> str: + """Convert arbitrary finding text into a stable lowercase slug.""" + chars = [char.lower() if char.isalnum() else "-" for char in value] + slug = "-".join(part for part in "".join(chars).split("-") if part) + return slug or "strix-finding" + + +def _help_text(report: dict[str, Any], fallback: str) -> str: + """Assemble SARIF help text from finding details and remediation.""" + sections = [ + _string_value(report.get("description")), + _string_value(report.get("impact")), + _string_value(report.get("remediation_steps")), + ] + help_text = "\n\n".join(section for section in sections if section) + return help_text or fallback + + +def _locationless_summary(report: dict[str, Any]) -> dict[str, Any]: + """Summarize findings that cannot be emitted as code-scanning alerts.""" + summary: dict[str, Any] = {} + for key in ("id", "title", "severity", "cwe", "cve", "target", "endpoint", "method"): + value = report.get(key) + if value not in (None, ""): + summary[key] = value + return summary + + +def _dropped_location_summary( + report: dict[str, Any], + dropped_location_count: int, +) -> dict[str, Any]: + """Summarize unsafe locations dropped from a partially emitted finding.""" + summary: dict[str, Any] = {"droppedLocationCount": dropped_location_count} + for key in ("id", "title"): + value = report.get(key) + if value not in (None, ""): + summary[key] = value + return summary diff --git a/tests/interface/test_sarif_cli.py b/tests/interface/test_sarif_cli.py new file mode 100644 index 000000000..17fb2c68f --- /dev/null +++ b/tests/interface/test_sarif_cli.py @@ -0,0 +1,78 @@ +import importlib +import sys +from argparse import Namespace +from pathlib import Path +from typing import Any + +from strix.interface.main import parse_arguments, write_requested_sarif_output + + +main_module = importlib.import_module("strix.interface.main") + + +def test_parse_arguments_accepts_sarif_flags(monkeypatch) -> None: + monkeypatch.setattr( + sys, + "argv", + [ + "strix", + "--target", + "./", + "--non-interactive", + "--sarif", + "--sarif-output", + "results.sarif", + ], + ) + + args = parse_arguments() + + assert args.sarif is True + assert args.sarif_output == "results.sarif" + + +def test_write_requested_sarif_output_writes_before_non_interactive_exit( + monkeypatch, + tmp_path: Path, +) -> None: + output_path = tmp_path / "results.sarif" + args = Namespace(sarif=False, sarif_output=str(output_path)) + + class FakeTracer: + def __init__(self) -> None: + self.vulnerability_reports: list[dict[str, Any]] = [ + { + "id": "vuln-0001", + "title": "Unsafe eval", + "severity": "high", + "code_locations": [{"file": "src/app.py", "start_line": 10, "end_line": 10}], + } + ] + + def get_fake_tracer() -> FakeTracer: + return FakeTracer() + + monkeypatch.setattr(main_module, "get_global_tracer", get_fake_tracer) + + written_path = write_requested_sarif_output(args, tmp_path / "strix_runs" / "demo") + + assert written_path == output_path + assert output_path.exists() + + +def test_write_requested_sarif_output_reports_write_errors( + monkeypatch, + capsys, + tmp_path: Path, +) -> None: + args = Namespace(sarif=True, sarif_output=str(tmp_path / "results.sarif")) + + def raise_write_error(*_args: Any, **_kwargs: Any) -> None: + raise OSError("disk full") + + monkeypatch.setattr(main_module, "write_sarif_report", raise_write_error) + + written_path = write_requested_sarif_output(args, tmp_path / "strix_runs" / "demo") + + assert written_path is None + assert "Failed to write SARIF" in capsys.readouterr().out diff --git a/tests/telemetry/test_sarif.py b/tests/telemetry/test_sarif.py new file mode 100644 index 000000000..091f87fed --- /dev/null +++ b/tests/telemetry/test_sarif.py @@ -0,0 +1,164 @@ +import json +from pathlib import Path +from typing import Any + +from strix.telemetry.sarif import build_sarif_report, write_sarif_report + + +def _finding(**overrides: Any) -> dict[str, Any]: + finding: dict[str, Any] = { + "id": "vuln-0001", + "title": "Unsanitized redirect target", + "severity": "high", + "description": "A user-controlled redirect target is trusted without validation.", + "impact": "Attackers can redirect users to phishing pages.", + "remediation_steps": "Allow-list trusted redirect destinations.", + "cvss": 8.1, + "cwe": "CWE-601", + "cve": "CVE-2026-0001", + "target": "./demo-app", + "endpoint": "/login", + "method": "GET", + "code_locations": [ + { + "file": "src/auth/redirects.py", + "start_line": 42, + "end_line": 45, + "snippet": "return redirect(request.args['next'])", + "label": "User-controlled redirect sink", + } + ], + } + finding.update(overrides) + return finding + + +def test_build_sarif_maps_code_location_and_metadata() -> None: + sarif = build_sarif_report([_finding()], tool_version="0.8.3") + + assert sarif["version"] == "2.1.0" + assert sarif["$schema"] == "https://json.schemastore.org/sarif-2.1.0.json" + + run = sarif["runs"][0] + driver = run["tool"]["driver"] + assert driver["name"] == "Strix" + assert driver["version"] == "0.8.3" + rule = driver["rules"][0] + assert rule["id"] == "CWE-601" + assert rule["fullDescription"]["text"] == ( + "A user-controlled redirect target is trusted without validation." + ) + assert "Allow-list trusted redirect destinations." in rule["help"]["text"] + + result = run["results"][0] + assert result["ruleId"] == "CWE-601" + assert result["level"] == "error" + assert result["message"]["text"] == "Unsanitized redirect target" + assert result["properties"]["cvss"] == 8.1 + assert result["properties"]["cve"] == "CVE-2026-0001" + assert result["properties"]["target"] == "./demo-app" + + location = result["locations"][0]["physicalLocation"] + assert location["artifactLocation"]["uri"] == "src/auth/redirects.py" + assert location["region"] == { + "startLine": 42, + "endLine": 45, + "snippet": {"text": "return redirect(request.args['next'])"}, + } + + +def test_build_sarif_maps_severity_levels() -> None: + findings = [ + _finding(id="vuln-critical", severity="critical"), + _finding(id="vuln-high", severity="high"), + _finding(id="vuln-medium", severity="medium"), + _finding(id="vuln-low", severity="low"), + _finding(id="vuln-info", severity="info"), + ] + + levels = [result["level"] for result in build_sarif_report(findings)["runs"][0]["results"]] + + assert levels == ["error", "error", "warning", "note", "note"] + + +def test_build_sarif_summarizes_locationless_findings_without_code_scanning_results() -> None: + sarif = build_sarif_report([_finding(code_locations=None, cwe=None, cve=None)]) + + run = sarif["runs"][0] + assert run["results"] == [] + assert run["properties"]["locationlessFindingCount"] == 1 + assert run["properties"]["locationlessFindings"][0]["id"] == "vuln-0001" + + +def test_build_sarif_drops_unsafe_code_locations() -> None: + sarif = build_sarif_report( + [ + _finding( + code_locations=[ + {"file": "/tmp/app.py", "start_line": 1, "end_line": 1}, + {"file": "../app.py", "start_line": 2, "end_line": 2}, + {"file": "C:\\Users\\app.py", "start_line": 3, "end_line": 3}, + {"file": "mailto:x", "start_line": 4, "end_line": 4}, + {"file": "foo:bar.py", "start_line": 5, "end_line": 5}, + {"file": "src/app.py", "start_line": 0, "end_line": 1}, + {"file": "src/other.py", "start_line": True, "end_line": True}, + ] + ) + ] + ) + + run = sarif["runs"][0] + assert run["results"] == [] + assert run["properties"]["locationlessFindingCount"] == 1 + assert "droppedUnsafeLocationCount" not in run["properties"] + assert "droppedUnsafeLocationFindings" not in run["properties"] + + +def test_build_sarif_keeps_locations_without_valid_end_line() -> None: + sarif = build_sarif_report( + [ + _finding( + code_locations=[ + {"file": "src/app.py", "start_line": 10}, + {"file": "src/reversed.py", "start_line": 20, "end_line": 19}, + ] + ) + ] + ) + + regions = [ + location["physicalLocation"]["region"] + for location in sarif["runs"][0]["results"][0]["locations"] + ] + assert regions == [{"startLine": 10}, {"startLine": 20}] + + +def test_build_sarif_summarizes_dropped_unsafe_locations_when_safe_locations_remain() -> None: + sarif = build_sarif_report( + [ + _finding( + code_locations=[ + {"file": "src/app.py", "start_line": 10, "end_line": 12}, + {"file": "foo:bar.py", "start_line": 1, "end_line": 1}, + ] + ) + ] + ) + + run = sarif["runs"][0] + assert len(run["results"]) == 1 + assert run["properties"]["droppedUnsafeLocationCount"] == 1 + assert run["properties"]["droppedUnsafeLocationFindings"][0] == { + "id": "vuln-0001", + "title": "Unsanitized redirect target", + "droppedLocationCount": 1, + } + + +def test_write_sarif_report_creates_parent_directories(tmp_path: Path) -> None: + output_path = tmp_path / "nested" / "results.sarif" + + write_sarif_report(output_path, [_finding()], tool_version="0.8.3") + + saved = json.loads(output_path.read_text(encoding="utf-8")) + assert saved["runs"][0]["results"][0]["ruleId"] == "CWE-601"