Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,9 @@ public void run() {
FileUtils.deleteDirectoryQuietly(tabletDir);

// Also delete corresponding KV tablet directory if it exists
Tuple2<PhysicalTablePath, TableBucket> pathAndBucket =
FlussPaths.parseTabletDir(tabletDir);
try {
Tuple2<PhysicalTablePath, TableBucket> pathAndBucket =
FlussPaths.parseTabletDir(tabletDir);
File kvTabletDir =
FlussPaths.kvTabletDir(
dataDir, pathAndBucket.f0, pathAndBucket.f1);
Expand All @@ -561,6 +561,34 @@ public void run() {
tabletDir,
kvDeleteException.getMessage());
}

// delete empty parent directories. For partitioned tables the
// parent is the partition dir and the grandparent is
// the table dir — both should be removed when empty.
// For non-partitioned tables the parent is the table
// dir (safe to remove) but the grandparent is the
// database dir — must NOT remove. Safe under parallel
// execution: the last job to finish finds the dir
// empty and removes it; deleteDirectoryQuietly
// tolerates races.
try {
boolean isPartitioned = pathAndBucket.f0.getPartitionName() != null;
File parentDir = tabletDir.getParentFile();
if (parentDir != null && FileUtils.isDirectoryEmpty(parentDir)) {
FileUtils.deleteDirectoryQuietly(parentDir);
if (isPartitioned) {
File tableDir = parentDir.getParentFile();
if (tableDir != null && FileUtils.isDirectoryEmpty(tableDir)) {
FileUtils.deleteDirectoryQuietly(tableDir);
}
}
}
} catch (Exception cleanupException) {
LOG.warn(
"Failed to clean up residual directories for {}: {}",
tabletDir,
cleanupException.getMessage());
}
return;
}
throw new FlussRuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.fluss.exception.InvalidColumnProjectionException;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidRequiredAcksException;
import org.apache.fluss.exception.KvStorageException;
import org.apache.fluss.exception.LogOffsetOutOfRangeException;
import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.exception.NotLeaderOrFollowerException;
Expand Down Expand Up @@ -978,7 +979,20 @@ public void stopReplicas(
TableBucket tb = data.getTableBucket();
HostedReplica hostedReplica = getReplica(tb);
if (hostedReplica instanceof NoneReplica) {
// do nothing fort this case.
if (data.isDeleteLocal()) {
try {
sweepOrphanTabletDirs(tb, deletedTableIds, deletedPartitionIds);
} catch (Exception e) {
LOG.error(
"Failed to sweep orphan tablet directories for {}",
tb,
e);
result.add(
new StopReplicaResultForBucket(
tb, ApiError.fromThrowable(e)));
continue;
}
}
result.add(new StopReplicaResultForBucket(tb));
} else if (hostedReplica instanceof OfflineReplica) {
LOG.warn(
Expand Down Expand Up @@ -1927,6 +1941,57 @@ private StopReplicaResultForBucket stopReplica(
return new StopReplicaResultForBucket(tb);
}

/**
* Remove on-disk tablet directories for a bucket that the in-memory ReplicaManager does not
* know about. This handles the case where a stopReplica(delete=true) arrives after the
* TabletServer was restarted during a delete — LogManager loaded the log at startup but no
* NotifyLeaderAndIsr ever ran, so allReplicas is empty.
*/
private void sweepOrphanTabletDirs(
TableBucket tb, Map<Long, Path> deletedTableIds, Map<Long, Path> deletedPartitionIds) {
Optional<LogTablet> orphanLog = logManager.getLog(tb);
if (!orphanLog.isPresent()) {
return;
}

LogTablet logTablet = orphanLog.get();
File dataDir = logTablet.getDataDir();
PhysicalTablePath physicalTablePath = logTablet.getPhysicalTablePath();
Path tabletParentDir = logManager.getTabletParentDir(dataDir, physicalTablePath, tb);

logManager.dropLog(tb);

boolean isKvTable = false;
if (kvManager.getKv(tb).isPresent()) {
kvManager.dropKv(tb);
isKvTable = true;
} else {
File kvTabletDir = FlussPaths.kvTabletDir(dataDir, physicalTablePath, tb);
if (kvTabletDir.exists()) {
isKvTable = true;
try {
FileUtils.deleteDirectory(kvTabletDir);
} catch (IOException e) {
throw new KvStorageException(
String.format(
"Failed to delete orphan KV tablet directory %s", kvTabletDir),
e);
}
}
}

localDiskManager.recordReplicaDelete(dataDir, isKvTable);

if (tb.getPartitionId() != null) {
deletedPartitionIds.put(tb.getPartitionId(), tabletParentDir);
deletedTableIds.put(tb.getTableId(), tabletParentDir.getParent());
} else {
deletedTableIds.put(tb.getTableId(), tabletParentDir);
}

LOG.info("Swept orphan tablet directories for bucket {}", tb);
}

private void truncateToHighWatermark(List<Replica> replicas) {
for (Replica replica : replicas) {
long highWatermark = replica.getLogTablet().getHighWatermark();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,44 @@

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.StopReplicaRequest;
import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.replica.ReplicaManager;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.server.testutils.RpcMessageTestUtils;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.types.DataTypes;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeStopBucketReplica;
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -132,6 +145,146 @@ void testStopReplica(boolean isPkTable) throws Exception {
retryUtilReplicaNotExist(tb, isr2, tableDirs2);
}

@Test
void testDropTableCleansOrphanDirsOnTabletServerRestart() throws Exception {
FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();

TablePath tablePath = TablePath.of("test_db_stop_replica", "test_orphan_table");
long tableId =
RpcMessageTestUtils.createTable(
FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);

List<Integer> isr = waitAndGetIsr(tb);

// Write data so that actual segment files exist on every replica.
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateway =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1);
leaderGateway.produceLog(newProduceLogRequest(tableId, 0, 1, records)).get();

int offlineServerId = isr.get(0);
ReplicaManager replicaManager =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(offlineServerId).getReplicaManager();
Replica replica = replicaManager.getReplicaOrException(tb);
Path offlineTsTableDir = replica.getTabletParentDir();
File offlineTsLogDir = replica.getLogTablet().getLogDir();
assertThat(offlineTsTableDir).exists();
assertThat(offlineTsLogDir).exists();
// Verify actual data files (.log, .index) exist under the log directory.
assertThat(offlineTsLogDir.listFiles()).isNotEmpty();

FLUSS_CLUSTER_EXTENSION.stopTabletServer(offlineServerId);
FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(2);

coordinatorGateway
.dropTable(
RpcMessageTestUtils.newDropTableRequest(
tablePath.getDatabaseName(), tablePath.getTableName(), false))
.get();
assertThat(zkClient.tableExist(tablePath)).isFalse();

FLUSS_CLUSTER_EXTENSION.startTabletServer(offlineServerId);
FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);

// Both the log directory (with data files) and the table parent directory
// should be cleaned at startup via the SchemaNotExistException handler.
retry(
Duration.ofMinutes(1),
() -> {
assertThat(offlineTsLogDir).doesNotExist();
assertThat(offlineTsTableDir).doesNotExist();
});
}

@Test
void testDropPartitionCleansOrphanDirsOnStopReplica() throws Exception {
FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata();

TablePath tablePath = TablePath.of("test_db_stop_replica", "test_orphan_partition");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.build())
.distributedBy(1)
.partitionedBy("b")
.property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3)
.build();
long tableId =
RpcMessageTestUtils.createTable(
FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor);

String partitionName = "p1";
long partitionId =
RpcMessageTestUtils.createPartition(
FLUSS_CLUSTER_EXTENSION,
tablePath,
new PartitionSpec(Collections.singletonMap("b", partitionName)),
false);
TableBucket tb = new TableBucket(tableId, partitionId, 0);
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);

List<Integer> isr = waitAndGetIsr(tb);

// Write data so that actual segment files exist on every replica.
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
TabletServerGateway leaderGateway =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1);
leaderGateway.produceLog(newProduceLogRequest(tableId, 0, 1, records)).get();

