Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/rust_data_daemon_development.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ flowchart LR
DISP --> ACT["per-trace actors"]
ACT -->|fire-and-forget| TW["trace_writer<br/>batched DB write-behind"]
ACT -->|fire-and-forget| JW["json_writer (IO thread)"]
ACT -->|spawn_blocking| FF["ffmpeg chunk encode"]
ACT -->|async subprocess| FF["ffmpeg chunk encode"]
TW --> DB[("SQLite WAL")]
LIS -->|recording-id queries| DB
subgraph CLOUD["cloud coordinators"]
Expand Down Expand Up @@ -99,7 +99,7 @@ CI uses `stable` (via `dtolnay/rust-toolchain@stable`), so any recent stable too

### System dependencies

- **ffmpeg + ffprobe** — required by the video-encoder subprocess and the `encoding::video_encoder` / `encoding::nut_writer` test suites (tests that need ffmpeg self-skip if it's missing, but the daemon itself depends on it at runtime):
- **ffmpeg + ffprobe** — required by the video-encoder subprocess and the daemon's `encoding::video_encoder` and the producer's `data_daemon_producer::nut_writer` test suites (tests that need ffmpeg self-skip if it's missing, but the daemon itself depends on it at runtime):

```bash
sudo apt-get update && sudo apt-get install -y ffmpeg
Expand Down Expand Up @@ -144,7 +144,7 @@ cargo test -p data_daemon_shared

# A specific module or test name (partial match)
cargo test -p data-daemon pipeline::dispatcher
cargo test -p data-daemon encoding::metadata::fixture_matches_python_video_trace_output
cargo test -p data-daemon encoding::metadata::fixture_matches_expected_video_trace_output
```

Tests that shell out to `ffmpeg` / `ffprobe` self-skip on hosts without those binaries — install them (see above) to exercise the full encoding suite.
Expand Down
13 changes: 13 additions & 0 deletions neuracore-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ embs
Emika
ENOENT
enoexec
ENOTDIR
EPERM
EPIPE
eprintln
erfinv
errno
ESRCH
Expand Down Expand Up @@ -189,12 +191,16 @@ gptj
GPTJ
groot
Groot
hdlc
hookwrapper
hparams
hstack
huggingface
hyperparameters
iceoryx
idat
iend
ihdr
iiwa
imageio
imagenet
Expand Down Expand Up @@ -262,14 +268,17 @@ metadatas
metafunc
metas
mimsave
miniz
Mish
mjcf
MJFC
mline
mocap
moov
movflags
mpng
mpsa
mpsc
Mujoco
mujocoinclude
multihead
Expand Down Expand Up @@ -394,6 +403,8 @@ rtype
rustls
rustup
Safetensors
scanline
scanlines
schematypens
SCTP
sdecode
Expand Down Expand Up @@ -427,6 +438,7 @@ synchronizable
syncpoint
syncpoints
targetbody
tdefl
TDVB
teleop
temb
Expand All @@ -450,6 +462,7 @@ torq
tqdm
traj
triu
truecolour
trunc
tryfirst
TTYNTK
Expand Down
16 changes: 16 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions rust/data_daemon/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,71 @@ mod tests {
assert_eq!(calls.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn exhausts_retry_budget_then_surfaces_status() {
// A persistently failing retryable status must stop after the budget
// (`max_retries` attempts total) and surface the error rather than
// retrying forever. `expect(3)` pins the bounded attempt count.
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/org/org-1/recording/traces/batch-register"))
.respond_with(ResponseTemplate::new(503))
.expect(3)
.mount(&server)
.await;

let client = client(&server);
let error = client.batch_register("org-1", &[]).await.unwrap_err();
match error {
ApiClientError::Status { status, .. } => {
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
}
other => panic!("unexpected error: {other:?}"),
}
}

#[tokio::test]
async fn reloads_auth_at_most_once_on_repeated_401() {
// A 401 triggers exactly one token reload; a second 401 surfaces as an
// error instead of looping forever on reload + retry.
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/org/org-1/recording/traces/batch-register"))
.respond_with(ResponseTemplate::new(401))
.mount(&server)
.await;

let calls = Arc::new(AtomicUsize::new(0));
struct CountingProvider {
calls: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl AuthProvider for CountingProvider {
async fn bearer_token(&self) -> Result<String, AuthError> {
Ok("token".to_string())
}
async fn reload(&self) -> Result<(), AuthError> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
let auth = Arc::new(CountingProvider {
calls: Arc::clone(&calls),
});
let client = ApiClient::new(options(server.uri()), auth).unwrap();

let error = client.batch_register("org-1", &[]).await.unwrap_err();
assert!(
matches!(error, ApiClientError::Status { status, .. } if status == StatusCode::UNAUTHORIZED),
"a repeated 401 surfaces as an error, not an infinite reload loop"
);
assert_eq!(
calls.load(Ordering::SeqCst),
1,
"auth is reloaded exactly once, not on every 401"
);
}

#[tokio::test]
async fn non_retryable_status_surfaces_error() {
let server = MockServer::start().await;
Expand Down
171 changes: 171 additions & 0 deletions rust/data_daemon/src/cloud/coordinators/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,175 @@ mod tests {
ProgressReportStatus::Reported
));
}

#[tokio::test]
async fn sweep_skips_when_org_id_unset() {
// No current org (e.g. not logged in / no org selected): the sweep must
// skip the recording without POSTing anything, leaving it pending.
let server = MockServer::start().await;
Mock::given(method("PUT"))
.and(path("/org/org-1/recording/rec-1/expected-trace-count"))
.respond_with(ResponseTemplate::new(200))
.expect(0)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/org/org-1/recording/rec-1/traces-metadata"))
.respond_with(ResponseTemplate::new(200))
.expect(0)
.mount(&server)
.await;

let (store, _dir) = open_store().await;
let recording_index = seed_recording(&store, "rec-1").await;
store
.create_trace(recording_index, "t-1", Some("JOINT_POSITIONS"), None)
.await
.unwrap();
store
.update_trace(
"t-1",
TraceUpdate {
write_status: Some(TraceWriteStatus::Written),
total_bytes: Some(5),
..TraceUpdate::default()
},
)
.await
.unwrap();
store
.mark_recording_stopped(recording_index, 0)
.await
.unwrap();

sweep_once(&Arc::new(store.clone()), &client(&server), &org_rx(None)).await;

let recording = store.get_recording(recording_index).await.unwrap().unwrap();
assert_eq!(
recording.expected_trace_count_reported, 0,
"no org → nothing reported"
);
assert!(matches!(
recording.progress_reported,
ProgressReportStatus::Pending
));
}

#[tokio::test]
async fn progress_post_failure_rolls_back_to_pending() {
// The expected-count PUT succeeds but the traces-metadata POST fails:
// the recording's progress status must roll Reporting → Pending so the
// next tick retries, never wedging in the transient Reporting state.
let server = MockServer::start().await;
Mock::given(method("PUT"))
.and(path("/org/org-1/recording/rec-1/expected-trace-count"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/org/org-1/recording/rec-1/traces-metadata"))
.respond_with(ResponseTemplate::new(500))
.mount(&server)
.await;

let (store, _dir) = open_store().await;
let recording_index = seed_recording(&store, "rec-1").await;
for (trace_id, bytes) in [("t-1", 10), ("t-2", 20)] {
store
.create_trace(recording_index, trace_id, Some("JOINT_POSITIONS"), None)
.await
.unwrap();
store
.update_trace(
trace_id,
TraceUpdate {
write_status: Some(TraceWriteStatus::Written),
total_bytes: Some(bytes),
..TraceUpdate::default()
},
)
.await
.unwrap();
}
store
.mark_recording_stopped(recording_index, 0)
.await
.unwrap();

sweep_once(
&Arc::new(store.clone()),
&client(&server),
&org_rx(Some("org-1")),
)
.await;

let recording = store.get_recording(recording_index).await.unwrap().unwrap();
assert_eq!(
recording.expected_trace_count_reported, 2,
"the expected-count PUT succeeded"
);
assert!(
matches!(recording.progress_reported, ProgressReportStatus::Pending),
"a failed progress POST rolls Reporting back to Pending for retry"
);
}

#[tokio::test]
async fn expected_count_put_failure_leaves_it_unreported() {
// The count is persisted locally first, then PUT to the backend. If the
// PUT fails it must NOT be marked reported (the next tick re-sends it),
// but the local count is retained.
let server = MockServer::start().await;
Mock::given(method("PUT"))
.and(path("/org/org-1/recording/rec-1/expected-trace-count"))
.respond_with(ResponseTemplate::new(500))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/org/org-1/recording/rec-1/traces-metadata"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;

let (store, _dir) = open_store().await;
let recording_index = seed_recording(&store, "rec-1").await;
store
.create_trace(recording_index, "t-1", Some("JOINT_POSITIONS"), None)
.await
.unwrap();
store
.update_trace(
"t-1",
TraceUpdate {
write_status: Some(TraceWriteStatus::Written),
total_bytes: Some(7),
..TraceUpdate::default()
},
)
.await
.unwrap();
store
.mark_recording_stopped(recording_index, 0)
.await
.unwrap();

sweep_once(
&Arc::new(store.clone()),
&client(&server),
&org_rx(Some("org-1")),
)
.await;

let recording = store.get_recording(recording_index).await.unwrap().unwrap();
assert_eq!(
recording.expected_trace_count,
Some(1),
"the count is persisted locally first"
);
assert_eq!(
recording.expected_trace_count_reported, 0,
"a failed PUT does not mark the count reported"
);
}
}
Loading
Loading