diff --git a/src/v/cloud_topics/level_zero/gc/level_zero_gc.cc b/src/v/cloud_topics/level_zero/gc/level_zero_gc.cc index 8390959fb646f..c7ca07329d776 100644 --- a/src/v/cloud_topics/level_zero/gc/level_zero_gc.cc +++ b/src/v/cloud_topics/level_zero/gc/level_zero_gc.cc @@ -355,7 +355,20 @@ l0::gc::epoch_source::max_gc_eligible_epoch(seastar::abort_source* as) { co_return std::unexpected(partitions.error()); } if (partitions.value().partitions.empty()) { - co_return std::nullopt; + // No cloud topic partitions currently exist, so no partition can + // hold back the collectible epoch. Use the snapshot revision as + // the watermark so that we can still collect stranded L0 objects + // from previously deleted topics. + auto result = partitions.value().snap_revision; + vlog( + cd_log.debug, + "Empty partition snapshot, max GC eligible epoch is snapshot " + "epoch {}", + result); + if (probe_) { + probe_->set_min_partition_gc_epoch(result); + } + co_return result; } /* diff --git a/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc b/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc index 85a92def2c0f3..da08a51b79102 100644 --- a/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc +++ b/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc @@ -527,15 +527,20 @@ class LevelZeroGCMaxEpochTest : public testing::Test { }; TEST_F(LevelZeroGCMaxEpochTest, EmptySnapshot) { - // no error + // An empty snapshot means no cloud topic partitions currently exist. + // The watermark falls through to `snap_revision`: this is the + // zero-iteration case of the min-reduce. Stranded L0 objects left over + // from previously deleted topics remain epoch-eligible. + snapshot().snap_revision = cloud_topics::cluster_epoch(42); ASSERT_TRUE(max_gc().has_value()); - // no result - ASSERT_FALSE(max_gc().value().has_value()); + ASSERT_TRUE(max_gc().value().has_value()); + ASSERT_EQ(max_gc().value().value(), cloud_topics::cluster_epoch(42)); } namespace { model::topic_namespace tpns0(model::ns("ns0"), model::topic("t0")); -} +model::topic_namespace tpns1(model::ns("ns1"), model::topic("t1")); +} // namespace TEST_F(LevelZeroGCMaxEpochTest, EmptyGcEpochReport) { // here we have a non-empty snapshot, but no reported epochs from individual @@ -572,10 +577,28 @@ TEST_F(LevelZeroGCMaxEpochTest, MinReduce) { } TEST_F(LevelZeroGCMaxEpochTest, EmptySnapshotNonEmptyReport) { + // With an empty snapshot, no partition can hold the watermark back: + // the topic table is the source of truth for liveness, so report + // entries without a matching snapshot entry are ignored regardless + // of value. This handles two races between the health report and + // snapshot collection points: + // (1) topic created after the snapshot - reported epoch is + // > snap_revision, so its L0 data is out of range for this + // pass anyway. + // (2) topic deleted before the snapshot - reported epoch is + // <= snap_revision; ignoring it prevents regressing the + // watermark below the snapshot and preserving the objects + // this pass is meant to reap. + snapshot().snap_revision = cloud_topics::cluster_epoch(100); + // case (1): report entry from a topic created after the snapshot get_partitions_max_gc_epoch_value[tpns0][model::partition_id(0)] = cloud_topics::cluster_epoch(200); + // case (2): stale report entry from a topic deleted before the snapshot + get_partitions_max_gc_epoch_value[tpns1][model::partition_id(0)] + = cloud_topics::cluster_epoch(50); ASSERT_TRUE(max_gc().has_value()); - ASSERT_FALSE(max_gc().value().has_value()); + ASSERT_TRUE(max_gc().value().has_value()); + ASSERT_EQ(max_gc().value().value(), cloud_topics::cluster_epoch(100)); } class LevelZeroGCScaleOutTest : public LevelZeroGCTest { diff --git a/tests/rptest/tests/cloud_topics/l0_gc_test.py b/tests/rptest/tests/cloud_topics/l0_gc_test.py index 89d48f9a5475f..3ea170b11c89a 100644 --- a/tests/rptest/tests/cloud_topics/l0_gc_test.py +++ b/tests/rptest/tests/cloud_topics/l0_gc_test.py @@ -1195,3 +1195,84 @@ def _blocked_rounds_stable(): backoff_sec=2, retry_on_exc=True, ) + + +class CloudTopicsL0GCAllTopicsDeletedTest(CloudTopicsL0GCAdminBase): + """ + Integration: when every cloud topic is deleted while L0 objects still + exist in the bucket, GC must still clean them up. + + Regression test for: `l0::gc::epoch_source::max_gc_eligible_epoch` + returned `std::nullopt` when the partition snapshot was empty (no + cloud topic partitions existed). `try_to_collect` then listed L0 + objects but refused to delete any of them, returning + `no_collectible_epoch`. The worker loop backed off and retried + forever without making progress, so objects remained in the bucket + indefinitely. + """ + + L0_PREFIX = "level_zero/data/" + + def _count_l0_objects(self) -> int: + objects = self.redpanda.get_objects_from_si() + keys = [o.key for o in objects if o.key.startswith(self.L0_PREFIX)] + self.logger.debug(f"Found {len(keys)} L0 objects") + return len(keys) + + @cluster(num_nodes=4) + @matrix( + cloud_storage_type=get_cloud_storage_type(applies_only_on=[CloudStorageType.S3]) + ) + def test_gc_completes_after_all_topics_deleted( + self, cloud_storage_type: CloudStorageType + ): + topic = TopicSpec(partition_count=2, replication_factor=3) + self.topics = [topic] + self.create_topics(self.topics) + + # Pause GC cluster-wide so L0 objects accumulate in the bucket + # while we produce. + self.gc_pause() + self.wait_for_status(status=GcStatus.L0_GC_STATUS_PAUSED) + + # Produce long enough that reconciliation advances the epoch and + # L0 objects exist to be collected. + self.produce_some(topics=[topic.name]) + + wait_until( + lambda: self._count_l0_objects() > 0, + timeout_sec=60, + backoff_sec=2, + err_msg="Expected L0 objects to exist in the bucket after produce", + ) + l0_before = self._count_l0_objects() + self.logger.info(f"Accumulated {l0_before} L0 objects with GC paused") + + # Delete the only cloud topic and wait for the deletion to be + # applied cluster-wide. + rpk = RpkTool(self.redpanda) + self.logger.info(f"Deleting topic {topic.name}") + rpk.delete_topic(topic.name) + wait_until( + lambda: topic.name not in rpk.list_topics(), + timeout_sec=30, + backoff_sec=1, + err_msg=f"Topic {topic.name} still visible after delete", + ) + + # Resume GC and allow plenty of time for multiple GC rounds to finish. + # NOTE: Grace period is 10s. + # (see cloud_topics_short_term_gc_minimum_object_age in the base class). + self.gc_start() + self.wait_all_running() + + wait_until( + lambda: self._count_l0_objects() == 0, + timeout_sec=120, + backoff_sec=5, + err_msg=lambda: ( + f"L0 objects not cleaned up after deleting all cloud topics: " + f"{self._count_l0_objects()} remain (started with {l0_before})" + ), + ) + self.logger.info("All L0 objects cleaned up after topic deletion")