int offlineServerId = isr.get(0);
ReplicaManager replicaManager =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(offlineServerId).getReplicaManager();
Replica replica = replicaManager.getReplicaOrException(tb);
Path offlineTsPartitionDir = replica.getTabletParentDir();
File offlineTsLogDir = replica.getLogTablet().getLogDir();
assertThat(offlineTsPartitionDir).exists();
assertThat(offlineTsLogDir).exists();
assertThat(offlineTsLogDir.listFiles()).isNotEmpty();

FLUSS_CLUSTER_EXTENSION.stopTabletServer(offlineServerId);
FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(2);

coordinatorGateway
.dropPartition(
RpcMessageTestUtils.newDropPartitionRequest(
tablePath,
new PartitionSpec(Collections.singletonMap("b", partitionName)),
false))
.get();

FLUSS_CLUSTER_EXTENSION.startTabletServer(offlineServerId);
FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);

// Manually send stopReplica(delete=true) to trigger sweepOrphanTabletDirs.
int coordinatorEpoch =
FLUSS_CLUSTER_EXTENSION
.getCoordinatorServer()
.getCoordinatorEventProcessor()
.getCoordinatorEpoch();
TabletServerGateway tsGateway =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(offlineServerId);
tsGateway
.stopReplica(
new StopReplicaRequest()
.setCoordinatorEpoch(coordinatorEpoch)
.addAllStopReplicasReqs(
Collections.singleton(
makeStopBucketReplica(tb, true, false, 0))))
.get();

// Both the log directory (with data files) and the partition parent
// directory should be cleaned by sweepOrphanTabletDirs.
assertThat(offlineTsLogDir).doesNotExist();
assertThat(offlineTsPartitionDir).doesNotExist();
}

private List<Integer> waitAndGetIsr(TableBucket tb) {
LeaderAndIsr leaderAndIsr =
waitValue(
Expand Down