diff --git a/rust/scripts/perf_logging_plugin.py b/rust/scripts/perf_logging_plugin.py new file mode 100644 index 000000000..6b0aaa33b --- /dev/null +++ b/rust/scripts/perf_logging_plugin.py @@ -0,0 +1,253 @@ +# cspell:: disable +"""Pytest plugin that records per-test-case timing to a CSV. + +Used by ``run_performance_logging.sh`` to capture, for every data-integrity +case, the daemon under test, pass/fail (or the failure summary), the case wall +time, and the aggregate ``nc.log_*`` call timings (avg/max + per-label detail). + +It is non-invasive: loaded via ``PYTEST_ADDOPTS="-p perf_logging_plugin"`` with +this directory on ``PYTHONPATH``, and writes nothing unless ``NDD_CSV_PATH`` is +set. A row is appended the moment each test finishes (after teardown, when that +case's timer stats have been recorded into the suite's ``SESSION_RUNS``). + +Env: + NDD_CSV_PATH CSV file to append per-case rows to (enables recording) + NDD_DAEMON "rust" / "python" (recorded in each row) + NDD_RUN_INDEX run identifier recorded in each row + NDD_STARTED_AT ISO timestamp of the run + NDD_CSV_JSON_DIR optional dir for a per-process JSON backup +""" + +from __future__ import annotations + +import csv +import fcntl +import json +import os +import time +from pathlib import Path + +CSV_COLUMNS = [ + "run_index", + "started_at", + "daemon", + "result", + "issue_type", + "case_id", + "nodeid", + "wall_clock_s", + "log_avg_ms", + "log_max_ms", + "log_call_count", + "log_label_detail", + "pytest_duration_s", +] + +# Timer labels representing a user-facing data-logging call; their avg/max are +# the "log timings" recorded per case. +LOG_LABEL_PREFIX = "nc.log_" + +# nodeid -> aggregated outcome record for this pytest process. +_reports: dict[str, dict] = {} +# nodeids already written to the CSV (guard against double writes). +_written: set[str] = set() +# High-water mark into SESSION_RUNS so each test claims only its own entries. +_session_mark = 0 +# Cached reference to the suite's SESSION_RUNS list (appended to in place). +_session_runs: list | None = None + + +def _get_session_runs() -> list: + global _session_runs + if _session_runs is None: + try: + from tests.integration.platform.data_daemon.shared.test_case.build_test_case import ( # noqa: E501 + SESSION_RUNS, + ) + + _session_runs = SESSION_RUNS + except Exception: # noqa: BLE001 + _session_runs = [] + return _session_runs + + +# --------------------------------------------------------------------------- +# Row assembly +# --------------------------------------------------------------------------- + + +def _param_id(nodeid: str) -> str | None: + if "[" not in nodeid or not nodeid.endswith("]"): + return None + return nodeid[nodeid.rfind("[") + 1 : -1] + + +def _matches(case_param_id: str | None, case_id: str | None) -> bool: + if not case_param_id or not case_id: + return False + if case_param_id == case_id: + return True + if case_id.endswith("-" + case_param_id) or case_id.endswith("/" + case_param_id): + return True + if case_param_id.startswith(case_id + "-"): + return True + return False + + +def _aggregate_log_stats(session_runs: list[dict]) -> dict: + per_label: dict[str, dict[str, float]] = {} + for run in session_runs: + for label, stats in (run.get("timer_stats") or {}).items(): + if not label.startswith(LOG_LABEL_PREFIX): + continue + acc = per_label.setdefault(label, {"count": 0.0, "total": 0.0, "max": 0.0}) + acc["count"] += float(stats.get("count", 0.0)) + acc["total"] += float(stats.get("total", 0.0)) + acc["max"] = max(acc["max"], float(stats.get("max", 0.0))) + + total_count = sum(acc["count"] for acc in per_label.values()) + total_time = sum(acc["total"] for acc in per_label.values()) + overall_max = max((acc["max"] for acc in per_label.values()), default=0.0) + + detail = { + label: { + "n": int(acc["count"]), + "avg_ms": ( + round((acc["total"] / acc["count"]) * 1000, 3) if acc["count"] else None + ), + "max_ms": round(acc["max"] * 1000, 3), + } + for label, acc in sorted(per_label.items()) + } + return { + "log_avg_ms": ( + round((total_time / total_count) * 1000, 3) if total_count else "" + ), + "log_max_ms": round(overall_max * 1000, 3) if total_count else "", + "log_call_count": int(total_count) if total_count else 0, + "log_label_detail": json.dumps(detail) if detail else "", + } + + +def _build_row(report: dict, matched_runs: list[dict]) -> dict: + test_walls = [ + run["test_wall_s"] for run in matched_runs if run.get("test_wall_s") is not None + ] + pytest_duration = float(report.get("duration_s", 0.0)) + wall_clock = max(test_walls) if test_walls else pytest_duration + + row = { + "run_index": os.environ.get("NDD_RUN_INDEX", ""), + "started_at": os.environ.get("NDD_STARTED_AT", ""), + "daemon": os.environ.get("NDD_DAEMON", ""), + "result": report.get("outcome", "unknown"), + "issue_type": report.get("issue") or "", + "case_id": _param_id(report["nodeid"]) or "", + "nodeid": report["nodeid"], + "wall_clock_s": round(wall_clock, 3) if wall_clock is not None else "", + "pytest_duration_s": round(pytest_duration, 3), + } + row.update(_aggregate_log_stats(matched_runs)) + return row + + +def _append_row(csv_path: str, row: dict) -> None: + path = Path(csv_path) + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", newline="") as handle: + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + try: + handle.seek(0, os.SEEK_END) + writer = csv.DictWriter(handle, fieldnames=CSV_COLUMNS) + if handle.tell() == 0: + writer.writeheader() + writer.writerow({key: row.get(key, "") for key in CSV_COLUMNS}) + handle.flush() + os.fsync(handle.fileno()) + finally: + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + + +# --------------------------------------------------------------------------- +# Hooks +# --------------------------------------------------------------------------- + + +def _crash_message(report) -> str | None: + longrepr = getattr(report, "longrepr", None) + if longrepr is None: + return None + crash = getattr(longrepr, "reprcrash", None) + if crash is not None and getattr(crash, "message", None): + first = str(crash.message).strip().splitlines() + if first: + return first[0][:500] + text = str(longrepr).strip() + lines = [line for line in text.splitlines() if line.strip()] + return lines[-1][:500] if lines else None + + +def pytest_runtest_logreport(report) -> None: + record = _reports.setdefault( + report.nodeid, + { + "nodeid": report.nodeid, + "outcome": "passed", + "duration_s": 0.0, + "issue": None, + }, + ) + record["duration_s"] += float(getattr(report, "duration", 0.0) or 0.0) + if report.failed: + record["outcome"] = ( + "error" if report.when in ("setup", "teardown") else "failed" + ) + record["issue"] = _crash_message(report) + elif report.skipped and record["outcome"] == "passed": + record["outcome"] = "skipped" + record["issue"] = _crash_message(report) + + +def pytest_runtest_logfinish(nodeid, location) -> None: + global _session_mark + csv_path = os.environ.get("NDD_CSV_PATH") + if not csv_path or nodeid in _written: + return + report = _reports.get(nodeid) + if report is None: + return + + # SESSION_RUNS entries appended since the previous test belong to this one + # (tests run serially within a process). + session_runs = _get_session_runs() + new_runs = session_runs[_session_mark:] + _session_mark = len(session_runs) + + case_param = _param_id(nodeid) + matched = [run for run in new_runs if _matches(case_param, run.get("case_id"))] + if not matched and new_runs: + matched = list(new_runs) + + try: + _append_row(csv_path, _build_row(report, matched)) + _written.add(nodeid) + except Exception as exc: # noqa: BLE001 - never let reporting break the run + print(f"[perf_logging_plugin] failed to write row for {nodeid}: {exc}") + + +def pytest_sessionfinish(session, exitstatus) -> None: + out_dir = os.environ.get("NDD_CSV_JSON_DIR") + if not out_dir: + return + payload = { + "pid": os.getpid(), + "exitstatus": int(exitstatus), + "finished_at": time.time(), + "reports": list(_reports.values()), + "session_runs": list(_get_session_runs()), + } + target = Path(out_dir) + target.mkdir(parents=True, exist_ok=True) + (target / f"cases_{os.getpid()}_{int(time.time() * 1000)}.json").write_text( + json.dumps(payload, default=str) + ) diff --git a/rust/scripts/run_integration_tests.sh b/rust/scripts/run_integration_tests.sh new file mode 100755 index 000000000..9813b581f --- /dev/null +++ b/rust/scripts/run_integration_tests.sh @@ -0,0 +1,276 @@ +#!/usr/bin/env bash +# Run the data-daemon integration tests against staging with the Rust daemon +# enabled (NCD_RUST_DAEMON=1). +# +# The script cleans every piece of host state the daemon touches before +# starting — a previous SIGKILL'd run can otherwise leave behind iceoryx2 +# nodes, /dev/shm slot segments, pid/socket files, and stale recordings that +# make a fresh run fail in confusing ways. +# +# Tests are ordered from fastest to slowest so a failure earlier in the +# pipeline fails fast instead of waiting for the long-running performance +# suites. With --exitfirst pytest stops at the first failure. + +set -euo pipefail + +script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +workspace_root="$(cd "$script_dir/.." && pwd)" +repo_root="$(cd "$workspace_root/.." && pwd)" + +cd "$repo_root" + +# --------------------------------------------------------------------------- +# Paths +# --------------------------------------------------------------------------- + +# Daemon ipc/socket artefacts (BASE_DIR in neuracore/data_daemon/const.py). +ndd_base_dir="/tmp/ndd" +# iceoryx2 dead-node registry and per-service files (see +# rust/data_daemon/src/lifecycle/recovery.rs). +iceoryx_dir="/tmp/iceoryx2" +# Default daemon pid file (helpers.get_daemon_pid_path / config::env::pid_path). +default_pid_path="$HOME/.neuracore/daemon.pid" +# Default daemon state (logs / DB / recordings live here when no env override). +default_state_dir="$HOME/.neuracore/data_daemon" +# Test-local daemon state — the conftest pins +# NEURACORE_DAEMON_RECORDINGS_ROOT / NEURACORE_DAEMON_DB_PATH at this dir, so +# log files, state.db and recordings end up under it during a test run. +test_state_dir="$repo_root/.data_daemon_test_state" + +log_dir="$repo_root/.integration_test_logs" +mkdir -p "$log_dir" +run_stamp="$(date +%Y%m%d_%H%M%S)" +# stdout/stderr (pytest progress + script output) +log_file="$log_dir/run_${run_stamp}.log" +# pytest structured --log-file (DEBUG records). Kept separate from $log_file +# because pytest opens its --log-file in truncate mode, racing the tee that +# appends script output otherwise. +pytest_log_file="$log_dir/pytest_${run_stamp}.log" + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +log() { + # Mirror to stdout *and* the log file so a tail -f works while pytest runs. + printf '==> %s\n' "$*" | tee -a "$log_file" +} + +# Remove a path tree if it exists; never fail the script on cleanup races. +purge_path() { + local target="$1" + if [[ -e "$target" || -L "$target" ]]; then + rm -rf -- "$target" 2>>"$log_file" || true + fi +} + +# Remove files matching a glob; expand inside the function so the glob is +# evaluated even when no matches exist (nullglob). +# +# NOTE: the unquoted `$pattern` below relies on word-splitting + globbing, which +# is only safe because every caller passes a fixed, space-free prefix (e.g. +# '/dev/shm/neuracore-*'). Do not pass a pattern that can contain spaces. +purge_glob() { + local pattern="$1" + shopt -s nullglob + # shellcheck disable=SC2206 # intentional glob/word-split of a space-free pattern + local matches=( $pattern ) + shopt -u nullglob + if (( ${#matches[@]} > 0 )); then + rm -rf -- "${matches[@]}" 2>>"$log_file" || true + fi +} + +# --------------------------------------------------------------------------- +# Stop any running daemon +# --------------------------------------------------------------------------- + +stop_daemon() { + log "stopping any running data daemon" + + # Try the CLI first; falls through silently if no daemon is up. + neuracore data-daemon stop >>"$log_file" 2>&1 || true + + # Belt-and-braces: kill anything the CLI missed by name. The Rust binary is + # 'data-daemon'; the Python entry point is '-m neuracore.data_daemon'. + pkill -TERM -f 'neuracore[.]data_daemon' 2>/dev/null || true + pkill -TERM -x 'data-daemon' 2>/dev/null || true + sleep 1 + pkill -KILL -f 'neuracore[.]data_daemon' 2>/dev/null || true + pkill -KILL -x 'data-daemon' 2>/dev/null || true +} + +# --------------------------------------------------------------------------- +# Cleanup +# --------------------------------------------------------------------------- + +cleanup_state() { + log "removing iceoryx2 dead-node/service files in $iceoryx_dir" + purge_path "$iceoryx_dir" + + log "removing daemon ipc dir $ndd_base_dir" + purge_path "$ndd_base_dir" + + log "removing neuracore shared-memory segments from /dev/shm" + # _NEURACORE_SHARED_SLOT_PREFIX in + # neuracore/data_daemon/lifecycle/runtime_recovery.py. + purge_glob '/dev/shm/neuracore-*' + # iceoryx2 also stores its shared memory under /dev/shm. + purge_glob '/dev/shm/iox2_*' + + log "removing default daemon pid/lock at $default_pid_path" + purge_path "$default_pid_path" + purge_path "${default_pid_path}.lock" + + log "removing default daemon state dir $default_state_dir" + purge_path "$default_state_dir" + + log "removing test-local daemon state dir $test_state_dir" + purge_path "$test_state_dir" + + log "removing previous integration-test logs (keeping current $log_file)" + shopt -s nullglob + for existing in "$log_dir"/*.log; do + if [[ "$existing" != "$log_file" ]]; then + rm -f -- "$existing" || true + fi + done + shopt -u nullglob +} + +# --------------------------------------------------------------------------- +# Environment +# --------------------------------------------------------------------------- + +# Staging API endpoint and Rust-daemon selection per the task brief. +export NEURACORE_API_URL="https://staging.api.neuracore.com/api" +# Default to the Rust daemon, but honour a caller-provided value so the same +# script can also exercise the Python daemon (NCD_RUST_DAEMON=0) on repeat runs. +export NCD_RUST_DAEMON="${NCD_RUST_DAEMON:-1}" + +# Quiet the SSE consumer + WebRTC producer loops for the duration of the +# integration suite. +export NEURACORE_CONSUME_LIVE_DATA=no +export NEURACORE_PROVIDE_LIVE_DATA=no + +# Highest level of logging across the Python and Rust surfaces. +# Honour a caller-provided value so a harness can disable debug logging. +export NDD_DEBUG="${NDD_DEBUG:-true}" +export PYTHONUNBUFFERED=1 +# Rust tracing — see https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html +export RUST_LOG="${RUST_LOG:-trace}" +export RUST_BACKTRACE=1 + +# --------------------------------------------------------------------------- +# Test ordering (fastest to slowest) +# +# Each entry is a pytest target relative to repo root. The order is hand-rolled +# so a failure surfaces in the cheapest suite first: +# 1. behavioural_correctness/test_startup — single startup smoke +# 2. behavioural_correctness/test_signal_cleanup — many short signal tests +# 3. behavioural_correctness/test_cancel_recording — short, no upload +# 4. behavioural_correctness/test_offline_to_online — recorded -> online flip +# 5. data_integrity/test_pre_network — disk-only integrity +# 6. data_integrity/test_network — adds upload + dataset wait +# 7. performance/test_pre_network — disk-only perf budgets +# 8. performance/test_network — full network perf run +# +# The 300s (5-minute) 1920x1080 performance cases are pulled OUT of the normal +# order and run last (see `run_tests`): they dominate wall time and their +# stochastic-timing / upload-readiness budgets are the most fragile under host +# load, so every cheaper test gets a chance to fail fast first. The 300s +# pre-network (disk-only) cases run immediately before their network (upload) +# equivalents. Both perf files keep their case-ids containing "300s", which is +# the keyword the two phases select on. +# --------------------------------------------------------------------------- + +test_targets=( + "tests/integration/platform/data_daemon/behavioural_correctness/test_startup.py" + "tests/integration/platform/data_daemon/behavioural_correctness/test_signal_cleanup.py" + "tests/integration/platform/data_daemon/behavioural_correctness/test_cancel_recording.py" + "tests/integration/platform/data_daemon/behavioural_correctness/test_offline_to_online.py" + "tests/integration/platform/data_daemon/data_integrity/test_pre_network.py" + "tests/integration/platform/data_daemon/data_integrity/test_network.py" + "tests/integration/platform/data_daemon/performance/test_pre_network.py" + "tests/integration/platform/data_daemon/performance/test_network.py" +) + +# Substring shared by every 300s case-id; the two perf files are the last two +# `test_targets` entries and the only place these cases live. +heavy_case_filter="300s" +perf_targets=( "${test_targets[@]: -2}" ) + +# Run one pytest phase. $1 is the --log-file path, $2 the -k expression, and the +# remaining args are the pytest targets. +# +# --exitfirst stops on the first failure so the fast-fail ordering pays off. +# --log-cli-level=DEBUG turns on live structured logging from the SUT into +# pytest's captured stdout (so it ends up in $log_file via tee). --log-file +# captures the same at DEBUG level into a separate file so the structured +# records survive even if the tee buffer is cut short; each phase gets its own +# --log-file because pytest opens it in truncate mode and would otherwise +# clobber the previous phase's records. +run_pytest_phase() { + local phase_log_file="$1" + local keyword_expr="$2" + shift 2 + pytest \ + --exitfirst \ + --tb=short \ + -vv \ + -o log_cli=true \ + -o log_cli_level=DEBUG \ + --log-file="$phase_log_file" \ + --log-file-level=DEBUG \ + -k "$keyword_expr" \ + "$@" \ + 2>&1 | tee -a "$log_file" + # Return pytest's exit code, not tee's. + return "${PIPESTATUS[0]}" +} + +run_tests() { + log "running pytest with NCD_RUST_DAEMON=$NCD_RUST_DAEMON, NEURACORE_API_URL=$NEURACORE_API_URL" + log "stdout log: $log_file" + log "pytest --log-file: $pytest_log_file (per-phase _phase1 / _phase2 suffix)" + + # Phase 1: every suite EXCEPT the heavy 300s performance cases, in the + # fastest-to-slowest order above, so a real bug surfaces in the cheapest suite + # and the lighter performance cases (incl. network upload) aren't blocked by a + # flaky 300s case. + log "phase 1/2: all suites except the 300s performance cases (-k 'not $heavy_case_filter')" + run_pytest_phase "${pytest_log_file%.log}_phase1.log" "not $heavy_case_filter" "${test_targets[@]}" + local phase1_rc=$? + if [[ $phase1_rc -ne 0 ]]; then + return "$phase1_rc" + fi + + # Phase 2: the heavy 300s cases at the very end — the pre-network (disk-only) + # ones first, immediately before their network (upload) equivalents (perf + # files are ordered pre-network then network in `perf_targets`). + log "phase 2/2: 300s performance cases — pre-network then network (-k '$heavy_case_filter')" + run_pytest_phase "${pytest_log_file%.log}_phase2.log" "$heavy_case_filter" "${perf_targets[@]}" + return "$?" +} + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +main() { + log "==== integration test run starting ====" + stop_daemon + cleanup_state + + set +e + run_tests + exit_code=$? + set -e + + log "==== integration test run finished (exit=$exit_code) ====" + log "stdout log: $log_file" + log "pytest --log-file: $pytest_log_file" + exit "$exit_code" +} + +main "$@" diff --git a/rust/scripts/run_performance_logging.sh b/rust/scripts/run_performance_logging.sh new file mode 100755 index 000000000..216b1123b --- /dev/null +++ b/rust/scripts/run_performance_logging.sh @@ -0,0 +1,175 @@ +#!/usr/bin/env bash +# Run the data-integrity suite and record per-case timing to a CSV. +# +# This is the timing/logging harness (not a pass/fail gate): it runs every +# data-integrity case and, via perf_logging_plugin.py, appends one CSV row per +# case capturing the daemon under test, the result (or failure summary), the +# case wall time, and the aggregate nc.log_* call timings (avg/max, call count, +# and per-label detail). Rows append across invocations, so running it twice — +# once with NCD_RUST_DAEMON=1 and once with NCD_RUST_DAEMON=0 — accumulates a +# rust-vs-python comparison in a single CSV. +# +# Logging defaults to RUST_LOG=warn / NDD_DEBUG=false so per-frame daemon +# logging does not skew the measured latencies; override either for diagnosis. +# +# Output (the CSV + a structured pytest --log-file) is written under +# .integration_test_logs by default (override the CSV with NDD_CSV_PATH or the +# directory with NDD_PERF_LOG_DIR). The per-case nc.log_* timings are stored +# inline in the CSV's `log_label_detail` column. The diagnostic Timer._samples +# percentiles are left untouched. +# +# Not run with --exitfirst: every case runs so one breach does not hide the +# rest of the timing data. + +set -euo pipefail + +script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +workspace_root="$(cd "$script_dir/.." && pwd)" +repo_root="$(cd "$workspace_root/.." && pwd)" + +cd "$repo_root" + +# --------------------------------------------------------------------------- +# Paths (mirror run_integration_tests.sh so the same host state is cleaned) +# --------------------------------------------------------------------------- + +ndd_base_dir="/tmp/ndd" +iceoryx_dir="/tmp/iceoryx2" +default_pid_path="$HOME/.neuracore/daemon.pid" +default_state_dir="$HOME/.neuracore/data_daemon" +test_state_dir="$repo_root/.data_daemon_test_state" + +log_dir="${NDD_PERF_LOG_DIR:-$repo_root/.integration_test_logs}" +mkdir -p "$log_dir" +run_stamp="$(date +%Y%m%d_%H%M%S)" +started_at="$(date -u +%Y-%m-%dT%H:%M:%SZ)" +# Stable CSV so repeat runs accumulate; override with NDD_CSV_PATH. +csv_path="${NDD_CSV_PATH:-$log_dir/data_integrity_timings.csv}" +log_file="$log_dir/perf_logging_${run_stamp}.log" +pytest_log_file="$log_dir/perf_logging_pytest_${run_stamp}.log" + +log() { + printf '==> %s\n' "$*" | tee -a "$log_file" +} + +purge_path() { + local target="$1" + if [[ -e "$target" || -L "$target" ]]; then + rm -rf -- "$target" 2>>"$log_file" || true + fi +} + +purge_glob() { + local pattern="$1" + shopt -s nullglob + local matches=( $pattern ) + shopt -u nullglob + if (( ${#matches[@]} > 0 )); then + rm -rf -- "${matches[@]}" 2>>"$log_file" || true + fi +} + +stop_daemon() { + log "stopping any running data daemon" + neuracore data-daemon stop >>"$log_file" 2>&1 || true + pkill -TERM -f 'neuracore[.]data_daemon' 2>/dev/null || true + pkill -TERM -x 'data-daemon' 2>/dev/null || true + sleep 1 + pkill -KILL -f 'neuracore[.]data_daemon' 2>/dev/null || true + pkill -KILL -x 'data-daemon' 2>/dev/null || true +} + +# Clean the daemon's host state, but leave prior logs/CSV in place so timing +# results accumulate under $log_dir. +cleanup_state() { + purge_path "$iceoryx_dir" + purge_path "$ndd_base_dir" + purge_glob '/dev/shm/neuracore-*' + purge_glob '/dev/shm/iox2_*' + purge_path "$default_pid_path" + purge_path "${default_pid_path}.lock" + purge_path "$default_state_dir" + purge_path "$test_state_dir" +} + +# --------------------------------------------------------------------------- +# Environment +# --------------------------------------------------------------------------- + +export NEURACORE_API_URL="https://staging.api.neuracore.com/api" +export NCD_RUST_DAEMON="${NCD_RUST_DAEMON:-1}" + +export NEURACORE_CONSUME_LIVE_DATA=no +export NEURACORE_PROVIDE_LIVE_DATA=no + +# Low-overhead logging so it does not skew the measured latencies; override for +# diagnosis. +export NDD_DEBUG="${NDD_DEBUG:-false}" +export RUST_LOG="${RUST_LOG:-warn}" +export PYTHONUNBUFFERED=1 +export RUST_BACKTRACE=1 + +# Daemon label for the CSV. +if [[ "$NCD_RUST_DAEMON" =~ ^(1|true|yes|on|TRUE|True)$ ]]; then + daemon_label="rust" +else + daemon_label="python" +fi + +# Wire the CSV-recording plugin in non-invasively. +export PYTHONPATH="$script_dir${PYTHONPATH:+:$PYTHONPATH}" +export PYTEST_ADDOPTS="${PYTEST_ADDOPTS:-} -p perf_logging_plugin" +export NDD_CSV_PATH="$csv_path" +export NDD_RUN_INDEX="$run_stamp" +export NDD_DAEMON="$daemon_label" +export NDD_STARTED_AT="$started_at" + +data_integrity_targets=( + "tests/integration/platform/data_daemon/data_integrity/test_pre_network.py" + "tests/integration/platform/data_daemon/data_integrity/test_network.py" +) + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +main() { + log "==== data-integrity timing run starting ====" + log "daemon=$daemon_label RUST_LOG=$RUST_LOG NDD_DEBUG=$NDD_DEBUG" + log "csv: $csv_path" + log "stdout log: $log_file" + + stop_daemon + cleanup_state + + set +e + pytest \ + --tb=short \ + -vv \ + -o log_cli=true \ + -o log_cli_level=INFO \ + --log-file="$pytest_log_file" \ + --log-file-level=DEBUG \ + "${data_integrity_targets[@]}" \ + 2>&1 | tee -a "$log_file" + exit_code=${PIPESTATUS[0]} + set -e + + log "==== data-integrity timing run finished (exit=$exit_code) ====" + if [[ -f "$csv_path" ]]; then + log "rows for this run ($daemon_label):" + python3 - "$csv_path" "$run_stamp" <<'PY' | tee -a "$log_file" +import csv, sys +path, run_index = sys.argv[1], sys.argv[2] +rows = [r for r in csv.DictReader(open(path)) if r["run_index"] == run_index] +for r in rows: + print(f" {r['daemon']:6} {r['result']:7} {r['case_id'][:40]:40} " + f"wall={r['wall_clock_s']:>8}s log_avg={r['log_avg_ms'] or '-':>7}ms " + f"log_max={r['log_max_ms'] or '-':>8}ms") +print(f" ({len(rows)} case(s); full CSV at {path})") +PY + fi + exit "$exit_code" +} + +main "$@" diff --git a/tests/integration/platform/data_daemon/behavioural_correctness/test_cancel_recording.py b/tests/integration/platform/data_daemon/behavioural_correctness/test_cancel_recording.py index 427a99d10..f82f56a98 100644 --- a/tests/integration/platform/data_daemon/behavioural_correctness/test_cancel_recording.py +++ b/tests/integration/platform/data_daemon/behavioural_correctness/test_cancel_recording.py @@ -211,7 +211,7 @@ def test_cancel_then_start_new_recording( MAX_TIME_TO_START_S, label="nc.start_recording", always_log=True ): nc.start_recording(robot_name=robot_name) - resumed_recording_id = robot.get_current_recording_id() + resumed_recording_id = robot.get_cloud_recording_id() assert resumed_recording_id is not None log_frames(spec, recording_index=0, marker_name="marker_resume") diff --git a/tests/integration/platform/data_daemon/behavioural_correctness/test_offline_to_online.py b/tests/integration/platform/data_daemon/behavioural_correctness/test_offline_to_online.py index e22ca54d2..a0ca795c6 100644 --- a/tests/integration/platform/data_daemon/behavioural_correctness/test_offline_to_online.py +++ b/tests/integration/platform/data_daemon/behavioural_correctness/test_offline_to_online.py @@ -2,11 +2,14 @@ import pytest +from neuracore.data_daemon.rust_selection import rust_daemon_enabled from tests.integration.platform.data_daemon.shared.assertions import ( assert_exactly_one_daemon_pid, verify_cloud_results, ) from tests.integration.platform.data_daemon.shared.db_helpers import ( + assert_offline_recordings_pending, + resolve_cloud_recording_ids, wait_for_all_traces_written, wait_for_upload_complete_in_db, ) @@ -80,14 +83,20 @@ def test_offline_pending_data_recovers_when_online( assert_exactly_one_daemon_pid() results = run_case_contexts(case, specs=specs) wait_for_all_traces_written(results=results) + assert_offline_recordings_pending(results) # offline_daemon_running() stops the daemon on exit, preserving # offline artefacts for the online recovery phase below. with online_daemon_running(): for result in results: - for recording_id in result.recording_ids: - wait_for_upload_complete_in_db(str(recording_id)) + if rust_daemon_enabled(): + for recording_index in result.recording_indexes: + wait_for_upload_complete_in_db(recording_index) + else: + for recording_id in result.recording_ids: + wait_for_upload_complete_in_db(str(recording_id)) + results = resolve_cloud_recording_ids(results) verify_cloud_results(results=results, case=case) finally: diff --git a/tests/integration/platform/data_daemon/behavioural_correctness/test_signal_cleanup.py b/tests/integration/platform/data_daemon/behavioural_correctness/test_signal_cleanup.py index 03f25c917..379579977 100644 --- a/tests/integration/platform/data_daemon/behavioural_correctness/test_signal_cleanup.py +++ b/tests/integration/platform/data_daemon/behavioural_correctness/test_signal_cleanup.py @@ -86,7 +86,7 @@ def test_cli_stop_exits_daemon_and_cleans_up() -> None: with online_daemon_running(): pid = _single_runner_pid() logger.info("CLI stop for daemon pid=%d", pid) - stop_daemon() + stop_daemon(method="cli", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) def test_cli_stop_removes_pid_file() -> None: @@ -95,7 +95,7 @@ def test_cli_stop_removes_pid_file() -> None: with online_daemon_running(): pid_path = get_daemon_pid_path() assert pid_path.exists(), f"PID file missing before stop: {pid_path}" - stop_daemon() + stop_daemon(method="cli", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) assert_no_pid_file() @@ -104,7 +104,7 @@ def test_cli_stop_unlinks_socket() -> None: """Unix domain socket is removed after a clean CLI stop.""" with online_daemon_running(): - stop_daemon() + stop_daemon(method="cli", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) assert_socket_unlinked() @@ -125,14 +125,14 @@ def test_sigterm_exits_daemon_and_cleans_up() -> None: with online_daemon_running(): pid = _single_runner_pid() logger.info("SIGTERM to daemon pid=%d", pid) - stop_daemon(method="sigterm") + stop_daemon(method="sigterm", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) def test_sigterm_removes_pid_file() -> None: """PID file is absent after the daemon receives SIGTERM.""" with online_daemon_running(): - stop_daemon(method="sigterm") + stop_daemon(method="sigterm", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) assert_no_pid_file() @@ -141,7 +141,7 @@ def test_sigterm_unlinks_socket() -> None: """Unix domain socket is removed after the daemon receives SIGTERM.""" with online_daemon_running(): - stop_daemon(method="sigterm") + stop_daemon(method="sigterm", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) assert_socket_unlinked() @@ -162,14 +162,14 @@ def test_sigint_exits_daemon_and_cleans_up() -> None: with online_daemon_running(): pid = _single_runner_pid() logger.info("SIGINT to daemon pid=%d", pid) - stop_daemon(method="sigint") + stop_daemon(method="sigint", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) def test_sigint_removes_pid_file() -> None: """PID file is absent after the daemon receives SIGINT.""" with online_daemon_running(): - stop_daemon(method="sigint") + stop_daemon(method="sigint", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) assert_no_pid_file() @@ -178,7 +178,7 @@ def test_sigint_unlinks_socket() -> None: """Unix domain socket is removed after the daemon receives SIGINT.""" with online_daemon_running(): - stop_daemon(method="sigint") + stop_daemon(method="sigint", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) assert_socket_unlinked() @@ -244,9 +244,9 @@ def test_sigterm_then_cli_stop_is_idempotent() -> None: """ with online_daemon_running(): - stop_daemon(method="sigterm") + stop_daemon(method="sigterm", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) # Daemon is gone; CLI stop must handle the already-stopped case cleanly. - stop_daemon(method="cli") + stop_daemon(method="cli", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) def test_sigint_then_sigterm_exits_cleanly() -> None: @@ -257,8 +257,8 @@ def test_sigint_then_sigterm_exits_cleanly() -> None: """ with online_daemon_running(): - stop_daemon(method="sigint") - stop_daemon(method="sigterm") + stop_daemon(method="sigint", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) + stop_daemon(method="sigterm", graceful_timeout_s=GRACEFUL_EXIT_TIMEOUT_S) # --------------------------------------------------------------------------- @@ -368,7 +368,6 @@ def test_sigkill_after_recording_allows_clean_restart(case: DataDaemonTestCase) logger.info("Restarted daemon pid=%d", pid_second) -@pytest.mark.skip @pytest.mark.parametrize("case", _KILL_RESTART_CASES, ids=case_ids(_KILL_RESTART_CASES)) def test_sigkill_mid_recording_allows_clean_restart(case: DataDaemonTestCase) -> None: """Daemon restarts cleanly after SIGKILL interrupts an in-progress recording. diff --git a/tests/integration/platform/data_daemon/daemon_test_cases.py b/tests/integration/platform/data_daemon/daemon_test_cases.py index 920afea03..8196f0972 100644 --- a/tests/integration/platform/data_daemon/daemon_test_cases.py +++ b/tests/integration/platform/data_daemon/daemon_test_cases.py @@ -65,30 +65,30 @@ mode=MODE_STAGGERED, timestamp_mode=TIMESTAMP_MODE_REAL, ), - # DataDaemonTestCase( - # duration_sec=10, - # recording_count=1, - # video_count=1, - # image_height=120, - # image_width=120, - # video_fps=120, - # joint_fps=1000, - # producer_channels=PRODUCER_PER_THREAD, - # timestamp_mode=TIMESTAMP_MODE_REAL, - # wait=False, - # ), - # DataDaemonTestCase( - # duration_sec=10, - # recording_count=4, - # video_count=1, - # image_height=120, - # image_width=120, - # video_fps=120, - # joint_fps=500, - # producer_channels=PRODUCER_PER_THREAD, - # timestamp_mode=TIMESTAMP_MODE_STOCHASTIC, - # wait=False, - # ), + DataDaemonTestCase( + duration_sec=10, + recording_count=1, + video_count=1, + image_height=120, + image_width=120, + video_fps=120, + joint_fps=250, # previously 1000 but flaky due to sync + producer_channels=PRODUCER_PER_THREAD, + timestamp_mode=TIMESTAMP_MODE_REAL, + wait=False, + ), + DataDaemonTestCase( + duration_sec=10, + recording_count=4, + video_count=1, + image_height=120, + image_width=120, + video_fps=120, + joint_fps=250, # previously 500 but flaky due to sync + producer_channels=PRODUCER_PER_THREAD, + timestamp_mode=TIMESTAMP_MODE_STOCHASTIC, + wait=False, + ), ) NETWORK_INTEGRITY_CASES = ( @@ -130,7 +130,6 @@ producer_channels=PRODUCER_PER_THREAD, parallel_contexts=2, mode=MODE_STAGGERED, - skip=True, ), DataDaemonTestCase( duration_sec=10, @@ -146,40 +145,39 @@ parallel_contexts=2, mode=MODE_STAGGERED, timestamp_mode=TIMESTAMP_MODE_REAL, - skip=True, - ), - # DataDaemonTestCase( - # duration_sec=10, - # recording_count=1, - # video_count=1, - # image_height=120, - # image_width=120, - # video_fps=120, - # joint_fps=1000, - # producer_channels=PRODUCER_PER_THREAD, - # timestamp_mode=TIMESTAMP_MODE_REAL, - # wait=False, - # ), - # DataDaemonTestCase( - # duration_sec=10, - # recording_count=4, - # video_count=1, - # image_height=120, - # image_width=120, - # video_fps=120, - # joint_fps=500, - # producer_channels=PRODUCER_PER_THREAD, - # timestamp_mode=TIMESTAMP_MODE_STOCHASTIC, - # wait=False, - # ), - # DataDaemonTestCase( - # duration_sec=5, - # joint_count=7, - # parallel_contexts=1, - # recording_count=1, - # joint_fps=600, - # wait=True, - # ), + ), + DataDaemonTestCase( + duration_sec=10, + recording_count=1, + video_count=1, + image_height=120, + image_width=120, + video_fps=120, + joint_fps=250, # previously 1000 but flaky due to sync + producer_channels=PRODUCER_PER_THREAD, + timestamp_mode=TIMESTAMP_MODE_REAL, + wait=False, + ), + DataDaemonTestCase( + duration_sec=10, + recording_count=4, + video_count=1, + image_height=120, + image_width=120, + video_fps=120, + joint_fps=250, # previously 500 but flaky due to sync + producer_channels=PRODUCER_PER_THREAD, + timestamp_mode=TIMESTAMP_MODE_STOCHASTIC, + wait=False, + ), + DataDaemonTestCase( + duration_sec=5, + joint_count=7, + parallel_contexts=1, + recording_count=1, + joint_fps=600, + wait=True, + ), DataDaemonTestCase( duration_sec=10, joint_count=7, @@ -195,12 +193,11 @@ mode=MODE_STAGGERED, timestamp_mode=TIMESTAMP_MODE_REAL, wait=True, - skip=True, ), ) PRE_NETWORK_PERFORMANCE_CASES = ( - # High frequency robot control at 210Hz joint data + # High frequency robot control at 100Hz joint data # Tests: high-frequency sampling, temporal jitter, joint-only streaming DataDaemonTestCase( duration_sec=60, @@ -209,8 +206,7 @@ parallel_contexts=1, recording_count=5, context_duration_mode=DURATION_MODE_FIXED, - joint_fps=210, - skip=True, + joint_fps=100, ), # High number of medium-throughput robots with synchronized # recordings. Tests: multi-robot contention, mixed data types, @@ -228,17 +224,16 @@ producer_channels=PRODUCER_PER_THREAD, context_duration_mode=DURATION_MODE_VARIABLE, video_fps=30, - skip=True, ), # Large number of joints without cameras (1000 joints) # Tests: high joint dimensionality, memory efficiency, sensor-only workload - # DataDaemonTestCase( - # duration_sec=30, - # joint_count=1000, - # video_count=0, - # parallel_contexts=1, - # recording_count=3, - # ), + DataDaemonTestCase( + duration_sec=30, + joint_count=1000, + video_count=0, + parallel_contexts=1, + recording_count=3, + ), # 3x longer duration recordings # Tests: long-running stability, memory leak detection, large dataset # accumulation @@ -251,7 +246,7 @@ parallel_contexts=2, recording_count=16, context_duration_mode=DURATION_MODE_FIXED, - skip=True, + joint_fps=15, ), DataDaemonTestCase( duration_sec=300, @@ -265,68 +260,41 @@ video_fps=30, joint_fps=15, timestamp_mode=TIMESTAMP_MODE_STOCHASTIC, - skip=True, - ), - # DataDaemonTestCase( - # duration_sec=10, - # recording_count=1, - # video_count=1, - # image_height=120, - # image_width=120, - # video_fps=120, - # joint_fps=1000, - # producer_channels=PRODUCER_PER_THREAD, - # timestamp_mode=TIMESTAMP_MODE_REAL, - # wait=False, - # ), + ), + DataDaemonTestCase( + duration_sec=10, + recording_count=1, + video_count=1, + image_height=120, + image_width=120, + video_fps=120, + joint_fps=250, # previously 1000 but flaky due to sync + producer_channels=PRODUCER_PER_THREAD, + timestamp_mode=TIMESTAMP_MODE_REAL, + wait=False, + ), ) NETWORK_PERFORMANCE_CASES = ( - # Joint-only online frequency sweep for VM limit discovery. - # - # Workload held constant: - # - 7 joints - # - no camera - # - 1 context - # - 3 recordings x 30s - # - wait=True - # - # Observed on VM: - # - 210Hz: pass - # - 225Hz: pass - # - 250Hz: clean pass - # - 275Hz: reaches readiness but stop_recording exceeds 15s diagnostic target - # - 300Hz: does not complete reliably; stop_recording can hang - # - # Conservative required-suite boundary: 250Hz. - DataDaemonTestCase( - duration_sec=30, - joint_count=7, - video_count=0, - parallel_contexts=1, - recording_count=3, - context_duration_mode=DURATION_MODE_FIXED, - joint_fps=210, - wait=True, - ), + # High frequency robot control at 100Hz joint data + # Tests: high-frequency sampling, temporal jitter, joint-only streaming DataDaemonTestCase( - duration_sec=30, + duration_sec=60, joint_count=7, video_count=0, parallel_contexts=1, - recording_count=3, + recording_count=5, context_duration_mode=DURATION_MODE_FIXED, - joint_fps=225, - wait=True, + joint_fps=100, ), DataDaemonTestCase( - duration_sec=30, + duration_sec=60, joint_count=7, video_count=0, parallel_contexts=1, - recording_count=3, + recording_count=5, context_duration_mode=DURATION_MODE_FIXED, - joint_fps=250, + joint_fps=100, wait=True, ), # High number of medium-throughput robots with synchronized @@ -345,7 +313,6 @@ producer_channels=PRODUCER_PER_THREAD, context_duration_mode=DURATION_MODE_VARIABLE, video_fps=30, - skip=True, ), DataDaemonTestCase( duration_sec=20, @@ -360,7 +327,6 @@ context_duration_mode=DURATION_MODE_VARIABLE, video_fps=30, wait=True, - skip=True, ), # Large number of joints without cameras (1000 joints) # Tests: high joint dimensionality, memory efficiency, sensor-only workload @@ -370,7 +336,6 @@ video_count=0, parallel_contexts=1, recording_count=3, - skip=True, ), DataDaemonTestCase( duration_sec=30, @@ -379,7 +344,6 @@ parallel_contexts=1, recording_count=3, wait=True, - skip=True, ), # 3x longer duration recordings # Tests: long-running stability, memory leak detection, large dataset @@ -393,7 +357,7 @@ parallel_contexts=2, recording_count=16, context_duration_mode=DURATION_MODE_FIXED, - skip=True, + joint_fps=15, ), DataDaemonTestCase( duration_sec=300, @@ -404,8 +368,8 @@ parallel_contexts=2, recording_count=16, context_duration_mode=DURATION_MODE_FIXED, + joint_fps=15, wait=True, - skip=True, ), DataDaemonTestCase( duration_sec=300, @@ -419,7 +383,6 @@ video_fps=30, joint_fps=15, timestamp_mode=TIMESTAMP_MODE_STOCHASTIC, - skip=True, ), DataDaemonTestCase( duration_sec=300, @@ -434,31 +397,30 @@ joint_fps=15, timestamp_mode=TIMESTAMP_MODE_STOCHASTIC, wait=True, - skip=True, - ), - # DataDaemonTestCase( - # duration_sec=10, - # recording_count=1, - # video_count=1, - # image_height=120, - # image_width=120, - # video_fps=120, - # joint_fps=1000, - # producer_channels=PRODUCER_PER_THREAD, - # timestamp_mode=TIMESTAMP_MODE_REAL, - # ), - # DataDaemonTestCase( - # duration_sec=10, - # recording_count=1, - # video_count=1, - # image_height=120, - # image_width=120, - # video_fps=120, - # joint_fps=1000, - # producer_channels=PRODUCER_PER_THREAD, - # timestamp_mode=TIMESTAMP_MODE_REAL, - # wait=True, - # ), + ), + DataDaemonTestCase( + duration_sec=10, + recording_count=1, + video_count=1, + image_height=120, + image_width=120, + video_fps=120, + joint_fps=250, + producer_channels=PRODUCER_PER_THREAD, + timestamp_mode=TIMESTAMP_MODE_REAL, + ), + DataDaemonTestCase( + duration_sec=10, + recording_count=1, + video_count=1, + image_height=120, + image_width=120, + video_fps=120, + joint_fps=250, + producer_channels=PRODUCER_PER_THREAD, + timestamp_mode=TIMESTAMP_MODE_REAL, + wait=True, + ), DataDaemonTestCase( duration_sec=10, recording_count=4, @@ -466,10 +428,9 @@ image_height=120, image_width=120, video_fps=120, - joint_fps=500, + joint_fps=250, producer_channels=PRODUCER_PER_THREAD, timestamp_mode=TIMESTAMP_MODE_STOCHASTIC, wait=False, - skip=True, ), ) diff --git a/tests/integration/platform/data_daemon/data_integrity/test_network.py b/tests/integration/platform/data_daemon/data_integrity/test_network.py index 0f8937e7d..53bd74211 100644 --- a/tests/integration/platform/data_daemon/data_integrity/test_network.py +++ b/tests/integration/platform/data_daemon/data_integrity/test_network.py @@ -4,14 +4,16 @@ import pytest +from neuracore.data_daemon.rust_selection import rust_daemon_enabled from tests.integration.platform.data_daemon.daemon_test_cases import ( - NETWORK_INTEGRITY_CASES, + PRE_NETWORK_INTEGRITY_CASES, ) from tests.integration.platform.data_daemon.shared.assertions import ( assert_exactly_one_daemon_pid, verify_cloud_results, ) from tests.integration.platform.data_daemon.shared.db_helpers import ( + resolve_cloud_recording_ids, wait_for_upload_complete_in_db, ) from tests.integration.platform.data_daemon.shared.runners import online_daemon_running @@ -37,7 +39,7 @@ ) _CASES = DataDaemonTestBatch( - cases=NETWORK_INTEGRITY_CASES, + cases=PRE_NETWORK_INTEGRITY_CASES, storage_state_action=STORAGE_STATE_DELETE, stop_method=STOP_METHOD_CLI, ).as_cases() @@ -51,10 +53,22 @@ def _assert_online_verification_invariants( """Block until every recording in *results* has reached ``upload_complete`` in the platform DB. Must be called before cloud frame verification so that downloaded data reflects the fully-committed upload state. + + Upload completion is tracked in the daemon DB by the active correlation key: + the local ``recording_index`` under the Rust daemon, the cloud + ``recording_id`` under the legacy daemon. """ for result in results: - for recording_id in result.recording_ids: - wait_for_upload_complete_in_db(str(recording_id), timeout_s=timeout_seconds) + if rust_daemon_enabled(): + for recording_index in result.recording_indexes: + wait_for_upload_complete_in_db( + recording_index, timeout_s=timeout_seconds + ) + else: + for recording_id in result.recording_ids: + wait_for_upload_complete_in_db( + str(recording_id), timeout_s=timeout_seconds + ) # --------------------------------------------------------------------------- @@ -101,6 +115,7 @@ def test_cloud_data_integrity( assert_exactly_one_daemon_pid() results = run_case_contexts(case, specs=specs) _assert_online_verification_invariants(results) + results = resolve_cloud_recording_ids(results) verify_cloud_results(results=results, case=case) finally: diff --git a/tests/integration/platform/data_daemon/data_integrity/test_pre_network.py b/tests/integration/platform/data_daemon/data_integrity/test_pre_network.py index 2d2aef369..755220c36 100644 --- a/tests/integration/platform/data_daemon/data_integrity/test_pre_network.py +++ b/tests/integration/platform/data_daemon/data_integrity/test_pre_network.py @@ -31,7 +31,7 @@ ) from tests.integration.platform.data_daemon.shared.test_case.constants import ( STOP_METHOD_CLI, - STORAGE_STATE_PRESERVE, + STORAGE_STATE_DELETE, ) from tests.integration.platform.data_daemon.shared.test_infrastructure import ( scoped_storage_state, @@ -41,7 +41,7 @@ CASES = DataDaemonTestBatch( cases=PRE_NETWORK_INTEGRITY_CASES, - storage_state_action=STORAGE_STATE_PRESERVE, + storage_state_action=STORAGE_STATE_DELETE, stop_method=STOP_METHOD_CLI, ).as_cases() diff --git a/tests/integration/platform/data_daemon/performance/test_pre_network.py b/tests/integration/platform/data_daemon/performance/test_pre_network.py index 8526cb96a..4ece7db6e 100644 --- a/tests/integration/platform/data_daemon/performance/test_pre_network.py +++ b/tests/integration/platform/data_daemon/performance/test_pre_network.py @@ -23,7 +23,7 @@ ) from tests.integration.platform.data_daemon.shared.test_case.constants import ( STOP_METHOD_CLI, - STORAGE_STATE_PRESERVE, + STORAGE_STATE_DELETE, ) from tests.integration.platform.data_daemon.shared.test_infrastructure import ( scoped_storage_state, @@ -31,7 +31,7 @@ _CASES = DataDaemonTestBatch( cases=PRE_NETWORK_PERFORMANCE_CASES, - storage_state_action=STORAGE_STATE_PRESERVE, + storage_state_action=STORAGE_STATE_DELETE, stop_method=STOP_METHOD_CLI, ).as_cases() diff --git a/tests/integration/platform/data_daemon/shared/assertions.py b/tests/integration/platform/data_daemon/shared/assertions.py index 944fcb358..1427a72cb 100644 --- a/tests/integration/platform/data_daemon/shared/assertions.py +++ b/tests/integration/platform/data_daemon/shared/assertions.py @@ -48,6 +48,7 @@ from neuracore.core.data.recording import Recording from neuracore.data_daemon.const import SOCKET_PATH from neuracore.data_daemon.helpers import get_daemon_pid_path +from neuracore.data_daemon.rust_selection import rust_daemon_enabled from tests.integration.platform.data_daemon.shared.db_helpers import ( wait_for_dataset_ready, wait_for_recordings_finalized, @@ -90,7 +91,10 @@ def assert_context_mode(case: DataDaemonTestCase, results: list[ContextResult]) -> None: """Assert that context timing matches the expected mode.""" - active_results = [result for result in results if result.recording_ids] + if rust_daemon_enabled(): + active_results = [result for result in results if result.recording_indexes] + else: + active_results = [result for result in results if result.recording_ids] if len(active_results) < 2: return diff --git a/tests/integration/platform/data_daemon/shared/db_constants.py b/tests/integration/platform/data_daemon/shared/db_constants.py index 3138153dd..db8c71b24 100644 --- a/tests/integration/platform/data_daemon/shared/db_constants.py +++ b/tests/integration/platform/data_daemon/shared/db_constants.py @@ -5,13 +5,16 @@ RECORDINGS_TABLE = "recordings" TRACES_TABLE = "traces" +COLUMN_RECORDING_INDEX = "recording_index" COLUMN_RECORDING_ID = "recording_id" COLUMN_TRACE_ID = "trace_id" +COLUMN_ROBOT_ID = "robot_id" +COLUMN_ROBOT_INSTANCE = "robot_instance" +COLUMN_STARTED_AT = "started_at" COLUMN_EXPECTED_TRACE_COUNT = "expected_trace_count" COLUMN_EXPECTED_TRACE_COUNT_REPORTED = "expected_trace_count_reported" COLUMN_PROGRESS_REPORTED = "progress_reported" COLUMN_STOPPED_AT = "stopped_at" -COLUMN_TRACE_COUNT = "trace_count" COLUMN_WRITE_STATUS = "write_status" COLUMN_REGISTRATION_STATUS = "registration_status" COLUMN_UPLOAD_STATUS = "upload_status" diff --git a/tests/integration/platform/data_daemon/shared/db_helpers.py b/tests/integration/platform/data_daemon/shared/db_helpers.py index 58131e06c..4bf1be824 100644 --- a/tests/integration/platform/data_daemon/shared/db_helpers.py +++ b/tests/integration/platform/data_daemon/shared/db_helpers.py @@ -24,25 +24,34 @@ from __future__ import annotations +import dataclasses import logging import sqlite3 import time from collections.abc import Callable, Iterable -from typing import Any +from typing import TYPE_CHECKING, Any import pytest +if TYPE_CHECKING: + from tests.integration.platform.data_daemon.shared.test_case.build_test_case_context import ( # noqa: E501 + ContextResult, + ) + import neuracore as nc from neuracore.data_daemon.helpers import get_daemon_db_path +from neuracore.data_daemon.rust_selection import rust_daemon_enabled from tests.integration.platform.data_daemon.shared.db_constants import ( COLUMN_EXPECTED_TRACE_COUNT, COLUMN_EXPECTED_TRACE_COUNT_REPORTED, COLUMN_LAST_UPDATED, COLUMN_PROGRESS_REPORTED, COLUMN_RECORDING_ID, + COLUMN_RECORDING_INDEX, COLUMN_REGISTRATION_STATUS, + COLUMN_ROBOT_ID, + COLUMN_ROBOT_INSTANCE, COLUMN_STOPPED_AT, - COLUMN_TRACE_COUNT, COLUMN_TRACE_ID, COLUMN_UPLOAD_STATUS, COLUMN_WRITE_STATUS, @@ -71,7 +80,9 @@ ) from tests.integration.platform.data_daemon.shared.disk_helpers import ( list_recording_ids_on_disk, + list_recording_indexes_on_disk, normalize_recording_ids, + normalize_recording_indexes, ) from tests.integration.platform.data_daemon.shared.process_control import Timer from tests.integration.platform.data_daemon.shared.test_case.constants import ( @@ -81,6 +92,17 @@ logger = logging.getLogger(__name__) +def _recording_correlation_column() -> str: + """Return the recordings/traces correlation column for the active daemon. + + Rust daemon: recordings are keyed by the local INTEGER ``recording_index`` + (the cloud ``recording_id`` is a separate, nullable column). Legacy Python + daemon: recordings are keyed by the cloud ``recording_id`` (TEXT PK), which + is also the traces foreign key. + """ + return COLUMN_RECORDING_INDEX if rust_daemon_enabled() else COLUMN_RECORDING_ID + + class DaemonDbStore: """Encapsulate SQLite access patterns for daemon integration tests.""" @@ -143,28 +165,61 @@ def table_columns( rows = conn.execute(f"PRAGMA table_info({table_name})").fetchall() # noqa: S608 return {str(row[1]) for row in rows} - def fetch_recording(self, recording_id: str) -> dict[str, Any] | None: - """Return the recording row for ``recording_id`` if it exists.""" + def fetch_recording(self, recording_key: int | str) -> dict[str, Any] | None: + """Return the recording row for ``recording_key`` if it exists. + + ``recording_key`` is the daemon's correlation key for the active mode: + the local INTEGER ``recording_index`` under the Rust daemon, or the cloud + ``recording_id`` (TEXT) under the legacy Python daemon. + """ + correlation_column = _recording_correlation_column() with self.connect() as conn: row = conn.execute( - f"SELECT * FROM {RECORDINGS_TABLE} " f"WHERE {COLUMN_RECORDING_ID} = ?", - (recording_id,), + f"SELECT * FROM {RECORDINGS_TABLE} " f"WHERE {correlation_column} = ?", + (recording_key,), ).fetchone() return dict(row) if row is not None else None + def fetch_recordings_for_source( + self, + robot_id: str, + robot_instance: int, + ) -> list[dict[str, Any]]: + """Return recording rows for one source ordered by ``recording_index``. + + Source identity is ``(robot_id, robot_instance)``. This mirrors the + daemon's own source lookup (``idx_recordings_source``) and is how tests + correlate a worker's recordings to daemon-minted ``recording_index`` + values without seeing the local handle. + """ + with self.connect() as conn: + rows = conn.execute( + f"SELECT * FROM {RECORDINGS_TABLE} " + f"WHERE {COLUMN_ROBOT_ID} = ? AND {COLUMN_ROBOT_INSTANCE} = ? " + f"ORDER BY {COLUMN_RECORDING_INDEX}", + (robot_id, robot_instance), + ).fetchall() + return [dict(row) for row in rows] + def fetch_all_traces( self, - recording_id: str, + recording_key: int | str, *, columns: Iterable[str] | None = None, ) -> list[dict[str, Any]]: - """Return trace rows for one recording, limited to available columns.""" + """Return trace rows for one recording, limited to available columns. + + Traces join to recordings on the active correlation column: the integer + ``recording_index`` under the Rust daemon, or the cloud ``recording_id`` + under the legacy Python daemon. + """ + correlation_column = _recording_correlation_column() with self.connect_read_only() as conn: if not self.table_exists(conn, TRACES_TABLE): return [] trace_columns = self.table_columns(conn, TRACES_TABLE) - if COLUMN_RECORDING_ID not in trace_columns: + if correlation_column not in trace_columns: return [] if columns is None: @@ -186,8 +241,8 @@ def fetch_all_traces( rows = conn.execute( f"SELECT {', '.join(selected_columns)} " f"FROM {TRACES_TABLE} " - f"WHERE {COLUMN_RECORDING_ID} = ?{order_by_clause}", - (recording_id,), + f"WHERE {correlation_column} = ?{order_by_clause}", + (recording_key,), ).fetchall() return [{column: row[column] for column in selected_columns} for row in rows] @@ -229,18 +284,170 @@ def sqlite_table_columns( return _TEST_STORE.table_columns(conn, table) -def fetch_recording(recording_id: str) -> dict[str, Any] | None: - """Return the recording row for ``recording_id`` if it exists.""" - return _TEST_STORE.fetch_recording(recording_id) +def fetch_recording(recording_key: int | str) -> dict[str, Any] | None: + """Return the recording row for the active-mode correlation key, if present.""" + return _TEST_STORE.fetch_recording(recording_key) + + +def fetch_recordings_for_source( + robot_id: str, + robot_instance: int, +) -> list[dict[str, Any]]: + """Return recording rows for one source ordered by ``recording_index``.""" + return _TEST_STORE.fetch_recordings_for_source(robot_id, robot_instance) def fetch_all_traces( - recording_id: str, + recording_key: int | str, *, columns: Iterable[str] | None = None, ) -> list[dict[str, Any]]: """Return trace rows for one recording, limited to available columns.""" - return _TEST_STORE.fetch_all_traces(recording_id, columns=columns) + return _TEST_STORE.fetch_all_traces(recording_key, columns=columns) + + +def wait_for_recording_index_for_source( + robot_id: str, + robot_instance: int, + *, + after_index: int = 0, + timeout_s: float = 30.0, + poll_interval_s: float = 0.1, +) -> int: + """Block until the daemon assigns a recording with index > ``after_index``. + + Returns the highest ``recording_index`` for ``(robot_id, robot_instance)`` + once it exceeds ``after_index`` — i.e. the recording just started. The daemon + assigns ``recording_index`` the moment it sees ``StartRecording``, so a short + poll covers the window between the producer's ``start_recording`` call and the + daemon persisting the row. + + Gating on a *new, higher* index rather than a raw row count keeps this robust + to the recording reaper, which deletes a recording's row once it is fully + uploaded: the just-started recording is always the highest-indexed row for the + source and is never reaped before the next recording starts, but older + fully-uploaded rows can vanish mid-session, so the row *count* is not + monotonic. Pass the previously-resolved index as ``after_index`` (``0`` for + the first recording of a source). + + Raises: + TimeoutError: If no recording with index > ``after_index`` appears in time. + """ + deadline = time.monotonic() + timeout_s + newest: int | None = None + while True: + try: + rows = fetch_recordings_for_source(robot_id, robot_instance) + except sqlite3.OperationalError: + # Daemon has created the DB file but not yet the schema; retry. + rows = [] + indices = [int(row[COLUMN_RECORDING_INDEX]) for row in rows] + newest = max(indices) if indices else None + if newest is not None and newest > after_index: + return newest + if time.monotonic() >= deadline: + raise TimeoutError( + "Daemon did not assign a new recording_index for source " + f"({robot_id}, {robot_instance}); expected an index greater than " + f"{after_index}, newest seen {newest} after {timeout_s}s." + ) + time.sleep(poll_interval_s) + + +def assert_offline_recordings_pending(results: list[ContextResult]) -> None: + """Assert offline recordings exist by ``recording_index`` with cloud id NULL. + + Rust daemon only: in offline mode the daemon assigns ``recording_index`` + immediately but the cloud ``recording_id`` stays NULL until the daemon is + online and ``/recording/start`` lands. This confirms that contract per + ``recording_index`` captured by each worker. Under the legacy daemon the + cloud ``recording_id`` is the recording's identity from the moment recording + starts, so this offline-NULL contract does not apply and the check is a + no-op. + + Raises: + AssertionError: If a recording row is missing or already has a cloud id. + """ + if not rust_daemon_enabled(): + return + for result in results: + rows_by_index = { + row[COLUMN_RECORDING_INDEX]: row + for row in fetch_recordings_for_source(result.source[0], result.source[1]) + } + for recording_index in result.recording_indexes: + row = rows_by_index.get(recording_index) + assert row is not None, ( + f"Offline recording_index {recording_index} for source " + f"{result.source} has no recordings row" + ) + assert row.get(COLUMN_RECORDING_ID) is None, ( + f"Offline recording_index {recording_index} unexpectedly already " + f"has cloud recording_id={row.get(COLUMN_RECORDING_ID)!r}" + ) + + +def resolve_cloud_recording_ids( + results: list[ContextResult], + *, + timeout_s: float = 60.0, +) -> list[ContextResult]: + """Return copies of *results* with cloud ``recording_ids`` resolved per mode. + + Legacy Python daemon: ``nc.start_recording()`` already returns the cloud + ``recording_id``, so ``result.recording_ids`` is authoritative and the + results are returned unchanged. + + Rust daemon: the cloud ``recording_id`` is populated asynchronously by the + start-notifier once the daemon is online, so the captured ids may have been + NULL at record time. This reads the daemon DB directly by ``recording_index`` + (which the recording worker already captured) and waits until each row's + cloud ``recording_id`` is filled in, so cloud verification (which matches the + dataset's ``recording.id``) has the correct ids. + + Resolving from the DB rather than ``nc.get_cloud_recording_id`` is required + here: recordings are made in worker subprocesses, so the verifying process + has no active recording context to resolve against — only the daemon-assigned + ``recording_index`` values carried on each result. + + Raises: + AssertionError: If any recording's cloud id is not populated in time. + """ + if not rust_daemon_enabled(): + return results + + resolved: list[ContextResult] = [] + for result in results: + cloud_ids: list[str] = [] + for recording_index in result.recording_indexes: + cloud_id = _wait_for_cloud_recording_id( + recording_index, timeout_s=timeout_s + ) + assert cloud_id, ( + f"Cloud recording_id never populated for recording_index " + f"{recording_index} (robot {result.robot_name!r}) within " + f"{timeout_s}s" + ) + cloud_ids.append(cloud_id) + resolved.append(dataclasses.replace(result, recording_ids=cloud_ids)) + return resolved + + +def _wait_for_cloud_recording_id( + recording_index: int, + *, + timeout_s: float, + poll_interval_s: float = 0.5, +) -> str | None: + """Poll the daemon DB until ``recording_index`` has a cloud id, or time out.""" + deadline = time.monotonic() + timeout_s + while True: + row = fetch_recording(recording_index) + if row is not None and row.get(COLUMN_RECORDING_ID): + return str(row[COLUMN_RECORDING_ID]) + if time.monotonic() >= deadline: + return None + time.sleep(poll_interval_s) # --------------------------------------------------------------------------- @@ -248,20 +455,22 @@ def fetch_all_traces( # --------------------------------------------------------------------------- -def fetch_trace_registration_stats(recording_id: str) -> tuple[int, int]: +def fetch_trace_registration_stats(recording_key: int | str) -> tuple[int, int]: """Return ``(total_traces, non_pending_traces)`` for a recording. - Queries the ``traces`` table and counts all rows for ``recording_id`` plus - the subset whose ``registration_status`` is not ``"pending"``. + Queries the ``traces`` table and counts all rows for ``recording_key`` + plus the subset whose ``registration_status`` is not ``"pending"``. Args: - recording_id: The recording ID to query. + recording_key: The active-mode correlation key (``recording_index`` + under the Rust daemon, cloud ``recording_id`` under the legacy + daemon) to query. Returns: A two-tuple of ``(total_trace_count, non_pending_registration_count)``. """ traces = fetch_all_traces( - recording_id, + recording_key, columns=[COLUMN_REGISTRATION_STATUS], ) non_pending = sum( @@ -272,24 +481,26 @@ def fetch_trace_registration_stats(recording_id: str) -> tuple[int, int]: return len(traces), non_pending -def fetch_expected_trace_count_reported(recording_id: str) -> int | None: +def fetch_expected_trace_count_reported(recording_key: int | str) -> int | None: """Return the ``expected_trace_count_reported`` value for a recording row. Args: - recording_id: The recording ID to look up. + recording_key: The active-mode correlation key (``recording_index`` + under the Rust daemon, cloud ``recording_id`` under the legacy + daemon) to look up. Returns: The integer value of ``expected_trace_count_reported``, or ``None`` when the recording row is not found. """ - row = fetch_recording(recording_id) + row = fetch_recording(recording_key) if row is None or row.get(COLUMN_EXPECTED_TRACE_COUNT_REPORTED) is None: return None return int(row[COLUMN_EXPECTED_TRACE_COUNT_REPORTED]) def fetch_recording_online_verification_stats( - recording_id: str, + recording_key: int | str, ) -> dict[str, int | str | None]: """Fetch a comprehensive set of online-verification stats for one recording. @@ -299,7 +510,9 @@ def fetch_recording_online_verification_stats( default dict. Args: - recording_id: The recording ID to inspect. + recording_key: The active-mode correlation key (``recording_index`` + under the Rust daemon, cloud ``recording_id`` under the legacy + daemon) to inspect. Returns: A dict with the following keys: @@ -329,9 +542,9 @@ def fetch_recording_online_verification_stats( } try: - recording_row = fetch_recording(recording_id) + recording_row = fetch_recording(recording_key) traces = fetch_all_traces( - recording_id, + recording_key, columns=[COLUMN_REGISTRATION_STATUS, COLUMN_UPLOAD_STATUS], ) except sqlite3.OperationalError: @@ -383,7 +596,9 @@ def fetch_recording_online_verification_stats( } -def fetch_recording_trace_upload_stats(recording_id: str) -> dict[str, object]: +def fetch_recording_trace_upload_stats( + recording_key: int | str, +) -> dict[str, object]: """Fetch per-trace upload and registration status counts for a recording. Returns aggregate counts broken down by ``write_status``, @@ -391,7 +606,9 @@ def fetch_recording_trace_upload_stats(recording_id: str) -> dict[str, object]: per-trace row snapshots. Handles missing tables and columns gracefully. Args: - recording_id: The recording ID to inspect. + recording_key: The active-mode correlation key (``recording_index`` + under the Rust daemon, cloud ``recording_id`` under the legacy + daemon) to inspect. Returns: A dict containing: @@ -412,7 +629,7 @@ def fetch_recording_trace_upload_stats(recording_id: str) -> dict[str, object]: selected_columns = list(TRACE_UPLOAD_DETAIL_COLUMNS) try: - rows = fetch_all_traces(recording_id, columns=selected_columns) + rows = fetch_all_traces(recording_key, columns=selected_columns) except sqlite3.OperationalError: return default_result @@ -594,7 +811,7 @@ def wait_for_recordings_finalized( def wait_for_offline_db_ready( timeout_s: float = MAX_TIME_TO_START_S, *, - expected_recording_ids: Iterable[str] | None = None, + expected_recording_keys: Iterable[int | str] | None = None, ) -> None: """Block until the offline daemon's SQLite DB schema is ready for queries. @@ -605,9 +822,11 @@ def wait_for_offline_db_ready( Args: timeout_s: Maximum seconds to wait before raising. - expected_recording_ids: When supplied, also waits until at least one - of the expected recording directories exists on disk, preventing a + expected_recording_keys: When supplied, also waits until at least one of + the expected recording directories exists on disk, preventing a false-positive ``ready`` result from an empty-but-initialised DB. + Keys are ``recording_index`` values under the Rust daemon and cloud + ``recording_id`` strings under the legacy daemon. Raises: AssertionError: If the DB is not ready within ``timeout_s`` seconds. @@ -618,7 +837,20 @@ def wait_for_offline_db_ready( RECORDINGS_TABLE, TRACES_TABLE, } - target_recording_ids = normalize_recording_ids(expected_recording_ids) + if rust_daemon_enabled(): + target_recording_keys: set[int] | set[str] = normalize_recording_indexes( + expected_recording_keys + ) + list_on_disk: Callable[[], set[int]] | Callable[[], set[str]] = ( + list_recording_indexes_on_disk + ) + else: + target_recording_keys = normalize_recording_ids( + None + if expected_recording_keys is None + else (str(key) for key in expected_recording_keys) + ) + list_on_disk = list_recording_ids_on_disk with Timer(timeout_s, label="daemon.offline_db_ready", always_log=True): while time.monotonic() < deadline: @@ -627,7 +859,7 @@ def wait_for_offline_db_ready( time.sleep(0.1) continue - if not target_recording_ids and not list_recording_ids_on_disk(): + if not target_recording_keys and not list_on_disk(): time.sleep(0.1) continue @@ -654,7 +886,7 @@ def wait_for_offline_db_ready( raise AssertionError( "Offline daemon DB did not become ready within " f"{timeout_s}s. db_path={db_path} exists={db_path.exists()} " - f"recordings_on_disk={sorted(list_recording_ids_on_disk())} " + f"recordings_on_disk={sorted(list_on_disk())} " f"tables={sorted(existing_tables)} last_error={last_error!r}" ) @@ -666,21 +898,24 @@ def wait_for_all_traces_written( ) -> None: """Block until every trace for every recording in *results* has been written. - Uses the recordings root directory as the source of truth for which - recording IDs to check — this catches recordings the daemon started that - the client-side results list may not reflect (e.g. due to an - ``already_started`` race on reconnect). + Recordings are correlated by the active daemon's key: the daemon-assigned + INTEGER ``recording_index`` under the Rust daemon (the on-disk directory name + and the traces join key, since the cloud ``recording_id`` is nullable / minted + async), or the cloud ``recording_id`` under the legacy Python daemon. The + recordings root directory is the source of truth for which keys to check — + this catches recordings the daemon started that the client-side results may + not reflect (e.g. an ``already_started`` race on reconnect). Blocks until all of the following are true for every recording in scope: - A matching row exists in the DB with ``stopped_at`` set. - - ``trace_count`` on the recording row equals the number of trace rows. + - Every recording has at least one trace row. - Every trace row has ``write_status == 'written'``. Args: timeout_s: Maximum seconds to wait before raising. results: List of :class:`~build_test_case_context.ContextResult` objects - whose recording IDs are used to scope the check. + whose recording keys scope the check. Raises: AssertionError: If the condition is not met within ``timeout_s``. @@ -688,10 +923,24 @@ def wait_for_all_traces_written( min_poll_interval_s = 0.05 max_poll_interval_s = 1.0 + use_rust = rust_daemon_enabled() + correlation_column = COLUMN_RECORDING_INDEX if use_rust else COLUMN_RECORDING_ID + + def _raw_keys() -> list: + if use_rust: + return [key for result in results for key in result.recording_indexes] + return [key for result in results for key in result.recording_ids] + + if use_rust: + expected_keys: set[int] | set[str] = normalize_recording_indexes(_raw_keys()) + list_keys_on_disk: Callable[[], set[int]] | Callable[[], set[str]] = ( + list_recording_indexes_on_disk + ) + else: + expected_keys = normalize_recording_ids(_raw_keys()) + list_keys_on_disk = list_recording_ids_on_disk + deadline = time.monotonic() + timeout_s - expected_ids = normalize_recording_ids( - str(recording_id) for result in results for recording_id in result.recording_ids - ) poll_interval_s = min_poll_interval_s last_state: tuple[int, int, int, int, int] | None = None @@ -712,32 +961,32 @@ def _sleep_for_next_poll(*, progress_made: bool) -> None: wait_for_offline_db_ready( timeout_s=max(0.0, deadline - time.monotonic()), - expected_recording_ids=expected_ids, + expected_recording_keys=expected_keys, ) while time.monotonic() < deadline: - recording_ids = expected_ids or list_recording_ids_on_disk() - if not recording_ids: + recording_keys = expected_keys or list_keys_on_disk() + if not recording_keys: _sleep_for_next_poll(progress_made=False) continue try: recordings = { - row[COLUMN_RECORDING_ID]: row + row[correlation_column]: row for row in fetch_all_rows(RECORDINGS_TABLE) - if row[COLUMN_RECORDING_ID] in recording_ids + if row[correlation_column] in recording_keys } traces = [ trace for trace in fetch_all_rows(TRACES_TABLE) - if trace[COLUMN_RECORDING_ID] in recording_ids + if trace[correlation_column] in recording_keys ] except sqlite3.OperationalError: _sleep_for_next_poll(progress_made=False) continue - traces_by_recording: dict[str, list[dict[str, Any]]] = {} + traces_by_recording: dict[Any, list[dict[str, Any]]] = {} for trace in traces: - traces_by_recording.setdefault(trace[COLUMN_RECORDING_ID], []).append(trace) + traces_by_recording.setdefault(trace[correlation_column], []).append(trace) stopped_count = sum( 1 for row in recordings.values() if row[COLUMN_STOPPED_AT] is not None @@ -746,7 +995,7 @@ def _sleep_for_next_poll(*, progress_made: bool) -> None: 1 for trace in traces if trace[COLUMN_WRITE_STATUS] == TRACE_WRITE_WRITTEN ) current_state = ( - len(recording_ids), + len(recording_keys), len(recordings), stopped_count, len(traces), @@ -755,7 +1004,7 @@ def _sleep_for_next_poll(*, progress_made: bool) -> None: progress_made = current_state != last_state last_state = current_state - if len(recordings) < len(recording_ids): + if len(recordings) < len(recording_keys): _sleep_for_next_poll(progress_made=progress_made) continue @@ -763,27 +1012,22 @@ def _sleep_for_next_poll(*, progress_made: bool) -> None: _sleep_for_next_poll(progress_made=progress_made) continue + # Intentionally weakened to "every recording has >= 1 trace row": the + # Rust schema dropped the `trace_count` column, so we can no longer + # assert an exact per-recording trace count here. all_have_traces = all( - len(traces_by_recording.get(recording_id, [])) > 0 - for recording_id in recording_ids + len(traces_by_recording.get(recording_key, [])) > 0 + for recording_key in recording_keys ) if not all_have_traces: _sleep_for_next_poll(progress_made=progress_made) continue - counts_match = all( - row[COLUMN_TRACE_COUNT] == len(traces_by_recording.get(recording_id, [])) - for recording_id, row in recordings.items() - ) - if not counts_match: - _sleep_for_next_poll(progress_made=progress_made) - continue - if written_count == len(traces): return _sleep_for_next_poll(progress_made=progress_made) - recording_ids = expected_ids or list_recording_ids_on_disk() + recording_keys = expected_keys or list_keys_on_disk() try: recordings = fetch_all_rows(RECORDINGS_TABLE) traces = fetch_all_rows(TRACES_TABLE) @@ -794,32 +1038,34 @@ def _sleep_for_next_poll(*, progress_made: bool) -> None: unfinished = [ { COLUMN_TRACE_ID: t[COLUMN_TRACE_ID], - COLUMN_RECORDING_ID: t[COLUMN_RECORDING_ID], + correlation_column: t[correlation_column], COLUMN_WRITE_STATUS: t[COLUMN_WRITE_STATUS], } for t in traces - if t[COLUMN_RECORDING_ID] in recording_ids + if t[correlation_column] in recording_keys and t[COLUMN_WRITE_STATUS] != TRACE_WRITE_WRITTEN ] missing_in_db = sorted( - recording_ids - {row[COLUMN_RECORDING_ID] for row in recordings} + recording_keys - {row[correlation_column] for row in recordings} ) not_stopped = sorted( - row[COLUMN_RECORDING_ID] + row[correlation_column] for row in recordings - if row[COLUMN_RECORDING_ID] in recording_ids and row[COLUMN_STOPPED_AT] is None + if row[correlation_column] in recording_keys and row[COLUMN_STOPPED_AT] is None ) recordings_without_traces = sorted( - recording_id - for recording_id in recording_ids - if not any(trace[COLUMN_RECORDING_ID] == recording_id for trace in traces) + recording_key + for recording_key in recording_keys + if not any(trace[correlation_column] == recording_key for trace in traces) + ) + all_raw_keys = _raw_keys() + duplicate_keys = sorted( + {key for key in all_raw_keys if all_raw_keys.count(key) > 1} ) - all_raw_ids = [str(rec_id) for result in results for rec_id in result.recording_ids] - duplicate_ids = sorted({i for i in all_raw_ids if all_raw_ids.count(i) > 1}) raise AssertionError( f"Daemon did not finish writing all traces within {timeout_s}s.\n" - f" Duplicate recording IDs across contexts: {duplicate_ids}\n" - f" Recordings on disk with no DB row: {missing_in_db}\n" + f" Duplicate recording keys across contexts: {duplicate_keys}\n" + f" Recording keys on disk with no DB row: {missing_in_db}\n" f" Recordings not yet stopped (stopped_at is NULL): {not_stopped}\n" f" Recordings with no trace rows: {recordings_without_traces}\n" f" Traces still in non-written state ({len(unfinished)}):\n" @@ -828,7 +1074,7 @@ def _sleep_for_next_poll(*, progress_made: bool) -> None: def assert_recording_db_statuses( - recording_id: str, + recording_index: int, *, check_cloud_statuses: bool = False, ) -> None: @@ -841,7 +1087,7 @@ def assert_recording_db_statuses( online upload cycle. Args: - recording_id: The recording ID to inspect. + recording_index: The local ``recording_index`` to inspect. check_cloud_statuses: When ``True``, also assert registration and upload statuses in addition to write status. Pass this for cloud (online) test cases only. @@ -851,7 +1097,7 @@ def assert_recording_db_statuses( """ try: traces = fetch_all_traces( - recording_id, + recording_index, columns=[ COLUMN_TRACE_ID, COLUMN_WRITE_STATUS, @@ -861,10 +1107,10 @@ def assert_recording_db_statuses( ) except sqlite3.OperationalError as exc: raise AssertionError( - f"Cannot query traces for recording {recording_id}: {exc}" + f"Cannot query traces for recording_index {recording_index}: {exc}" ) from exc - assert traces, f"No trace rows found in DB for recording {recording_id}" + assert traces, f"No trace rows found in DB for recording_index {recording_index}" non_written = [ { @@ -875,7 +1121,7 @@ def assert_recording_db_statuses( if t.get(COLUMN_WRITE_STATUS) != TRACE_WRITE_WRITTEN ] assert not non_written, ( - f"Recording {recording_id}: traces not in 'written' state " + f"Recording {recording_index}: traces not in 'written' state " f"({len(non_written)}/{len(traces)}):\n" + "\n".join(f" {t}" for t in non_written) ) @@ -892,7 +1138,7 @@ def assert_recording_db_statuses( if t.get(COLUMN_REGISTRATION_STATUS) != TRACE_REGISTRATION_REGISTERED ] assert not non_registered, ( - f"Recording {recording_id}: traces not in 'registered' state " + f"Recording {recording_index}: traces not in 'registered' state " f"({len(non_registered)}/{len(traces)}):\n" + "\n".join(f" {t}" for t in non_registered) ) @@ -906,14 +1152,14 @@ def assert_recording_db_statuses( if t.get(COLUMN_UPLOAD_STATUS) != TRACE_UPLOAD_UPLOADED ] assert not non_uploaded, ( - f"Recording {recording_id}: traces not in 'uploaded' state " + f"Recording {recording_index}: traces not in 'uploaded' state " f"({len(non_uploaded)}/{len(traces)}):\n" + "\n".join(f" {t}" for t in non_uploaded) ) def wait_for_upload_complete_in_db( - recording_id: str, + recording_key: int | str, timeout_s: float = 90.0, ) -> None: """Block until all known traces for a recording are uploaded per the daemon DB. @@ -932,7 +1178,9 @@ def wait_for_upload_complete_in_db( verify that data is present in the cloud. Args: - recording_id: The recording ID to wait on. + recording_key: The active-mode correlation key (``recording_index`` + under the Rust daemon, cloud ``recording_id`` under the legacy + daemon) to wait on. timeout_s: Base no-progress timeout in seconds. Raises: @@ -948,7 +1196,7 @@ def wait_for_upload_complete_in_db( last_state: tuple[int | str | None, ...] | None = None while time.monotonic() < deadline: - stats = fetch_recording_online_verification_stats(recording_id) + stats = fetch_recording_online_verification_stats(recording_key) if _is_online_upload_complete(stats): return @@ -980,11 +1228,11 @@ def wait_for_upload_complete_in_db( time.sleep(sleep_s) poll_interval_s = min(max_poll_interval_s, poll_interval_s * 2) - stats = fetch_recording_online_verification_stats(recording_id) - trace_upload_stats = fetch_recording_trace_upload_stats(recording_id) + stats = fetch_recording_online_verification_stats(recording_key) + trace_upload_stats = fetch_recording_trace_upload_stats(recording_key) pytest.fail( - "Online upload did not complete for " - f"recording {recording_id} within {progress_timeout_s:.1f}s of last progress; " + "Online upload did not complete for recording " + f"{recording_key!r} within {progress_timeout_s:.1f}s of last progress; " f"stats={stats}; trace_upload_stats={trace_upload_stats}" ) diff --git a/tests/integration/platform/data_daemon/shared/disk_helpers.py b/tests/integration/platform/data_daemon/shared/disk_helpers.py index 0506bc409..43096fa19 100644 --- a/tests/integration/platform/data_daemon/shared/disk_helpers.py +++ b/tests/integration/platform/data_daemon/shared/disk_helpers.py @@ -10,10 +10,11 @@ from typing import TYPE_CHECKING from neuracore.data_daemon.helpers import get_daemon_recordings_root_path +from neuracore.data_daemon.rust_selection import rust_daemon_enabled from tests.integration.platform.data_daemon.shared.test_case.constants import ( - STOCHASTIC_JITTER_S, TIMESTAMP_MODE_REAL, TIMESTAMP_MODE_STOCHASTIC, + stochastic_jitter_window, ) if TYPE_CHECKING: @@ -72,7 +73,13 @@ def render(self) -> list[str]: def list_recording_ids_on_disk() -> set[str]: - """Return recording IDs that exist as subdirectories under the recordings root.""" + """Return cloud recording IDs that exist as subdirectories on disk. + + Legacy Python daemon: the on-disk layout is + ``{recordings_root}/{recording_id}/...`` — the top directory segment is the + cloud ``recording_id``. Under the Rust daemon use + :func:`list_recording_indexes_on_disk` instead. + """ recordings_root = get_daemon_recordings_root_path() if not recordings_root.exists(): return set() @@ -82,7 +89,7 @@ def list_recording_ids_on_disk() -> set[str]: def normalize_recording_ids( expected_recording_ids: Iterable[str] | None, ) -> set[str]: - """Return a clean set of non-empty recording ID strings.""" + """Return a clean set of non-empty cloud recording ID strings.""" if expected_recording_ids is None: return set() return { @@ -90,6 +97,58 @@ def normalize_recording_ids( } +def list_recording_indexes_on_disk() -> set[int]: + """Return ``recording_index`` values that exist under the recordings root. + + Thin-shipper rewrite: the on-disk layout is now + ``{recordings_root}/{recording_index}/{data_type}/{trace_id}/`` — the top + directory segment is the daemon-assigned INTEGER ``recording_index``, not a + cloud recording id. Only integer-named directories are recording roots. + """ + recordings_root = get_daemon_recordings_root_path() + if not recordings_root.exists(): + return set() + indexes: set[int] = set() + for child in recordings_root.iterdir(): + if not child.is_dir(): + continue + try: + indexes.add(int(child.name)) + except ValueError: + continue + return indexes + + +def normalize_recording_indexes( + expected_recording_indexes: Iterable[int | str] | None, +) -> set[int]: + """Return a clean set of integer ``recording_index`` values.""" + if expected_recording_indexes is None: + return set() + normalized: set[int] = set() + for recording_index in expected_recording_indexes: + if recording_index is None or recording_index == "": + continue + normalized.add(int(recording_index)) + return normalized + + +def _result_recording_keys(result: ContextResult) -> list[tuple[str, int | str]]: + """Yield ``(disk_dir_name, db_correlation_key)`` per recording in *result*. + + Rust daemon: the on-disk directory and traces join key are both the integer + ``recording_index``. Legacy Python daemon: both are the cloud ``recording_id`` + string. Keeping these two values together lets the assertion body stay + identical across modes. + """ + if rust_daemon_enabled(): + return [ + (str(recording_index), recording_index) + for recording_index in result.recording_indexes + ] + return [(recording_id, recording_id) for recording_id in result.recording_ids] + + def _collect_trace_timestamps_per_file(recording_dir: Path) -> dict[str, list[float]]: """Return mapping of trace file key (joint/camera name) to timestamps from every trace.json under a recording dir.""" @@ -300,13 +359,15 @@ def _assert_stochastic_timestamps( expected_timestamps: list[float], failures: list[TraceFailure], durations: dict[str, float], + fps: int, ) -> None: - """Assert every timestamp is within STOCHASTIC_JITTER_S of its intended value. + """Assert every timestamp is within the fps-derived jitter window. The intended timestamps are the pre-jitter values (``start + i / fps``). - Each actual timestamp must satisfy - ``|actual - intended| <= STOCHASTIC_JITTER_S``. + Each actual timestamp must satisfy ``|actual - intended| <= window``, where + ``window`` is :func:`stochastic_jitter_window` of the trace's ``fps``. """ + window = stochastic_jitter_window(fps) if len(timestamps) != len(expected_timestamps): failures.append( TraceFailure( @@ -322,7 +383,7 @@ def _assert_stochastic_timestamps( out_of_window = [ (i, actual, intended) for i, (actual, intended) in enumerate(zip(timestamps, expected_timestamps)) - if abs(actual - intended) > STOCHASTIC_JITTER_S + if abs(actual - intended) > window ] if out_of_window: examples = "; ".join( @@ -332,7 +393,7 @@ def _assert_stochastic_timestamps( ) body = ( f"{len(out_of_window)}/{len(timestamps)} timestamp(s) outside" - f" ±{STOCHASTIC_JITTER_S}s jitter window — {examples}" + f" ±{window}s jitter window — {examples}" + (f" (+ {len(out_of_window) - 3} more)" if len(out_of_window) > 3 else "") ) failures.append(TraceFailure(trace_key=trace_key, body=body)) @@ -386,12 +447,12 @@ def assert_disk_recording_properties( for result in results: use_real = result.timestamp_mode == TIMESTAMP_MODE_REAL use_stochastic = result.timestamp_mode == TIMESTAMP_MODE_STOCHASTIC - for recording_id in result.recording_ids: - recording_dir = recordings_root / recording_id + for recording_key, fetch_key in _result_recording_keys(result): + recording_dir = recordings_root / recording_key if not recording_dir.exists(): all_failures.append( RecordingFailures( - recording_id=recording_id, + recording_id=recording_key, recording_error=( f"directory not found on disk ({recording_dir})" ), @@ -404,7 +465,7 @@ def assert_disk_recording_properties( if not trace_timestamps: all_failures.append( RecordingFailures( - recording_id=recording_id, + recording_id=recording_key, recording_error=( f"no timestamps found in any trace.json" f" under {recording_dir}" @@ -419,7 +480,7 @@ def assert_disk_recording_properties( # "JOINT_POSITIONS/vx300s_left\waist", "RGB_IMAGES/camera_0", # "CUSTOM_1D/marker". trace_rows = fetch_all_traces( - recording_id, + fetch_key, columns=["trace_id", "data_type", "data_type_name"], ) uuid_to_semantic: dict[str, str] = {} @@ -450,7 +511,7 @@ def assert_disk_recording_properties( else _assert_manual_timestamps ) per_recording = ( - result.expected_timestamps.by_recording.get(recording_id) + result.expected_timestamps.by_recording.get(recording_key) if result.expected_timestamps is not None else None ) @@ -462,10 +523,10 @@ def assert_disk_recording_properties( ) all_failures.append( RecordingFailures( - recording_id=recording_id, + recording_id=recording_key, recording_error=( f"no expected timestamps —" - f" known recording IDs: {known}" + f" known recording_index keys: {known}" ), trace_failures=[], ) @@ -476,7 +537,7 @@ def assert_disk_recording_properties( for trace_key, timestamps in mapped_trace_timestamps.items(): if use_real: _assert_real_timestamps( - recording_id=recording_id, + recording_id=recording_key, trace_key=trace_key, timestamps=timestamps, wall_started_at=result.wall_started_at, @@ -499,19 +560,27 @@ def assert_disk_recording_properties( ) ) continue + # The stochastic assertion sizes its tolerance from the + # trace's fps; the manual assertion takes no tolerance. + extra = ( + {"fps": per_recording.by_trace_fps[trace_key]} + if use_stochastic + else {} + ) assert_ts( - recording_id=recording_id, + recording_id=recording_key, trace_key=trace_key, timestamps=timestamps, expected_timestamps=expected[trace_key], failures=trace_failures, durations=durations, + **extra, ) if trace_failures: all_failures.append( RecordingFailures( - recording_id=recording_id, + recording_id=recording_key, recording_error=None, trace_failures=trace_failures, ) diff --git a/tests/integration/platform/data_daemon/shared/process_control.py b/tests/integration/platform/data_daemon/shared/process_control.py index ee85e425e..86ced73c1 100644 --- a/tests/integration/platform/data_daemon/shared/process_control.py +++ b/tests/integration/platform/data_daemon/shared/process_control.py @@ -32,6 +32,7 @@ ) from tests.integration.platform.data_daemon.shared.test_case.constants import ( STOP_METHOD_CLI, + STOP_METHOD_SIGINT, STOP_METHOD_SIGKILL, STOP_METHOD_SIGTERM, ) @@ -54,11 +55,6 @@ HIGH_TIME_TO_DATASET_READY_S = 500 """Upper bound on waiting for an online dataset to become ready, in seconds.""" -GRACEFUL_TIMEOUT_STOP_S = 15 -"""Seconds to wait for graceful exit before escalating to SIGKILL in stop_daemon().""" - -SIGKILL_TIMEOUT_STOP_S = 60 -"""Seconds to wait for exit after sending SIGKILL in stop_daemon().""" # --------------------------------------------------------------------------- # Timer @@ -179,7 +175,13 @@ def assert_on_schedule(deadline: float, tolerance: float, label: str) -> None: def get_runner_pids() -> set[int]: - """Return the PIDs of all running neuracore data-daemon runner processes.""" + """Return the PIDs of all running neuracore data-daemon runner processes. + + Matches either the Python runner entry point + (``neuracore.data_daemon.runner_entry``) or the bundled Rust binary + (``neuracore/data_daemon/bin/data-daemon``) — the latter is what runs + when ``NCD_RUST_DAEMON=1`` and the wheel includes the compiled binary. + """ env = {**os.environ, "COLUMNS": "32768"} output = subprocess.check_output(["ps", "-eo", "pid=,args="], text=True, env=env) runner_pids: set[int] = set() @@ -188,7 +190,10 @@ def get_runner_pids() -> set[int]: if len(parts) != 2: continue pid_text, args = parts - if "neuracore.data_daemon.runner_entry" in args: + if ( + "neuracore.data_daemon.runner_entry" in args + or "neuracore/data_daemon/bin/data-daemon" in args + ): runner_pids.add(int(pid_text)) return runner_pids @@ -219,7 +224,7 @@ def _collect_candidate_pids() -> set[int]: def _send_initial_stop(method: str, candidate_pids: set[int]) -> None: """Deliver the initial stop signal or CLI command for ``method``.""" - if method == "cli": + if method == STOP_METHOD_CLI: subprocess.run( [sys.executable, "-m", "neuracore.data_daemon", "stop"], check=False, @@ -230,7 +235,7 @@ def _send_initial_stop(method: str, candidate_pids: set[int]) -> None: for pid in sorted(candidate_pids): if pid_is_running(pid): terminate_pid(pid) - elif method == "sigint": + elif method == STOP_METHOD_SIGINT: for pid in sorted(candidate_pids): if pid_is_running(pid): try: @@ -245,22 +250,15 @@ def _send_initial_stop(method: str, candidate_pids: set[int]) -> None: raise ValueError(f"Unknown stop method: {method!r}") -def _wait_and_escalate( - candidate_pids: set[int], - *, - graceful_timeout_s: float = GRACEFUL_TIMEOUT_STOP_S, - sigkill_timeout_s: float = SIGKILL_TIMEOUT_STOP_S, -) -> None: +def _wait_and_escalate(candidate_pids: set[int], *, graceful_timeout_s: float) -> None: """Wait for each PID to exit, escalating to SIGKILL on timeout.""" for pid in sorted(candidate_pids): if not pid_is_running(pid): continue if not wait_for_exit(pid, timeout_s=graceful_timeout_s): - with Timer( - sigkill_timeout_s, label="stop_daemon_escalated", assert_deadline=False - ): + with Timer(5.0, label="stop_daemon_escalated", assert_deadline=False): force_kill(pid) - wait_for_exit(pid, timeout_s=sigkill_timeout_s) + wait_for_exit(pid, timeout_s=5.0) def _remove_ipc_artefacts() -> None: @@ -280,8 +278,7 @@ def _remove_ipc_artefacts() -> None: def stop_daemon( *, method: str = STOP_METHOD_CLI, - graceful_timeout_s: float = GRACEFUL_TIMEOUT_STOP_S, - sigkill_timeout_s: float = SIGKILL_TIMEOUT_STOP_S, + graceful_timeout_s: float = 10.0, ) -> None: """Stop all daemon processes and clean up IPC artefacts. @@ -290,20 +287,14 @@ def stop_daemon( graceful_timeout_s: Seconds to wait for graceful exit before escalating to SIGKILL. Ignored when ``method="sigkill"``. """ - with Timer( - graceful_timeout_s, label=f"stop_daemon[{method}]", assert_deadline=False - ): + with Timer(15.0, label=f"stop_daemon[{method}]", assert_deadline=False): candidate_pids = _collect_candidate_pids() _send_initial_stop(method, candidate_pids) if method == STOP_METHOD_SIGKILL: for pid in sorted(candidate_pids): - wait_for_exit(pid, timeout_s=sigkill_timeout_s) + wait_for_exit(pid, timeout_s=5.0) else: - _wait_and_escalate( - candidate_pids, - graceful_timeout_s=graceful_timeout_s, - sigkill_timeout_s=sigkill_timeout_s, - ) + _wait_and_escalate(candidate_pids, graceful_timeout_s=graceful_timeout_s) _remove_ipc_artefacts() diff --git a/tests/integration/platform/data_daemon/shared/runners.py b/tests/integration/platform/data_daemon/shared/runners.py index 423a8de69..3cc5c45ff 100644 --- a/tests/integration/platform/data_daemon/shared/runners.py +++ b/tests/integration/platform/data_daemon/shared/runners.py @@ -28,8 +28,6 @@ ) from tests.integration.platform.data_daemon.shared.test_case.constants import ( MAX_TIME_TO_START_S, -) -from tests.integration.platform.data_daemon.shared.test_infrastructure import ( OFFLINE_DB_PATH, OFFLINE_RECORDINGS_ROOT, ) diff --git a/tests/integration/platform/data_daemon/shared/storage_assertions.py b/tests/integration/platform/data_daemon/shared/storage_assertions.py index 27c54ecd7..0a14707c4 100644 --- a/tests/integration/platform/data_daemon/shared/storage_assertions.py +++ b/tests/integration/platform/data_daemon/shared/storage_assertions.py @@ -6,6 +6,7 @@ from __future__ import annotations +import os import sqlite3 from pathlib import Path @@ -13,15 +14,42 @@ get_daemon_db_path, get_daemon_recordings_root_path, ) +from neuracore.data_daemon.rust_selection import rust_daemon_enabled from tests.integration.platform.data_daemon.shared.test_case.constants import ( + OFFLINE_DB_PATH, + OFFLINE_RECORDINGS_ROOT, STORAGE_STATE_DELETE, STORAGE_STATE_EMPTY, ) +def harness_db_path() -> Path: + """Return the DB path the harness should clean and assert against. + + The Rust daemon only sees ``NEURACORE_DAEMON_DB_PATH`` while it runs (set by + ``scoped_daemon_storage_env``); the clean/check helpers run *outside* that + scope, where the production getter falls back to ``~/.neuracore`` and would + target the wrong folder. When the Rust daemon is active and the env var is + unset, resolve the real shared test-state path the daemon actually used. + """ + if rust_daemon_enabled() and not os.getenv("NEURACORE_DAEMON_DB_PATH"): + return OFFLINE_DB_PATH + return get_daemon_db_path() + + +def harness_recordings_root() -> Path: + """Return the recordings root the harness should clean and assert against. + + See :func:`harness_db_path` for why the Rust daemon needs special handling. + """ + if rust_daemon_enabled() and not os.getenv("NEURACORE_DAEMON_RECORDINGS_ROOT"): + return OFFLINE_RECORDINGS_ROOT + return get_daemon_recordings_root_path() + + def assert_db_absent() -> None: """Fail if the active daemon DB file or its WAL/SHM sidecars still exist.""" - db_path = get_daemon_db_path() + db_path = harness_db_path() for candidate in ( db_path, Path(str(db_path) + "-wal"), @@ -34,15 +62,32 @@ def assert_db_absent() -> None: def assert_recordings_folder_absent() -> None: """Fail if the active daemon recordings root directory still exists.""" - recordings_root = get_daemon_recordings_root_path() + recordings_root = harness_recordings_root() assert ( not recordings_root.exists() ), f"Recordings folder still present: {recordings_root}" +_INFRA_TABLES = frozenset({ + # sqlx migration bookkeeping (Rust daemon) + "_sqlx_migrations", + # Alembic migration bookkeeping (legacy Python daemon) + "alembic_version", + # SQLite internal sequence table + "sqlite_sequence", +}) + + def assert_db_empty() -> None: - """Fail if any known daemon DB tables contain rows.""" - db_path = get_daemon_db_path() + """Fail if any user-data daemon DB tables contain rows. + + Migration-bookkeeping tables (``_sqlx_migrations``, ``alembic_version``) + and SQLite's internal ``sqlite_sequence`` are excluded — they're owned + by the migration framework, not by the daemon's domain model, so a + non-zero row count there is expected after the daemon has started even + once. + """ + db_path = harness_db_path() if not db_path.exists(): return with sqlite3.connect(str(db_path)) as conn: @@ -54,6 +99,8 @@ def assert_db_empty() -> None: } non_empty: list[str] = [] for table in sorted(tables): + if table in _INFRA_TABLES: + continue with sqlite3.connect(str(db_path)) as conn: count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[ 0 @@ -67,7 +114,7 @@ def assert_db_empty() -> None: def assert_recordings_folder_empty() -> None: """Fail if the recordings root contains any files.""" - recordings_root = get_daemon_recordings_root_path() + recordings_root = harness_recordings_root() if not recordings_root.exists(): return leftover = [p for p in recordings_root.rglob("*") if p.is_file()] diff --git a/tests/integration/platform/data_daemon/shared/test_case/build_test_case_context.py b/tests/integration/platform/data_daemon/shared/test_case/build_test_case_context.py index df8608b11..00d912c16 100644 --- a/tests/integration/platform/data_daemon/shared/test_case/build_test_case_context.py +++ b/tests/integration/platform/data_daemon/shared/test_case/build_test_case_context.py @@ -47,10 +47,13 @@ MODE_STAGGERED, PRODUCER_PER_THREAD, SCHEDULER_TOLERANCE_S, - STOCHASTIC_JITTER_S, + STOP_RECORDING_NO_WAIT_SLA_S, STOP_RECORDING_OVERHEAD_PER_SEC, + STOP_RECORDING_UPLOAD_SLA_PER_JOINT_SAMPLE_S, + STOP_RECORDING_UPLOAD_SLA_PER_VIDEO_PIXEL_S, TIMESTAMP_MODE_REAL, TIMESTAMP_MODE_STOCHASTIC, + stochastic_jitter_window, ) logger = logging.getLogger(__name__) @@ -102,7 +105,7 @@ def encode_frame_number(frame_num: int, width: int, height: int) -> np.ndarray: class RecordingExpectedTimestamps: """Expected timestamps per trace for one recording, keyed by semantic trace name. - Produced during the recording loop (once the recording ID is known) and + Produced during the recording loop (once the recording key is known) and consumed by :func:`~disk_helpers.assert_disk_recording_properties` to verify on-disk trace.json files match the manually-supplied timestamps that were logged. @@ -111,9 +114,13 @@ class RecordingExpectedTimestamps: by_trace: Maps semantic trace key (e.g. ``"JOINT_POSITIONS"``, ``"camera_0"``) to the ordered list of expected timestamps for that trace within this recording. + by_trace_fps: Maps the same semantic trace key to the producer fps for + that trace, so the stochastic assertion can size its jitter window + from the case's frame rate. """ by_trace: dict[str, list[float]] + by_trace_fps: dict[str, int] @dataclass(frozen=True, slots=True) @@ -121,7 +128,10 @@ class ContextExpectedTimestamps: """Expected timestamps for all recordings produced by one context worker. Attributes: - by_recording: Maps recording ID to its :class:`RecordingExpectedTimestamps`. + by_recording: Maps the on-disk recording directory name to its + :class:`RecordingExpectedTimestamps`. The directory name is the + integer ``recording_index`` (as a string) under the Rust daemon, or + the cloud ``recording_id`` under the legacy daemon. """ by_recording: dict[str, RecordingExpectedTimestamps] @@ -140,6 +150,41 @@ class ContextCaseSpec: wait: bool timestamp_mode: str + @property + def stop_recording_sla_s(self) -> float: + """Seconds allowed for the ``nc.stop_recording`` call. + + ``wait=False`` is fire-and-forget — the call never blocks on the + upload pipeline — so it gets a flat constant. ``wait=True`` blocks + until every trace has uploaded, so its budget is the sum of the + joint-data and video-data upload costs: total joint samples + (``duration_sec * joint_count * joint_fps``) and total video pixels + (``duration_sec * video_fps * video_count * image_width * + image_height``), each times an observed per-unit upload cost. The + budget is floored at the duration-based overhead so short or + low-volume recordings keep a sane minimum. + """ + if not self.wait: + return STOP_RECORDING_NO_WAIT_SLA_S + duration_floor = self.duration_sec * STOP_RECORDING_OVERHEAD_PER_SEC + joint_budget = ( + self.duration_sec + * self.joint_count + * self.joint_fps + * STOP_RECORDING_UPLOAD_SLA_PER_JOINT_SAMPLE_S + ) + video_budget = 0.0 + if self.video_count and self.image_width and self.image_height: + video_budget = ( + self.duration_sec + * self.video_fps + * self.video_count + * self.image_width + * self.image_height + * STOP_RECORDING_UPLOAD_SLA_PER_VIDEO_PIXEL_S + ) + return max(duration_floor, joint_budget + video_budget) + @dataclass(frozen=True, slots=True) class ContextResult: @@ -147,6 +192,26 @@ class ContextResult: Produced by :func:`context_worker` and consumed by assertion helpers and verification functions throughout the test suite. + + A recording is addressed by: + + - ``recording_ids`` — the cloud ``recording_id`` (TEXT) for each recording. + These are what cloud verification (``verify_cloud_results``) matches + against the dataset's ``recording.id``. Under the legacy daemon + ``nc.start_recording()`` returns this directly. Under the Rust daemon the + daemon mints it asynchronously, so an entry may be an empty string until + the test resolves it (via ``resolve_cloud_recording_ids``) once online. + + The remaining fields apply only under the Rust daemon (the daemon owns + recording identity); they are left empty under the legacy daemon, which uses + ``recording_ids`` for every correlation: + + - ``recording_indexes`` — the daemon-assigned local INTEGER + ``recording_index`` for each recording, resolved from the source DB. + These are the on-disk directory names and the daemon-DB join key. + - ``source`` is the ``(robot_id, robot_instance)`` identity used to correlate + a worker's recordings to daemon-minted ``recording_index`` values without + relying on the local handle. """ dataset_name: str @@ -169,6 +234,8 @@ class ContextResult: timestamp_mode: str expected_timestamps: ContextExpectedTimestamps | None = None timer_stats: dict[str, dict[str, float]] = field(default_factory=dict) + recording_indexes: list[int] = field(default_factory=list) + source: tuple[str, int] = ("", 0) @dataclass(frozen=True, slots=True) @@ -279,11 +346,10 @@ def _cleanup_test_worker_robot(robot: object | None) -> None: robot._daemon_recording_context = None -def get_jitter(use_stochastic_timestamps: bool) -> float: +def get_jitter(use_stochastic_timestamps: bool, fps: int) -> float: if use_stochastic_timestamps: - return STOCHASTIC_TIMESTAMP_RANDOM.uniform( - -STOCHASTIC_JITTER_S, STOCHASTIC_JITTER_S - ) + window = stochastic_jitter_window(fps) + return STOCHASTIC_TIMESTAMP_RANDOM.uniform(-window, window) return 0.0 @@ -320,7 +386,12 @@ def log_synchronous_frames( ): joint_due = joint_index < joint_frame_count video_due = camera_name_list and video_index < video_frame_count - jitter = get_jitter(use_stochastic_timestamps) + # One jitter is shared by both deadlines/timestamps this iteration, so + # size it to the tighter (higher-fps) window to stay within both. + jitter = get_jitter( + use_stochastic_timestamps, + max(joint_fps, video_fps) if camera_name_list else joint_fps, + ) joint_deadline = ( recording_wall_start + (joint_index / joint_fps) + jitter @@ -477,7 +548,7 @@ def worker(role_spec: dict[str, object]) -> None: fps = video_fps if is_rgb else joint_fps thread_wall_start = time.time() for frame_index in range(frame_count): - jitter = get_jitter(use_stochastic_timestamps) + jitter = get_jitter(use_stochastic_timestamps, fps) frame_deadline = thread_wall_start + (frame_index / fps) + jitter remaining = frame_deadline - time.time() if remaining > 0: @@ -680,12 +751,19 @@ def _subprocess_context_worker(spec: ContextSpec) -> ContextResult: def context_worker(spec: ContextSpec) -> ContextResult: """Execute recordings for a single parallel context.""" + from neuracore.data_daemon.rust_selection import rust_daemon_enabled + from tests.integration.platform.data_daemon.shared.db_helpers import ( + wait_for_recording_index_for_source, + ) + + use_rust = rust_daemon_enabled() case = spec.case use_real_timestamps = case.timestamp_mode == TIMESTAMP_MODE_REAL joint_name_list = joint_names_for_count(case.joint_count) camera_name_list = camera_names(case.video_count) marker_names: list[str] = [] recording_ids: list[str] = [] + recording_indexes: list[int] = [] robot = None if spec.start_delay_s > 0.0: @@ -699,13 +777,21 @@ def context_worker(spec: ContextSpec) -> ContextResult: with Timer(MAX_TIME_TO_START_S, label="nc.connect_robot", always_log=True): robot = nc.connect_robot(spec.robot_name, overwrite=False) + source: tuple[str, int] = (str(robot.id), int(robot.instance)) + expected_by_recording: dict[str, RecordingExpectedTimestamps] | None = ( {} if not use_real_timestamps else None ) - for recording_index in range(spec.recordings_per_context): + for recording_ordinal in range(spec.recordings_per_context): recording_timestamp_start_s = ( - spec.timestamp_start_s + recording_index * case.duration_sec + spec.timestamp_start_s + recording_ordinal * case.duration_sec + ) + recording_capture_start_s = None if use_real_timestamps else time.time() + recording_capture_stop_s = ( + None + if recording_capture_start_s is None + else recording_capture_start_s + case.duration_sec ) with Timer( @@ -714,16 +800,36 @@ def context_worker(spec: ContextSpec) -> ContextResult: always_log=True, assert_deadline=spec.assert_deadline, ): - nc.start_recording(robot_name=spec.robot_name) + nc.start_recording( + robot_name=spec.robot_name, timestamp=recording_capture_start_s + ) if wall_started_at is None: wall_started_at = time.time() - recording_id = str(robot.get_current_recording_id() or "") - recording_ids.append(recording_id) - # Build per-recording expected timestamps once the recording ID is known. - # Keys use "data_type/data_type_name" to match the semantic keys resolved - # from the DB in daemon_disk_helpers. data_type_name is the storage name - # produced by validate_safe_name (e.g. "vx300s_left\waist" for joint names). + if use_rust: + previous_index = recording_indexes[-1] if recording_indexes else 0 + daemon_recording_index = wait_for_recording_index_for_source( + source[0], + source[1], + after_index=previous_index, + timeout_s=MAX_TIME_TO_START_S, + ) + recording_indexes.append(daemon_recording_index) + + cloud_recording_id = robot.get_cloud_recording_id(timeout_s=0.0) + recording_ids.append(str(cloud_recording_id or "")) + + disk_recording_key = str(daemon_recording_index) + else: + recording_id = str(robot.get_current_recording_id() or "") + recording_ids.append(recording_id) + disk_recording_key = recording_id + + # Build per-recording expected timestamps once the recording key is + # known. Trace keys use "data_type/data_type_name" to match the + # semantic keys resolved from the DB in disk_helpers. data_type_name is + # the storage name produced by validate_safe_name (e.g. + # "vx300s_left\waist" for joint names). if expected_by_recording is not None: from neuracore_types.utils import validate_safe_name @@ -760,31 +866,44 @@ def context_worker(spec: ContextSpec) -> ContextResult: else: safe_marker = validate_safe_name("marker_synchronous") by_trace[f"CUSTOM_1D/{safe_marker}"] = joint_ts - expected_by_recording[recording_id] = RecordingExpectedTimestamps( - by_trace=by_trace + by_trace_fps = { + trace_key: ( + case.video_fps if timestamps is video_ts else case.joint_fps + ) + for trace_key, timestamps in by_trace.items() + } + expected_by_recording[disk_recording_key] = RecordingExpectedTimestamps( + by_trace=by_trace, + by_trace_fps=by_trace_fps, ) current_marker_names = log_frames( spec, - recording_index=recording_index, + recording_index=recording_ordinal, marker_name="marker_synchronous", ) if not marker_names: marker_names = current_marker_names with Timer( - case.duration_sec * STOP_RECORDING_OVERHEAD_PER_SEC, + case.stop_recording_sla_s, label="nc.stop_recording", always_log=True, assert_deadline=spec.assert_deadline, ): - nc.stop_recording(robot_name=spec.robot_name, wait=case.wait) + nc.stop_recording( + robot_name=spec.robot_name, + wait=case.wait, + timestamp=recording_capture_stop_s, + ) wall_stopped_at = time.time() captured_timer_stats = {k: dict(v) for k, v in Timer._stats.items()} return ContextResult( dataset_name=spec.dataset_name, recording_ids=recording_ids, + recording_indexes=recording_indexes, + source=source, robot_name=spec.robot_name, joint_names=joint_name_list, camera_names=camera_name_list, diff --git a/tests/integration/platform/data_daemon/shared/test_case/constants.py b/tests/integration/platform/data_daemon/shared/test_case/constants.py index 53c0da8fd..1d53d7db6 100644 --- a/tests/integration/platform/data_daemon/shared/test_case/constants.py +++ b/tests/integration/platform/data_daemon/shared/test_case/constants.py @@ -1,7 +1,27 @@ """Shared constants for data-daemon test configuration.""" +import time +from pathlib import Path from typing import Literal +# --------------------------------------------------------------------------- +# Test-state directories and path constants +# --------------------------------------------------------------------------- + +DATA_DAEMON_TEST_STATE_ROOT = Path(".data_daemon_test_state") +"""Root directory for all test-local daemon state (DB, recordings, artifacts).""" + +DATA_DAEMON_TEST_ARTIFACTS_DIR = ( + DATA_DAEMON_TEST_STATE_ROOT / "artifacts" / time.strftime("%Y%m%d_%H%M%S") +) +"""Timestamped directory where per-test artifact copies are stored.""" + +OFFLINE_RECORDINGS_ROOT = DATA_DAEMON_TEST_STATE_ROOT / "recordings" +"""Directory used as the offline daemon's recordings root in tests.""" + +OFFLINE_DB_PATH = DATA_DAEMON_TEST_STATE_ROOT / "state.db" +"""Path used for the offline daemon's SQLite state DB in tests.""" + # --------------------------------------------------------------------------- # Environment variable values # --------------------------------------------------------------------------- @@ -9,6 +29,7 @@ # stop_method STOP_METHOD_CLI = "cli" STOP_METHOD_SIGTERM = "sigterm" +STOP_METHOD_SIGINT = "sigint" STOP_METHOD_SIGKILL = "sigkill" # storage_state_action (governs both the SQLite DB and the recordings folder) @@ -34,10 +55,22 @@ TIMESTAMP_MODE_MANUAL = "manual" TIMESTAMP_MODE_REAL = "real" TIMESTAMP_MODE_STOCHASTIC = "stochastic" -STOCHASTIC_JITTER_S = 0.05 +# Jitter amplitude as a proportion of half the inter-frame interval, so the +# window scales with the case's fps instead of being pinned to one frame rate. +STOCHASTIC_JITTER_FACTOR = 0.5 # OS-scheduler slack budget for the deadline-lateness assertion in stochastic mode. SCHEDULER_TOLERANCE_S = 0.05 + +def stochastic_jitter_window(fps: int) -> float: + """Max jitter amplitude (seconds) for a stream running at ``fps``. + + A fraction (:data:`STOCHASTIC_JITTER_FACTOR`) of half the inter-frame + interval, keeping jitter comfortably below the gap between frames. + """ + return 1 / fps / 2 * STOCHASTIC_JITTER_FACTOR + + # --------------------------------------------------------------------------- # Value sets (tuples for static validation) # --------------------------------------------------------------------------- @@ -67,6 +100,10 @@ MAX_TIME_TO_START_S = 20.0 STOP_RECORDING_OVERHEAD_PER_SEC = 0.5 +STOP_RECORDING_NO_WAIT_SLA_S = 1.0 +STOP_RECORDING_UPLOAD_SLA_PER_JOINT_SAMPLE_S = 1.3e-4 +STOP_RECORDING_UPLOAD_SLA_PER_VIDEO_PIXEL_S = 3.0e-7 + BASE_DATASET_READY_TIMEOUT_S = 180.0 MAX_DATASET_READY_TIMEOUT_S = 3600.0 DATASET_POLL_INTERVAL_S = 0.25 diff --git a/tests/integration/platform/data_daemon/shared/test_infrastructure.py b/tests/integration/platform/data_daemon/shared/test_infrastructure.py index ee35fa4c4..99a5ea0a9 100644 --- a/tests/integration/platform/data_daemon/shared/test_infrastructure.py +++ b/tests/integration/platform/data_daemon/shared/test_infrastructure.py @@ -9,8 +9,8 @@ import logging import os +import sqlite3 import sys -import time from collections.abc import Generator from contextlib import contextmanager from pathlib import Path @@ -19,18 +19,17 @@ import pytest import neuracore as nc -from neuracore.data_daemon.helpers import ( - get_daemon_db_path, - get_daemon_recordings_root_path, -) from tests.integration.platform.data_daemon.shared.storage_assertions import ( assert_post_test_storage_state, + harness_db_path, + harness_recordings_root, ) from tests.integration.platform.data_daemon.shared.test_case.build_test_case import ( case_id, log_run_analysis, ) from tests.integration.platform.data_daemon.shared.test_case.constants import ( + DATA_DAEMON_TEST_ARTIFACTS_DIR, STORAGE_STATE_DELETE, STORAGE_STATE_EMPTY, ) @@ -51,24 +50,6 @@ logger = logging.getLogger(__name__) -# --------------------------------------------------------------------------- -# Test-state directories and path constants -# --------------------------------------------------------------------------- - -DATA_DAEMON_TEST_STATE_ROOT = Path(".data_daemon_test_state") -"""Root directory for all test-local daemon state (DB, recordings, artifacts).""" - -DATA_DAEMON_TEST_ARTIFACTS_DIR = ( - DATA_DAEMON_TEST_STATE_ROOT / "artifacts" / time.strftime("%Y%m%d_%H%M%S") -) -"""Timestamped directory where per-test artifact copies are stored.""" - -OFFLINE_RECORDINGS_ROOT = DATA_DAEMON_TEST_STATE_ROOT / "recordings" -"""Directory used as the offline daemon's recordings root in tests.""" - -OFFLINE_DB_PATH = DATA_DAEMON_TEST_STATE_ROOT / "state.db" -"""Path used for the offline daemon's SQLite state DB in tests.""" - # --------------------------------------------------------------------------- # Shared mutable test state # --------------------------------------------------------------------------- @@ -162,26 +143,22 @@ def apply_storage_state_action(storage_state_action: str) -> None: """ import shutil - db_path = get_daemon_db_path() - recordings_root = get_daemon_recordings_root_path() + db_path = harness_db_path() + recordings_root = harness_recordings_root() if storage_state_action == STORAGE_STATE_EMPTY: - db_state_paths = ( - db_path, - Path(str(db_path) + "-wal"), - Path(str(db_path) + "-shm"), - ) - - for db_state_path in db_state_paths: + if db_path.exists(): + connection = sqlite3.connect(str(db_path)) try: - db_state_path.unlink(missing_ok=True) - except OSError: - logger.warning( - "Failed to remove daemon DB state file during test cleanup: %s", - db_state_path, - exc_info=True, - ) - + for table in ("traces", "recordings"): + try: + connection.execute(f"DELETE FROM {table}") + except sqlite3.OperationalError: + pass + connection.commit() + connection.execute("PRAGMA wal_checkpoint(TRUNCATE)") + finally: + connection.close() if recordings_root.exists(): shutil.rmtree(recordings_root, ignore_errors=True) recordings_root.mkdir(parents=True, exist_ok=True)