diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java index ae04328202..aefbef2fe5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java @@ -543,9 +543,9 @@ public void run() { FileUtils.deleteDirectoryQuietly(tabletDir); // Also delete corresponding KV tablet directory if it exists + Tuple2 pathAndBucket = + FlussPaths.parseTabletDir(tabletDir); try { - Tuple2 pathAndBucket = - FlussPaths.parseTabletDir(tabletDir); File kvTabletDir = FlussPaths.kvTabletDir( dataDir, pathAndBucket.f0, pathAndBucket.f1); @@ -561,6 +561,34 @@ public void run() { tabletDir, kvDeleteException.getMessage()); } + + // delete empty parent directories. For partitioned tables the + // parent is the partition dir and the grandparent is + // the table dir — both should be removed when empty. + // For non-partitioned tables the parent is the table + // dir (safe to remove) but the grandparent is the + // database dir — must NOT remove. Safe under parallel + // execution: the last job to finish finds the dir + // empty and removes it; deleteDirectoryQuietly + // tolerates races. + try { + boolean isPartitioned = pathAndBucket.f0.getPartitionName() != null; + File parentDir = tabletDir.getParentFile(); + if (parentDir != null && FileUtils.isDirectoryEmpty(parentDir)) { + FileUtils.deleteDirectoryQuietly(parentDir); + if (isPartitioned) { + File tableDir = parentDir.getParentFile(); + if (tableDir != null && FileUtils.isDirectoryEmpty(tableDir)) { + FileUtils.deleteDirectoryQuietly(tableDir); + } + } + } + } catch (Exception cleanupException) { + LOG.warn( + "Failed to clean up residual directories for {}: {}", + tabletDir, + cleanupException.getMessage()); + } return; } throw new FlussRuntimeException(e); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 0aaea033a4..5bf9b25dd4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidColumnProjectionException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidRequiredAcksException; +import org.apache.fluss.exception.KvStorageException; import org.apache.fluss.exception.LogOffsetOutOfRangeException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.NotLeaderOrFollowerException; @@ -978,7 +979,20 @@ public void stopReplicas( TableBucket tb = data.getTableBucket(); HostedReplica hostedReplica = getReplica(tb); if (hostedReplica instanceof NoneReplica) { - // do nothing fort this case. + if (data.isDeleteLocal()) { + try { + sweepOrphanTabletDirs(tb, deletedTableIds, deletedPartitionIds); + } catch (Exception e) { + LOG.error( + "Failed to sweep orphan tablet directories for {}", + tb, + e); + result.add( + new StopReplicaResultForBucket( + tb, ApiError.fromThrowable(e))); + continue; + } + } result.add(new StopReplicaResultForBucket(tb)); } else if (hostedReplica instanceof OfflineReplica) { LOG.warn( @@ -1927,6 +1941,57 @@ private StopReplicaResultForBucket stopReplica( return new StopReplicaResultForBucket(tb); } + /** + * Remove on-disk tablet directories for a bucket that the in-memory ReplicaManager does not + * know about. This handles the case where a stopReplica(delete=true) arrives after the + * TabletServer was restarted during a delete — LogManager loaded the log at startup but no + * NotifyLeaderAndIsr ever ran, so allReplicas is empty. + */ + private void sweepOrphanTabletDirs( + TableBucket tb, Map deletedTableIds, Map deletedPartitionIds) { + Optional orphanLog = logManager.getLog(tb); + if (!orphanLog.isPresent()) { + return; + } + + LogTablet logTablet = orphanLog.get(); + File dataDir = logTablet.getDataDir(); + PhysicalTablePath physicalTablePath = logTablet.getPhysicalTablePath(); + Path tabletParentDir = logManager.getTabletParentDir(dataDir, physicalTablePath, tb); + + logManager.dropLog(tb); + + boolean isKvTable = false; + if (kvManager.getKv(tb).isPresent()) { + kvManager.dropKv(tb); + isKvTable = true; + } else { + File kvTabletDir = FlussPaths.kvTabletDir(dataDir, physicalTablePath, tb); + if (kvTabletDir.exists()) { + isKvTable = true; + try { + FileUtils.deleteDirectory(kvTabletDir); + } catch (IOException e) { + throw new KvStorageException( + String.format( + "Failed to delete orphan KV tablet directory %s", kvTabletDir), + e); + } + } + } + + localDiskManager.recordReplicaDelete(dataDir, isKvTable); + + if (tb.getPartitionId() != null) { + deletedPartitionIds.put(tb.getPartitionId(), tabletParentDir); + deletedTableIds.put(tb.getTableId(), tabletParentDir.getParent()); + } else { + deletedTableIds.put(tb.getTableId(), tabletParentDir); + } + + LOG.info("Swept orphan tablet directories for bucket {}", tb); + } + private void truncateToHighWatermark(List replicas) { for (Replica replica : replicas) { long highWatermark = replica.getLogTablet().getHighWatermark(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/StopReplicaITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/StopReplicaITCase.java index 2abec7264b..ff2022e9fb 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/StopReplicaITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/StopReplicaITCase.java @@ -19,31 +19,44 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.server.testutils.RpcMessageTestUtils; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.File; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeStopBucketReplica; +import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue; import static org.assertj.core.api.Assertions.assertThat; @@ -132,6 +145,146 @@ void testStopReplica(boolean isPkTable) throws Exception { retryUtilReplicaNotExist(tb, isr2, tableDirs2); } + @Test + void testDropTableCleansOrphanDirsOnTabletServerRestart() throws Exception { + FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata(); + + TablePath tablePath = TablePath.of("test_db_stop_replica", "test_orphan_table"); + long tableId = + RpcMessageTestUtils.createTable( + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_DESCRIPTOR); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + List isr = waitAndGetIsr(tb); + + // Write data so that actual segment files exist on every replica. + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateway = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); + leaderGateway.produceLog(newProduceLogRequest(tableId, 0, 1, records)).get(); + + int offlineServerId = isr.get(0); + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(offlineServerId).getReplicaManager(); + Replica replica = replicaManager.getReplicaOrException(tb); + Path offlineTsTableDir = replica.getTabletParentDir(); + File offlineTsLogDir = replica.getLogTablet().getLogDir(); + assertThat(offlineTsTableDir).exists(); + assertThat(offlineTsLogDir).exists(); + // Verify actual data files (.log, .index) exist under the log directory. + assertThat(offlineTsLogDir.listFiles()).isNotEmpty(); + + FLUSS_CLUSTER_EXTENSION.stopTabletServer(offlineServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(2); + + coordinatorGateway + .dropTable( + RpcMessageTestUtils.newDropTableRequest( + tablePath.getDatabaseName(), tablePath.getTableName(), false)) + .get(); + assertThat(zkClient.tableExist(tablePath)).isFalse(); + + FLUSS_CLUSTER_EXTENSION.startTabletServer(offlineServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3); + + // Both the log directory (with data files) and the table parent directory + // should be cleaned at startup via the SchemaNotExistException handler. + retry( + Duration.ofMinutes(1), + () -> { + assertThat(offlineTsLogDir).doesNotExist(); + assertThat(offlineTsTableDir).doesNotExist(); + }); + } + + @Test + void testDropPartitionCleansOrphanDirsOnStopReplica() throws Exception { + FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata(); + + TablePath tablePath = TablePath.of("test_db_stop_replica", "test_orphan_partition"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build()) + .distributedBy(1) + .partitionedBy("b") + .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3) + .build(); + long tableId = + RpcMessageTestUtils.createTable( + FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + + String partitionName = "p1"; + long partitionId = + RpcMessageTestUtils.createPartition( + FLUSS_CLUSTER_EXTENSION, + tablePath, + new PartitionSpec(Collections.singletonMap("b", partitionName)), + false); + TableBucket tb = new TableBucket(tableId, partitionId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + List isr = waitAndGetIsr(tb); + + // Write data so that actual segment files exist on every replica. + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateway = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); + leaderGateway.produceLog(newProduceLogRequest(tableId, 0, 1, records)).get(); + + int offlineServerId = isr.get(0); + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(offlineServerId).getReplicaManager(); + Replica replica = replicaManager.getReplicaOrException(tb); + Path offlineTsPartitionDir = replica.getTabletParentDir(); + File offlineTsLogDir = replica.getLogTablet().getLogDir(); + assertThat(offlineTsPartitionDir).exists(); + assertThat(offlineTsLogDir).exists(); + assertThat(offlineTsLogDir.listFiles()).isNotEmpty(); + + FLUSS_CLUSTER_EXTENSION.stopTabletServer(offlineServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(2); + + coordinatorGateway + .dropPartition( + RpcMessageTestUtils.newDropPartitionRequest( + tablePath, + new PartitionSpec(Collections.singletonMap("b", partitionName)), + false)) + .get(); + + FLUSS_CLUSTER_EXTENSION.startTabletServer(offlineServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3); + + // Manually send stopReplica(delete=true) to trigger sweepOrphanTabletDirs. + int coordinatorEpoch = + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorEventProcessor() + .getCoordinatorEpoch(); + TabletServerGateway tsGateway = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(offlineServerId); + tsGateway + .stopReplica( + new StopReplicaRequest() + .setCoordinatorEpoch(coordinatorEpoch) + .addAllStopReplicasReqs( + Collections.singleton( + makeStopBucketReplica(tb, true, false, 0)))) + .get(); + + // Both the log directory (with data files) and the partition parent + // directory should be cleaned by sweepOrphanTabletDirs. + assertThat(offlineTsLogDir).doesNotExist(); + assertThat(offlineTsPartitionDir).doesNotExist(); + } + private List waitAndGetIsr(TableBucket tb) { LeaderAndIsr leaderAndIsr = waitValue(