Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/v/cloud_topics/level_zero/gc/level_zero_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,20 @@ l0::gc::epoch_source::max_gc_eligible_epoch(seastar::abort_source* as) {
co_return std::unexpected(partitions.error());
}
if (partitions.value().partitions.empty()) {
co_return std::nullopt;
// No cloud topic partitions currently exist, so no partition can
// hold back the collectible epoch. Use the snapshot revision as
// the watermark so that we can still collect stranded L0 objects
// from previously deleted topics.
auto result = partitions.value().snap_revision;
vlog(
cd_log.debug,
"Empty partition snapshot, max GC eligible epoch is snapshot "
"epoch {}",
result);
if (probe_) {
probe_->set_min_partition_gc_epoch(result);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is inherently contradictory, but it's an interesting point.

}
co_return result;
Comment thread
oleiman marked this conversation as resolved.
Comment thread
oleiman marked this conversation as resolved.
Comment thread
oleiman marked this conversation as resolved.
}

/*
Expand Down
33 changes: 28 additions & 5 deletions src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,20 @@ class LevelZeroGCMaxEpochTest : public testing::Test {
};

TEST_F(LevelZeroGCMaxEpochTest, EmptySnapshot) {
// no error
// An empty snapshot means no cloud topic partitions currently exist.
// The watermark falls through to `snap_revision`: this is the
// zero-iteration case of the min-reduce. Stranded L0 objects left over
// from previously deleted topics remain epoch-eligible.
snapshot().snap_revision = cloud_topics::cluster_epoch(42);
ASSERT_TRUE(max_gc().has_value());
// no result
ASSERT_FALSE(max_gc().value().has_value());
ASSERT_TRUE(max_gc().value().has_value());
ASSERT_EQ(max_gc().value().value(), cloud_topics::cluster_epoch(42));
}

namespace {
model::topic_namespace tpns0(model::ns("ns0"), model::topic("t0"));
}
model::topic_namespace tpns1(model::ns("ns1"), model::topic("t1"));
} // namespace

TEST_F(LevelZeroGCMaxEpochTest, EmptyGcEpochReport) {
// here we have a non-empty snapshot, but no reported epochs from individual
Expand Down Expand Up @@ -572,10 +577,28 @@ TEST_F(LevelZeroGCMaxEpochTest, MinReduce) {
}

TEST_F(LevelZeroGCMaxEpochTest, EmptySnapshotNonEmptyReport) {
// With an empty snapshot, no partition can hold the watermark back:
// the topic table is the source of truth for liveness, so report
// entries without a matching snapshot entry are ignored regardless
// of value. This handles two races between the health report and
// snapshot collection points:
// (1) topic created after the snapshot - reported epoch is
// > snap_revision, so its L0 data is out of range for this
// pass anyway.
// (2) topic deleted before the snapshot - reported epoch is
// <= snap_revision; ignoring it prevents regressing the
// watermark below the snapshot and preserving the objects
// this pass is meant to reap.
snapshot().snap_revision = cloud_topics::cluster_epoch(100);
// case (1): report entry from a topic created after the snapshot
get_partitions_max_gc_epoch_value[tpns0][model::partition_id(0)]
= cloud_topics::cluster_epoch(200);
// case (2): stale report entry from a topic deleted before the snapshot
get_partitions_max_gc_epoch_value[tpns1][model::partition_id(0)]
= cloud_topics::cluster_epoch(50);
ASSERT_TRUE(max_gc().has_value());
ASSERT_FALSE(max_gc().value().has_value());
ASSERT_TRUE(max_gc().value().has_value());
ASSERT_EQ(max_gc().value().value(), cloud_topics::cluster_epoch(100));
}

class LevelZeroGCScaleOutTest : public LevelZeroGCTest {
Expand Down
81 changes: 81 additions & 0 deletions tests/rptest/tests/cloud_topics/l0_gc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,3 +1195,84 @@ def _blocked_rounds_stable():
backoff_sec=2,
retry_on_exc=True,
)


class CloudTopicsL0GCAllTopicsDeletedTest(CloudTopicsL0GCAdminBase):
"""
Integration: when every cloud topic is deleted while L0 objects still
exist in the bucket, GC must still clean them up.

Regression test for: `l0::gc::epoch_source::max_gc_eligible_epoch`
returned `std::nullopt` when the partition snapshot was empty (no
cloud topic partitions existed). `try_to_collect` then listed L0
objects but refused to delete any of them, returning
`no_collectible_epoch`. The worker loop backed off and retried
forever without making progress, so objects remained in the bucket
indefinitely.
"""

L0_PREFIX = "level_zero/data/"

def _count_l0_objects(self) -> int:
objects = self.redpanda.get_objects_from_si()
keys = [o.key for o in objects if o.key.startswith(self.L0_PREFIX)]
self.logger.debug(f"Found {len(keys)} L0 objects")
return len(keys)
Comment thread
oleiman marked this conversation as resolved.

@cluster(num_nodes=4)
@matrix(
cloud_storage_type=get_cloud_storage_type(applies_only_on=[CloudStorageType.S3])
)
def test_gc_completes_after_all_topics_deleted(
self, cloud_storage_type: CloudStorageType
):
topic = TopicSpec(partition_count=2, replication_factor=3)
self.topics = [topic]
self.create_topics(self.topics)

# Pause GC cluster-wide so L0 objects accumulate in the bucket
# while we produce.
self.gc_pause()
self.wait_for_status(status=GcStatus.L0_GC_STATUS_PAUSED)

# Produce long enough that reconciliation advances the epoch and
# L0 objects exist to be collected.
self.produce_some(topics=[topic.name])
Comment thread
oleiman marked this conversation as resolved.

wait_until(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be direct assert, right? If produce succeeded then the objects must be already there.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah fair. get_objects_from_si should make a direct call to the object store. test isn't trying to make any claims about the produce path though, so I'm inclined to use wait_until rather than assume this synchronous chain between produce -> upload -> s3 <- LIST, even if it's technically expected.

lambda: self._count_l0_objects() > 0,
timeout_sec=60,
backoff_sec=2,
err_msg="Expected L0 objects to exist in the bucket after produce",
)
l0_before = self._count_l0_objects()
self.logger.info(f"Accumulated {l0_before} L0 objects with GC paused")

# Delete the only cloud topic and wait for the deletion to be
# applied cluster-wide.
rpk = RpkTool(self.redpanda)
self.logger.info(f"Deleting topic {topic.name}")
rpk.delete_topic(topic.name)
wait_until(
lambda: topic.name not in rpk.list_topics(),
timeout_sec=30,
backoff_sec=1,
err_msg=f"Topic {topic.name} still visible after delete",
)

# Resume GC and allow plenty of time for multiple GC rounds to finish.
# NOTE: Grace period is 10s.
# (see cloud_topics_short_term_gc_minimum_object_age in the base class).
self.gc_start()
Comment thread
oleiman marked this conversation as resolved.
self.wait_all_running()

wait_until(
lambda: self._count_l0_objects() == 0,
timeout_sec=120,
backoff_sec=5,
err_msg=lambda: (
f"L0 objects not cleaned up after deleting all cloud topics: "
f"{self._count_l0_objects()} remain (started with {l0_before})"
),
)
self.logger.info("All L0 objects cleaned up after topic deletion")
Comment thread
oleiman marked this conversation as resolved.
Loading