diff --git a/docs/rust_data_daemon_development.md b/docs/rust_data_daemon_development.md index d4d792a6..be75f789 100644 --- a/docs/rust_data_daemon_development.md +++ b/docs/rust_data_daemon_development.md @@ -41,7 +41,7 @@ flowchart LR DISP --> ACT["per-trace actors"] ACT -->|fire-and-forget| TW["trace_writer
batched DB write-behind"] ACT -->|fire-and-forget| JW["json_writer (IO thread)"] - ACT -->|spawn_blocking| FF["ffmpeg chunk encode"] + ACT -->|async subprocess| FF["ffmpeg chunk encode"] TW --> DB[("SQLite WAL")] LIS -->|recording-id queries| DB subgraph CLOUD["cloud coordinators"] @@ -99,7 +99,7 @@ CI uses `stable` (via `dtolnay/rust-toolchain@stable`), so any recent stable too ### System dependencies -- **ffmpeg + ffprobe** — required by the video-encoder subprocess and the `encoding::video_encoder` / `encoding::nut_writer` test suites (tests that need ffmpeg self-skip if it's missing, but the daemon itself depends on it at runtime): +- **ffmpeg + ffprobe** — required by the video-encoder subprocess and the daemon's `encoding::video_encoder` and the producer's `data_daemon_producer::nut_writer` test suites (tests that need ffmpeg self-skip if it's missing, but the daemon itself depends on it at runtime): ```bash sudo apt-get update && sudo apt-get install -y ffmpeg @@ -144,7 +144,7 @@ cargo test -p data_daemon_shared # A specific module or test name (partial match) cargo test -p data-daemon pipeline::dispatcher -cargo test -p data-daemon encoding::metadata::fixture_matches_python_video_trace_output +cargo test -p data-daemon encoding::metadata::fixture_matches_expected_video_trace_output ``` Tests that shell out to `ffmpeg` / `ffprobe` self-skip on hosts without those binaries — install them (see above) to exercise the full encoding suite. diff --git a/neuracore-dictionary.txt b/neuracore-dictionary.txt index bfb437d5..6b3aad59 100644 --- a/neuracore-dictionary.txt +++ b/neuracore-dictionary.txt @@ -127,8 +127,10 @@ embs Emika ENOENT enoexec +ENOTDIR EPERM EPIPE +eprintln erfinv errno ESRCH @@ -189,12 +191,16 @@ gptj GPTJ groot Groot +hdlc hookwrapper hparams hstack huggingface hyperparameters iceoryx +idat +iend +ihdr iiwa imageio imagenet @@ -262,6 +268,7 @@ metadatas metafunc metas mimsave +miniz Mish mjcf MJFC @@ -269,7 +276,9 @@ mline mocap moov movflags +mpng mpsa +mpsc Mujoco mujocoinclude multihead @@ -394,6 +403,8 @@ rtype rustls rustup Safetensors +scanline +scanlines schematypens SCTP sdecode @@ -427,6 +438,7 @@ synchronizable syncpoint syncpoints targetbody +tdefl TDVB teleop temb @@ -450,6 +462,7 @@ torq tqdm traj triu +truecolour trunc tryfirst TTYNTK diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 94bea9cd..00229f26 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "aho-corasick" version = "1.1.4" @@ -448,6 +454,7 @@ dependencies = [ "data_daemon_shared", "iceoryx2", "libc", + "miniz_oxide", "pyo3", "serde", "serde_json", @@ -1622,6 +1629,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", +] + [[package]] name = "mio" version = "1.2.0" diff --git a/rust/data_daemon/src/api/client.rs b/rust/data_daemon/src/api/client.rs index 66fb8d27..e66b1368 100644 --- a/rust/data_daemon/src/api/client.rs +++ b/rust/data_daemon/src/api/client.rs @@ -690,6 +690,71 @@ mod tests { assert_eq!(calls.load(Ordering::SeqCst), 1); } + #[tokio::test] + async fn exhausts_retry_budget_then_surfaces_status() { + // A persistently failing retryable status must stop after the budget + // (`max_retries` attempts total) and surface the error rather than + // retrying forever. `expect(3)` pins the bounded attempt count. + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/org/org-1/recording/traces/batch-register")) + .respond_with(ResponseTemplate::new(503)) + .expect(3) + .mount(&server) + .await; + + let client = client(&server); + let error = client.batch_register("org-1", &[]).await.unwrap_err(); + match error { + ApiClientError::Status { status, .. } => { + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + } + other => panic!("unexpected error: {other:?}"), + } + } + + #[tokio::test] + async fn reloads_auth_at_most_once_on_repeated_401() { + // A 401 triggers exactly one token reload; a second 401 surfaces as an + // error instead of looping forever on reload + retry. + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/org/org-1/recording/traces/batch-register")) + .respond_with(ResponseTemplate::new(401)) + .mount(&server) + .await; + + let calls = Arc::new(AtomicUsize::new(0)); + struct CountingProvider { + calls: Arc, + } + #[async_trait::async_trait] + impl AuthProvider for CountingProvider { + async fn bearer_token(&self) -> Result { + Ok("token".to_string()) + } + async fn reload(&self) -> Result<(), AuthError> { + self.calls.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + } + let auth = Arc::new(CountingProvider { + calls: Arc::clone(&calls), + }); + let client = ApiClient::new(options(server.uri()), auth).unwrap(); + + let error = client.batch_register("org-1", &[]).await.unwrap_err(); + assert!( + matches!(error, ApiClientError::Status { status, .. } if status == StatusCode::UNAUTHORIZED), + "a repeated 401 surfaces as an error, not an infinite reload loop" + ); + assert_eq!( + calls.load(Ordering::SeqCst), + 1, + "auth is reloaded exactly once, not on every 401" + ); + } + #[tokio::test] async fn non_retryable_status_surfaces_error() { let server = MockServer::start().await; diff --git a/rust/data_daemon/src/cloud/coordinators/progress.rs b/rust/data_daemon/src/cloud/coordinators/progress.rs index dc02e4f4..01658355 100644 --- a/rust/data_daemon/src/cloud/coordinators/progress.rs +++ b/rust/data_daemon/src/cloud/coordinators/progress.rs @@ -534,4 +534,175 @@ mod tests { ProgressReportStatus::Reported )); } + + #[tokio::test] + async fn sweep_skips_when_org_id_unset() { + // No current org (e.g. not logged in / no org selected): the sweep must + // skip the recording without POSTing anything, leaving it pending. + let server = MockServer::start().await; + Mock::given(method("PUT")) + .and(path("/org/org-1/recording/rec-1/expected-trace-count")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/org/org-1/recording/rec-1/traces-metadata")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount(&server) + .await; + + let (store, _dir) = open_store().await; + let recording_index = seed_recording(&store, "rec-1").await; + store + .create_trace(recording_index, "t-1", Some("JOINT_POSITIONS"), None) + .await + .unwrap(); + store + .update_trace( + "t-1", + TraceUpdate { + write_status: Some(TraceWriteStatus::Written), + total_bytes: Some(5), + ..TraceUpdate::default() + }, + ) + .await + .unwrap(); + store + .mark_recording_stopped(recording_index, 0) + .await + .unwrap(); + + sweep_once(&Arc::new(store.clone()), &client(&server), &org_rx(None)).await; + + let recording = store.get_recording(recording_index).await.unwrap().unwrap(); + assert_eq!( + recording.expected_trace_count_reported, 0, + "no org → nothing reported" + ); + assert!(matches!( + recording.progress_reported, + ProgressReportStatus::Pending + )); + } + + #[tokio::test] + async fn progress_post_failure_rolls_back_to_pending() { + // The expected-count PUT succeeds but the traces-metadata POST fails: + // the recording's progress status must roll Reporting → Pending so the + // next tick retries, never wedging in the transient Reporting state. + let server = MockServer::start().await; + Mock::given(method("PUT")) + .and(path("/org/org-1/recording/rec-1/expected-trace-count")) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/org/org-1/recording/rec-1/traces-metadata")) + .respond_with(ResponseTemplate::new(500)) + .mount(&server) + .await; + + let (store, _dir) = open_store().await; + let recording_index = seed_recording(&store, "rec-1").await; + for (trace_id, bytes) in [("t-1", 10), ("t-2", 20)] { + store + .create_trace(recording_index, trace_id, Some("JOINT_POSITIONS"), None) + .await + .unwrap(); + store + .update_trace( + trace_id, + TraceUpdate { + write_status: Some(TraceWriteStatus::Written), + total_bytes: Some(bytes), + ..TraceUpdate::default() + }, + ) + .await + .unwrap(); + } + store + .mark_recording_stopped(recording_index, 0) + .await + .unwrap(); + + sweep_once( + &Arc::new(store.clone()), + &client(&server), + &org_rx(Some("org-1")), + ) + .await; + + let recording = store.get_recording(recording_index).await.unwrap().unwrap(); + assert_eq!( + recording.expected_trace_count_reported, 2, + "the expected-count PUT succeeded" + ); + assert!( + matches!(recording.progress_reported, ProgressReportStatus::Pending), + "a failed progress POST rolls Reporting back to Pending for retry" + ); + } + + #[tokio::test] + async fn expected_count_put_failure_leaves_it_unreported() { + // The count is persisted locally first, then PUT to the backend. If the + // PUT fails it must NOT be marked reported (the next tick re-sends it), + // but the local count is retained. + let server = MockServer::start().await; + Mock::given(method("PUT")) + .and(path("/org/org-1/recording/rec-1/expected-trace-count")) + .respond_with(ResponseTemplate::new(500)) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/org/org-1/recording/rec-1/traces-metadata")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let (store, _dir) = open_store().await; + let recording_index = seed_recording(&store, "rec-1").await; + store + .create_trace(recording_index, "t-1", Some("JOINT_POSITIONS"), None) + .await + .unwrap(); + store + .update_trace( + "t-1", + TraceUpdate { + write_status: Some(TraceWriteStatus::Written), + total_bytes: Some(7), + ..TraceUpdate::default() + }, + ) + .await + .unwrap(); + store + .mark_recording_stopped(recording_index, 0) + .await + .unwrap(); + + sweep_once( + &Arc::new(store.clone()), + &client(&server), + &org_rx(Some("org-1")), + ) + .await; + + let recording = store.get_recording(recording_index).await.unwrap().unwrap(); + assert_eq!( + recording.expected_trace_count, + Some(1), + "the count is persisted locally first" + ); + assert_eq!( + recording.expected_trace_count_reported, 0, + "a failed PUT does not mark the count reported" + ); + } } diff --git a/rust/data_daemon/src/cloud/coordinators/registration.rs b/rust/data_daemon/src/cloud/coordinators/registration.rs index d4c38a87..9fbf5425 100644 --- a/rust/data_daemon/src/cloud/coordinators/registration.rs +++ b/rust/data_daemon/src/cloud/coordinators/registration.rs @@ -765,4 +765,127 @@ mod tests { Some("Unexpected error during registration") ); } + + #[tokio::test] + async fn silent_omission_retries_under_budget() { + // The backend returns neither a registered nor a failed entry for the + // trace. It must be retried (rolled back to pending) under the bounded + // budget, never silently dropped. + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/org/org-1/recording/traces/batch-register")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "registered_traces": [], + "failed_traces": [] + }))) + .mount(&server) + .await; + + let (store, _dir) = open_store().await; + seed_written_trace(&store, "trace-1", Some("cloud-rec-1")).await; + let bus = EventBus::new(); + let api = client(&server); + + let claimed = store + .claim_traces_for_registration(BATCH_SIZE, 0.0) + .await + .unwrap(); + submit_batch( + &Arc::new(store.clone()), + &bus, + &api, + &org_rx(Some("org-1")), + claimed, + &mut HashMap::new(), + ) + .await; + + let trace = store.get_trace("trace-1").await.unwrap().unwrap(); + assert_eq!( + trace.registration_status, + TraceRegistrationStatus::Pending, + "a silently-omitted trace is retried, not lost" + ); + } + + #[tokio::test] + async fn drain_once_registers_and_emits_ready() { + // End-to-end through the coordinator's own drain: claim → register → + // promotion sweep. Exercises drain_once (which the per-call tests above + // bypass by calling submit_batch directly). + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/org/org-1/recording/traces/batch-register")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "registered_traces": [{ + "trace_id": "trace-1", + "upload_session_uris": {"JOINT_POSITIONS/arm0/trace.json": "https://upload/abc"} + }], + "failed_traces": [] + }))) + .expect(1) + .mount(&server) + .await; + + let (store, _dir) = open_store().await; + seed_written_trace(&store, "trace-1", Some("cloud-rec-1")).await; + let bus = EventBus::new(); + let mut subscriber = bus.subscribe(); + let api = client(&server); + + // `Duration::ZERO` max_wait claims the freshly-seeded trace immediately. + drain_once( + &Arc::new(store.clone()), + &bus, + &api, + &org_rx(Some("org-1")), + Duration::from_secs(0), + &mut HashMap::new(), + ) + .await; + + let trace = store.get_trace("trace-1").await.unwrap().unwrap(); + assert_eq!( + trace.registration_status, + TraceRegistrationStatus::Registered + ); + assert_eq!(trace.upload_status, TraceUploadStatus::Queued); + let mut saw_ready = false; + while let Ok(event) = subscriber.try_recv() { + if matches!(event, DaemonEvent::ReadyForUpload { .. }) { + saw_ready = true; + } + } + assert!( + saw_ready, + "drain_once promotes the registered+written trace to ReadyForUpload" + ); + } + + #[tokio::test] + async fn drain_once_with_no_claimable_traces_is_a_noop() { + // Nothing claimable → no register POST, no panic (the empty-claim early + // return). + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/org/org-1/recording/traces/batch-register")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount(&server) + .await; + + let (store, _dir) = open_store().await; + let bus = EventBus::new(); + let api = client(&server); + + drain_once( + &Arc::new(store.clone()), + &bus, + &api, + &org_rx(Some("org-1")), + Duration::from_secs(0), + &mut HashMap::new(), + ) + .await; + } } diff --git a/rust/data_daemon/src/cloud/coordinators/status.rs b/rust/data_daemon/src/cloud/coordinators/status.rs index d52f0612..5e7080e5 100644 --- a/rust/data_daemon/src/cloud/coordinators/status.rs +++ b/rust/data_daemon/src/cloud/coordinators/status.rs @@ -371,6 +371,50 @@ impl RecordingBatch { mod tests { use super::*; + use crate::api::auth::StaticAuthProvider; + use crate::api::client::ApiClientOptions; + use crate::state::store::NewRecording; + use tempfile::TempDir; + use wiremock::matchers::{body_json, method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + async fn open_store() -> (SqliteStateStore, TempDir) { + let dir = TempDir::new().unwrap(); + let store = SqliteStateStore::open(&dir.path().join("state.db")) + .await + .unwrap(); + (store, dir) + } + + fn client(server: &MockServer) -> Arc { + let auth = Arc::new(StaticAuthProvider::new("test")); + let mut options = ApiClientOptions::new(server.uri()); + options.max_backoff = Duration::from_millis(10); + Arc::new(ApiClient::new(options, auth).unwrap()) + } + + /// A live-org receiver fixed at `org`. The sender is leaked so the channel + /// stays open for the test's duration (matches `progress.rs`). + fn org_rx(org: Option<&str>) -> OrgIdRx { + let (org_tx, org_rx) = tokio::sync::watch::channel(org.map(str::to_string)); + Box::leak(Box::new(org_tx)); + org_rx + } + + /// Create a recording stamped with the given cloud `recording_id` so the + /// wiremock URL expectations resolve. Returns the local `recording_index`. + async fn seed_recording(store: &SqliteStateStore, cloud_recording_id: &str) -> i64 { + let recording = store + .create_recording(NewRecording::default()) + .await + .unwrap(); + store + .mark_recording_start_notified(recording.recording_index, cloud_recording_id) + .await + .unwrap(); + recording.recording_index + } + #[test] fn batch_records_completion_flag() { let mut batch = RecordingBatch::new(1); @@ -418,4 +462,273 @@ mod tests { batch.add(StatusUpdate::completed(1, "t".to_string(), 1)); assert!(batch.deadline() < baseline); } + + #[tokio::test] + async fn flush_batch_sends_coalesced_updates() { + // The whole point of the coordinator: the per-trace coalesced state + // reaches the backend in one batch-update PUT. The body asserts the + // coalescing — t1's later byte count supersedes the earlier one, and + // t2 carries its completion status + totals. + let server = MockServer::start().await; + Mock::given(method("PUT")) + .and(path("/org/org-1/recording/rec-1/traces/batch-update")) + .and(body_json(serde_json::json!({ + "updates": { + "t1": {"uploaded_bytes": 30}, + "t2": {"status": "UPLOAD_COMPLETE", "uploaded_bytes": 200, "total_bytes": 200} + } + }))) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&server) + .await; + + let (store, _dir) = open_store().await; + let index = seed_recording(&store, "rec-1").await; + let mut batch = RecordingBatch::new(index); + batch.add(StatusUpdate::in_progress(index, "t1".to_string(), 10)); + batch.add(StatusUpdate::in_progress(index, "t1".to_string(), 30)); // supersedes 10 + batch.add(StatusUpdate::completed(index, "t2".to_string(), 200)); + + let result = flush_batch( + Arc::new(store.clone()), + client(&server), + org_rx(Some("org-1")), + batch, + ) + .await; + assert!(result.is_none(), "a sent batch is not re-queued"); + } + + #[tokio::test] + async fn flush_batch_defers_when_recording_row_missing() { + // The start notifier hasn't written the recording row yet. The batch + // must be re-queued with its deadline pushed into the future so the + // select loop doesn't busy-wait on a permanently-past deadline. + let server = MockServer::start().await; // no mock mounted: nothing is sent + let (store, _dir) = open_store().await; + let mut batch = RecordingBatch::new(999); // never created + batch.add(StatusUpdate::in_progress(999, "t1".to_string(), 5)); + + let before = Instant::now(); + let result = flush_batch( + Arc::new(store.clone()), + client(&server), + org_rx(Some("org-1")), + batch, + ) + .await; + let deferred = result.expect("a missing recording row re-queues the batch"); + assert!( + deferred.deadline() > before, + "the re-queued batch's deadline is pushed into the future" + ); + } + + #[tokio::test] + async fn flush_batch_defers_when_org_id_unset() { + let server = MockServer::start().await; // nothing is sent + let (store, _dir) = open_store().await; + let index = seed_recording(&store, "rec-1").await; + let mut batch = RecordingBatch::new(index); + batch.add(StatusUpdate::in_progress(index, "t1".to_string(), 5)); + + let result = flush_batch( + Arc::new(store.clone()), + client(&server), + org_rx(None), + batch, + ) + .await; + assert!( + result.is_some(), + "no org_id → the batch is deferred, not sent" + ); + } + + #[tokio::test] + async fn flush_batch_defers_when_cloud_recording_id_unset() { + let server = MockServer::start().await; // nothing is sent + let (store, _dir) = open_store().await; + // Created but NOT start-notified, so the cloud recording_id is absent. + let index = store + .create_recording(NewRecording::default()) + .await + .unwrap() + .recording_index; + let mut batch = RecordingBatch::new(index); + batch.add(StatusUpdate::in_progress(index, "t1".to_string(), 5)); + + let result = flush_batch( + Arc::new(store.clone()), + client(&server), + org_rx(Some("org-1")), + batch, + ) + .await; + assert!( + result.is_some(), + "no cloud recording_id → the batch is deferred, not sent" + ); + } + + #[tokio::test] + async fn flush_batch_skips_empty_batch() { + let server = MockServer::start().await; // nothing is sent + let (store, _dir) = open_store().await; + let index = seed_recording(&store, "rec-1").await; + let batch = RecordingBatch::new(index); // no updates added + + let result = flush_batch( + Arc::new(store.clone()), + client(&server), + org_rx(Some("org-1")), + batch, + ) + .await; + assert!( + result.is_none(), + "an empty batch is a no-op, nothing is sent" + ); + } + + #[tokio::test] + async fn flush_all_drains_every_pending_batch() { + // The shutdown path: every pending recording's batch is flushed and the + // map left empty. + let server = MockServer::start().await; + for recording_id in ["rec-1", "rec-2"] { + Mock::given(method("PUT")) + .and(path(format!( + "/org/org-1/recording/{recording_id}/traces/batch-update" + ))) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&server) + .await; + } + + let (store, _dir) = open_store().await; + let first = seed_recording(&store, "rec-1").await; + let second = seed_recording(&store, "rec-2").await; + let mut pending: HashMap = HashMap::new(); + let mut first_batch = RecordingBatch::new(first); + first_batch.add(StatusUpdate::in_progress(first, "t1".to_string(), 1)); + pending.insert(first, first_batch); + let mut second_batch = RecordingBatch::new(second); + second_batch.add(StatusUpdate::completed(second, "t2".to_string(), 9)); + pending.insert(second, second_batch); + + flush_all( + &Arc::new(store.clone()), + &client(&server), + &org_rx(Some("org-1")), + &mut pending, + ) + .await; + assert!(pending.is_empty(), "flush_all drains the pending map"); + } + + #[tokio::test] + async fn flush_due_spawns_only_batches_past_their_deadline() { + // The periodic tick must flush only batches whose deadline has elapsed, + // leaving younger batches pending to keep coalescing. + let server = MockServer::start().await; + Mock::given(method("PUT")) + .and(path("/org/org-1/recording/rec-1/traces/batch-update")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let (store, _dir) = open_store().await; + let due_index = seed_recording(&store, "rec-1").await; + let fresh_index = seed_recording(&store, "rec-2").await; + let mut pending: HashMap = HashMap::new(); + // A batch whose deadline is already well in the past. + let mut due = RecordingBatch::new(due_index); + due.add(StatusUpdate::in_progress(due_index, "t1".to_string(), 1)); + due.opened_at = Instant::now() - Duration::from_secs(60); + pending.insert(due_index, due); + // A just-opened batch whose deadline is comfortably in the future. + let mut fresh = RecordingBatch::new(fresh_index); + fresh.add(StatusUpdate::in_progress(fresh_index, "t2".to_string(), 1)); + pending.insert(fresh_index, fresh); + + let mut background: JoinSet> = JoinSet::new(); + flush_due( + &Arc::new(store.clone()), + &client(&server), + &org_rx(Some("org-1")), + &mut pending, + &mut background, + ); + + assert_eq!(background.len(), 1, "only the past-deadline batch flushes"); + assert!( + pending.contains_key(&fresh_index), + "the fresh batch keeps coalescing" + ); + assert!( + !pending.contains_key(&due_index), + "the due batch was taken for flushing" + ); + + // Drain the spawned flush so the task doesn't outlive the test. + while background.join_next().await.is_some() {} + } + + #[tokio::test] + async fn run_loop_flushes_a_full_batch_via_the_inbox() { + // Drive the public updater end to end: MAX_BATCH_SIZE distinct-trace + // updates for one recording trip the size trigger, which spawns a + // background flush. Exercises run()'s inbox + max-batch + join_next arms. + let server = MockServer::start().await; + Mock::given(method("PUT")) + .and(path("/org/org-1/recording/rec-1/traces/batch-update")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let (store, _dir) = open_store().await; + let index = seed_recording(&store, "rec-1").await; + + let (tx, rx) = mpsc::unbounded_channel::(); + let (shutdown_tx, shutdown_rx) = broadcast::channel(4); + let handle = spawn_status_updater( + store.clone(), + client(&server), + org_rx(Some("org-1")), + rx, + shutdown_rx, + ); + + for n in 0..MAX_BATCH_SIZE { + tx.send(StatusUpdate::in_progress(index, format!("t-{n}"), n as i64)) + .unwrap(); + } + + // Poll the mock until the size-triggered flush lands, breaking as soon + // as it does rather than sleeping a fixed duration. The only requests + // this server sees are batch-update PUTs. + let mut flushed = false; + for _ in 0..100 { + if let Some(requests) = server.received_requests().await { + if !requests.is_empty() { + flushed = true; + break; + } + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + assert!( + flushed, + "a full batch is flushed to the backend by the run loop" + ); + + // Closing the inbox stops the loop (recv → None → break); keep the + // shutdown sender alive until then so the shutdown arm doesn't pre-empt. + drop(tx); + handle.join().await; + drop(shutdown_tx); + } } diff --git a/rust/data_daemon/src/cloud/coordinators/upload_transfer.rs b/rust/data_daemon/src/cloud/coordinators/upload_transfer.rs index 8260ffd8..d4a5dff2 100644 --- a/rust/data_daemon/src/cloud/coordinators/upload_transfer.rs +++ b/rust/data_daemon/src/cloud/coordinators/upload_transfer.rs @@ -42,8 +42,6 @@ pub const CHUNK_SIZE: usize = 16 * 1024 * 1024; /// (`parse_resume_offset`) re-derives the committed offset from the server on /// restart, so a stale DB offset only costs re-sending at most this many chunks. const PROGRESS_PERSIST_EVERY_CHUNKS: u32 = 4; -/// Cap on the exponential backoff for transient upload failures. -const MAX_BACKOFF: Duration = Duration::from_secs(300); /// Maximum retries for a single chunk. const MAX_RETRIES: u32 = 5; /// Hard deadline for a single chunk PUT. Belt-and-braces over the reqwest @@ -373,7 +371,7 @@ async fn put_chunk( } attempt += 1; tracing::warn!(%error, attempt, "upload chunk transport error; retrying"); - sleep(backoff(attempt)).await; + sleep(backoff(attempt, client.options().max_backoff)).await; continue; } Err(_elapsed) => { @@ -390,7 +388,7 @@ async fn put_chunk( )); } attempt += 1; - sleep(backoff(attempt)).await; + sleep(backoff(attempt, client.options().max_backoff)).await; continue; } }; @@ -429,16 +427,20 @@ async fn put_chunk( if matches!(status.as_u16(), 429 | 500 | 502 | 503 | 504) && attempt + 1 < MAX_RETRIES { attempt += 1; tracing::warn!(%status, attempt, "retrying upload chunk after transient failure"); - sleep(backoff(attempt)).await; + sleep(backoff(attempt, client.options().max_backoff)).await; continue; } return Ok(PutChunkOutcome::Failed { status, body }); } } -fn backoff(attempt: u32) -> Duration { +/// Exponential backoff (1s, 2s, 4s, …) for a chunk retry, capped at the API +/// client's configured `max_backoff`. At [`MAX_RETRIES`] the cap never binds in +/// production (the largest delay reached is 8s), but exposing it lets tests +/// drive the retry path without real-time sleeps. +fn backoff(attempt: u32, max: Duration) -> Duration { let secs = 2u64.saturating_pow(attempt.saturating_sub(1)); - Duration::from_secs(secs.min(MAX_BACKOFF.as_secs())) + Duration::from_secs(secs).min(max) } fn parse_resume_offset(headers: &HeaderMap) -> Option { diff --git a/rust/data_daemon/src/cloud/coordinators/uploader.rs b/rust/data_daemon/src/cloud/coordinators/uploader.rs index 6cf13048..56ae41ba 100644 --- a/rust/data_daemon/src/cloud/coordinators/uploader.rs +++ b/rust/data_daemon/src/cloud/coordinators/uploader.rs @@ -651,6 +651,35 @@ mod tests { (recording_index, local) } + /// Drive `upload_single` for `trace_id` with the standard single-trace + /// harness: an owned store handle, a fresh trace-event writer (held alive + /// for the await), the `recordings_root`, and org `org-1`. The caller owns + /// `bus`/`status_tx` so it can observe the emitted events and status updates. + async fn run_upload_single( + store: &SqliteStateStore, + recordings_root: std::path::PathBuf, + api: &Arc, + bus: &EventBus, + status_tx: &mpsc::UnboundedSender, + trace_id: &str, + ) { + let store_arc = Arc::new(store.clone()); + let (trace_writer, _trace_writer_owner) = + crate::state::trace_event_database_writer::spawn(store_arc.clone()); + let recordings_root = Arc::new(recordings_root); + upload_single( + &store_arc, + &trace_writer, + bus, + api, + &recordings_root, + &org_rx(Some("org-1")), + status_tx, + trace_id, + ) + .await; + } + #[tokio::test] async fn bad_server_checksum_marks_trace_retrying() { // Server returns a deliberately wrong CRC32C (0) — the real payload's @@ -688,21 +717,7 @@ mod tests { let bus = EventBus::new(); let (status_tx, mut status_rx) = mpsc::unbounded_channel::(); - let store_arc = Arc::new(store.clone()); - let (trace_writer, _trace_writer_owner) = - crate::state::trace_event_database_writer::spawn(store_arc.clone()); - let recordings_root = Arc::new(recordings_root); - upload_single( - &store_arc, - &trace_writer, - &bus, - &api, - &recordings_root, - &org_rx(Some("org-1")), - &status_tx, - "trace-1", - ) - .await; + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; let trace = store.get_trace("trace-1").await.unwrap().unwrap(); assert_eq!(trace.upload_status, TraceUploadStatus::Retrying); @@ -745,21 +760,7 @@ mod tests { let bus = EventBus::new(); let (status_tx, mut status_rx) = mpsc::unbounded_channel::(); - let store_arc = Arc::new(store.clone()); - let (trace_writer, _trace_writer_owner) = - crate::state::trace_event_database_writer::spawn(store_arc.clone()); - let recordings_root = Arc::new(recordings_root); - upload_single( - &store_arc, - &trace_writer, - &bus, - &api, - &recordings_root, - &org_rx(Some("org-1")), - &status_tx, - "trace-1", - ) - .await; + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; let trace = store.get_trace("trace-1").await.unwrap().unwrap(); assert_eq!(trace.upload_status, TraceUploadStatus::Uploaded); @@ -826,21 +827,7 @@ mod tests { let bus = EventBus::new(); let (status_tx, _status_rx) = mpsc::unbounded_channel::(); - let store_arc = Arc::new(store.clone()); - let (trace_writer, _trace_writer_owner) = - crate::state::trace_event_database_writer::spawn(store_arc.clone()); - let recordings_root = Arc::new(recordings_root); - upload_single( - &store_arc, - &trace_writer, - &bus, - &api, - &recordings_root, - &org_rx(Some("org-1")), - &status_tx, - "trace-1", - ) - .await; + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; let trace = store.get_trace("trace-1").await.unwrap().unwrap(); assert_eq!(trace.upload_status, TraceUploadStatus::Uploaded); @@ -904,21 +891,7 @@ mod tests { let mut subscriber = bus.subscribe(); let (status_tx, mut status_rx) = mpsc::unbounded_channel::(); - let store_arc = Arc::new(store.clone()); - let (trace_writer, _trace_writer_owner) = - crate::state::trace_event_database_writer::spawn(store_arc.clone()); - let recordings_root = Arc::new(recordings_root); - upload_single( - &store_arc, - &trace_writer, - &bus, - &api, - &recordings_root, - &org_rx(Some("org-1")), - &status_tx, - "trace-1", - ) - .await; + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; let trace = store.get_trace("trace-1").await.unwrap().unwrap(); assert_eq!(trace.upload_status, TraceUploadStatus::Failed); @@ -935,4 +908,493 @@ mod tests { let update = status_rx.try_recv().expect("status update enqueued"); assert!(update.completed); } + + #[tokio::test] + async fn empty_session_uris_marks_uploaded_immediately() { + // A registered-but-empty trace (nothing to PUT) settles as Uploaded + // with zero bytes and emits UploadComplete, so the progress gate isn't + // held waiting on an upload that has nothing to do. + let server = MockServer::start().await; + let (store, tempdir) = open_store().await; + let recordings_root = tempdir.path().join("recordings"); + let recording = store + .create_recording(NewRecording::default()) + .await + .unwrap(); + store + .mark_recording_start_notified(recording.recording_index, "rec-1") + .await + .unwrap(); + store + .create_trace( + recording.recording_index, + "trace-1", + Some("JOINT_POSITIONS"), + Some("arm"), + ) + .await + .unwrap(); + store + .update_trace( + "trace-1", + TraceUpdate { + write_status: Some(TraceWriteStatus::Written), + upload_status: Some(TraceUploadStatus::Queued), + upload_session_uris: Some("{}".to_string()), + ..TraceUpdate::default() + }, + ) + .await + .unwrap(); + + let api = client(&server); + let bus = EventBus::new(); + let mut subscriber = bus.subscribe(); + let (status_tx, mut status_rx) = mpsc::unbounded_channel::(); + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; + + let trace = store.get_trace("trace-1").await.unwrap().unwrap(); + assert_eq!(trace.upload_status, TraceUploadStatus::Uploaded); + assert_eq!(trace.bytes_uploaded, 0); + assert!(matches!( + subscriber.try_recv(), + Ok(DaemonEvent::UploadComplete { .. }) + )); + assert!(status_rx.try_recv().unwrap().completed); + } + + #[tokio::test] + async fn no_session_uris_leaves_trace_queued() { + // Ready-for-upload but no stored session URIs (registration hasn't + // persisted them yet): the uploader returns without touching the trace + // so a later drain retries once they land. + let server = MockServer::start().await; + let (store, tempdir) = open_store().await; + let recordings_root = tempdir.path().join("recordings"); + let recording = store + .create_recording(NewRecording::default()) + .await + .unwrap(); + store + .mark_recording_start_notified(recording.recording_index, "rec-1") + .await + .unwrap(); + store + .create_trace( + recording.recording_index, + "trace-1", + Some("JOINT_POSITIONS"), + Some("arm"), + ) + .await + .unwrap(); + store + .update_trace( + "trace-1", + TraceUpdate { + upload_status: Some(TraceUploadStatus::Queued), + ..TraceUpdate::default() // no upload_session_uris + }, + ) + .await + .unwrap(); + + let api = client(&server); + let bus = EventBus::new(); + let (status_tx, _status_rx) = mpsc::unbounded_channel::(); + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; + + let trace = store.get_trace("trace-1").await.unwrap().unwrap(); + assert_eq!( + trace.upload_status, + TraceUploadStatus::Queued, + "no URIs → trace untouched for retry" + ); + } + + #[tokio::test] + async fn missing_cloud_recording_id_defers_upload() { + // The recording row has no cloud id yet (start not notified). The + // uploader must defer (leave the trace queued), never marking it + // Uploading, so the periodic rescan re-enters once the id lands. + let server = MockServer::start().await; + let (store, tempdir) = open_store().await; + let recordings_root = tempdir.path().join("recordings"); + let recording = store + .create_recording(NewRecording::default()) + .await + .unwrap(); + // Deliberately NOT start-notified, so recording_id is None. + store + .create_trace( + recording.recording_index, + "trace-1", + Some("JOINT_POSITIONS"), + Some("arm"), + ) + .await + .unwrap(); + let mut uris = HashMap::new(); + uris.insert( + "JOINT_POSITIONS/arm/trace.json".to_string(), + "https://upload/abc".to_string(), + ); + store + .update_trace( + "trace-1", + TraceUpdate { + upload_status: Some(TraceUploadStatus::Queued), + upload_session_uris: Some(serde_json::to_string(&uris).unwrap()), + ..TraceUpdate::default() + }, + ) + .await + .unwrap(); + + let api = client(&server); + let bus = EventBus::new(); + let (status_tx, _status_rx) = mpsc::unbounded_channel::(); + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; + + let trace = store.get_trace("trace-1").await.unwrap().unwrap(); + assert_eq!( + trace.upload_status, + TraceUploadStatus::Queued, + "no cloud id → upload deferred, not started" + ); + } + + #[tokio::test] + async fn missing_artefact_marks_trace_failed() { + // Session URI + cloud id + data_type all present, but the on-disk file + // is gone. The uploader marks the trace Failed and emits UploadComplete + // so the recording's progress gate isn't wedged on a vanished file. + let server = MockServer::start().await; + let (store, tempdir) = open_store().await; + let recordings_root = tempdir.path().join("recordings"); + let recording = store + .create_recording(NewRecording::default()) + .await + .unwrap(); + store + .mark_recording_start_notified(recording.recording_index, "rec-1") + .await + .unwrap(); + store + .create_trace( + recording.recording_index, + "trace-1", + Some("JOINT_POSITIONS"), + Some("arm"), + ) + .await + .unwrap(); + let mut uris = HashMap::new(); + uris.insert( + "JOINT_POSITIONS/arm/trace.json".to_string(), + format!("{}/upload/abc", server.uri()), + ); + store + .update_trace( + "trace-1", + TraceUpdate { + write_status: Some(TraceWriteStatus::Written), + upload_status: Some(TraceUploadStatus::Queued), + upload_session_uris: Some(serde_json::to_string(&uris).unwrap()), + ..TraceUpdate::default() + }, + ) + .await + .unwrap(); + // Deliberately do NOT create the on-disk artefact. + + let api = client(&server); + let bus = EventBus::new(); + let mut subscriber = bus.subscribe(); + let (status_tx, mut status_rx) = mpsc::unbounded_channel::(); + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; + + let trace = store.get_trace("trace-1").await.unwrap().unwrap(); + assert_eq!(trace.upload_status, TraceUploadStatus::Failed); + assert!(matches!( + subscriber.try_recv(), + Ok(DaemonEvent::UploadComplete { .. }) + )); + assert!(status_rx.try_recv().unwrap().completed); + } + + #[tokio::test] + async fn spawn_upload_task_skips_already_dispatched_trace() { + // The dedup guard: a trace already in `in_flight_ids` must not be + // dispatched twice (a drain triggered before the task marks itself + // Uploading would otherwise double-upload). + let server = MockServer::start().await; + let (store, tempdir) = open_store().await; + let recordings_root = Arc::new(tempdir.path().join("recordings")); + let api = client(&server); + let bus = EventBus::new(); + let (status_tx, _status_rx) = mpsc::unbounded_channel::(); + let store_arc = Arc::new(store.clone()); + let (trace_writer, _owner) = + crate::state::trace_event_database_writer::spawn(store_arc.clone()); + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_UPLOADS)); + let mut in_flight: JoinSet = JoinSet::new(); + let mut in_flight_ids: HashSet = HashSet::new(); + in_flight_ids.insert("trace-1".to_string()); + + spawn_upload_task( + &store_arc, + &trace_writer, + &bus, + &api, + &recordings_root, + &org_rx(Some("org-1")), + &status_tx, + &semaphore, + &mut in_flight, + &mut in_flight_ids, + "trace-1".to_string(), + ); + + assert!(in_flight.is_empty(), "a duplicate dispatch is skipped"); + assert_eq!(in_flight_ids.len(), 1); + } + + #[tokio::test] + async fn spawn_upload_task_skips_when_semaphore_full() { + // No permit available → the task is not dispatched and the trace is not + // marked in-flight, so a later drain (which frees permits) retries it. + let server = MockServer::start().await; + let (store, tempdir) = open_store().await; + let recordings_root = Arc::new(tempdir.path().join("recordings")); + let api = client(&server); + let bus = EventBus::new(); + let (status_tx, _status_rx) = mpsc::unbounded_channel::(); + let store_arc = Arc::new(store.clone()); + let (trace_writer, _owner) = + crate::state::trace_event_database_writer::spawn(store_arc.clone()); + let semaphore = Arc::new(Semaphore::new(0)); // no permits + let mut in_flight: JoinSet = JoinSet::new(); + let mut in_flight_ids: HashSet = HashSet::new(); + + spawn_upload_task( + &store_arc, + &trace_writer, + &bus, + &api, + &recordings_root, + &org_rx(Some("org-1")), + &status_tx, + &semaphore, + &mut in_flight, + &mut in_flight_ids, + "trace-1".to_string(), + ); + + assert!(in_flight.is_empty(), "no permit → nothing dispatched"); + assert!( + in_flight_ids.is_empty(), + "no permit → id not marked in-flight" + ); + } + + // The following drive the wire-level transfer layer (`upload_transfer.rs`) + // through the uploader entry point, exercising branches that only a live + // server reaches: transient-retry, auth refresh, a hard error, and the + // no-progress stall guard. + + #[tokio::test] + async fn transient_5xx_chunk_is_retried_then_uploaded() { + // A 503 on the chunk PUT is transient: put_chunk retries within budget + // and the trace still settles as Uploaded. + let server = MockServer::start().await; + let payload = b"transient-ok"; + let header = format!( + "crc32c={}", + BASE64.encode(crc32c::crc32c(payload).to_be_bytes()) + ); + Mock::given(method("PUT")) + .and(path("/upload/abc")) + .respond_with(ResponseTemplate::new(503)) + .up_to_n_times(1) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/upload/abc")) + .respond_with(move |_req: &Request| { + ResponseTemplate::new(200).insert_header("X-Goog-Hash", header.as_str()) + }) + .expect(1) + .mount(&server) + .await; + + let (store, tempdir) = open_store().await; + let recordings_root = tempdir.path().join("recordings"); + seed_ready_trace( + &store, + &recordings_root, + "rec-1", + "trace-1", + "JOINT_POSITIONS", + "arm", + &format!("{}/upload/abc", server.uri()), + payload, + ) + .await; + + let api = client(&server); + let bus = EventBus::new(); + let (status_tx, _status_rx) = mpsc::unbounded_channel::(); + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; + + assert_eq!( + store + .get_trace("trace-1") + .await + .unwrap() + .unwrap() + .upload_status, + TraceUploadStatus::Uploaded + ); + } + + #[tokio::test] + async fn auth_401_on_chunk_is_reloaded_then_uploaded() { + // A 401 mid-upload triggers one token reload and then the retried PUT + // succeeds. + let server = MockServer::start().await; + let payload = b"auth-ok"; + let header = format!( + "crc32c={}", + BASE64.encode(crc32c::crc32c(payload).to_be_bytes()) + ); + Mock::given(method("PUT")) + .and(path("/upload/abc")) + .respond_with(ResponseTemplate::new(401)) + .up_to_n_times(1) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/upload/abc")) + .respond_with(move |_req: &Request| { + ResponseTemplate::new(200).insert_header("X-Goog-Hash", header.as_str()) + }) + .expect(1) + .mount(&server) + .await; + + let (store, tempdir) = open_store().await; + let recordings_root = tempdir.path().join("recordings"); + seed_ready_trace( + &store, + &recordings_root, + "rec-1", + "trace-1", + "JOINT_POSITIONS", + "arm", + &format!("{}/upload/abc", server.uri()), + payload, + ) + .await; + + let api = client(&server); + let bus = EventBus::new(); + let (status_tx, _status_rx) = mpsc::unbounded_channel::(); + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; + + assert_eq!( + store + .get_trace("trace-1") + .await + .unwrap() + .unwrap() + .upload_status, + TraceUploadStatus::Uploaded + ); + } + + #[tokio::test] + async fn hard_4xx_chunk_error_rolls_trace_to_retrying() { + // A non-retryable 4xx is a hard error: put_chunk returns Failed, + // upload_one_file errors, and the uploader rolls the trace to Retrying + // for the registration recovery sweep to re-arm. + let server = MockServer::start().await; + Mock::given(method("PUT")) + .and(path("/upload/abc")) + .respond_with(ResponseTemplate::new(400).set_body_string("bad request")) + .expect(1) + .mount(&server) + .await; + + let (store, tempdir) = open_store().await; + let recordings_root = tempdir.path().join("recordings"); + seed_ready_trace( + &store, + &recordings_root, + "rec-1", + "trace-1", + "JOINT_POSITIONS", + "arm", + &format!("{}/upload/abc", server.uri()), + b"hard-error", + ) + .await; + + let api = client(&server); + let bus = EventBus::new(); + let (status_tx, _status_rx) = mpsc::unbounded_channel::(); + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; + + assert_eq!( + store + .get_trace("trace-1") + .await + .unwrap() + .unwrap() + .upload_status, + TraceUploadStatus::Retrying + ); + } + + #[tokio::test] + async fn no_progress_308_stall_marks_trace_retrying() { + // A peer that keeps replying 308 without committing any bytes must not + // wedge the upload task (and its concurrency permit) forever: the + // stall guard bails after MAX_RETRIES and the trace rolls to Retrying. + let server = MockServer::start().await; + Mock::given(method("PUT")) + .and(path("/upload/abc")) + .respond_with(ResponseTemplate::new(308)) // no Range → no forward progress + .mount(&server) + .await; + + let (store, tempdir) = open_store().await; + let recordings_root = tempdir.path().join("recordings"); + seed_ready_trace( + &store, + &recordings_root, + "rec-1", + "trace-1", + "JOINT_POSITIONS", + "arm", + &format!("{}/upload/abc", server.uri()), + b"stall-me", + ) + .await; + + let api = client(&server); + let bus = EventBus::new(); + let (status_tx, _status_rx) = mpsc::unbounded_channel::(); + run_upload_single(&store, recordings_root, &api, &bus, &status_tx, "trace-1").await; + + assert_eq!( + store + .get_trace("trace-1") + .await + .unwrap() + .unwrap() + .upload_status, + TraceUploadStatus::Retrying + ); + } } diff --git a/rust/data_daemon/src/cloud/watchers/recording_reaper.rs b/rust/data_daemon/src/cloud/watchers/recording_reaper.rs index 87b9fa12..7f4af6cc 100644 --- a/rust/data_daemon/src/cloud/watchers/recording_reaper.rs +++ b/rust/data_daemon/src/cloud/watchers/recording_reaper.rs @@ -137,3 +137,203 @@ async fn reclaim( ), } } + +#[cfg(test)] +mod tests { + use super::*; + use std::path::Path; + + use tempfile::TempDir; + + use crate::state::{ + NewRecording, ProgressReportStatus, TraceUpdate, TraceUploadStatus, TraceWriteStatus, + }; + + async fn open_store() -> (SqliteStateStore, TempDir) { + let dir = TempDir::new().unwrap(); + let store = SqliteStateStore::open(&dir.path().join("state.db")) + .await + .unwrap(); + (store, dir) + } + + fn new_recording(instance: i64) -> NewRecording<'static> { + NewRecording { + robot_id: Some("robot-1"), + robot_instance: Some(instance), + dataset_id: Some("ds-1"), + start_timestamp_ns: 1_700_000_000_000_000_000, + } + } + + /// Drive a stopped recording with one fully-uploaded trace through every + /// notify + progress gate so the server-side reclaim filter reports it. + async fn seed_reclaimable_stopped(store: &SqliteStateStore, instance: i64) -> i64 { + let index = store + .create_recording(new_recording(instance)) + .await + .unwrap() + .recording_index; + let trace_id = format!("t-{instance}"); + store + .mark_recording_start_notified(index, &format!("cloud-{instance}")) + .await + .unwrap(); + store + .create_trace(index, &trace_id, Some("J"), None) + .await + .unwrap(); + store + .update_trace( + &trace_id, + TraceUpdate { + write_status: Some(TraceWriteStatus::Written), + upload_status: Some(TraceUploadStatus::Uploaded), + ..TraceUpdate::default() + }, + ) + .await + .unwrap(); + store.mark_recording_stopped(index, 1).await.unwrap(); + store.mark_recording_stop_notified(index).await.unwrap(); + store.set_expected_trace_count(index, 1).await.unwrap(); + store + .set_progress_report_status( + index, + ProgressReportStatus::Pending, + ProgressReportStatus::Reported, + ) + .await + .unwrap(); + index + } + + /// A cancelled recording whose backend cancel has been notified — the + /// reaper is the single owner of removing its files. + async fn seed_reclaimable_cancelled(store: &SqliteStateStore, instance: i64) -> i64 { + let index = store + .create_recording(new_recording(instance)) + .await + .unwrap() + .recording_index; + store + .mark_recording_start_notified(index, &format!("cloud-{instance}")) + .await + .unwrap(); + store.cancel_recording(index, 1).await.unwrap(); + store.mark_recording_cancel_notified(index).await.unwrap(); + index + } + + fn touch(path: &Path) { + std::fs::write(path, b"x").expect("write file"); + } + + #[tokio::test] + async fn sweep_deletes_files_and_rows_of_a_reclaimable_recording() { + let (store, _db_dir) = open_store().await; + let root_dir = TempDir::new().unwrap(); + let root = Arc::new(root_dir.path().to_path_buf()); + let index = seed_reclaimable_stopped(&store, 0).await; + + let dir = recording_dir(&root, index); + std::fs::create_dir_all(&dir).unwrap(); + touch(&dir.join("trace.json")); + assert!(dir.exists()); + + sweep_once(&Arc::new(store.clone()), &root).await; + + assert!(!dir.exists(), "the recording directory is removed"); + assert!( + store.get_recording(index).await.unwrap().is_none(), + "the recording row is removed" + ); + } + + #[tokio::test] + async fn sweep_reclaims_a_cancelled_recording() { + let (store, _db_dir) = open_store().await; + let root_dir = TempDir::new().unwrap(); + let root = Arc::new(root_dir.path().to_path_buf()); + let index = seed_reclaimable_cancelled(&store, 0).await; + + let dir = recording_dir(&root, index); + std::fs::create_dir_all(&dir).unwrap(); + touch(&dir.join("video.mp4")); + + sweep_once(&Arc::new(store.clone()), &root).await; + + assert!(!dir.exists(), "cancelled recording files are reclaimed"); + assert!(store.get_recording(index).await.unwrap().is_none()); + } + + #[tokio::test] + async fn sweep_leaves_an_unsettled_recording_intact() { + // A live (never stopped) recording is not reclaimable: the reaper must + // touch neither its files nor its rows. + let (store, _db_dir) = open_store().await; + let root_dir = TempDir::new().unwrap(); + let root = Arc::new(root_dir.path().to_path_buf()); + let index = store + .create_recording(new_recording(0)) + .await + .unwrap() + .recording_index; + + let dir = recording_dir(&root, index); + std::fs::create_dir_all(&dir).unwrap(); + touch(&dir.join("trace.json")); + + sweep_once(&Arc::new(store.clone()), &root).await; + + assert!(dir.exists(), "a live recording's files are untouched"); + assert!( + store.get_recording(index).await.unwrap().is_some(), + "a live recording's row is untouched" + ); + } + + #[tokio::test] + async fn sweep_deletes_rows_when_directory_already_gone() { + // Crash-recovery: a prior sweep removed the files but died before its + // row delete committed. The next sweep must still drop the rows — + // NotFound on the directory is treated as success and falls through. + let (store, _db_dir) = open_store().await; + let root_dir = TempDir::new().unwrap(); + let root = Arc::new(root_dir.path().to_path_buf()); + let index = seed_reclaimable_stopped(&store, 0).await; + // Deliberately never create the on-disk directory. + assert!(!recording_dir(&root, index).exists()); + + sweep_once(&Arc::new(store.clone()), &root).await; + + assert!( + store.get_recording(index).await.unwrap().is_none(), + "rows are reclaimed even when the directory is already gone" + ); + } + + #[tokio::test] + async fn sweep_retains_rows_when_directory_removal_fails() { + // Files are removed before rows. If the unlink fails the rows must be + // left in place so the next sweep retries — never orphan files with no + // row pointing at them. We force the failure by planting a regular + // file where the directory is expected: `remove_dir_all` then fails + // with ENOTDIR (not NotFound), exercising the retain-and-retry branch. + let (store, _db_dir) = open_store().await; + let root_dir = TempDir::new().unwrap(); + let root = Arc::new(root_dir.path().to_path_buf()); + let index = seed_reclaimable_stopped(&store, 0).await; + + let dir_path = recording_dir(&root, index); + std::fs::write(&dir_path, b"not a directory").unwrap(); + + sweep_once(&Arc::new(store.clone()), &root).await; + + assert!( + store.get_recording(index).await.unwrap().is_some(), + "rows are retained for retry when file removal fails" + ); + assert!(dir_path.exists(), "the undeletable path is left in place"); + } +} diff --git a/rust/data_daemon/src/pipeline/dispatcher.rs b/rust/data_daemon/src/pipeline/dispatcher.rs index 51f9e4cc..789fce59 100644 --- a/rust/data_daemon/src/pipeline/dispatcher.rs +++ b/rust/data_daemon/src/pipeline/dispatcher.rs @@ -1446,4 +1446,125 @@ mod tests { } assert!(saw_cancel, "RecordingCancelled must be published"); } + + #[tokio::test] + async fn reap_idle_force_closes_a_silent_live_window() { + // A producer that crashes without a Stop leaves a live window open. The + // idle reaper must force-close it (open upper bound, so straggler data + // still routes) and mark the recording stopped so it reaches a terminal, + // notifiable state — otherwise the recording leaks forever. + fast_holdback(); + let (store, dir) = open_store().await; + let context = test_context(dir.path().join("recordings"), store.clone()); + let bus = crate::state::EventBus::new(); + let mut sub = bus.subscribe(); + let mut dispatcher = Dispatcher::new( + store.clone(), + context, + DispatcherContext { + event_bus: Some(bus.clone()), + }, + ); + + let source = ("robot-1".to_string(), 0); + let opened_at = Instant::now(); + dispatcher + .handle_start(source.clone(), None, 100, 100, opened_at) + .await; + assert!(dispatcher.windows.get(&source).unwrap().live.is_some()); + + // Advance past the idle horizon (a future instant — no real waiting). + let now = opened_at + IDLE_REAP + Duration::from_secs(1); + dispatcher.reap_idle(now).await; + + let entry = dispatcher.windows.get(&source).unwrap(); + assert!(entry.live.is_none(), "the idle live window is force-closed"); + assert_eq!(entry.closing.len(), 1); + assert_eq!( + entry.closing[0].stopped_at_ns, + Some(i64::MAX), + "the reaped window keeps an open upper bound for stragglers" + ); + + let recordings = store.recordings_for_source("robot-1", 0).await.unwrap(); + assert_eq!(recordings.len(), 1); + assert!( + recordings[0].stopped_at.is_some(), + "the recording row is marked stopped at the reap moment" + ); + + let mut saw_stop = false; + while let Ok(event) = sub.try_recv() { + if matches!(event, DaemonEvent::RecordingStopped { .. }) { + saw_stop = true; + } + } + assert!( + saw_stop, + "RecordingStopped is published for the reaped window" + ); + } + + #[tokio::test] + async fn reap_idle_leaves_a_recently_active_window_open() { + // A window whose source was seen within the idle horizon must NOT be + // reaped — the guard against force-closing a still-live recording. + fast_holdback(); + let (store, dir) = open_store().await; + let context = test_context(dir.path().join("recordings"), store.clone()); + let mut dispatcher = Dispatcher::new(store.clone(), context, DispatcherContext::default()); + + let source = ("robot-1".to_string(), 0); + let opened_at = Instant::now(); + dispatcher + .handle_start(source.clone(), None, 100, 100, opened_at) + .await; + + // Only a short time has passed — well within the idle horizon. + dispatcher + .reap_idle(opened_at + Duration::from_millis(5)) + .await; + + assert!( + dispatcher.windows.get(&source).unwrap().live.is_some(), + "a recently-active window must stay live" + ); + } + + #[tokio::test] + async fn housekeep_evicts_a_closing_window_past_retention() { + // A closing window is retained for 2·holdback (so its in-window data has + // released) and then evicted; without this the window map — and the + // actor handles it holds — leak for the daemon's lifetime. + fast_holdback(); + let (store, dir) = open_store().await; + let context = test_context(dir.path().join("recordings"), store.clone()); + let mut dispatcher = Dispatcher::new(store.clone(), context, DispatcherContext::default()); + + let source = ("robot-1".to_string(), 0); + let opened_at = Instant::now(); + dispatcher + .handle_start(source.clone(), None, 100, 100, opened_at) + .await; + let stopped_at = opened_at + Duration::from_millis(1); + dispatcher + .handle_stop(source.clone(), 200, 200, stopped_at) + .await; + assert_eq!( + dispatcher.windows.get(&source).unwrap().closing.len(), + 1, + "the stopped window is retained as closing" + ); + + // Just past the 2·holdback retention window. + let retention = dispatcher.holdback * 2; + let now = stopped_at + retention + Duration::from_millis(1); + dispatcher.housekeep(now).await; + + let closing = dispatcher + .windows + .get(&source) + .map_or(0, |entry| entry.closing.len()); + assert_eq!(closing, 0, "a closing window past 2·holdback is evicted"); + } } diff --git a/rust/data_daemon/src/pipeline/json_writer.rs b/rust/data_daemon/src/pipeline/json_writer.rs index 41ee884d..36d8a7bd 100644 --- a/rust/data_daemon/src/pipeline/json_writer.rs +++ b/rust/data_daemon/src/pipeline/json_writer.rs @@ -222,3 +222,136 @@ fn writer_gone() -> JsonTraceError { source: std::io::Error::other("json trace writer thread stopped"), } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::paths::TRACE_JSON_FILENAME; + use serde_json::json; + use std::path::Path; + use tempfile::TempDir; + + fn read_back(trace_dir: &Path) -> Value { + let bytes = std::fs::read(trace_dir.join(TRACE_JSON_FILENAME)).expect("read trace.json"); + serde_json::from_slice(&bytes).expect("parse trace.json") + } + + /// Drain the write-behind thread: drop the handle so the channel closes, + /// then join so any in-flight disk write has landed before assertions. + fn shutdown(handle: JsonWriteHandle, join: JoinHandle<()>) { + drop(handle); + join.join().expect("json writer thread join"); + } + + #[tokio::test] + async fn open_append_finish_round_trips_to_disk() { + let tempdir = TempDir::new().unwrap(); + let (handle, join) = spawn(); + + handle.open("t-1", tempdir.path().to_path_buf()); + handle.append("t-1", 1, br#"{"frame":0,"v":1.5}"#.to_vec()); + handle.append("t-1", 2, br#"{"frame":1,"v":2.5}"#.to_vec()); + let bytes = handle.finish("t-1").await.expect("finish"); + + let path = tempdir.path().join(TRACE_JSON_FILENAME); + assert_eq!( + bytes, + std::fs::metadata(&path).unwrap().len(), + "reported byte count must match the file on disk" + ); + assert_eq!( + read_back(tempdir.path()), + json!([{"frame":0,"v":1.5},{"frame":1,"v":2.5}]) + ); + shutdown(handle, join); + } + + #[tokio::test] + async fn already_json_payload_is_forwarded_verbatim() { + // A float the SDK formatted itself must land on disk byte-for-byte: + // the integration matrix compares `trace.json` floats exactly, so the + // already-JSON branch forwards the raw bytes with no serde round trip. + let tempdir = TempDir::new().unwrap(); + let (handle, join) = spawn(); + + handle.open("t-1", tempdir.path().to_path_buf()); + handle.append("t-1", 1, br#"{"v":0.11666666666666667}"#.to_vec()); + handle.finish("t-1").await.expect("finish"); + + let raw = std::fs::read_to_string(tempdir.path().join(TRACE_JSON_FILENAME)).unwrap(); + assert!( + raw.contains("0.11666666666666667"), + "float text must survive verbatim, got {raw}" + ); + shutdown(handle, join); + } + + #[tokio::test] + async fn non_json_payload_is_wrapped_in_fallback_entry() { + let tempdir = TempDir::new().unwrap(); + let (handle, join) = spawn(); + + handle.open("t-1", tempdir.path().to_path_buf()); + handle.append("t-1", 42, b"not-json".to_vec()); + handle.finish("t-1").await.expect("finish"); + + assert_eq!( + read_back(tempdir.path()), + json!([{"timestamp_ns": 42, "payload_len": 8}]), + "a non-JSON payload is replaced by a length-only fallback object" + ); + shutdown(handle, join); + } + + #[tokio::test] + async fn finish_without_open_reports_zero_bytes() { + let (handle, join) = spawn(); + let bytes = handle.finish("never-opened").await.expect("finish"); + assert_eq!(bytes, 0, "finalising an unknown trace is a no-op"); + shutdown(handle, join); + } + + #[tokio::test] + async fn drop_discards_an_open_writer() { + let tempdir = TempDir::new().unwrap(); + let (handle, join) = spawn(); + + handle.open("t-1", tempdir.path().to_path_buf()); + handle.append("t-1", 1, br#"{"frame":0}"#.to_vec()); + handle.drop_trace("t-1"); + // After a drop the writer is gone, so a later finish finalises nothing. + let bytes = handle.finish("t-1").await.expect("finish"); + assert_eq!(bytes, 0, "a dropped trace has no writer left to finalise"); + shutdown(handle, join); + } + + #[tokio::test] + async fn concurrent_traces_do_not_interleave() { + // Two traces share the one FIFO thread; each must retain only its own + // entries, in order, in its own file — no cross-trace corruption. + let tempdir = TempDir::new().unwrap(); + let dir_one = tempdir.path().join("one"); + let dir_two = tempdir.path().join("two"); + let (handle, join) = spawn(); + + handle.open("one", dir_one.clone()); + handle.open("two", dir_two.clone()); + handle.append("one", 1, br#"{"trace":"one","i":0}"#.to_vec()); + handle.append("two", 1, br#"{"trace":"two","i":0}"#.to_vec()); + handle.append("one", 2, br#"{"trace":"one","i":1}"#.to_vec()); + handle.append("two", 2, br#"{"trace":"two","i":1}"#.to_vec()); + + handle.finish("one").await.expect("finish one"); + handle.finish("two").await.expect("finish two"); + + assert_eq!( + read_back(&dir_one), + json!([{"trace":"one","i":0},{"trace":"one","i":1}]) + ); + assert_eq!( + read_back(&dir_two), + json!([{"trace":"two","i":0},{"trace":"two","i":1}]) + ); + shutdown(handle, join); + } +} diff --git a/rust/data_daemon/src/pipeline/trace_actor.rs b/rust/data_daemon/src/pipeline/trace_actor.rs index aae5474c..d39dca05 100644 --- a/rust/data_daemon/src/pipeline/trace_actor.rs +++ b/rust/data_daemon/src/pipeline/trace_actor.rs @@ -1256,4 +1256,102 @@ mod tests { assert_eq!(trace.write_status, TraceWriteStatus::Written); assert!(trace.total_bytes > 0); } + + #[tokio::test] + async fn frames_over_storage_budget_are_dropped_not_written() { + // Under storage pressure the actor must refuse frames (counting the + // drops) and keep writing the ones that fit, rather than blow past the + // cap or wedge — a silent-data-loss path with no other guard. + let tempdir = TempDir::new().unwrap(); + let store = SqliteStateStore::open(&tempdir.path().join("state.db")) + .await + .expect("open store"); + let store_arc = Arc::new(store.clone()); + // A tight 25-byte cap: the first 20-byte frame fits; later ones don't. + let policy = StoragePolicy { + storage_limit_bytes: Some(25), + min_free_disk_bytes: 0, + refresh_interval: Duration::from_secs(60), + }; + let recordings_root = tempdir.path().join("recordings"); + let budget = Arc::new(StorageBudget::new(&recordings_root, policy)); + let (trace_writer, _writer_owner) = + crate::state::trace_event_database_writer::spawn(store_arc.clone()); + let (json_writer, _json_owner) = crate::pipeline::json_writer::spawn(); + let context = Arc::new(TraceActorContext::new( + recordings_root, + budget, + VideoEncoder::new(), + trace_writer, + json_writer, + )); + + let mut state = ActorState::new(identity(1, "trace-1", "joints")); + state.send_create(&context); + for _ in 0..3 { + state.handle_data(&context, 0, None, vec![0u8; 20]).await; + } + state.finalise_trace(&context).await; + context.trace_writer.flush().await; + + assert_eq!(state.frame_count, 1, "only the first frame fits the budget"); + assert_eq!( + state.dropped_over_budget, 2, + "the two over-budget frames are counted as dropped" + ); + + let trace_dir = + TracePath::new("1", "joints", "trace-1").directory(context.recordings_root.as_path()); + let bytes = std::fs::read(trace_dir.join("trace.json")).unwrap(); + let parsed: Value = serde_json::from_slice(&bytes).unwrap(); + assert_eq!( + parsed.as_array().unwrap().len(), + 1, + "only the accepted frame lands on disk" + ); + } + + #[tokio::test] + async fn cancel_discards_writer_and_releases_budget_without_finalising() { + // A cancel mid-trace must drop the open writer (release its file handle + // without finalising) and give back the budget it reserved — never + // marking the trace Written. + let tempdir = TempDir::new().unwrap(); + let store = SqliteStateStore::open(&tempdir.path().join("state.db")) + .await + .expect("open store"); + let store_arc = Arc::new(store.clone()); + let context = test_context(&tempdir.path().join("recordings"), store_arc.clone()); + + let mut state = ActorState::new(identity(1, "trace-1", "joints")); + state.send_create(&context); + state + .handle_data( + &context, + 0, + None, + serde_json::to_vec(&json!({"i": 0})).unwrap(), + ) + .await; + assert!(state.bytes_on_disk > 0, "the frame was accounted on disk"); + + state.handle_cancel(&context).await; + context.trace_writer.flush().await; + + assert!( + matches!(state.writer, TraceWriterKind::Pending), + "cancel discards the open writer" + ); + assert_eq!( + state.bytes_on_disk, 0, + "cancel releases the byte accounting" + ); + + let trace = store.get_trace("trace-1").await.unwrap().unwrap(); + assert_ne!( + trace.write_status, + TraceWriteStatus::Written, + "a cancelled trace is never finalised as Written" + ); + } } diff --git a/rust/data_daemon_producer/Cargo.toml b/rust/data_daemon_producer/Cargo.toml index 5b224278..afafcac1 100644 --- a/rust/data_daemon_producer/Cargo.toml +++ b/rust/data_daemon_producer/Cargo.toml @@ -15,6 +15,7 @@ path = "src/lib.rs" data_daemon_shared = { path = "../data_daemon_shared" } iceoryx2.workspace = true libc.workspace = true +miniz_oxide = "0.8" pyo3.workspace = true # Used by the batched joint-data fast path to format each per-item # `{"timestamp":...,"value":...}` payload. We deliberately do NOT hand-format diff --git a/rust/data_daemon_producer/src/nut_writer.rs b/rust/data_daemon_producer/src/nut_writer.rs index c7dd8940..15ccb60b 100644 --- a/rust/data_daemon_producer/src/nut_writer.rs +++ b/rust/data_daemon_producer/src/nut_writer.rs @@ -1,13 +1,17 @@ -//! Minimal NUT-container muxer for a single raw-RGB24 video stream. +//! Minimal NUT-container muxer for a single video stream of lossless PNG +//! frames. //! -//! The video trace actor spools captured frames into a `raw.nut` file with -//! this writer; the file is then handed off to an `ffmpeg` transcoder. +//! The video trace actor spools captured frames into a `.nut` file with this +//! writer; the file is then handed off to an `ffmpeg` transcoder. The caller +//! always supplies packed RGB24; the writer PNG-encodes each frame (see +//! [`PngScratch::encode`]) so the on-disk spool — and the daemon's transcode +//! read-back of it — is a small fraction of the raw video bandwidth. //! //! The output is intentionally the bare minimum NUT spec elements needed for -//! `ffprobe` to report the stream geometry: file id string, main header, -//! stream header, one syncpoint, and one frame packet per captured RGB -//! buffer. We deliberately skip the optional index packet so the file stays -//! crash-safe — a truncated tail still demuxes up to the last complete frame. +//! `ffprobe`/`ffmpeg` to demux the PNG stream: file id string, main header, +//! stream header, one syncpoint, and one frame packet per captured frame. We +//! deliberately skip the optional index packet so the file stays crash-safe — a +//! truncated tail still demuxes up to the last complete frame. //! //! See `https://ffmpeg.org/~michael/nut.txt` for the authoritative spec. The //! bit-level layout is non-obvious in several places (frame-code table @@ -21,6 +25,10 @@ use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; use std::sync::OnceLock; +use miniz_oxide::deflate::core::{ + compress, create_comp_flags_from_zip_params, CompressorOxide, TDEFLFlush, TDEFLStatus, +}; + /// Fixed 25-byte file identifier mandated by the NUT spec. The trailing NUL /// is part of the signature. const FILE_ID_STRING: &[u8] = b"nut/multimedia container\0"; @@ -70,8 +78,32 @@ const FLAG_INVALID: u64 = 1 << 13; // bit 13 — entry is unusable /// table, this means every frame carries its own flags inline. const FRAME_CODE_ALL_EXPLICIT: u8 = 0xFF; +/// zlib level for the PNG IDAT stream. Level 1 (fastest) is deliberate: the +/// encode runs on the writer thread that drains the frame queue, so a slow +/// encode backs the queue up into producer backpressure — speed matters more +/// than ratio. And with the Up filter the flat frames compress to almost +/// nothing at level 1 anyway. Raise only with a measured CPU/ratio trade. +const PNG_COMPRESSION_LEVEL: u8 = 1; + +/// `window_bits` for [`create_comp_flags_from_zip_params`]: a positive value +/// asks miniz_oxide to wrap the deflate stream in a zlib header — what PNG IDAT +/// requires (and what `compress_to_vec_zlib` passed). +const ZLIB_WINDOW_BITS: i32 = 1; + +/// `strategy` for [`create_comp_flags_from_zip_params`]: 0 is the default +/// (no forced Huffman-only/RLE strategy). +const DEFLATE_STRATEGY_DEFAULT: i32 = 0; + +/// 8-byte PNG file signature. +const PNG_SIGNATURE: [u8; 8] = [0x89, b'P', b'N', b'G', 0x0D, 0x0A, 0x1A, 0x0A]; + +/// PNG row-filter tag for the "Up" predictor (residual vs the pixel directly +/// above). Prefixes every scanline; see [`PngScratch::encode`] for why Up. +const PNG_FILTER_UP: u8 = 2; + /// Configuration captured at writer-creation time. The writer is single -/// stream and assumes packed RGB24 (3 bytes per pixel, no padding). +/// stream; the caller always supplies packed RGB24 (3 bytes per pixel, no +/// padding), which the writer stores as per-frame lossless PNG. #[derive(Debug, Clone, Copy)] pub struct NutVideoConfig { /// Frame width in pixels. Must be non-zero. @@ -144,6 +176,14 @@ pub struct NutWriter { /// Number of bytes a well-formed RGB24 frame must occupy. Cached so the /// per-frame size check stays in a single `usize`. expected_frame_bytes: usize, + /// Decoded-equivalent bytes seen so far: `expected_frame_bytes` summed over + /// every appended frame. The chunk-roll threshold keys off this rather than + /// the compressed [`bytes_written`], so a chunk holds the same frame count + /// — and thus the same daemon per-chunk transcode granularity — regardless + /// of how well the PNG frames happened to compress. + /// + /// [`bytes_written`]: Self::bytes_written + logical_bytes: u64, /// File offset of the most recently written syncpoint packet. Drives the /// bytes-since-last-syncpoint check that triggers the next periodic /// syncpoint. @@ -151,6 +191,9 @@ pub struct NutWriter { /// File offset up to which an async writeback hint has been issued. The /// next hint covers `[last_writeback_hint, bytes_written)`. last_writeback_hint: u64, + /// Reused PNG-encode scratch space (buffers + compressor); see + /// [`PngScratch`]. + png: PngScratch, } /// Emit a new syncpoint when the bytes-since-last-syncpoint would exceed @@ -237,8 +280,10 @@ impl NutWriter { config, bytes_written: 0, expected_frame_bytes, + logical_bytes: 0, last_syncpoint_offset: 0, last_writeback_hint: 0, + png: PngScratch::new(), }; writer.write_headers()?; Ok(writer) @@ -257,9 +302,17 @@ impl NutWriter { self.bytes_written } - /// Append one raw-RGB24 frame at the supplied PTS (frame index in - /// time-base ticks). The supplied slice must be exactly - /// `width * height * 3` bytes long. + /// Decoded-equivalent bytes appended so far — `width * height * 3` per + /// frame, not the compressed on-disk size. See the + /// [`logical_bytes`](Self::logical_bytes) field for why chunk rolls key off + /// this rather than [`bytes_written`](Self::bytes_written). + pub fn logical_bytes(&self) -> u64 { + self.logical_bytes + } + + /// Append one frame at the supplied PTS (frame index in time-base ticks). + /// The frame is supplied as packed RGB24 — exactly `width * height * 3` + /// bytes — and stored as a per-frame lossless PNG. pub fn write_frame(&mut self, pts: u64, rgb_bytes: &[u8]) -> Result<(), NutError> { if rgb_bytes.len() != self.expected_frame_bytes { return Err(NutError::FrameSize { @@ -268,6 +321,14 @@ impl NutWriter { }); } + // The caller always supplies packed RGB24; we compress it to a per-frame + // PNG here, into the writer's reused scratch. `payload_len` is the length + // of the assembled PNG now sitting in `self.png.output` — the bytes that + // land in the NUT frame packet below. + let payload_len = self + .png + .encode(self.config.width, self.config.height, rgb_bytes); + // Emit a periodic syncpoint before appending the frame so the gap // between consecutive syncpoints stays within the demuxer's reach. // Without this, files containing more than a few frames of >16 KiB @@ -275,7 +336,7 @@ impl NutWriter { let bytes_since_last = self .bytes_written .saturating_sub(self.last_syncpoint_offset); - let projected_frame_bytes = rgb_bytes.len() as u64 + 32; + let projected_frame_bytes = payload_len as u64 + 32; if bytes_since_last.saturating_add(projected_frame_bytes) > SYNCPOINT_INTERVAL_BYTES { self.write_syncpoint(pts)?; } @@ -312,8 +373,8 @@ impl NutWriter { vencode(&mut header, coded_pts); // data_size = data_size_lsb (0) + data_size_msb * data_size_mul (1) - // = data_size_msb, so we encode the raw frame length here. - vencode(&mut header, rgb_bytes.len() as u64); + // = data_size_msb, so we encode the on-disk frame length here. + vencode(&mut header, payload_len as u64); // Frame header checksum: CRC32/MPEG-2 over framecode + all header // bytes up to (but not including) the checksum itself. @@ -321,7 +382,13 @@ impl NutWriter { header.extend_from_slice(&checksum.to_be_bytes()); self.write_all(&header)?; - self.write_all(rgb_bytes)?; + self.write_png_payload(payload_len)?; + + // Track decoded-equivalent volume so chunk rolls track frame count, not + // the (much smaller, content-dependent) compressed PNG size. + self.logical_bytes = self + .logical_bytes + .saturating_add(self.expected_frame_bytes as u64); // Once enough has accumulated, kick off async writeback for it so dirty // pages drain continuously instead of piling up to the throttle's hard @@ -439,6 +506,21 @@ impl NutWriter { Ok(()) } + /// Write the first `len` bytes of the reused `png.output` (the assembled PNG + /// payload). A dedicated method rather than `write_all(&self.png.output)` so + /// the borrow stays a disjoint `&mut self.writer` + `&self.png.output` rather + /// than the whole-`self` borrow a method argument would force. + fn write_png_payload(&mut self, len: usize) -> Result<(), NutError> { + self.writer + .write_all(&self.png.output[..len]) + .map_err(|source| NutError::Write { + path: self.path.clone(), + source, + })?; + self.bytes_written = self.bytes_written.saturating_add(len as u64); + Ok(()) + } + fn flush(&mut self) -> Result<(), NutError> { self.writer.flush().map_err(|source| NutError::Write { path: self.path.clone(), @@ -504,17 +586,17 @@ fn build_stream_header_payload(config: NutVideoConfig) -> Vec { vencode(&mut payload, 0); // stream_id vencode(&mut payload, 0); // stream_class — 0 = video - // fourcc as a `vb`: length-prefixed bytes. "RGB\x18" advertises packed - // RGB24 (8 bits per channel, 24 bpp). FFmpeg's libavformat maps this - // fourcc to `AV_CODEC_ID_RAWVIDEO` with pix_fmt = `AV_PIX_FMT_RGB24`. - let fourcc: &[u8] = b"RGB\x18"; + // fourcc as a `vb`: length-prefixed bytes. "MPNG" tells ffmpeg's NUT + // demuxer the frame packets are PNG (`AV_CODEC_ID_PNG`) — the tag ffmpeg + // itself writes for png-in-NUT, verified via `ffprobe` codec_tag_string. + let fourcc: &[u8] = b"MPNG"; vencode(&mut payload, fourcc.len() as u64); payload.extend_from_slice(fourcc); vencode(&mut payload, 0); // time_base_id vencode(&mut payload, MSB_PTS_SHIFT); vencode(&mut payload, 1); // max_pts_distance — we always include FLAG_CHECKSUM anyway - vencode(&mut payload, 0); // decode_delay — no B-frames in raw video + vencode(&mut payload, 0); // decode_delay — PNG is all intra, no B-frames // stream_flags = 0. We deliberately do *not* set FLAG_FIXED_FPS: our // time_base is microsecond ticks (1/1_000_000), and FLAG_FIXED_FPS would // tell downstream demuxers the stream runs at exactly 1/time_base fps @@ -691,6 +773,199 @@ fn crc32_nut(bytes: &[u8]) -> u32 { crc } +/// Per-frame PNG-encode scratch, owned by the [`NutWriter`] and reused across +/// frames so a steady-state frame allocates nothing. The three buffers and the +/// compressor are bundled (rather than loose fields) because [`encode`] always +/// drives them as a unit. +/// +/// [`encode`]: Self::encode +struct PngScratch { + /// One frame's filtered PNG scanlines (each row: a filter tag + the row's + /// filtered bytes) — the input to the zlib stream that becomes IDAT. + filter: Vec, + /// The zlib-compressed IDAT payload. Grown once to fit and then held; see + /// [`compress_zlib_into`]. + idat: Vec, + /// The assembled PNG bitstream (signature + IHDR + IDAT + IEND) that becomes + /// one NUT frame packet. After [`encode`](Self::encode) the frame is + /// `output[..len]`. + output: Vec, + /// Persistent zlib compressor, [`reset`](CompressorOxide::reset) between + /// frames. Holding one across frames keeps its internal Huffman/dictionary + /// allocations off the per-frame path — `compress_to_vec_zlib` would build a + /// fresh `CompressorOxide` (two heap allocations) every call. + compressor: CompressorOxide, +} + +impl PngScratch { + fn new() -> Self { + Self { + filter: Vec::new(), + idat: Vec::new(), + output: Vec::new(), + compressor: CompressorOxide::new(create_comp_flags_from_zip_params( + PNG_COMPRESSION_LEVEL as i32, + ZLIB_WINDOW_BITS, + DEFLATE_STRATEGY_DEFAULT, + )), + } + } + + /// Encode one packed RGB24 frame as a standalone PNG (truecolour, 8-bit, + /// "Up" filter) into `self.output`, returning the byte length written (the + /// PNG is `self.output[..len]`). + /// + /// We hand-roll the container — signature + IHDR + IDAT + IEND — rather than + /// pull in an image crate; the IDAT payload is a miniz_oxide zlib stream and + /// the chunk checksums use a standard CRC-32 ([`png_crc32`]). + fn encode(&mut self, width: u32, height: u32, rgb: &[u8]) -> usize { + let row_bytes = width as usize * 3; + + // Filtered scanlines: each row is a one-byte filter tag followed by the + // row's filtered bytes — the input to the zlib stream that becomes IDAT. + // + // We use the "Up" filter (residual vs the pixel directly above) on every + // row. It is the cheapest predictor (a flat byte subtraction, no + // per-pixel bookkeeping) yet collapses the flat/slowly-varying regions + // typical of camera frames into runs of zeros, which deflate then + // crushes far faster and smaller than the raw bytes — measured ~3x + // faster and ~4x smaller than filter "None" on the near-uniform + // integration frames. + self.filter.clear(); + self.filter.reserve((row_bytes + 1) * height as usize); + for row in 0..height as usize { + let start = row * row_bytes; + let current = &rgb[start..start + row_bytes]; + self.filter.push(PNG_FILTER_UP); + if row == 0 { + // PNG defines the scanline above row 0 as all-zero, so Up leaves + // the first row's bytes unchanged. + self.filter.extend_from_slice(current); + } else { + let above = &rgb[start - row_bytes..start]; + self.filter.extend( + current + .iter() + .zip(above) + .map(|(value, prior)| value.wrapping_sub(*prior)), + ); + } + } + + let idat_len = compress_zlib_into(&mut self.compressor, &self.filter, &mut self.idat); + let idat = &self.idat[..idat_len]; + + let mut ihdr = [0u8; 13]; + ihdr[0..4].copy_from_slice(&width.to_be_bytes()); + ihdr[4..8].copy_from_slice(&height.to_be_bytes()); + ihdr[8] = 8; // bit depth + ihdr[9] = 2; // colour type: truecolour (RGB) + ihdr[10] = 0; // compression method: zlib/deflate + ihdr[11] = 0; // filter method: adaptive (per-row tag; all Up here) + ihdr[12] = 0; // interlace: none + + self.output.clear(); + self.output + .reserve(PNG_SIGNATURE.len() + 12 + 13 + 12 + idat.len() + 12); + self.output.extend_from_slice(&PNG_SIGNATURE); + write_png_chunk(&mut self.output, b"IHDR", &ihdr); + write_png_chunk(&mut self.output, b"IDAT", idat); + write_png_chunk(&mut self.output, b"IEND", &[]); + self.output.len() + } +} + +/// Compress `input` as a zlib stream into `output`, reusing both `compressor` +/// and `output`'s allocation across calls; returns the compressed length (the +/// IDAT payload is `output[..len]`). +/// +/// This is the buffer-reusing equivalent of miniz_oxide's `compress_to_vec_zlib` +/// (which allocates a fresh output `Vec` *and* a fresh `CompressorOxide` every +/// call). Every frame in a trace is the same size, so `output` grows to fit +/// within the first frame or two and then never resizes again. +fn compress_zlib_into( + compressor: &mut CompressorOxide, + input: &[u8], + output: &mut Vec, +) -> usize { + compressor.reset(); + + // `compress` writes into the slice's *length*, not its spare capacity, so + // the buffer must be sized (not merely reserved). Seed a small floor on the + // first frame; later frames keep whatever length earlier ones grew it to. + const MIN_OUTPUT: usize = 1024; + if output.len() < MIN_OUTPUT { + output.resize(MIN_OUTPUT, 0); + } + + let mut remaining = input; + let mut written = 0; + loop { + let (status, consumed, produced) = compress( + compressor, + remaining, + &mut output[written..], + TDEFLFlush::Finish, + ); + written += produced; + match status { + TDEFLStatus::Done => return written, + // Ran out of output room before finishing: advance past the consumed + // input (the compressor itself holds no input cursor), grow, repeat. + TDEFLStatus::Okay => { + remaining = &remaining[consumed..]; + let headroom = output.len() - written; + if headroom < 64 { + output.resize((output.len() * 2).max(written + 64), 0); + } + } + // `compress` only ever returns Done/Okay for a valid compressor and + // a real output buffer (BadParam/PutBufFailed need a null compressor + // or the callback-func variant, neither of which we use). + other => unreachable!("zlib compress returned {other:?}"), + } + } +} + +/// Append one PNG chunk — length (u32 BE) + type + data + CRC-32 (u32 BE) over +/// type+data — per the PNG spec. +fn write_png_chunk(out: &mut Vec, kind: &[u8; 4], data: &[u8]) { + out.extend_from_slice(&(data.len() as u32).to_be_bytes()); + let checksum_start = out.len(); + out.extend_from_slice(kind); + out.extend_from_slice(data); + out.extend_from_slice(&png_crc32(&out[checksum_start..]).to_be_bytes()); +} + +/// Standard CRC-32 (ISO-HDLC: reflected, polynomial 0xEDB88320, init/final XOR +/// 0xFFFFFFFF) — the variant PNG chunk checksums use. This is a *different* +/// variant from [`crc32_nut`] (NUT uses CRC-32/MPEG-2), so the two are kept +/// separate rather than parameterised. +fn png_crc32(bytes: &[u8]) -> u32 { + static TABLE: OnceLock<[u32; 256]> = OnceLock::new(); + let table = TABLE.get_or_init(|| { + let mut table = [0u32; 256]; + for (index, slot) in table.iter_mut().enumerate() { + let mut crc = index as u32; + for _ in 0..8 { + crc = if crc & 1 != 0 { + 0xEDB8_8320 ^ (crc >> 1) + } else { + crc >> 1 + }; + } + *slot = crc; + } + table + }); + + let mut crc = 0xFFFF_FFFFu32; + for &byte in bytes { + crc = table[((crc ^ byte as u32) & 0xFF) as usize] ^ (crc >> 8); + } + crc ^ 0xFFFF_FFFF +} + #[cfg(test)] mod tests { use super::*; @@ -820,15 +1095,13 @@ mod tests { )); } - /// Locate `ffprobe`. Returns `None` (with a logged note) if it is not on - /// PATH so the test can skip cleanly rather than fail in sandboxes that - /// lack the FFmpeg suite. - fn locate_ffprobe() -> Option { - // `which ffprobe` is the most portable check across the Linux - // distributions we run CI on. Falling back to a literal "ffprobe" - // string lets the eventual `Command::new` produce a clear error if - // the binary disappears between this lookup and execution. - let output = Command::new("which").arg("ffprobe").output().ok()?; + /// Locate `binary` on PATH. Returns `None` (so the caller can skip cleanly) + /// when it is absent — sandboxes without the FFmpeg suite skip these tests + /// rather than failing. + fn locate_on_path(binary: &str) -> Option { + // `which` is the most portable check across the Linux distributions we + // run CI on. + let output = Command::new("which").arg(binary).output().ok()?; if !output.status.success() { return None; } @@ -843,7 +1116,7 @@ mod tests { #[test] fn ffprobe_recognises_stream_metadata() { - let ffprobe = match locate_ffprobe() { + let ffprobe = match locate_on_path("ffprobe") { Some(path) => path, None => { eprintln!( @@ -926,4 +1199,92 @@ mod tests { ); } } + + #[test] + fn png_chunk_round_trips_losslessly() { + let ffprobe = match locate_on_path("ffprobe") { + Some(path) => path, + None => { + eprintln!("ffprobe not on PATH — skipping PNG round-trip test."); + return; + } + }; + let ffmpeg = match locate_on_path("ffmpeg") { + Some(path) => path, + None => { + eprintln!("ffmpeg not on PATH — skipping PNG round-trip test."); + return; + } + }; + + let (width, height) = (32u32, 24u32); + let tempdir = TempDir::new().unwrap(); + let path = tempdir.path().join("frames.nut"); + let config = NutVideoConfig { + width, + height, + time_base_num: 1, + time_base_den: 1_000_000, + }; + let mut writer = NutWriter::create(&path, config).unwrap(); + + // Distinct, high-entropy-ish pixels per frame so a regression that + // dropped/duplicated pixels (or silently lost the alpha/channel order) + // is caught by the bit-exact comparison below. + let frame_count = 5usize; + let frame_bytes = (width * height * 3) as usize; + let mut expected = Vec::with_capacity(frame_bytes * frame_count); + for index in 0..frame_count { + let mut buffer = vec![0u8; frame_bytes]; + for (pixel_index, channels) in buffer.chunks_mut(3).enumerate() { + channels[0] = ((pixel_index * 7 + index * 13) & 0xFF) as u8; + channels[1] = ((pixel_index * 5 + index * 29) & 0xFF) as u8; + channels[2] = ((pixel_index * 3 + index * 31) & 0xFF) as u8; + } + writer.write_frame(index as u64, &buffer).unwrap(); + expected.extend_from_slice(&buffer); + } + writer.finish().unwrap(); + + // The NUT must advertise the PNG codec so the daemon's codec-agnostic + // transcode picks the right decoder off the stream header. + let probe = Command::new(&ffprobe) + .args([ + "-v", + "error", + "-select_streams", + "v:0", + "-show_entries", + "stream=codec_name", + "-of", + "default=nokey=1:noprint_wrappers=1", + ]) + .arg(&path) + .output() + .expect("spawn ffprobe"); + assert!(probe.status.success(), "ffprobe failed: {probe:?}"); + assert_eq!(String::from_utf8_lossy(&probe.stdout).trim(), "png"); + + // Decode the whole NUT back to packed RGB24 and assert it is bit-exact + // to what we wrote — PNG is lossless, so the round-trip must be exact. + let decoded = Command::new(&ffmpeg) + .args(["-v", "error", "-vsync", "passthrough"]) + .arg("-i") + .arg(&path) + .args(["-f", "rawvideo", "-pix_fmt", "rgb24", "-"]) + .output() + .expect("spawn ffmpeg"); + assert!( + decoded.status.success(), + "ffmpeg decode failed: stderr={}", + String::from_utf8_lossy(&decoded.stderr) + ); + assert_eq!( + decoded.stdout, + expected, + "PNG round-trip was not bit-exact (len got {}, want {})", + decoded.stdout.len(), + expected.len() + ); + } } diff --git a/rust/data_daemon_producer/src/writer.rs b/rust/data_daemon_producer/src/writer.rs index 9dae6b54..a2accce6 100644 --- a/rust/data_daemon_producer/src/writer.rs +++ b/rust/data_daemon_producer/src/writer.rs @@ -62,14 +62,33 @@ use crate::publisher::{now_ns, publisher_tx, ProducerError, PublishMsg}; /// first (see [`should_flush_chunk`]). Small frames never reach 256 MiB /// mid-recording, so the frame cap is what bounds the chunk's announcement /// envelope to one commands slice. +/// +/// One chunk's worth is also the in-RAM writer-queue ceiling +/// ([`WRITER_QUEUE_MAX_BYTES`]): changing this size moves both the chunk +/// granularity *and* the producer's backpressure headroom. const CHUNK_FLUSH_BYTES: u64 = 256 * 1024 * 1024; -/// Backpressure cap for the writer's frame queue. A transient disk stall is -/// absorbed by buffering frames up to this many bytes before `log_frame` -/// blocks; only a *sustained* overload (the writer genuinely can't keep up) -/// propagates backpressure to the caller. 64 MiB holds ~a second of a -/// multi-camera 256×256@30 workload while staying small next to a worker's RSS. -const WRITER_QUEUE_MAX_BYTES: usize = 64 * 1024 * 1024; +/// Backpressure cap for the writer's frame queue. `log_frame` copies a frame in +/// and returns; the background writer thread drains the queue to disk, so the +/// caller blocks only once the queue is *full*. The cap is therefore sized by +/// the writer-side stall it lets the caller ride out before `log_rgb` slows. +/// +/// Those stalls are the kernel's system-wide `balance_dirty_pages` throttle — +/// hundreds of ms once dirty pages cross `vm.dirty_ratio`, driven by *any* +/// process on the host (the daemon's ffmpeg transcodes, other tenants) and not +/// preventable by the producer. So size by drain time: at the heaviest workload +/// the suite runs — 1080p RGB @ 60 fps ≈ 356 MiB/s per camera — one chunk's +/// 256 MiB buys ≈ 0.7 s, enough to absorb the ~0.6-0.8 s stalls seen in +/// practice. The old 64 MiB bought only ~0.17 s and was overrun. +/// +/// The queue holds *raw* RGB ([`FrameJob::data`]) — PNG-encoding happens after +/// dequeue — so PNG shrinks what hits the disk but not the queue-fill rate +/// during a stall; the raw sizing stands. One chunk is also the deliberate +/// ceiling: this anonymous RAM competes with the page cache, so a larger queue +/// would shrink the dirty headroom and worsen the very throttle it absorbs. +/// Sustained overload is bounded by the on-disk spool cap (`spool_max`, default +/// 2 GiB). +const WRITER_QUEUE_MAX_BYTES: usize = CHUNK_FLUSH_BYTES as usize; /// How often the writer rescans its spool inbox to refresh the on-disk backlog /// estimate and release frame-admission backpressure. Also bounds how long a @@ -646,15 +665,19 @@ fn current_thread_id() -> i64 { /// Whether the in-progress chunk should be sealed now, checked after each /// appended frame. A chunk is rolled at the **lower** of two bounds: /// -/// * [`CHUNK_FLUSH_BYTES`] — keeps the daemon's per-chunk encode cost amortised. +/// * [`CHUNK_FLUSH_BYTES`] — measured against *logical* (decoded-equivalent) +/// bytes, not the compressed on-disk size, so chunk granularity (and the +/// daemon's per-chunk transcode cost) stays stable however well the PNG +/// frames compress. Keying off the on-disk size would pack thousands of +/// tiny PNG frames into one chunk and balloon that transcode unit. /// * [`MAX_VIDEO_CHUNK_FRAMES`] — keeps the chunk's `VideoChunkReady` /// announcement within one `COMMANDS_MAX_PAYLOAD_BYTES` sample. Small frames /// never reach the byte threshold mid-recording, so without the frame cap a /// long recording accumulates one ever-growing chunk whose per-frame /// timestamp vectors eventually overflow the commands slice — the /// announcement then fails to publish and the recording's video is lost. -fn should_flush_chunk(chunk_bytes: u64, frame_count: u32) -> bool { - chunk_bytes >= CHUNK_FLUSH_BYTES || frame_count >= MAX_VIDEO_CHUNK_FRAMES +fn should_flush_chunk(logical_chunk_bytes: u64, frame_count: u32) -> bool { + logical_chunk_bytes >= CHUNK_FLUSH_BYTES || frame_count >= MAX_VIDEO_CHUNK_FRAMES } /// Append one frame to the `(source, sensor)` in-progress NUT chunk, opening @@ -836,7 +859,9 @@ fn append_frame_locked( } } - let bytes_after_write = { + // Roll on decoded-equivalent volume, not the on-disk byte count — see + // [`should_flush_chunk`] for why. + let logical_bytes_after_write = { let writer = state.nut_writer.as_mut().expect("opened immediately above"); if let Err(error) = writer.write_frame(pts, payload) { tracing::warn!( @@ -846,14 +871,14 @@ fn append_frame_locked( ); return announcements; } - writer.bytes_written() + writer.logical_bytes() }; state.last_pts_us = Some(pts); state.frame_count = state.frame_count.saturating_add(1); state.frame_timestamps_ns.push(timestamp_ns); state.frame_timestamps_s.push(timestamp_s); - if should_flush_chunk(bytes_after_write, state.frame_count) { + if should_flush_chunk(logical_bytes_after_write, state.frame_count) { if let Some(envelope) = flush_chunk_locked(robot_id, robot_instance, data_type, sensor_name, state) { @@ -1058,6 +1083,113 @@ mod tests { assert_eq!(state.frame_count, 1, "the new chunk holds the 4x4 frame"); } + #[test] + fn flush_seals_chunk_with_populated_announcement_and_resets_state() { + // The normal seal path the daemon routes on (size/frame-cap roll or the + // stop barrier). The announcement must carry every frame's identity in + // order, and the state must reset so the next frame opens a fresh chunk + // rather than re-announcing these frames. + let dir = tempfile::tempdir().unwrap(); + let mut state = fresh_state(dir.path().to_path_buf(), 2, 2); + let frame = vec![0u8; 2 * 2 * 3]; + + for (timestamp_ns, timestamp_s) in [(1_000, 0.0), (2_000, 0.001), (3_000, 0.002)] { + let announcements = append_frame_locked( + &mut state, + "r", + 0, + "RGB", + "cam", + 2, + 2, + &frame, + timestamp_ns, + timestamp_s, + ); + assert!( + announcements.is_empty(), + "frames below both bounds accumulate without sealing" + ); + } + assert_eq!(state.frame_count, 3); + + let envelope = flush_chunk_locked("r", 0, "RGB", "cam", &mut state) + .expect("an open chunk seals into an announcement"); + match envelope { + Envelope::VideoChunkReady { + sensor_name, + width, + height, + frame_count, + byte_count, + frame_timestamps_ns, + frame_timestamps_s, + .. + } => { + assert_eq!(sensor_name.as_deref(), Some("cam")); + assert_eq!((width, height), (2, 2)); + assert_eq!(frame_count, 3); + assert!(byte_count > 0, "a sealed chunk has a non-zero NUT file"); + assert_eq!(frame_timestamps_ns, vec![1_000, 2_000, 3_000]); + assert_eq!(frame_timestamps_s, vec![0.0, 0.001, 0.002]); + } + other => panic!("expected VideoChunkReady, got {other:?}"), + } + + // The seal takes the writer and clears the counters / per-frame vectors, + // so the next frame opens a brand-new chunk. + assert!(state.nut_writer.is_none()); + assert_eq!(state.frame_count, 0); + assert!(state.frame_timestamps_ns.is_empty()); + assert!(state.frame_timestamps_s.is_empty()); + } + + #[test] + fn flush_without_an_open_chunk_announces_nothing() { + // No frames written since the last seal → nothing to announce. Keeps the + // stop barrier's empty-source case from emitting a bogus zero-frame + // chunk. + let dir = tempfile::tempdir().unwrap(); + let mut state = fresh_state(dir.path().to_path_buf(), 2, 2); + assert!(flush_chunk_locked("r", 0, "RGB", "cam", &mut state).is_none()); + } + + #[test] + fn non_monotonic_timestamps_do_not_drop_frames() { + // Capture timestamps can repeat or go backwards (clock coalescing, + // batched logging). Every frame must still be recorded: the writer bumps + // each PTS to stay strictly increasing rather than dropping the + // colliding frames. + let dir = tempfile::tempdir().unwrap(); + let mut state = fresh_state(dir.path().to_path_buf(), 2, 2); + let frame = vec![0u8; 2 * 2 * 3]; + + // Three frames at the same instant, then one that goes backwards. + for timestamp_ns in [5_000, 5_000, 5_000, 4_000] { + let _ = append_frame_locked( + &mut state, + "r", + 0, + "RGB", + "cam", + 2, + 2, + &frame, + timestamp_ns, + 0.0, + ); + } + assert_eq!( + state.frame_count, 4, + "every frame is written despite non-monotonic capture timestamps" + ); + + match flush_chunk_locked("r", 0, "RGB", "cam", &mut state).expect("seal") { + Envelope::VideoChunkReady { frame_count, .. } => assert_eq!(frame_count, 4), + other => panic!("expected VideoChunkReady, got {other:?}"), + } + } + /// A minimal video-frame message carrying `bytes` of payload. fn frame_msg(bytes: usize) -> WriterMsg { WriterMsg::Frame(FrameJob { diff --git a/rust/data_daemon_shared/src/config/env.rs b/rust/data_daemon_shared/src/config/env.rs index 9c2da19a..4788d3e5 100644 --- a/rust/data_daemon_shared/src/config/env.rs +++ b/rust/data_daemon_shared/src/config/env.rs @@ -218,9 +218,9 @@ impl RuntimeEnv { db_path: db_path(), recordings_root: recordings_root_path(), profile: active_profile_name(), - // Mirrors `helpers.py::is_debug_mode`. debug: env_var("NDD_DEBUG") - .map(|value| value.to_lowercase() == "true") + .as_deref() + .map(is_truthy) .unwrap_or(false), api_url: env_var("NEURACORE_API_URL").unwrap_or_else(|| DEFAULT_API_URL.to_string()), } @@ -251,4 +251,131 @@ mod tests { assert!(parse_bytes("12tb").is_err()); assert!(parse_bytes("1 g").is_err()); } + + #[test] + fn is_truthy_recognises_only_yes_values() { + for truthy in ["1", "true", "TRUE", "yes", "Yes", "y", "Y"] { + assert!(is_truthy(truthy), "{truthy:?} should be truthy"); + } + for falsy in ["0", "false", "no", "n", "", "enabled", "2"] { + assert!(!is_truthy(falsy), "{falsy:?} should be falsy"); + } + } + + #[test] + fn deserialize_optional_bytes_accepts_int_unit_or_null() { + #[derive(Deserialize)] + struct Holder { + #[serde(default, deserialize_with = "deserialize_optional_bytes")] + limit: Option, + } + let limit_of = |json: &str| serde_json::from_str::(json).map(|holder| holder.limit); + + assert_eq!(limit_of(r#"{"limit": 1024}"#).unwrap(), Some(1024)); + assert_eq!( + limit_of(r#"{"limit": "1G"}"#).unwrap(), + Some(1024 * 1024 * 1024) + ); + assert_eq!(limit_of(r#"{"limit": null}"#).unwrap(), None); + assert_eq!(limit_of(r#"{}"#).unwrap(), None); + assert!( + limit_of(r#"{"limit": "not-a-size"}"#).is_err(), + "a malformed unit string surfaces as a deserialization error" + ); + } + + /// Every variable the env layer reads. Listed so the matrix test can + /// save/restore them all (and so a future field is added here too). + const MANAGED_VARS: &[&str] = &[ + "NCD_STORAGE_LIMIT", + "NCD_BANDWIDTH_LIMIT", + "NCD_SPOOL_LIMIT", + "NCD_PATH_TO_STORE_RECORD", + "NCD_NUM_THREADS", + "NCD_KEEP_WAKELOCK_WHILE_UPLOAD", + "NCD_OFFLINE", + "NCD_API_KEY", + "NCD_CURRENT_ORG_ID", + "NEURACORE_DAEMON_PROFILE", + ]; + + #[test] + fn env_config_overrides_reads_ncd_vars() { + // Mutates process-wide env, so — per this crate's convention (see + // `paths.rs::resolution_precedence`) — a single test drives the whole + // matrix and saves/restores every variable it touches. + let saved: Vec<(&str, Option)> = MANAGED_VARS + .iter() + .map(|name| (*name, std::env::var_os(name))) + .collect(); + + // 1) Every override present and well-formed → every field populated. + std::env::set_var("NCD_STORAGE_LIMIT", "2G"); + std::env::set_var("NCD_BANDWIDTH_LIMIT", "10mb"); + std::env::set_var("NCD_SPOOL_LIMIT", "512"); + std::env::set_var("NCD_PATH_TO_STORE_RECORD", "/srv/recordings"); + std::env::set_var("NCD_NUM_THREADS", "4"); + std::env::set_var("NCD_KEEP_WAKELOCK_WHILE_UPLOAD", "yes"); + std::env::set_var("NCD_OFFLINE", "1"); + std::env::set_var("NCD_API_KEY", "secret-key"); + std::env::set_var("NCD_CURRENT_ORG_ID", "org-42"); + std::env::set_var("NEURACORE_DAEMON_PROFILE", "lab"); + + let config = env_config_overrides(); + assert_eq!(config.storage_limit, Some(2 * 1024 * 1024 * 1024)); + assert_eq!(config.bandwidth_limit, Some(10 * 1024 * 1024)); + assert_eq!(config.spool_limit, Some(512)); + assert_eq!( + config.path_to_store_record.as_deref(), + Some("/srv/recordings") + ); + assert_eq!(config.num_threads, Some(4)); + assert_eq!(config.keep_wakelock_while_upload, Some(true)); + assert_eq!(config.offline, Some(true)); + assert_eq!(config.api_key.as_deref(), Some("secret-key")); + assert_eq!(config.current_org_id.as_deref(), Some("org-42")); + assert_eq!(active_profile_name().as_deref(), Some("lab")); + + // 2) An empty string is treated as unset; an unparseable numeric is + // skipped (not fatal) and leaves its field unset. + std::env::set_var("NCD_API_KEY", ""); + std::env::set_var("NCD_OFFLINE", ""); + std::env::set_var("NCD_STORAGE_LIMIT", "not-a-size"); + std::env::set_var("NCD_NUM_THREADS", "twelve"); + let config = env_config_overrides(); + assert_eq!(config.api_key, None, "empty string is treated as unset"); + assert_eq!(config.offline, None, "empty string is treated as unset"); + assert_eq!( + config.storage_limit, None, + "an unparseable size is skipped, not fatal" + ); + assert_eq!( + config.num_threads, None, + "an unparseable thread count is skipped" + ); + + // 3) Nothing set → an all-default (empty) override layer. + for name in MANAGED_VARS { + std::env::remove_var(name); + } + let config = env_config_overrides(); + assert_eq!(config.storage_limit, None); + assert_eq!(config.bandwidth_limit, None); + assert_eq!(config.spool_limit, None); + assert_eq!(config.path_to_store_record, None); + assert_eq!(config.num_threads, None); + assert_eq!(config.keep_wakelock_while_upload, None); + assert_eq!(config.offline, None); + assert_eq!(config.api_key, None); + assert_eq!(config.current_org_id, None); + assert_eq!(active_profile_name(), None); + + // Restore the pre-test environment for other tests in this binary. + for (name, value) in saved { + match value { + Some(value) => std::env::set_var(name, value), + None => std::env::remove_var(name), + } + } + } }