Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -103,6 +104,14 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
MetadataRequest metadataRequest =
ClientRpcMessageUtils.makeMetadataRequest(
tablePaths, tablePartitions, tablePartitionIds);
// Collect the table paths for which partition metadata is being refreshed, so that stale
// partition entries can be removed during partial updates.
final Set<TablePath> refreshedPartitionTables = new HashSet<>();
if (tablePartitions != null) {
for (PhysicalTablePath ptp : tablePartitions) {
refreshedPartitionTables.add(ptp.getTablePath());
}
}
return gateway.metadata(metadataRequest)
.thenApply(
response -> {
Expand Down Expand Up @@ -135,6 +144,28 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
newPartitionIdByPath =
new HashMap<>(originCluster.getPartitionIdByPath());

// Remove stale partition entries for tables whose partition
// metadata was refreshed. The response only contains currently
// existing partitions, so any entry not in the response is stale
// (e.g., dropped partitions).
if (!refreshedPartitionTables.isEmpty()) {
newPartitionIdByPath
.keySet()
.removeIf(
path ->
refreshedPartitionTables.contains(
path.getTablePath()));
newBucketLocations
.keySet()
.removeIf(
path ->
path.getPartitionName() != null
&& refreshedPartitionTables
.contains(
path
.getTablePath()));
}

newTablePathToTableId.putAll(newTableMetadata.tablePathToTableId);
newBucketLocations.putAll(newTableMetadata.bucketLocations);
newPartitionIdByPath.putAll(newTableMetadata.partitionIdByPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ public final class RecordAccumulator {
private final ConcurrentMap<PhysicalTablePath, BucketAndWriteBatches> writeBatches =
new CopyOnWriteMap<>();

/**
* Paths that have been marked stale (e.g. the partition was dropped). New appends to these
* paths are rejected. The Sender thread removes an entry from {@link #writeBatches} once all
* its deques are drained.
*
* <p>All accesses must be guarded by {@link #staleLock}.
*/
private final Set<PhysicalTablePath> stalePaths = new HashSet<>();

/** Guards {@link #stalePaths} and the remove-if-empty logic in {@link #writeBatches}. */
private final Object staleLock = new Object();

private final IncompleteBatches incomplete;

private final Map<Integer, Integer> nodesDrainIndex;
Expand Down Expand Up @@ -185,6 +197,17 @@ public RecordAppendResult append(
// The metadata may return null for the partition id, but it is fine to pass null here,
// because we will fill the partitionId in bucketReady() before send the batch.
Optional<Long> partitionIdOpt = cluster.getPartitionId(physicalTablePath);

// Reject appends to paths that the Sender has marked as stale (e.g. dropped partitions).
// This check must happen before computeIfAbsent so that a concurrent markPathsAsStale +
// removeStalePathIfEmpty cannot race with a new append re-inserting the same path.
synchronized (staleLock) {
if (stalePaths.contains(physicalTablePath)) {
throw new IllegalStateException(
"Cannot append to a stale (dropped) partition: " + physicalTablePath);
}
}

BucketAndWriteBatches bucketAndWriteBatches =
writeBatches.computeIfAbsent(
physicalTablePath,
Expand Down Expand Up @@ -428,6 +451,50 @@ public Set<PhysicalTablePath> getPhysicalTablePathsInBatches() {
return writeBatches.keySet();
}

/**
* Mark the given physical table paths as stale. Subsequent {@link #append} calls for these
* paths will be rejected. The Sender should call {@link #removeStalePathIfEmpty} once a path's
* deques have been fully drained.
*
* @param paths the paths to mark as stale (e.g. dropped partitions)
*/
public void markPathsAsStale(Set<PhysicalTablePath> paths) {
synchronized (staleLock) {
stalePaths.addAll(paths);
}
}

/**
* Remove a stale path from {@link #writeBatches} if and only if all its deques are empty.
*
* <p>This is safe to call from the Sender thread. The {@link #staleLock} prevents a concurrent
* {@link #append} from sneaking in between the emptiness check and the remove.
*
* @param path the stale path to try to remove
* @return {@code true} if the path was removed, {@code false} if it still had pending data
*/
public boolean removeStalePathIfEmpty(PhysicalTablePath path) {
synchronized (staleLock) {
BucketAndWriteBatches entry = writeBatches.get(path);
if (entry == null) {
stalePaths.remove(path);
return true;
}
for (Deque<WriteBatch> deque : entry.batches.values()) {
synchronized (deque) {
if (!deque.isEmpty()) {
return false;
}
}
}
// All deques are empty under staleLock: no concurrent append can insert new data
// because append() checks stalePaths while holding staleLock before computeIfAbsent.
writeBatches.remove(path);
stalePaths.remove(path);
return true;
}
}

private List<MemorySegment> allocateMemorySegments(
WriteRecord writeRecord, PhysicalTablePath physicalTablePath) throws IOException {
int pagesPerBatch =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ private void sendWriteData() throws Exception {
readyCheckResult.unknownLeaderTables);
}

// Clean up stale physical table path entries from the accumulator on every cycle.
// Dropped partitions will no longer appear in the cluster's bucket locations; any
// writeBatches entry whose deques are all empty can be safely removed.
cleanupStaleWriteBatches(metadataUpdater.getCluster());

Set<Integer> readyNodes = readyCheckResult.readyNodes;
if (readyNodes.isEmpty()) {
// TODO The method sendWriteData is in a busy loop. If there is no data continuously, it
Expand All @@ -256,6 +261,37 @@ private void sendWriteData() throws Exception {
}
}

/**
* Mark physical table paths that no longer appear in the cluster as stale, then try to remove
* any stale path whose deques have been fully drained.
*
* <p>Marking prevents new appends to dropped partitions. Removal is deferred until all
* in-flight batches for a path have been drained, so no data is silently lost.
*/
private void cleanupStaleWriteBatches(Cluster cluster) {
Set<PhysicalTablePath> clusterPaths = cluster.getBucketLocationsByPath().keySet();
Set<PhysicalTablePath> newStalePaths = new HashSet<>();
for (PhysicalTablePath path : accumulator.getPhysicalTablePathsInBatches()) {
if (!clusterPaths.contains(path)) {
newStalePaths.add(path);
}
}
if (!newStalePaths.isEmpty()) {
LOG.debug(
"Marking {} stale physical table path(s) from write batches: {}",
newStalePaths.size(),
newStalePaths);
accumulator.markPathsAsStale(newStalePaths);
}

// Try to remove every path that has been marked stale and is now fully drained.
for (PhysicalTablePath path : accumulator.getPhysicalTablePathsInBatches()) {
if (!clusterPaths.contains(path)) {
accumulator.removeStalePathIfEmpty(path);
}
}
}

private void completeBatch(ReadyWriteBatch readyWriteBatch) {
if (idempotenceManager.idempotenceEnabled()) {
idempotenceManager.handleCompletedBatch(readyWriteBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,34 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
Collections.emptyMap());
}

/**
* Create a cluster that contains all the given bucket locations for DATA1 table plus an extra
* physical table path with its single bucket. Used to test stale-path cleanup.
*/
private Cluster updateClusterWithExtra(
List<BucketLocation> bucketLocations,
PhysicalTablePath extraPath,
BucketLocation extraBucket) {
Map<Integer, ServerNode> aliveTabletServersById = new HashMap<>();
aliveTabletServersById.put(node1.id(), node1);
aliveTabletServersById.put(node2.id(), node2);
aliveTabletServersById.put(node3.id(), node3);

Map<PhysicalTablePath, List<BucketLocation>> bucketsByPath = new HashMap<>();
bucketsByPath.put(DATA1_PHYSICAL_TABLE_PATH, bucketLocations);
bucketsByPath.put(extraPath, Collections.singletonList(extraBucket));

Map<TablePath, Long> tableIdByPath = new HashMap<>();
tableIdByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_ID);
tableIdByPath.put(extraPath.getTablePath(), DATA1_TABLE_ID);
return new Cluster(
aliveTabletServersById,
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
bucketsByPath,
tableIdByPath,
Collections.emptyMap());
}

private void delayedInterrupt(final Thread thread, final long delayMs) {
Thread t =
new Thread(
Expand Down Expand Up @@ -602,6 +630,87 @@ private void verifyTableBucketInBatches(
assertThat(tableBucketsInBatch).containsExactlyInAnyOrder(tb);
}

@Test
void testMarkAndRemoveStalePathAfterDrain() throws Exception {
IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
// batchTimeoutMs=0 so batches are immediately ready and drain() polls them out.
RecordAccumulator accum = createTestRecordAccumulator(0, 4 * 1024, 256, 64 * 1024);

// Create a stale path representing a dropped partition.
PhysicalTablePath stalePath =
PhysicalTablePath.of(TablePath.of("test_db", "test_table"), "stale_partition");
BucketLocation staleBucket =
new BucketLocation(stalePath, DATA1_TABLE_ID, 0, node1.id(), serverNodes);
Cluster clusterWithStale =
updateClusterWithExtra(Arrays.asList(bucket1, bucket2), stalePath, staleBucket);

accum.append(createRecord(row), writeCallback, cluster, 0, false);
accum.append(
WriteRecord.forIndexedAppend(
DATA1_TABLE_INFO,
stalePath,
indexedRow(DATA1_ROW_TYPE, new Object[] {2, "b"}),
null),
writeCallback,
clusterWithStale,
0,
false);

assertThat(accum.getPhysicalTablePathsInBatches()).contains(DATA1_PHYSICAL_TABLE_PATH);
assertThat(accum.getPhysicalTablePathsInBatches()).contains(stalePath);

// Mark stale: subsequent appends to stalePath must be rejected.
accum.markPathsAsStale(Collections.singleton(stalePath));
assertThatThrownBy(
() ->
accum.append(
WriteRecord.forIndexedAppend(
DATA1_TABLE_INFO,
stalePath,
indexedRow(DATA1_ROW_TYPE, new Object[] {3, "c"}),
null),
writeCallback,
clusterWithStale,
0,
false))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("stale");

// Drain and deallocate — deque becomes empty.
Map<Integer, List<ReadyWriteBatch>> drained =
accum.drain(clusterWithStale, Collections.singleton(node1.id()), Integer.MAX_VALUE);
for (List<ReadyWriteBatch> batches : drained.values()) {
for (ReadyWriteBatch b : batches) {
accum.deallocate(b.writeBatch());
}
}

// Now the deque is empty: removeStalePathIfEmpty must succeed.
boolean removed = accum.removeStalePathIfEmpty(stalePath);
assertThat(removed).isTrue();
assertThat(accum.getPhysicalTablePathsInBatches()).doesNotContain(stalePath);
// The live path still has un-drained data and must not be removed.
assertThat(accum.getPhysicalTablePathsInBatches()).contains(DATA1_PHYSICAL_TABLE_PATH);
}

@Test
void testRemoveStalePathReturnsFalseWhenDequeNonEmpty() throws Exception {
IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
RecordAccumulator accum = createTestRecordAccumulator(4 * 1024, 64 * 1024);

// Append a record but do NOT drain — deque is non-empty.
accum.append(createRecord(row), writeCallback, cluster, 0, false);
assertThat(accum.getPhysicalTablePathsInBatches()).contains(DATA1_PHYSICAL_TABLE_PATH);

// Mark stale and attempt removal while there is still pending data.
accum.markPathsAsStale(Collections.singleton(DATA1_PHYSICAL_TABLE_PATH));
boolean removed = accum.removeStalePathIfEmpty(DATA1_PHYSICAL_TABLE_PATH);

assertThat(removed).isFalse();
// Path must still be present because data is pending.
assertThat(accum.getPhysicalTablePathsInBatches()).contains(DATA1_PHYSICAL_TABLE_PATH);
}

@Test
void testDrainContinuesWhenBucketAtMaxInflight() throws Exception {
int batchSize = 1024;
Expand Down
Loading