Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ def apply_random_seed_per_iteration():
_DEBUG_LEVEL_ENV_VAR = "DEBUG_LEVEL"

stress_cmd = "./db_stress"
_REMOTE_COMPACTION_OUTPUT_PREFIX = "tmp_output_"
_ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR = ".abandoned_remote_compaction_outputs"
_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX = "run_"
_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUNS_TO_KEEP = 3


def is_release_mode():
Expand Down Expand Up @@ -2052,6 +2056,62 @@ def print_and_cleanup_fault_injection_log(pid):
pass


def remove_path_ignore_errors(path):
try:
if os.path.isdir(path):
shutil.rmtree(path, True)
else:
os.remove(path)
except OSError:
pass


def cleanup_stale_remote_compaction_outputs(dbname):
if is_remote_db or dbname is None or dbname == "" or not os.path.isdir(dbname):
return

# Preserve a small amount of abandoned remote compaction state for
# postmortem triage without letting long crash-test runs fill the DB path.
stale_entries = [
entry
for entry in os.listdir(dbname)
if entry.startswith(_REMOTE_COMPACTION_OUTPUT_PREFIX)
]
if stale_entries:
archive_root = os.path.join(dbname, _ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR)
archive_run = os.path.join(
archive_root,
f"{_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX}{time.time_ns():020d}",
)
try:
os.makedirs(archive_run, exist_ok=False)
except OSError:
archive_run = None

for entry in stale_entries:
path = os.path.join(dbname, entry)
if archive_run is not None:
try:
os.replace(path, os.path.join(archive_run, entry))
continue
except OSError:
pass

remove_path_ignore_errors(path)

archive_root = os.path.join(dbname, _ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR)
if not os.path.isdir(archive_root):
return

archived_runs = sorted(
entry
for entry in os.listdir(archive_root)
if entry.startswith(_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX)
)
for entry in archived_runs[:-_ABANDONED_REMOTE_COMPACTION_OUTPUT_RUNS_TO_KEEP]:
remove_path_ignore_errors(os.path.join(archive_root, entry))


# This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in RocksDB.
def blackbox_crash_main(args, unknown_args):
Expand Down Expand Up @@ -2090,13 +2150,15 @@ def blackbox_crash_main(args, unknown_args):
sys.exit(2)

print_run_output_and_exit_on_error(args, finalized_params, outs, errs)
cleanup_stale_remote_compaction_outputs(dbname)

time.sleep(1) # time to stabilize before the next run

time.sleep(1) # time to stabilize before the next run

# We should run the test one more time with VerifyOnly setup and no-timeout
# Only do this if the tests are not failed for total-duration
cleanup_stale_remote_compaction_outputs(dbname)
print("Running final time for verification")
cmd_params.update({"verification_only": 1})
cmd_params.update({"skip_verifydb": 0})
Expand Down Expand Up @@ -2291,6 +2353,7 @@ def whitebox_crash_main(args, unknown_args):
cmd_params["destroy_db_initially"] = 1
check_mode = (check_mode + 1) % total_check_mode

cleanup_stale_remote_compaction_outputs(dbname)
time.sleep(1) # time to stabilize after a kill

# If successfully finished or timed out (we currently treat timed out test as passing)
Expand Down
104 changes: 104 additions & 0 deletions tools/db_crashtest_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,27 @@ def build_params(self, base_params, overrides=None):
params.update(overrides)
return params

def create_remote_compaction_output(self, parent_dir, output_name, marker):
output_dir = os.path.join(parent_dir, output_name)
os.makedirs(output_dir)
with open(os.path.join(output_dir, "orphan.sst"), "w") as f:
f.write(marker)
return output_dir

def create_archived_remote_compaction_run(
self, db_crashtest, dbname, run_suffix, output_name, marker
):
archive_root = os.path.join(
dbname, db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR
)
run_dir = os.path.join(
archive_root,
f"{db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX}"
f"{run_suffix:020d}",
)
self.create_remote_compaction_output(run_dir, output_name, marker)
return run_dir

