diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 8f0b0bb1c881..82e6e646e552 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -509,6 +509,10 @@ def apply_random_seed_per_iteration(): _DEBUG_LEVEL_ENV_VAR = "DEBUG_LEVEL" stress_cmd = "./db_stress" +_REMOTE_COMPACTION_OUTPUT_PREFIX = "tmp_output_" +_ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR = ".abandoned_remote_compaction_outputs" +_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX = "run_" +_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUNS_TO_KEEP = 3 def is_release_mode(): @@ -2052,6 +2056,62 @@ def print_and_cleanup_fault_injection_log(pid): pass +def remove_path_ignore_errors(path): + try: + if os.path.isdir(path): + shutil.rmtree(path, True) + else: + os.remove(path) + except OSError: + pass + + +def cleanup_stale_remote_compaction_outputs(dbname): + if is_remote_db or dbname is None or dbname == "" or not os.path.isdir(dbname): + return + + # Preserve a small amount of abandoned remote compaction state for + # postmortem triage without letting long crash-test runs fill the DB path. + stale_entries = [ + entry + for entry in os.listdir(dbname) + if entry.startswith(_REMOTE_COMPACTION_OUTPUT_PREFIX) + ] + if stale_entries: + archive_root = os.path.join(dbname, _ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR) + archive_run = os.path.join( + archive_root, + f"{_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX}{time.time_ns():020d}", + ) + try: + os.makedirs(archive_run, exist_ok=False) + except OSError: + archive_run = None + + for entry in stale_entries: + path = os.path.join(dbname, entry) + if archive_run is not None: + try: + os.replace(path, os.path.join(archive_run, entry)) + continue + except OSError: + pass + + remove_path_ignore_errors(path) + + archive_root = os.path.join(dbname, _ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR) + if not os.path.isdir(archive_root): + return + + archived_runs = sorted( + entry + for entry in os.listdir(archive_root) + if entry.startswith(_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX) + ) + for entry in archived_runs[:-_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUNS_TO_KEEP]: + remove_path_ignore_errors(os.path.join(archive_root, entry)) + + # This script runs and kills db_stress multiple times. It checks consistency # in case of unsafe crashes in RocksDB. def blackbox_crash_main(args, unknown_args): @@ -2090,6 +2150,7 @@ def blackbox_crash_main(args, unknown_args): sys.exit(2) print_run_output_and_exit_on_error(args, finalized_params, outs, errs) + cleanup_stale_remote_compaction_outputs(dbname) time.sleep(1) # time to stabilize before the next run @@ -2097,6 +2158,7 @@ def blackbox_crash_main(args, unknown_args): # We should run the test one more time with VerifyOnly setup and no-timeout # Only do this if the tests are not failed for total-duration + cleanup_stale_remote_compaction_outputs(dbname) print("Running final time for verification") cmd_params.update({"verification_only": 1}) cmd_params.update({"skip_verifydb": 0}) @@ -2291,6 +2353,7 @@ def whitebox_crash_main(args, unknown_args): cmd_params["destroy_db_initially"] = 1 check_mode = (check_mode + 1) % total_check_mode + cleanup_stale_remote_compaction_outputs(dbname) time.sleep(1) # time to stabilize after a kill # If successfully finished or timed out (we currently treat timed out test as passing) diff --git a/tools/db_crashtest_test.py b/tools/db_crashtest_test.py index dfb49db7578c..b86a79fc263f 100644 --- a/tools/db_crashtest_test.py +++ b/tools/db_crashtest_test.py @@ -66,6 +66,27 @@ def build_params(self, base_params, overrides=None): params.update(overrides) return params + def create_remote_compaction_output(self, parent_dir, output_name, marker): + output_dir = os.path.join(parent_dir, output_name) + os.makedirs(output_dir) + with open(os.path.join(output_dir, "orphan.sst"), "w") as f: + f.write(marker) + return output_dir + + def create_archived_remote_compaction_run( + self, db_crashtest, dbname, run_suffix, output_name, marker + ): + archive_root = os.path.join( + dbname, db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR + ) + run_dir = os.path.join( + archive_root, + f"{db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX}" + f"{run_suffix:020d}", + ) + self.create_remote_compaction_output(run_dir, output_name, marker) + return run_dir + def test_setup_expected_values_dir_preserves_existing_contents(self): os.makedirs(self.expected_dir) marker = os.path.join(self.expected_dir, "marker") @@ -122,6 +143,89 @@ def test_finalize_disables_test_batches_snapshots_for_blob_direct_write(self): self.assertEqual(1, finalized["disable_wal"]) self.assertEqual(0, finalized["test_batches_snapshots"]) + def test_cleanup_stale_remote_compaction_outputs_archives_only_tmp_output_dirs( + self, + ): + db_crashtest = self.load_db_crashtest() + dbname = os.path.join(self.test_tmpdir, "rocksdb_crashtest_blackbox") + os.makedirs(dbname) + + stale_dir = self.create_remote_compaction_output( + dbname, "tmp_output_stale", "old remote compaction output" + ) + + backup_dir = os.path.join(dbname, ".backup0") + os.makedirs(backup_dir) + live_sst = os.path.join(dbname, "000123.sst") + with open(live_sst, "w") as f: + f.write("keep") + + db_crashtest.cleanup_stale_remote_compaction_outputs(dbname) + + self.assertFalse(os.path.exists(stale_dir)) + self.assertTrue(os.path.isdir(backup_dir)) + self.assertTrue(os.path.isfile(live_sst)) + archive_root = os.path.join( + dbname, db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR + ) + archived_runs = sorted(os.listdir(archive_root)) + self.assertEqual(1, len(archived_runs)) + self.assertTrue( + os.path.isdir(os.path.join(archive_root, archived_runs[0], "tmp_output_stale")) + ) + + def test_cleanup_stale_remote_compaction_outputs_keeps_last_three_runs(self): + db_crashtest = self.load_db_crashtest() + dbname = os.path.join(self.test_tmpdir, "rocksdb_crashtest_whitebox") + os.makedirs(dbname) + + oldest_run = self.create_archived_remote_compaction_run( + db_crashtest, dbname, 1, "tmp_output_oldest", "oldest" + ) + second_oldest_run = self.create_archived_remote_compaction_run( + db_crashtest, dbname, 2, "tmp_output_old_2", "old_2" + ) + newest_existing_run = self.create_archived_remote_compaction_run( + db_crashtest, dbname, 3, "tmp_output_old_3", "old_3" + ) + current_output = self.create_remote_compaction_output( + dbname, "tmp_output_current", "current" + ) + + db_crashtest.cleanup_stale_remote_compaction_outputs(dbname) + + self.assertFalse(os.path.exists(current_output)) + self.assertFalse(os.path.exists(oldest_run)) + self.assertTrue(os.path.isdir(second_oldest_run)) + self.assertTrue(os.path.isdir(newest_existing_run)) + + archive_root = os.path.join( + dbname, db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR + ) + archived_runs = sorted(os.listdir(archive_root)) + self.assertEqual(3, len(archived_runs)) + self.assertEqual( + f"{db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX}{2:020d}", + archived_runs[0], + ) + self.assertEqual( + f"{db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX}{3:020d}", + archived_runs[1], + ) + self.assertTrue( + os.path.isdir( + os.path.join(archive_root, archived_runs[2], "tmp_output_current") + ) + ) + + def test_cleanup_stale_remote_compaction_outputs_ignores_missing_db_dir(self): + db_crashtest = self.load_db_crashtest() + missing_db = os.path.join(self.test_tmpdir, "missing_db") + + db_crashtest.cleanup_stale_remote_compaction_outputs(missing_db) + + self.assertFalse(os.path.exists(missing_db)) + def test_strip_expected_sigterm_stderr_suppresses_only_known_lines(self): db_crashtest = self.load_db_crashtest() stdout = "Received signal 15 (Terminated)\n"