diff --git a/neuracore-dictionary.txt b/neuracore-dictionary.txt index bd433f9e3..5d39d87ae 100644 --- a/neuracore-dictionary.txt +++ b/neuracore-dictionary.txt @@ -242,6 +242,7 @@ openarm openarm_description openpi OPENPI +overfit optim osmesa outc diff --git a/tests/integration/ml/shared/constants.py b/tests/integration/ml/shared/constants.py deleted file mode 100644 index 12f79e33b..000000000 --- a/tests/integration/ml/shared/constants.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Shared robot / embodiment / hardware constants for ML integration tests. - -Single source of truth for the bimanual-VX300s embodiment and the GPU/frequency -settings reused across the ml integration suite (training flow, inference, -resume, back-to-back, sync failure). Kept here so the tests share one definition -rather than each re-declaring it. -""" - -import os -import sys - -from neuracore_types import DataType, EmbodimentDescription - -_THIS_DIR = os.path.dirname(os.path.abspath(__file__)) -_EXAMPLES_DIR = os.path.join(_THIS_DIR, "..", "..", "..", "..", "examples") -if _EXAMPLES_DIR not in sys.path: - sys.path.append(_EXAMPLES_DIR) - -# ruff: noqa: E402 -from common.base_env import BimanualViperXTask - -NC_CAM_NAME = "rgb_angle" -MJ_CAM_NAME = "angle" -JOINT_NAMES = ( - BimanualViperXTask.LEFT_ARM_JOINT_NAMES + BimanualViperXTask.RIGHT_ARM_JOINT_NAMES -) -GRIPPER_NAMES = ["left_gripper", "right_gripper"] -POSE_SENSOR_NAME = "tcp" -LANGUAGE_LABEL = "instruction" - - -def _indexed_names(names: list[str] | tuple[str, ...]) -> dict[int, str]: - return {index: name for index, name in enumerate(names)} - - -# Training/Inference robot (VX300s) embodiment descriptions -INPUT_EMBODIMENT_DESCRIPTION: EmbodimentDescription = { - DataType.RGB_IMAGES: {0: NC_CAM_NAME}, - DataType.JOINT_POSITIONS: _indexed_names(names=JOINT_NAMES), - DataType.LANGUAGE: {0: LANGUAGE_LABEL}, - DataType.PARALLEL_GRIPPER_OPEN_AMOUNTS: _indexed_names(names=GRIPPER_NAMES), -} -OUTPUT_EMBODIMENT_DESCRIPTION: EmbodimentDescription = { - DataType.JOINT_TARGET_POSITIONS: _indexed_names(names=JOINT_NAMES), - DataType.PARALLEL_GRIPPER_TARGET_OPEN_AMOUNTS: _indexed_names(names=GRIPPER_NAMES), -} - -INPUT_DATA_TYPES = list(INPUT_EMBODIMENT_DESCRIPTION.keys()) -OUTPUT_DATA_TYPES = list(OUTPUT_EMBODIMENT_DESCRIPTION.keys()) - -ROBOT_NAME = "integration_test_robot" -GPU_TYPE = "NVIDIA_TESLA_V100" -NUM_GPUS = 1 -FREQUENCY = 20 diff --git a/tests/integration/ml/shared/dataset.py b/tests/integration/ml/shared/dataset.py new file mode 100644 index 000000000..64765da63 --- /dev/null +++ b/tests/integration/ml/shared/dataset.py @@ -0,0 +1,408 @@ +"""Dataset collection, mutation, and verification helpers for ML integration tests.""" + +import hashlib +import json +import logging +import os +import sys +import time + +import numpy as np +from neuracore_types import Dataset as DatasetModel +from neuracore_types import DataType + +import neuracore as nc +from neuracore.core.auth import get_auth +from neuracore.core.const import API_URL +from neuracore.core.data.dataset import SYNC_PROGRESS_POLL_INTERVAL_S, Dataset +from neuracore.core.data.recording import Recording +from neuracore.core.data.synced_dataset import SynchronizedDataset +from neuracore.core.utils.embodiment_description_utils import ( + merge_cross_embodiment_description, +) +from neuracore.core.utils.http_session import thread_local_session + +_THIS_DIR = os.path.dirname(os.path.abspath(__file__)) +_EXAMPLES_DIR = os.path.join(_THIS_DIR, "..", "..", "..", "..", "examples") +if _EXAMPLES_DIR not in sys.path: + sys.path.append(_EXAMPLES_DIR) + +# ruff: noqa: E402 +from common.rollout_utils import rollout_policy +from common.transfer_cube import BIMANUAL_VIPERX_URDF_PATH + +from tests.integration.platform.data_daemon.shared.assertions import ( + assert_exactly_one_daemon_pid, +) +from tests.integration.platform.data_daemon.shared.db_helpers import ( + wait_for_dataset_ready, + wait_for_recordings_finalized, +) +from tests.integration.platform.data_daemon.shared.runners import online_daemon_running + +logger = logging.getLogger(__name__) + +RECORDING_POLL_INTERVAL_S = 5 + + +def wait_for_dataset_recording_count( + dataset_name: str, + expected_recordings: int, + timeout_seconds: int = 120, + poll_seconds: int = 5, +) -> Dataset: + deadline = time.time() + timeout_seconds + last_count = None + last_error = None + + while time.time() < deadline: + try: + dataset = nc.get_dataset(name=dataset_name) + last_count = len(dataset) + if last_count == expected_recordings: + return dataset + last_error = None + except Exception as e: + last_error = e + + time.sleep(poll_seconds) + + if last_error is not None: + raise AssertionError( + f"Dataset {dataset_name!r} did not become queryable within " + f"{timeout_seconds} seconds; last error: {last_error}" + ) + raise AssertionError( + f"Dataset {dataset_name!r} had {last_count} recordings after " + f"{timeout_seconds} seconds; expected {expected_recordings}" + ) + + +def collect_demo_data( + robot_name: str, + dataset_name: str, + *, + joint_names: tuple[str, ...] | list[str], + gripper_names: list[str], + language_label: str, + nc_cam_name: str, + pose_sensor_name: str, + num_episodes: int = 3, + instance_id: int = 0, + episode_length_multiplier: int = 1, + num_cameras: int = 1, + frequency: float = 20, + timestamp_jitter_frac: float = 0.05, +) -> Dataset: + """Collect scripted demonstrations and log them to neuracore. + + Use different instances for different tests since they are run in parallel. + Increase episode_length_multiplier to inflate episode length by repeating + the rollout trajectory steps. + Increase num_cameras to log multiple RGB streams per timestep. + """ + assert ( + episode_length_multiplier >= 1 + ), f"episode_length_multiplier must be >= 1, got {episode_length_multiplier}" + assert num_cameras >= 1, f"num_cameras must be >= 1, got {num_cameras}" + + with online_daemon_running(): + assert_exactly_one_daemon_pid() + nc.connect_robot( + robot_name=robot_name, + instance=instance_id, + urdf_path=str(BIMANUAL_VIPERX_URDF_PATH), + overwrite=False, + ) + dataset = nc.create_dataset(name=dataset_name) + for ep_idx in range(num_episodes): + logger.info(f"Collecting episode {ep_idx + 1}/{num_episodes}") + action_traj = rollout_policy() + expanded_action_traj = [ + action_dict + for action_dict in action_traj + for _ in range(episode_length_multiplier) + ] + nc.start_recording(robot_name=robot_name, instance=instance_id) + t = time.time() + timestamp_rng = np.random.default_rng(ep_idx) + for frame_idx, action_dict in enumerate(expanded_action_traj): + dt = 1.0 / frequency + t += dt * float( + timestamp_rng.uniform( + 1.0 - timestamp_jitter_frac, 1.0 + timestamp_jitter_frac + ) + ) + joint_positions = { + k: v for k, v in action_dict.items() if "gripper" not in k + } + joint_torques = { + name: float(0.01 * ((index + frame_idx) % 5)) + for index, name in enumerate(joint_names) + } + joint_velocities = { + name: float(0.05 * ((index + frame_idx) % 7)) + for index, name in enumerate(joint_names) + } + gripper_open_amounts = { + name: float(0.25 + 0.5 * ((frame_idx % 2) == 0)) + for name in gripper_names + } + pose = np.array([0.1 + frame_idx * 0.001, 0.2, 0.3, 0.0, 0.0, 0.0, 1.0]) + img = np.zeros((84, 84, 3), dtype=np.uint8) + img.fill(50 + frame_idx % 200) + + nc.log_joint_positions( + positions=joint_positions, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_joint_target_positions( + target_positions=joint_positions, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_joint_velocities( + velocities=joint_velocities, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_joint_torques( + torques=joint_torques, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_parallel_gripper_open_amounts( + values=gripper_open_amounts, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_parallel_gripper_target_open_amounts( + values=gripper_open_amounts, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_pose( + name=pose_sensor_name, + pose=pose, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_language( + name=language_label, + language="pick and place", + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_rgb( + name=nc_cam_name, + rgb=img, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.stop_recording(wait=True, robot_name=robot_name, instance=instance_id) + wait_for_dataset_ready( + dataset_name, + expected_recording_count=ep_idx + 1, + timeout_s=500, + poll_interval_s=5, + ) + logger.info( + f"Episode {ep_idx + 1} recorded ({len(expanded_action_traj)} frames)" + ) + return dataset + + +def delete_recording_from_dataset(dataset: Dataset, recording: Recording) -> None: + """Remove a recording from a dataset via the platform API.""" + session = thread_local_session() + response = session.delete( + f"{API_URL}/org/{dataset.org_id}/datasets/{dataset.id}/recording/{recording.id}", + headers=get_auth().get_headers(), + ) + response.raise_for_status() + + +def fetch_dataset_model(dataset: Dataset) -> DatasetModel: + """Fetch full dataset metadata including num_demonstrations and data types.""" + session = thread_local_session() + response = session.get( + f"{API_URL}/org/{dataset.org_id}/datasets/{dataset.id}", + headers=get_auth().get_headers(), + ) + response.raise_for_status() + return DatasetModel.model_validate(response.json()) + + +def assert_active_recordings( + dataset: Dataset, + *, + expected_count: int, + expected_types: set[DataType], + tracked_ids: set[str] | None = None, +) -> set[str]: + """Assert recording count, datatype presence, and optional ID set equality.""" + assert ( + len(dataset) == expected_count + ), f"Expected {expected_count} recordings, got {len(dataset)}" + active_ids: set[str] = set() + for recording in dataset: + active_ids.add(str(recording.id)) + missing = expected_types - recording.data_types + assert not missing, ( + f"Recording {recording.id} missing datatypes " + f"{sorted(dt.value for dt in missing)}; " + f"has {sorted(dt.value for dt in recording.data_types)}" + ) + if tracked_ids is not None: + assert active_ids == tracked_ids, ( + f"Active recording IDs mismatch.\n" + f"Expected: {sorted(tracked_ids)}\n" + f"Got: {sorted(active_ids)}" + ) + return active_ids + + +def assert_dataset_metadata( + dataset: Dataset, + *, + expected_count: int, + expected_common_types: set[DataType], +) -> DatasetModel: + """Assert dataset metadata reports the expected recording count and types.""" + model = fetch_dataset_model(dataset) + assert model.num_demonstrations == expected_count, ( + f"Metadata num_demonstrations={model.num_demonstrations}, " + f"expected {expected_count}" + ) + common = set(model.common_data_types.keys()) + missing = expected_common_types - common + assert not missing, ( + f"common_data_types missing {sorted(dt.value for dt in missing)}; " + f"has {sorted(dt.value for dt in common)}" + ) + return model + + +def statistics_fingerprint(stats) -> str: + """Stable hash of synchronized dataset statistics for refresh checks.""" + payload = stats.model_dump(mode="json") + encoded = json.dumps(payload, sort_keys=True, default=str) + return hashlib.sha256(encoded.encode()).hexdigest() + + +def assert_synced_statistics( + dataset: Dataset, + input_desc: dict, + output_desc: dict, + *, + expected_count: int, + frequency: int = 20, + finalize_timeout_seconds: float = 300, + sync_timeout_seconds: float = 600, + log_prefix: str | None = None, +) -> tuple[SynchronizedDataset, str]: + """Synchronize dataset, calculate statistics, and assert consistency.""" + prefix = log_prefix or f"[{dataset.name}]" + recording_ids = {str(recording.id) for recording in dataset} + logger.info( + f"{prefix} Waiting for {len(recording_ids)} recordings to finalize " + f"(timeout={finalize_timeout_seconds}s)" + ) + wait_for_recordings_finalized( + dataset.name, + recording_ids=recording_ids, + timeout_s=finalize_timeout_seconds, + poll_interval_s=RECORDING_POLL_INTERVAL_S, + ) + logger.info(f"{prefix} All recordings finalized") + + cross_embodiment_union = merge_cross_embodiment_description(input_desc, output_desc) + logger.info(f"{prefix} Starting dataset synchronization at {frequency} Hz") + synced_model = dataset._synchronize( + frequency=frequency, + cross_embodiment_union=cross_embodiment_union, + ) + total = synced_model.num_demonstrations + deadline = time.time() + sync_timeout_seconds + progress = dataset._get_synchronization_progress(synced_model.id) + processed = progress.num_synchronized_demonstrations + logger.info( + f"{prefix} Synchronization progress: {processed}/{total} " + f"(timeout={sync_timeout_seconds}s)" + ) + + while processed < total: + if time.time() >= deadline: + raise AssertionError( + f"Synchronization timed out after {sync_timeout_seconds}s for " + f"dataset {dataset.name!r}: {processed}/{total} recordings synced" + ) + time.sleep(SYNC_PROGRESS_POLL_INTERVAL_S) + progress = dataset._get_synchronization_progress(synced_model.id) + new_processed = progress.num_synchronized_demonstrations + if new_processed != processed: + processed = new_processed + logger.info(f"{prefix} Synchronization progress: {processed}/{total}") + + assert ( + not progress.has_failures + ), f"Synchronization failures: {progress.failed_recording_ids}" + assert progress.num_synchronized_demonstrations == expected_count, ( + f"Synchronized {progress.num_synchronized_demonstrations} recordings, " + f"expected {expected_count}" + ) + + logger.info(f"{prefix} Synchronization complete ({processed}/{total})") + + synced = SynchronizedDataset( + id=synced_model.id, + dataset=dataset, + frequency=frequency, + cross_embodiment_union=cross_embodiment_union, + ) + assert len(synced) == expected_count, ( + f"Synchronized dataset has {len(synced)} recordings, " + f"expected {expected_count}" + ) + + logger.info(f"{prefix} Calculating dataset statistics") + stats = synced.calculate_statistics( + input_cross_embodiment_description=input_desc, + output_cross_embodiment_description=output_desc, + ) + assert stats.dataset_statistics, "Expected non-empty dataset_statistics" + + role_keys = {"input", "output"} + actual_keys = set(stats.dataset_statistics.keys()) + assert role_keys <= actual_keys, ( + f"Expected dataset_statistics keys {sorted(role_keys)}, " + f"got {sorted(actual_keys)}" + ) + + expected_input_types = { + data_type for robot_desc in input_desc.values() for data_type in robot_desc + } + expected_output_types = { + data_type for robot_desc in output_desc.values() for data_type in robot_desc + } + + input_stats = stats.dataset_statistics["input"] + output_stats = stats.dataset_statistics["output"] + + for data_type in expected_input_types: + assert data_type in input_stats, f"Input stats missing {data_type.value}" + for data_type in expected_output_types: + assert data_type in output_stats, f"Output stats missing {data_type.value}" + + logger.info(f"{prefix} Dataset statistics calculated and validated") + return synced, statistics_fingerprint(stats) diff --git a/tests/integration/ml/shared/training.py b/tests/integration/ml/shared/training.py index 60b61468c..7df862646 100644 --- a/tests/integration/ml/shared/training.py +++ b/tests/integration/ml/shared/training.py @@ -2,11 +2,20 @@ import logging import time +from typing import TYPE_CHECKING -from neuracore_types import DataType +from neuracore_types import DataType, Metrics import neuracore as nc +from neuracore.core.auth import get_auth +from neuracore.core.config.get_current_org import get_current_org +from neuracore.core.const import API_URL from neuracore.core.data.dataset import Dataset +from neuracore.core.utils.http_session import thread_local_session + +if TYPE_CHECKING: + from neuracore.core.data.synced_dataset import SynchronizedDataset + from neuracore.core.endpoint import Policy logger = logging.getLogger(__name__) @@ -53,6 +62,56 @@ def wait_for_training( time.sleep(poll_seconds) +def wait_for_training_running_duration( + job_id: str, + running_minutes: float, + timeout_minutes: int = 120, + poll_seconds: int = 20, +) -> str: + """Block until *job_id* has been RUNNING for at least *running_minutes*. + + Returns early if the job reaches a terminal state. This is used to give a + training job enough time to snapshot/prepare its dataset before a later + test step mutates the underlying dataset, so the in-flight run is not + affected by the mutation. Returns the last observed status. + """ + deadline = time.time() + timeout_minutes * 60 + running_since: float | None = None + while True: + status = nc.get_training_job_status(job_id=job_id) + now = time.time() + if status in TERMINAL_STATES: + logger.info( + f"Training job {job_id} reached terminal status {status} before " + f"running for {running_minutes} minutes" + ) + return status + if status == "RUNNING": + if running_since is None: + running_since = now + logger.info( + f"Training job {job_id} is RUNNING; waiting {running_minutes} " + f"minutes before proceeding" + ) + elapsed_running_minutes = (now - running_since) / 60 + if elapsed_running_minutes >= running_minutes: + logger.info( + f"Training job {job_id} has been RUNNING for " + f"{elapsed_running_minutes:.1f} minutes" + ) + return status + else: + running_since = None + logger.info(f"Training job {job_id}: status={status} (waiting for RUNNING)") + if now >= deadline: + cancel_incomplete_training_jobs([job_id]) + assert False, ( + f"Training job {job_id} did not run for {running_minutes} minutes " + f"within {timeout_minutes} minutes" + ) + time.sleep(poll_seconds) + + def wait_for_all_training( job_ids: list[str], timeout_minutes: int = 120, @@ -125,6 +184,101 @@ def assert_no_training_log_errors( ) +def get_training_job_metrics(job_id: str) -> Metrics: + """Fetch training metrics for a completed or in-progress job.""" + org_id = get_current_org() + session = thread_local_session() + response = session.get( + f"{API_URL}/org/{org_id}/training/jobs/{job_id}/metrics", + headers=get_auth().get_headers(), + ) + response.raise_for_status() + return Metrics.model_validate(response.json()) + + +def get_final_metric_value(job_id: str, metric_key: str) -> float | None: + """Return the value at the highest logged step for *metric_key*, if present.""" + metrics = get_training_job_metrics(job_id) + metric_data = metrics.metrics.get(metric_key) + if metric_data is None or not metric_data.data: + return None + final_step = max(metric_data.data.keys()) + return float(metric_data.data[final_step]) + + +def assert_training_loss_below( + job_id: str, + metric_key: str, + threshold: float, + *, + context: str = "training loss", +) -> float: + """Assert the final epoch loss metric is below *threshold*.""" + final_loss = get_final_metric_value(job_id, metric_key) + assert ( + final_loss is not None + ), f"{context}: metric {metric_key!r} not found for job {job_id}" + assert ( + final_loss < threshold + ), f"{context}: {metric_key}={final_loss:.6g} is not below {threshold:.6g}" + return final_loss + + +def evaluate_training_mse( + policy: "Policy", + synced_dataset: "SynchronizedDataset", + ground_truth_by_recording: dict[str, list[float]], + target_joint_name: str, + *, + excluded_recording_ids: set[str] | None = None, + included_recording_ids: set[str] | None = None, +) -> float: + """Compute mean MSE between predictions and stored ground truth. + + When *included_recording_ids* is provided, only those recordings are + evaluated and any others present in the synced dataset are skipped. This is + useful after the dataset has been mutated, when a policy should only be + scored against the recordings it was actually trained on. + + Returns the mean MSE across all evaluated frames. + """ + excluded_recording_ids = excluded_recording_ids or set() + total_squared_error = 0.0 + total_frames = 0 + + for recording in synced_dataset: + recording_id = str(recording.id) + assert ( + recording_id not in excluded_recording_ids + ), f"Deleted recording {recording_id} appeared in synced dataset" + if ( + included_recording_ids is not None + and recording_id not in included_recording_ids + ): + continue + expected_targets = ground_truth_by_recording[recording_id] + for frame_idx, sync_point in enumerate(recording): + predictions = policy.predict(sync_point=sync_point, timeout=60) + joint_targets = predictions[DataType.JOINT_TARGET_POSITIONS] + predicted = float(joint_targets[target_joint_name].value[0, 0, 0].item()) + expected = expected_targets[frame_idx] + squared_error = (predicted - expected) ** 2 + logger.info( + f"Predicted: {predicted}, Expected: {expected}, " + f"Squared Error: {squared_error}" + ) + total_squared_error += squared_error + total_frames += 1 + + assert total_frames > 0, "No frames evaluated" + mean_mse = total_squared_error / total_frames + logger.info( + f"Evaluated {total_frames} frames across " + f"{len(synced_dataset)} recordings; mean MSE={mean_mse:.6g}" + ) + return mean_mse + + def build_cross_embodiment_descriptions( dataset: Dataset, input_types: list[DataType], diff --git a/tests/integration/ml/test_back_to_back_training.py b/tests/integration/ml/test_back_to_back_training.py index a16b12aa3..f22d1841e 100644 --- a/tests/integration/ml/test_back_to_back_training.py +++ b/tests/integration/ml/test_back_to_back_training.py @@ -1,19 +1,14 @@ """Integration test for launching multiple training jobs back-to-back.""" import logging +import os +import sys from neuracore_types import DataType import neuracore as nc from neuracore.core.data.dataset import Dataset -from tests.integration.ml.shared.constants import ( - GRIPPER_NAMES, - JOINT_NAMES, - LANGUAGE_LABEL, - NC_CAM_NAME, - POSE_SENSOR_NAME, -) -from tests.integration.ml.shared.data_collection import ( +from tests.integration.ml.shared.dataset import ( collect_demo_data, wait_for_dataset_recording_count, ) @@ -23,11 +18,27 @@ ) from tests.integration.ml.shared.utils import unique_name +_THIS_DIR = os.path.dirname(os.path.abspath(__file__)) +_EXAMPLES_DIR = os.path.join(_THIS_DIR, "..", "..", "..", "examples") +if _EXAMPLES_DIR not in sys.path: + sys.path.append(_EXAMPLES_DIR) + +# ruff: noqa: E402 +from common.base_env import BimanualViperXTask + logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) +NC_CAM_NAME = "rgb_angle" +JOINT_NAMES = ( + BimanualViperXTask.LEFT_ARM_JOINT_NAMES + BimanualViperXTask.RIGHT_ARM_JOINT_NAMES +) +GRIPPER_NAMES = ["left_gripper", "right_gripper"] +POSE_SENSOR_NAME = "tcp" +LANGUAGE_LABEL = "instruction" + ROBOT_NAME = "integration_test_robot" GPU_TYPE = "NVIDIA_TESLA_V100" NUM_GPUS = 1 diff --git a/tests/integration/ml/test_dataset_datatype_validation.py b/tests/integration/ml/test_dataset_datatype_validation.py index 5a7d903b4..eed4196b8 100644 --- a/tests/integration/ml/test_dataset_datatype_validation.py +++ b/tests/integration/ml/test_dataset_datatype_validation.py @@ -109,7 +109,7 @@ def _record_episode(data_types: set[DataType]) -> None: instance=ROBOT_INSTANCE, timestamp=t, ) - nc.stop_recording(wait=False, robot_name=ROBOT_NAME, instance=ROBOT_INSTANCE) + nc.stop_recording(wait=True, robot_name=ROBOT_NAME, instance=ROBOT_INSTANCE) def _missing_recordings_from_error(error_msg: str) -> dict[str, set[str]]: diff --git a/tests/integration/ml/test_dataset_mutation_training.py b/tests/integration/ml/test_dataset_mutation_training.py new file mode 100644 index 000000000..ebe6666cb --- /dev/null +++ b/tests/integration/ml/test_dataset_mutation_training.py @@ -0,0 +1,758 @@ +"""Integration test: dataset mutation, statistics, datatype handling, training. + +Validates synthetic multimodal datasets where the label is: + + target = joint_positions[0] + velocity[0] + torque[0] + +encoded as the first joint's JOINT_TARGET_POSITIONS. Covers initial collection, +statistics verification, baseline overfitting, dataset mutation, +retraining, and corrupted ablation proving all modalities are used. + +Steps test_step1 .. test_step10 run sequentially in a single pytest process: + + test_step1 .. test_step2 — collect data and verify initial statistics. + test_step3 .. test_step4 — start baseline and corrupted-inputs training + (parallel, on separate datasets). + test_step5 — wait for baseline to finish, mutate the main dataset, then + verify updated statistics and start retrain (test_step6 .. test_step7). + test_step8 .. test_step10 — evaluate baseline, retrain, and corrupt runs. +""" + +import logging +import os +import sys +import time + +import numpy as np +from neuracore_types import DataType, EmbodimentDescription + +import neuracore as nc +from neuracore.core.data.dataset import Dataset +from tests.integration.ml.shared.dataset import ( + assert_active_recordings, + assert_dataset_metadata, + assert_synced_statistics, + delete_recording_from_dataset, + wait_for_dataset_recording_count, +) +from tests.integration.ml.shared.training import ( + assert_training_loss_below, + build_cross_embodiment_descriptions, + evaluate_training_mse, + get_final_metric_value, + wait_for_training, +) +from tests.integration.ml.shared.utils import unique_name +from tests.integration.platform.data_daemon.shared.assertions import ( + assert_exactly_one_daemon_pid, +) +from tests.integration.platform.data_daemon.shared.db_helpers import ( + wait_for_dataset_ready, + wait_for_recordings_finalized, +) +from tests.integration.platform.data_daemon.shared.runners import online_daemon_running + +_THIS_DIR = os.path.dirname(os.path.abspath(__file__)) +_EXAMPLES_DIR = os.path.join(_THIS_DIR, "..", "..", "..", "examples") +if _EXAMPLES_DIR not in sys.path: + sys.path.append(_EXAMPLES_DIR) + +# ruff: noqa: E402 +from common.transfer_cube import BIMANUAL_VIPERX_URDF_PATH + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +ROBOT_NAME = "integration_test_robot" +ROBOT_INSTANCE = 5 +GPU_TYPE = "NVIDIA_TESLA_V100" +NUM_GPUS = 1 + +JOINT_NAMES = ( + "vx300s_left/waist", + "vx300s_left/shoulder", +) +TARGET_JOINT = JOINT_NAMES[0] + +FREQUENCY = 20 +EPISODE_STEPS = 400 +RECORDING_STOP_TIMEOUT_SECONDS = 30 +RECORDING_POLL_SECONDS = 5 + +INITIAL_RECORDINGS = 10 +MUTATION_ADD = 6 +MUTATION_DELETE = 6 + +INPUT_DATA_TYPES = [ + DataType.JOINT_POSITIONS, + DataType.JOINT_VELOCITIES, + DataType.JOINT_TORQUES, +] +OUTPUT_DATA_TYPES = [DataType.JOINT_TARGET_POSITIONS] + +EXPECTED_RECORDING_TYPES = { + DataType.JOINT_POSITIONS, + DataType.JOINT_VELOCITIES, + DataType.JOINT_TORQUES, + DataType.JOINT_TARGET_POSITIONS, +} +EXPECTED_COMMON_TYPES = EXPECTED_RECORDING_TYPES + +OVERFIT_CNNMLP_CONFIG = { + "batch_size": 64, + "epochs": 400, + "output_prediction_horizon": 1, + "lr": 1e-4, +} + +L1_LOSS_THRESHOLD = 0.05 +L1_LOSS_METRIC_KEY = "train/epoch/loss/l1_loss" +MSE_THRESHOLD = 2e-4 +CORRUPT_MSE_FACTOR = 10 + + +def _compute_target(joint_pos_0: float, velocity_0: float, torque_0: float) -> float: + """Scalar label: first joint position + first velocity + first joint torque.""" + return joint_pos_0 + velocity_0 + torque_0 + + +def _record_episode( + episode_idx: int, + *, + robot_name: str, + instance_id: int, + corrupt_inputs: bool = False, +) -> list[float]: + """Record one episode and return per-frame ground-truth targets.""" + frame_targets: list[float] = [] + t = time.time() + rng = np.random.default_rng(episode_idx) + nc.start_recording(robot_name=robot_name, instance=instance_id) + + for frame_idx in range(EPISODE_STEPS): + t += 1.0 / FREQUENCY + joint_positions = { + name: float(value) + for name, value in zip( + JOINT_NAMES, rng.uniform(0.0, 1.0, size=len(JOINT_NAMES)) + ) + } + joint_velocities = { + name: float(value) + for name, value in zip( + JOINT_NAMES, rng.uniform(0.0, 1.0, size=len(JOINT_NAMES)) + ) + } + joint_torques = { + name: float(value) + for name, value in zip( + JOINT_NAMES, rng.uniform(0.0, 1.0, size=len(JOINT_NAMES)) + ) + } + target = _compute_target( + joint_positions[TARGET_JOINT], + joint_velocities[TARGET_JOINT], + joint_torques[TARGET_JOINT], + ) + frame_targets.append(target) + + if corrupt_inputs: + joint_torques = {name: 0.0 for name in JOINT_NAMES} + + nc.log_joint_positions( + positions=joint_positions, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_joint_velocities( + velocities=joint_velocities, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_joint_torques( + torques=joint_torques, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + nc.log_joint_target_positions( + target_positions={TARGET_JOINT: target}, + timestamp=t, + robot_name=robot_name, + instance=instance_id, + ) + + nc.stop_recording(wait=True, robot_name=robot_name, instance=instance_id) + return frame_targets + + +def _collect_recordings( + dataset_name: str, + robot_name: str, + instance_id: int, + num_episodes: int, + *, + start_episode_idx: int = 0, + corrupt_inputs: bool = False, + ground_truth: dict[str, list[float]] | None = None, + known_recording_ids: set[str] | None = None, +) -> tuple[Dataset, dict[str, list[float]], set[str]]: + """Collect episodes into *dataset_name* and track ground-truth targets.""" + ground_truth = ground_truth if ground_truth is not None else {} + known_recording_ids = ( + known_recording_ids if known_recording_ids is not None else set() + ) + new_recording_ids: set[str] = set() + + with online_daemon_running(): + assert_exactly_one_daemon_pid() + nc.connect_robot( + robot_name=robot_name, + instance=instance_id, + urdf_path=str(BIMANUAL_VIPERX_URDF_PATH), + overwrite=False, + ) + nc.create_dataset(name=dataset_name) + expected_count = len(known_recording_ids) + + for offset in range(num_episodes): + episode_idx = start_episode_idx + offset + logger.info( + f"Recording episode {offset + 1}/{num_episodes} " + f"(episode_idx={episode_idx})" + ) + frame_targets = _record_episode( + episode_idx, + robot_name=robot_name, + instance_id=instance_id, + corrupt_inputs=corrupt_inputs, + ) + expected_count += 1 + wait_for_dataset_ready( + dataset_name, + expected_recording_count=expected_count, + timeout_s=RECORDING_STOP_TIMEOUT_SECONDS, + poll_interval_s=RECORDING_POLL_SECONDS, + ) + current_ids = {str(r.id) for r in nc.get_dataset(name=dataset_name)} + added = current_ids - known_recording_ids - new_recording_ids + assert len(added) == 1, ( + f"Expected exactly one new recording after episode " + f"{offset + 1}, got {sorted(added)}" + ) + recording_id = added.pop() + new_recording_ids.add(recording_id) + ground_truth[recording_id] = frame_targets + + dataset = nc.get_dataset(name=dataset_name) + return dataset, ground_truth, new_recording_ids + + +def _build_training_descriptions(dataset: Dataset) -> tuple[dict, dict]: + assert ( + len(dataset.robot_ids) == 1 + ), f"Expected single-robot dataset, got {len(dataset.robot_ids)} robots" + return build_cross_embodiment_descriptions( + dataset=dataset, + input_types=INPUT_DATA_TYPES, + output_types=OUTPUT_DATA_TYPES, + ) + + +def _input_output_embodiment_for_policy( + input_desc: dict, output_desc: dict +) -> tuple[EmbodimentDescription, EmbodimentDescription]: + robot_id = next(iter(input_desc)) + return input_desc[robot_id], output_desc[robot_id] + + +class TestDatasetMutationTraining: + """End-to-end dataset mutation, statistics, and training correctness.""" + + track_step_teardown = True + all_steps_passed: bool = True + + dataset_name: str + corrupt_dataset_name: str + baseline_training_name: str + retrain_training_name: str + corrupt_training_name: str + + dataset: Dataset | None = None + corrupt_dataset: Dataset | None = None + ground_truth: dict[str, list[float]] + corrupt_ground_truth: dict[str, list[float]] + + active_recording_ids: set[str] + initial_recording_ids: set[str] + deleted_recording_ids: set[str] + added_recording_ids: set[str] + + input_desc: dict | None = None + output_desc: dict | None = None + stats_fingerprint_initial: str | None = None + stats_fingerprint_mutated: str | None = None + + baseline_job_id: str | None = None + retrain_job_id: str | None = None + corrupt_job_id: str | None = None + baseline_mse: float | None = None + baseline_l1_loss: float | None = None + + @classmethod + def setup_class(cls) -> None: + cls.all_steps_passed = True + cls.dataset_name = unique_name(prefix="dataset_mutation") + cls.corrupt_dataset_name = unique_name(prefix="dataset_corrupt") + cls.baseline_training_name = unique_name(prefix="ml_dataset_mutation_baseline") + cls.retrain_training_name = unique_name(prefix="ml_dataset_mutation_retrain") + cls.corrupt_training_name = unique_name(prefix="ml_dataset_mutation_corrupt") + cls.dataset = None + cls.corrupt_dataset = None + cls.ground_truth = {} + cls.corrupt_ground_truth = {} + cls.active_recording_ids = set() + cls.initial_recording_ids = set() + cls.deleted_recording_ids = set() + cls.added_recording_ids = set() + cls.input_desc = None + cls.output_desc = None + cls.stats_fingerprint_initial = None + cls.stats_fingerprint_mutated = None + cls.baseline_job_id = None + cls.retrain_job_id = None + cls.corrupt_job_id = None + cls.baseline_mse = None + cls.baseline_l1_loss = None + nc.login() + + @classmethod + def teardown_class(cls) -> None: + if not cls.all_steps_passed: + logger.warning( + "Skipping TestDatasetMutationTraining teardown: " + "one or more steps failed" + ) + return + for job_id in (cls.baseline_job_id, cls.retrain_job_id, cls.corrupt_job_id): + if job_id is None: + continue + try: + nc.delete_training_job(job_id) + except Exception: + logger.warning(f"Failed to delete training job {job_id}", exc_info=True) + for dataset in (cls.dataset, cls.corrupt_dataset): + if dataset is None: + continue + try: + dataset.delete() + except Exception: + logger.warning( + f"Failed to delete dataset {dataset.name}", exc_info=True + ) + + def test_step1_create_initial_dataset(self) -> None: + dataset, ground_truth, new_ids = _collect_recordings( + dataset_name=self.dataset_name, + robot_name=ROBOT_NAME, + instance_id=ROBOT_INSTANCE, + num_episodes=INITIAL_RECORDINGS, + start_episode_idx=0, + ground_truth=self.ground_truth, + ) + self.__class__.dataset = wait_for_dataset_recording_count( + dataset_name=self.dataset_name, + expected_recordings=INITIAL_RECORDINGS, + ) + self.__class__.ground_truth = ground_truth + self.__class__.initial_recording_ids = set(new_ids) + self.__class__.active_recording_ids = set(new_ids) + assert len(self.dataset) == INITIAL_RECORDINGS + logger.info( + f"[STEP 1] [PASSED] Collected {len(self.dataset)} recordings " + f"into '{self.dataset_name}'" + ) + + def test_step2_verify_initial_statistics(self) -> None: + assert self.dataset is not None, "[STEP 1] Did Not Complete" + assert_dataset_metadata( + self.dataset, + expected_count=INITIAL_RECORDINGS, + expected_common_types=EXPECTED_COMMON_TYPES, + ) + assert_active_recordings( + self.dataset, + expected_count=INITIAL_RECORDINGS, + expected_types=EXPECTED_RECORDING_TYPES, + tracked_ids=self.initial_recording_ids, + ) + input_desc, output_desc = _build_training_descriptions(self.dataset) + self.__class__.input_desc = input_desc + self.__class__.output_desc = output_desc + _, fingerprint = assert_synced_statistics( + self.dataset, + input_desc, + output_desc, + expected_count=INITIAL_RECORDINGS, + frequency=FREQUENCY, + ) + self.__class__.stats_fingerprint_initial = fingerprint + logger.info("[STEP 2] [PASSED] Initial dataset statistics verified") + + def test_step3_start_baseline_training(self) -> None: + assert self.dataset is not None, "[STEP 1] Did Not Complete" + assert self.input_desc is not None, "[STEP 2] Did Not Complete" + assert self.output_desc is not None, "[STEP 2] Did Not Complete" + + job_data = nc.start_training_run( + name=self.baseline_training_name, + dataset_name=self.dataset_name, + algorithm_name="CNNMLP", + algorithm_config=OVERFIT_CNNMLP_CONFIG, + gpu_type=GPU_TYPE, + num_gpus=NUM_GPUS, + frequency=FREQUENCY, + input_cross_embodiment_description=self.input_desc, + output_cross_embodiment_description=self.output_desc, + ) + self.__class__.baseline_job_id = job_data["id"] + logger.info( + f"[STEP 3] [PASSED] Baseline training started on the initial dataset: " + f"job_id={self.baseline_job_id}" + ) + + def test_step4_start_corrupt_training(self) -> None: + # Started before the dataset mutation so the corrupted-inputs run trains + # in parallel with the baseline (it uses a separate dataset, so it does + # not block the later mutation of the main dataset). + _, corrupt_ground_truth, _ = _collect_recordings( + dataset_name=self.corrupt_dataset_name, + robot_name=ROBOT_NAME, + instance_id=ROBOT_INSTANCE, + num_episodes=INITIAL_RECORDINGS, + start_episode_idx=0, + corrupt_inputs=True, + ground_truth={}, + ) + self.__class__.corrupt_dataset = wait_for_dataset_recording_count( + dataset_name=self.corrupt_dataset_name, + expected_recordings=INITIAL_RECORDINGS, + ) + self.__class__.corrupt_ground_truth = corrupt_ground_truth + + corrupt_input_desc, corrupt_output_desc = _build_training_descriptions( + self.corrupt_dataset + ) + job_data = nc.start_training_run( + name=self.corrupt_training_name, + dataset_name=self.corrupt_dataset_name, + algorithm_name="CNNMLP", + algorithm_config=OVERFIT_CNNMLP_CONFIG, + gpu_type=GPU_TYPE, + num_gpus=NUM_GPUS, + frequency=FREQUENCY, + input_cross_embodiment_description=corrupt_input_desc, + output_cross_embodiment_description=corrupt_output_desc, + ) + self.__class__.corrupt_job_id = job_data["id"] + logger.info( + f"[STEP 4] [PASSED] Corrupted-inputs training started (runs in " + f"parallel with the baseline): job_id={self.corrupt_job_id}" + ) + + def test_step5_mutate_dataset(self) -> None: + assert self.dataset is not None, "[STEP 1] Did Not Complete" + assert self.baseline_job_id is not None, "[STEP 3] Did Not Complete" + + # The platform blocks recording deletion while a dataset still has an + # active training job, so wait for the baseline run to finish before + # mutating the dataset it trained on. + logger.info( + f"[STEP 5] Waiting for baseline training {self.baseline_job_id} to " + f"complete before mutating the dataset" + ) + final_status = wait_for_training(job_id=self.baseline_job_id) + assert final_status == "COMPLETED", ( + f"Baseline training ended with non-COMPLETED status before the " + f"dataset could be mutated: {final_status}" + ) + + dataset = nc.get_dataset(name=self.dataset_name) + assert len(dataset) == INITIAL_RECORDINGS + + recordings_to_delete = [dataset[index] for index in range(MUTATION_DELETE)] + self.__class__.deleted_recording_ids = { + str(recording.id) for recording in recordings_to_delete + } + for recording in recordings_to_delete: + logger.info(f"Deleting recording {recording.id!r}") + delete_recording_from_dataset(dataset=dataset, recording=recording) + + remaining = INITIAL_RECORDINGS - MUTATION_DELETE + self.__class__.dataset = wait_for_dataset_recording_count( + dataset_name=self.dataset_name, + expected_recordings=remaining, + ) + surviving_ids = {str(r.id) for r in self.dataset} + assert not self.deleted_recording_ids & surviving_ids + + known_ids = surviving_ids + _, ground_truth, new_ids = _collect_recordings( + dataset_name=self.dataset_name, + robot_name=ROBOT_NAME, + instance_id=ROBOT_INSTANCE, + num_episodes=MUTATION_ADD, + start_episode_idx=INITIAL_RECORDINGS, + ground_truth=self.ground_truth, + known_recording_ids=known_ids, + ) + self.__class__.ground_truth = ground_truth + self.__class__.added_recording_ids = new_ids + self.__class__.dataset = wait_for_dataset_recording_count( + dataset_name=self.dataset_name, + expected_recordings=INITIAL_RECORDINGS, + ) + self.__class__.active_recording_ids = surviving_ids | new_ids + assert len(self.added_recording_ids) == MUTATION_ADD + wait_for_recordings_finalized( + self.dataset_name, + recording_ids=self.active_recording_ids, + timeout_s=RECORDING_STOP_TIMEOUT_SECONDS, + poll_interval_s=RECORDING_POLL_SECONDS, + ) + logger.info( + f"[STEP 5] [PASSED] Mutated dataset: deleted {MUTATION_DELETE}, " + f"added {MUTATION_ADD}, total {len(self.dataset)}" + ) + + def test_step6_verify_updated_statistics(self) -> None: + assert self.dataset is not None, "[STEP 5] Did Not Complete" + assert self.input_desc is not None, "[STEP 2] Did Not Complete" + assert self.output_desc is not None, "[STEP 2] Did Not Complete" + + logger.info("[STEP 6] Starting updated statistics verification") + logger.info("[STEP 6] Checking dataset metadata") + assert_dataset_metadata( + self.dataset, + expected_count=INITIAL_RECORDINGS, + expected_common_types=EXPECTED_COMMON_TYPES, + ) + logger.info("[STEP 6] Checking active recordings") + active_ids = assert_active_recordings( + self.dataset, + expected_count=INITIAL_RECORDINGS, + expected_types=EXPECTED_RECORDING_TYPES, + tracked_ids=self.active_recording_ids, + ) + logger.info("[STEP 6] Checking mutation recording IDs") + assert self.added_recording_ids <= active_ids + assert not self.deleted_recording_ids & active_ids + + logger.info("[STEP 6] Syncing dataset and checking statistics") + _, fingerprint = assert_synced_statistics( + self.dataset, + self.input_desc, + self.output_desc, + expected_count=INITIAL_RECORDINGS, + frequency=FREQUENCY, + log_prefix="[STEP 6]", + ) + self.__class__.stats_fingerprint_mutated = fingerprint + logger.info("[STEP 6] Checking statistics fingerprint changed after mutation") + assert ( + fingerprint != self.stats_fingerprint_initial + ), "Expected dataset statistics to change after mutation" + logger.info("[STEP 6] [PASSED] Updated dataset statistics verified") + + def test_step7_start_retrain(self) -> None: + assert self.dataset is not None, "[STEP 5] Did Not Complete" + assert self.input_desc is not None, "[STEP 2] Did Not Complete" + assert self.output_desc is not None, "[STEP 2] Did Not Complete" + + job_data = nc.start_training_run( + name=self.retrain_training_name, + dataset_name=self.dataset_name, + algorithm_name="CNNMLP", + algorithm_config=OVERFIT_CNNMLP_CONFIG, + gpu_type=GPU_TYPE, + num_gpus=NUM_GPUS, + frequency=FREQUENCY, + input_cross_embodiment_description=self.input_desc, + output_cross_embodiment_description=self.output_desc, + name_auto_increment=True, + ) + self.__class__.retrain_job_id = job_data["id"] + logger.info( + f"[STEP 7] [PASSED] Retrain started on the mutated dataset: " + f"job_id={self.retrain_job_id}" + ) + + def test_step8_evaluate_baseline(self) -> None: + """Wait for the baseline run and verify overfitting on surviving originals. + + The dataset has been mutated since training started, so the baseline + policy is only scored against the original recordings that survive the + mutation (the ones it was actually trained on). + """ + assert self.baseline_job_id is not None, "[STEP 3] Did Not Complete" + assert self.input_desc is not None and self.output_desc is not None + + final_status = wait_for_training(job_id=self.baseline_job_id) + assert ( + final_status == "COMPLETED" + ), f"Baseline training ended with non-COMPLETED status: {final_status}" + self.__class__.baseline_l1_loss = assert_training_loss_below( + self.baseline_job_id, + L1_LOSS_METRIC_KEY, + L1_LOSS_THRESHOLD, + context="Step 8 (baseline L1 loss)", + ) + + synced, _ = assert_synced_statistics( + self.dataset, + self.input_desc, + self.output_desc, + expected_count=INITIAL_RECORDINGS, + frequency=FREQUENCY, + ) + surviving_original_ids = self.initial_recording_ids - self.deleted_recording_ids + input_emb, output_emb = _input_output_embodiment_for_policy( + self.input_desc, self.output_desc + ) + policy = nc.policy( + input_embodiment_description=input_emb, + output_embodiment_description=output_emb, + train_run_name=self.baseline_training_name, + ) + try: + baseline_mse = evaluate_training_mse( + policy=policy, + synced_dataset=synced, + ground_truth_by_recording=self.ground_truth, + target_joint_name=TARGET_JOINT, + included_recording_ids=surviving_original_ids, + ) + finally: + policy.disconnect() + assert ( + baseline_mse < MSE_THRESHOLD + ), f"Baseline mean MSE={baseline_mse:.6g} exceeds {MSE_THRESHOLD:.6g}" + self.__class__.baseline_mse = baseline_mse + + logger.info( + f"[STEP 8] [PASSED] Baseline training overfit: " + f"L1={self.baseline_l1_loss:.6g}, MSE={self.baseline_mse:.6g}" + ) + + def test_step9_evaluate_retrain(self) -> None: + """Wait for the retrain run and verify overfitting.""" + assert self.retrain_job_id is not None, "[STEP 7] Did Not Complete" + assert self.input_desc is not None and self.output_desc is not None + + final_status = wait_for_training(job_id=self.retrain_job_id) + assert ( + final_status == "COMPLETED" + ), f"Retrain ended with non-COMPLETED status: {final_status}" + assert_training_loss_below( + self.retrain_job_id, + L1_LOSS_METRIC_KEY, + L1_LOSS_THRESHOLD, + context="Step 9 (retrain L1 loss)", + ) + + synced, _ = assert_synced_statistics( + self.dataset, + self.input_desc, + self.output_desc, + expected_count=INITIAL_RECORDINGS, + frequency=FREQUENCY, + ) + synced_ids = {str(recording.id) for recording in synced.dataset} + assert not self.deleted_recording_ids & synced_ids + + input_emb, output_emb = _input_output_embodiment_for_policy( + self.input_desc, self.output_desc + ) + policy = nc.policy( + input_embodiment_description=input_emb, + output_embodiment_description=output_emb, + train_run_name=self.retrain_training_name, + ) + try: + retrain_mse = evaluate_training_mse( + policy=policy, + synced_dataset=synced, + ground_truth_by_recording=self.ground_truth, + target_joint_name=TARGET_JOINT, + excluded_recording_ids=self.deleted_recording_ids, + ) + finally: + policy.disconnect() + assert ( + retrain_mse < MSE_THRESHOLD + ), f"Retrain mean MSE={retrain_mse:.6g} exceeds {MSE_THRESHOLD:.6g}" + + logger.info("[STEP 9] [PASSED] Retrain overfit on mutated dataset") + + def test_step10_evaluate_corrupt(self) -> None: + """Wait for the corrupted-inputs run and verify it fails to overfit.""" + assert self.corrupt_job_id is not None, "[STEP 4] Did Not Complete" + assert self.corrupt_dataset is not None, "[STEP 4] Did Not Complete" + + final_status = wait_for_training(job_id=self.corrupt_job_id) + assert ( + final_status == "COMPLETED" + ), f"Corrupt training ended with non-COMPLETED status: {final_status}" + + corrupt_l1 = get_final_metric_value(self.corrupt_job_id, L1_LOSS_METRIC_KEY) + assert corrupt_l1 is not None, "Corrupt run missing L1 loss metric" + if self.baseline_l1_loss is not None: + assert corrupt_l1 >= self.baseline_l1_loss, ( + f"Corrupt L1 loss {corrupt_l1:.6g} should be >= baseline " + f"{self.baseline_l1_loss:.6g}" + ) + + corrupt_input_desc, corrupt_output_desc = _build_training_descriptions( + self.corrupt_dataset + ) + synced, _ = assert_synced_statistics( + self.corrupt_dataset, + corrupt_input_desc, + corrupt_output_desc, + expected_count=INITIAL_RECORDINGS, + frequency=FREQUENCY, + ) + input_emb, output_emb = _input_output_embodiment_for_policy( + corrupt_input_desc, corrupt_output_desc + ) + policy = nc.policy( + input_embodiment_description=input_emb, + output_embodiment_description=output_emb, + train_run_name=self.corrupt_training_name, + ) + try: + corrupt_mse = evaluate_training_mse( + policy=policy, + synced_dataset=synced, + ground_truth_by_recording=self.corrupt_ground_truth, + target_joint_name=TARGET_JOINT, + ) + finally: + policy.disconnect() + + assert ( + self.baseline_mse is not None + ), "[STEP 8] baseline MSE missing — run test_step8_evaluate_baseline first" + assert corrupt_mse >= self.baseline_mse * CORRUPT_MSE_FACTOR, ( + f"Corrupt MSE {corrupt_mse:.6g} should be much worse than baseline " + f"{self.baseline_mse:.6g}" + ) + logger.info( + f"[STEP 10] [PASSED] Corrupted inputs ablation: " + f"baseline MSE={self.baseline_mse:.6g}, corrupt MSE={corrupt_mse:.6g}, " + f"baseline L1={self.baseline_l1_loss:.6g}, corrupt L1={corrupt_l1:.6g}" + ) diff --git a/tests/integration/ml/test_failure_reporting.py b/tests/integration/ml/test_failure_reporting.py index bc9e40ce3..bc25b4798 100644 --- a/tests/integration/ml/test_failure_reporting.py +++ b/tests/integration/ml/test_failure_reporting.py @@ -1,34 +1,66 @@ """Integration test for training-script failure reporting.""" import logging +import os +import sys + +from neuracore_types import DataType, EmbodimentDescription import neuracore as nc from neuracore.core.data.dataset import Dataset -from tests.integration.ml.shared.constants import ( - FREQUENCY, - GPU_TYPE, - GRIPPER_NAMES, - INPUT_DATA_TYPES, - JOINT_NAMES, - LANGUAGE_LABEL, - NC_CAM_NAME, - NUM_GPUS, - OUTPUT_DATA_TYPES, - POSE_SENSOR_NAME, - ROBOT_NAME, -) -from tests.integration.ml.shared.data_collection import collect_demo_data +from tests.integration.ml.shared.dataset import collect_demo_data from tests.integration.ml.shared.training import ( build_cross_embodiment_descriptions, wait_for_training, ) from tests.integration.ml.shared.utils import unique_name +_THIS_DIR = os.path.dirname(os.path.abspath(__file__)) +_EXAMPLES_DIR = os.path.join(_THIS_DIR, "..", "..", "..", "examples") +if _EXAMPLES_DIR not in sys.path: + sys.path.append(_EXAMPLES_DIR) + +# ruff: noqa: E402 +from common.base_env import BimanualViperXTask + logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) +NC_CAM_NAME = "rgb_angle" +MJ_CAM_NAME = "angle" +JOINT_NAMES = ( + BimanualViperXTask.LEFT_ARM_JOINT_NAMES + BimanualViperXTask.RIGHT_ARM_JOINT_NAMES +) +GRIPPER_NAMES = ["left_gripper", "right_gripper"] +POSE_SENSOR_NAME = "tcp" +LANGUAGE_LABEL = "instruction" + + +def _indexed_names(names: list[str] | tuple[str, ...]) -> dict[int, str]: + return {index: name for index, name in enumerate(names)} + + +INPUT_EMBODIMENT_DESCRIPTION: EmbodimentDescription = { + DataType.RGB_IMAGES: {0: NC_CAM_NAME}, + DataType.JOINT_POSITIONS: _indexed_names(names=JOINT_NAMES), + DataType.LANGUAGE: {0: LANGUAGE_LABEL}, + DataType.PARALLEL_GRIPPER_OPEN_AMOUNTS: _indexed_names(names=GRIPPER_NAMES), +} +OUTPUT_EMBODIMENT_DESCRIPTION: EmbodimentDescription = { + DataType.JOINT_TARGET_POSITIONS: _indexed_names(names=JOINT_NAMES), + DataType.PARALLEL_GRIPPER_TARGET_OPEN_AMOUNTS: _indexed_names(names=GRIPPER_NAMES), +} + +INPUT_DATA_TYPES = list(INPUT_EMBODIMENT_DESCRIPTION.keys()) +OUTPUT_DATA_TYPES = list(OUTPUT_EMBODIMENT_DESCRIPTION.keys()) + +ROBOT_NAME = "integration_test_robot" +GPU_TYPE = "NVIDIA_TESLA_V100" +NUM_GPUS = 1 +FREQUENCY = 20 + # A batch_size value that is not "auto" and not parseable as an integer. # It passes client-side validation (which only checks data types / algorithm # compatibility, not the batch_size value itself) but causes a ValueError in diff --git a/tests/integration/ml/test_inference.py b/tests/integration/ml/test_inference.py index 10205979b..f88b00263 100644 --- a/tests/integration/ml/test_inference.py +++ b/tests/integration/ml/test_inference.py @@ -29,8 +29,15 @@ import neuracore as nc from neuracore.core.endpoint import Policy -from tests.integration.ml.shared.constants import ( +from tests.integration.ml.shared.utils import ( + SelectedRun, + prune_training_runs_except, + resolve_latest_completed_run, + unique_name, +) +from tests.integration.ml.test_training_flow import ( GRIPPER_NAMES, + INFERENCE_MODEL_TRAIN_RUN_PREFIX, INPUT_EMBODIMENT_DESCRIPTION, JOINT_NAMES, LANGUAGE_LABEL, @@ -39,13 +46,6 @@ OUTPUT_DATA_TYPES, OUTPUT_EMBODIMENT_DESCRIPTION, ) -from tests.integration.ml.shared.utils import ( - SelectedRun, - prune_training_runs_except, - resolve_latest_completed_run, - unique_name, -) -from tests.integration.ml.test_training_flow import INFERENCE_MODEL_TRAIN_RUN_PREFIX _THIS_DIR = os.path.dirname(os.path.abspath(__file__)) _EXAMPLES_DIR = os.path.join(_THIS_DIR, "..", "..", "..", "examples") diff --git a/tests/integration/ml/test_resume_training.py b/tests/integration/ml/test_resume_training.py index a73fbc535..783d374aa 100644 --- a/tests/integration/ml/test_resume_training.py +++ b/tests/integration/ml/test_resume_training.py @@ -19,22 +19,10 @@ import logging import neuracore as nc -from neuracore.core.auth import get_auth -from neuracore.core.const import API_URL from neuracore.core.data.dataset import Dataset -from neuracore.core.data.recording import Recording -from neuracore.core.utils.http_session import thread_local_session -from tests.integration.ml.shared.constants import ( - GRIPPER_NAMES, - INPUT_DATA_TYPES, - JOINT_NAMES, - LANGUAGE_LABEL, - NC_CAM_NAME, - OUTPUT_DATA_TYPES, - POSE_SENSOR_NAME, -) -from tests.integration.ml.shared.data_collection import ( +from tests.integration.ml.shared.dataset import ( collect_demo_data, + delete_recording_from_dataset, wait_for_dataset_recording_count, ) from tests.integration.ml.shared.training import ( @@ -51,7 +39,14 @@ from tests.integration.ml.test_training_flow import ( FLOW_COLLECTED_DATASET_PREFIX, FLOW_MERGED_DATASET_PREFIX, + GRIPPER_NAMES, INFERENCE_MODEL_TRAIN_RUN_PREFIX, + INPUT_DATA_TYPES, + JOINT_NAMES, + LANGUAGE_LABEL, + NC_CAM_NAME, + OUTPUT_DATA_TYPES, + POSE_SENSOR_NAME, ) logging.basicConfig( @@ -161,16 +156,6 @@ def test_step2_resume_training(self) -> None: ) -def delete_recording_from_dataset(dataset: Dataset, recording: Recording) -> None: - """Remove a recording from a dataset via the platform API.""" - session = thread_local_session() - response = session.delete( - f"{API_URL}/org/{dataset.org_id}/datasets/{dataset.id}/recording/{recording.id}", - headers=get_auth().get_headers(), - ) - response.raise_for_status() - - class TestResumeTrainingAfterDataDeletion: """Train on a dataset, delete some recordings, then relaunch training.""" diff --git a/tests/integration/ml/test_training_flow.py b/tests/integration/ml/test_training_flow.py index e8f4330d1..79a1d0a7e 100644 --- a/tests/integration/ml/test_training_flow.py +++ b/tests/integration/ml/test_training_flow.py @@ -12,25 +12,16 @@ """ import logging +import os import pprint +import sys import time +from neuracore_types import DataType, EmbodimentDescription + import neuracore as nc from neuracore.core.data.dataset import Dataset -from tests.integration.ml.shared.constants import ( - FREQUENCY, - GPU_TYPE, - GRIPPER_NAMES, - INPUT_DATA_TYPES, - JOINT_NAMES, - LANGUAGE_LABEL, - NC_CAM_NAME, - NUM_GPUS, - OUTPUT_DATA_TYPES, - POSE_SENSOR_NAME, - ROBOT_NAME, -) -from tests.integration.ml.shared.data_collection import ( +from tests.integration.ml.shared.dataset import ( collect_demo_data, wait_for_dataset_recording_count, ) @@ -42,11 +33,48 @@ ) from tests.integration.ml.shared.utils import unique_name +_THIS_DIR = os.path.dirname(os.path.abspath(__file__)) +_EXAMPLES_DIR = os.path.join(_THIS_DIR, "..", "..", "..", "examples") +if _EXAMPLES_DIR not in sys.path: + sys.path.append(_EXAMPLES_DIR) + +# ruff: noqa: E402 +from common.base_env import BimanualViperXTask + logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) +NC_CAM_NAME = "rgb_angle" +MJ_CAM_NAME = "angle" +JOINT_NAMES = ( + BimanualViperXTask.LEFT_ARM_JOINT_NAMES + BimanualViperXTask.RIGHT_ARM_JOINT_NAMES +) +GRIPPER_NAMES = ["left_gripper", "right_gripper"] +POSE_SENSOR_NAME = "tcp" +LANGUAGE_LABEL = "instruction" + + +def _indexed_names(names: list[str] | tuple[str, ...]) -> dict[int, str]: + return {index: name for index, name in enumerate(names)} + + +# Training/Inference robot (VX300s) embodiment descriptions +INPUT_EMBODIMENT_DESCRIPTION: EmbodimentDescription = { + DataType.RGB_IMAGES: {0: NC_CAM_NAME}, + DataType.JOINT_POSITIONS: _indexed_names(names=JOINT_NAMES), + DataType.LANGUAGE: {0: LANGUAGE_LABEL}, + DataType.PARALLEL_GRIPPER_OPEN_AMOUNTS: _indexed_names(names=GRIPPER_NAMES), +} +OUTPUT_EMBODIMENT_DESCRIPTION: EmbodimentDescription = { + DataType.JOINT_TARGET_POSITIONS: _indexed_names(names=JOINT_NAMES), + DataType.PARALLEL_GRIPPER_TARGET_OPEN_AMOUNTS: _indexed_names(names=GRIPPER_NAMES), +} + +INPUT_DATA_TYPES = list(INPUT_EMBODIMENT_DESCRIPTION.keys()) +OUTPUT_DATA_TYPES = list(OUTPUT_EMBODIMENT_DESCRIPTION.keys()) + # Predefined name prefix for the training run produced by this test. The # inference and resume tests scan for the latest COMPLETED run under this prefix INFERENCE_MODEL_TRAIN_RUN_PREFIX = "ml_integration_flow" @@ -59,6 +87,10 @@ JOB_STATE_POLL_SECONDS = 30 RUNNING_STATE_TIMEOUT_MINUTES = 10 LOGS_AVAILABILITY_TIMEOUT_MINUTES = 10 +ROBOT_NAME = "integration_test_robot" +GPU_TYPE = "NVIDIA_TESLA_V100" +NUM_GPUS = 1 +FREQUENCY = 20 SHARED_DATASET_NAME = "NYU ROT" COLLECTED_DEMO_EPISODES = 3 CNNMLP_CONFIG = { diff --git a/tests/integration/ml/test_training_sync_failure.py b/tests/integration/ml/test_training_sync_failure.py index 40510f825..61a02f5a5 100644 --- a/tests/integration/ml/test_training_sync_failure.py +++ b/tests/integration/ml/test_training_sync_failure.py @@ -16,35 +16,66 @@ """ import logging +import os +import sys from dataclasses import dataclass +from neuracore_types import DataType, EmbodimentDescription + import neuracore as nc from neuracore.core.data.dataset import Dataset -from tests.integration.ml.shared.constants import ( - FREQUENCY, - GPU_TYPE, - GRIPPER_NAMES, - INPUT_DATA_TYPES, - JOINT_NAMES, - LANGUAGE_LABEL, - NC_CAM_NAME, - NUM_GPUS, - OUTPUT_DATA_TYPES, - POSE_SENSOR_NAME, - ROBOT_NAME, -) -from tests.integration.ml.shared.data_collection import collect_demo_data +from tests.integration.ml.shared.dataset import collect_demo_data from tests.integration.ml.shared.training import ( build_cross_embodiment_descriptions, wait_for_all_training, ) from tests.integration.ml.shared.utils import unique_name +_THIS_DIR = os.path.dirname(os.path.abspath(__file__)) +_EXAMPLES_DIR = os.path.join(_THIS_DIR, "..", "..", "..", "examples") +if _EXAMPLES_DIR not in sys.path: + sys.path.append(_EXAMPLES_DIR) + +# ruff: noqa: E402 +from common.base_env import BimanualViperXTask + logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) +NC_CAM_NAME = "rgb_angle" +JOINT_NAMES = ( + BimanualViperXTask.LEFT_ARM_JOINT_NAMES + BimanualViperXTask.RIGHT_ARM_JOINT_NAMES +) +GRIPPER_NAMES = ["left_gripper", "right_gripper"] +POSE_SENSOR_NAME = "tcp" +LANGUAGE_LABEL = "instruction" + + +def _indexed_names(names: list[str] | tuple[str, ...]) -> dict[int, str]: + return {index: name for index, name in enumerate(names)} + + +INPUT_EMBODIMENT_DESCRIPTION: EmbodimentDescription = { + DataType.RGB_IMAGES: {0: NC_CAM_NAME}, + DataType.JOINT_POSITIONS: _indexed_names(names=JOINT_NAMES), + DataType.LANGUAGE: {0: LANGUAGE_LABEL}, + DataType.PARALLEL_GRIPPER_OPEN_AMOUNTS: _indexed_names(names=GRIPPER_NAMES), +} +OUTPUT_EMBODIMENT_DESCRIPTION: EmbodimentDescription = { + DataType.JOINT_TARGET_POSITIONS: _indexed_names(names=JOINT_NAMES), + DataType.PARALLEL_GRIPPER_TARGET_OPEN_AMOUNTS: _indexed_names(names=GRIPPER_NAMES), +} + +INPUT_DATA_TYPES = list(INPUT_EMBODIMENT_DESCRIPTION.keys()) +OUTPUT_DATA_TYPES = list(OUTPUT_EMBODIMENT_DESCRIPTION.keys()) + +ROBOT_NAME = "integration_test_robot" +GPU_TYPE = "NVIDIA_TESLA_V100" +NUM_GPUS = 1 +FREQUENCY = 20 + # A frequency far above the data rate. Reference timestamps then fall between the # real data points, which both starves the no-duplicates case (too sparse) and # exposes the tiny-max-delay case (no point within delta). Shared by both cases diff --git a/tests/integration/platform/test_dataset_synchronization.py b/tests/integration/platform/test_dataset_synchronization.py index 56ca506c9..cdceb14ba 100644 --- a/tests/integration/platform/test_dataset_synchronization.py +++ b/tests/integration/platform/test_dataset_synchronization.py @@ -1,13 +1,16 @@ """Integration tests: dataset synchronization success / param-error / missing-data. -Three independent tests drive a freshly collected dataset through -synchronization to exercise the backend's permanent-failure handling: +Four independent tests drive a freshly collected dataset through +synchronization to exercise the backend's permanent-failure handling and +post-mutation re-sync: * :func:`test_dataset_synchronization_success` — synchronization **succeeds**, * :func:`test_dataset_synchronization_param_error` — **fails on bad parameters** (``SynchronizationParameterError``), and * :func:`test_dataset_synchronization_missing_data` — **fails on missing data** - (``SynchronizationMissingDataError``). + (``SynchronizationMissingDataError``), and +* :func:`test_dataset_synchronization_after_mutation` — **re-synchronizes** after + removing and replacing half the recordings. Each scenario drives the high-level ``Dataset.synchronize()``: success returns a ``SynchronizedDataset``, while the two failure classes raise a ``DatasetError`` @@ -48,6 +51,7 @@ from common.rollout_utils import rollout_policy from common.transfer_cube import BIMANUAL_VIPERX_URDF_PATH +from tests.integration.ml.shared.dataset import delete_recording_from_dataset from tests.integration.platform.data_daemon.shared.assertions import ( assert_exactly_one_daemon_pid, ) @@ -69,6 +73,8 @@ FREQUENCY = 20 # Hz the demo data is logged at. MAX_FRAMES = 30 # Truncate the rollout to keep recordings short. RECORDINGS_PER_DATASET = 10 # Recordings collected for each dataset. +RECORDINGS_TO_REMOVE = 5 # Recordings removed during the mutation scenario. +RECORDINGS_TO_ADD = 5 # Replacement recordings added after removal. NC_CAM_NAME = "rgb_angle" MJ_CAM_NAME = "angle" LANGUAGE_LABEL = "instruction" @@ -148,7 +154,7 @@ def _record_one(robot_name: str, instance: int) -> None: robot_name=robot_name, instance=instance, ) - nc.stop_recording(wait=False, robot_name=robot_name, instance=instance) + nc.stop_recording(wait=True, robot_name=robot_name, instance=instance) def _collect_dataset(robot_name: str, dataset_name: str, instance: int) -> Dataset: @@ -183,6 +189,42 @@ def _collect_dataset(robot_name: str, dataset_name: str, instance: int) -> Datas return dataset +def _add_recordings( + robot_name: str, + dataset_name: str, + instance: int, + count: int, + known_recording_ids: set[str], +) -> set[str]: + """Append ``count`` scripted recordings to an existing dataset.""" + new_ids: set[str] = set() + expected_count = len(known_recording_ids) + + for _ in range(count): + _record_one(robot_name=robot_name, instance=instance) + expected_count += 1 + wait_for_dataset_ready( + dataset_name, + expected_recording_count=expected_count, + timeout_s=RECORDING_STOP_TIMEOUT_SECONDS, + poll_interval_s=PROGRESS_POLL_SECONDS, + ) + current_ids = {str(recording.id) for recording in nc.get_dataset(dataset_name)} + added = current_ids - known_recording_ids - new_ids + assert ( + len(added) == 1 + ), f"Expected exactly one new recording, got {sorted(added)}" + new_ids.add(added.pop()) + + wait_for_recordings_finalized( + dataset_name, + recording_ids=new_ids, + timeout_s=RECORDING_FINALIZE_TIMEOUT_SECONDS, + poll_interval_s=PROGRESS_POLL_SECONDS, + ) + return new_ids + + def _assert_failure_with_reason( excinfo: pytest.ExceptionInfo[DatasetError], marker: str, @@ -278,3 +320,123 @@ def test_dataset_synchronization_missing_data() -> None: cross_embodiment_union={"": {MISSING_DATA_TYPE: [MISSING_SENSOR_NAME]}}, ) _assert_failure_with_reason(excinfo, "No sensors found for data type") + + +def test_dataset_synchronization_after_mutation() -> None: + """Synchronize, remove half the recordings, add replacements, re-synchronize.""" + nc.login() + run_id = uuid.uuid4().hex[:8] + robot_name = f"sync_it_robot_{run_id}" + dataset_name = _unique_name("sync_it_mutation") + dataset: Dataset | None = None + sync_kwargs = { + "frequency": 10, + "allow_duplicates": True, + "max_delay_s": 1.0, + "cross_embodiment_union": None, + } + try: + with online_daemon_running(): + assert_exactly_one_daemon_pid() + + logger.info( + "[STEP 1] Collecting %d recordings into %r", + RECORDINGS_PER_DATASET, + dataset_name, + ) + dataset = _collect_dataset( + robot_name=robot_name, + dataset_name=dataset_name, + instance=0, + ) + logger.info( + "[STEP 1] [PASSED] Collected %d recordings into %r", + len(dataset), + dataset_name, + ) + + logger.info("[STEP 2] Synchronizing dataset (initial)") + synced_initial = dataset.synchronize(**sync_kwargs) + assert len(synced_initial) == RECORDINGS_PER_DATASET + logger.info( + "[STEP 2] [PASSED] Initial synchronization complete: " + "%d recordings synced", + len(synced_initial), + ) + + logger.info( + "[STEP 3] Removing %d recordings from %r", + RECORDINGS_TO_REMOVE, + dataset_name, + ) + recordings_to_delete = [ + dataset[index] for index in range(RECORDINGS_TO_REMOVE) + ] + deleted_ids = {str(recording.id) for recording in recordings_to_delete} + for recording in recordings_to_delete: + logger.info("[STEP 3] Deleting recording %s", recording.id) + delete_recording_from_dataset(dataset=dataset, recording=recording) + + remaining = RECORDINGS_PER_DATASET - RECORDINGS_TO_REMOVE + wait_for_dataset_ready( + dataset_name, + expected_recording_count=remaining, + timeout_s=RECORDING_STOP_TIMEOUT_SECONDS, + poll_interval_s=PROGRESS_POLL_SECONDS, + ) + dataset = nc.get_dataset(dataset_name) + surviving_ids = {str(recording.id) for recording in dataset} + assert deleted_ids.isdisjoint(surviving_ids) + assert len(surviving_ids) == remaining + logger.info( + "[STEP 3] [PASSED] Removed %d recordings; %d remaining", + RECORDINGS_TO_REMOVE, + len(surviving_ids), + ) + + logger.info( + "[STEP 4] Adding %d recordings to %r", + RECORDINGS_TO_ADD, + dataset_name, + ) + new_ids = _add_recordings( + robot_name=robot_name, + dataset_name=dataset_name, + instance=0, + count=RECORDINGS_TO_ADD, + known_recording_ids=surviving_ids, + ) + active_ids = surviving_ids | new_ids + wait_for_dataset_ready( + dataset_name, + expected_recording_count=RECORDINGS_PER_DATASET, + timeout_s=RECORDING_STOP_TIMEOUT_SECONDS, + poll_interval_s=PROGRESS_POLL_SECONDS, + ) + dataset = nc.get_dataset(dataset_name) + assert len(active_ids) == RECORDINGS_PER_DATASET + assert deleted_ids.isdisjoint(active_ids) + assert new_ids <= active_ids + logger.info( + "[STEP 4] [PASSED] Added %d recordings; dataset has %d total", + len(new_ids), + len(dataset), + ) + + logger.info("[STEP 5] Re-synchronizing dataset after mutation") + synced_after = dataset.synchronize(**sync_kwargs) + assert len(synced_after) == RECORDINGS_PER_DATASET + assert { + str(recording.id) for recording in synced_after.dataset + } == active_ids + logger.info( + "[STEP 5] [PASSED] Re-synchronization complete: " + "%d recordings synced", + len(synced_after), + ) + finally: + if dataset is not None: + try: + dataset.delete() + except Exception: # noqa: BLE001 + logger.warning("Failed to clean up dataset %s", dataset.id)