def test_setup_expected_values_dir_preserves_existing_contents(self):
os.makedirs(self.expected_dir)
marker = os.path.join(self.expected_dir, "marker")
Expand Down Expand Up @@ -122,6 +143,89 @@ def test_finalize_disables_test_batches_snapshots_for_blob_direct_write(self):
self.assertEqual(1, finalized["disable_wal"])
self.assertEqual(0, finalized["test_batches_snapshots"])

def test_cleanup_stale_remote_compaction_outputs_archives_only_tmp_output_dirs(
self,
):
db_crashtest = self.load_db_crashtest()
dbname = os.path.join(self.test_tmpdir, "rocksdb_crashtest_blackbox")
os.makedirs(dbname)

stale_dir = self.create_remote_compaction_output(
dbname, "tmp_output_stale", "old remote compaction output"
)

backup_dir = os.path.join(dbname, ".backup0")
os.makedirs(backup_dir)
live_sst = os.path.join(dbname, "000123.sst")
with open(live_sst, "w") as f:
f.write("keep")

db_crashtest.cleanup_stale_remote_compaction_outputs(dbname)

self.assertFalse(os.path.exists(stale_dir))
self.assertTrue(os.path.isdir(backup_dir))
self.assertTrue(os.path.isfile(live_sst))
archive_root = os.path.join(
dbname, db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR
)
archived_runs = sorted(os.listdir(archive_root))
self.assertEqual(1, len(archived_runs))
self.assertTrue(
os.path.isdir(os.path.join(archive_root, archived_runs[0], "tmp_output_stale"))
)

def test_cleanup_stale_remote_compaction_outputs_keeps_last_three_runs(self):
db_crashtest = self.load_db_crashtest()
dbname = os.path.join(self.test_tmpdir, "rocksdb_crashtest_whitebox")
os.makedirs(dbname)

oldest_run = self.create_archived_remote_compaction_run(
db_crashtest, dbname, 1, "tmp_output_oldest", "oldest"
)
second_oldest_run = self.create_archived_remote_compaction_run(
db_crashtest, dbname, 2, "tmp_output_old_2", "old_2"
)
newest_existing_run = self.create_archived_remote_compaction_run(
db_crashtest, dbname, 3, "tmp_output_old_3", "old_3"
)
current_output = self.create_remote_compaction_output(
dbname, "tmp_output_current", "current"
)

db_crashtest.cleanup_stale_remote_compaction_outputs(dbname)

self.assertFalse(os.path.exists(current_output))
self.assertFalse(os.path.exists(oldest_run))
self.assertTrue(os.path.isdir(second_oldest_run))
self.assertTrue(os.path.isdir(newest_existing_run))

archive_root = os.path.join(
dbname, db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUTS_DIR
)
archived_runs = sorted(os.listdir(archive_root))
self.assertEqual(3, len(archived_runs))
self.assertEqual(
f"{db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX}{2:020d}",
archived_runs[0],
)
self.assertEqual(
f"{db_crashtest._ABANDONED_REMOTE_COMPACTION_OUTPUT_RUN_PREFIX}{3:020d}",
archived_runs[1],
)
self.assertTrue(
os.path.isdir(
os.path.join(archive_root, archived_runs[2], "tmp_output_current")
)
)

def test_cleanup_stale_remote_compaction_outputs_ignores_missing_db_dir(self):
db_crashtest = self.load_db_crashtest()
missing_db = os.path.join(self.test_tmpdir, "missing_db")

db_crashtest.cleanup_stale_remote_compaction_outputs(missing_db)

self.assertFalse(os.path.exists(missing_db))

def test_strip_expected_sigterm_stderr_suppresses_only_known_lines(self):
db_crashtest = self.load_db_crashtest()
stdout = "Received signal 15 (Terminated)\n"
Expand Down
Loading