diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java index 1f08dacc90eb..0e26a5dd4943 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java @@ -710,8 +710,11 @@ private String reconfigDeletingServiceWorkers(String value) { } private String reconfigReplicationStreamsLimit(String value) { + int poolSize = Integer.parseInt(value); getDatanodeStateMachine().getContainer().getReplicationServer() - .setPoolSize(Integer.parseInt(value)); + .setPoolSize(poolSize); + getDatanodeStateMachine().getSupervisor() + .setReplicationMaxStreams(poolSize); return value; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 8dee840db226..3184fb2ed2e0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -346,21 +346,31 @@ public int getMaxQueueSize() { public void nodeStateUpdated(HddsProtos.NodeOperationalState newState) { if (state.getAndSet(newState) != newState) { - int threadCount = replicationConfig.getReplicationMaxStreams(); - int newMaxQueueSize = datanodeConfig.getCommandQueueLimit(); + resize(newState); + } + } - if (isMaintenance(newState) || isDecommission(newState)) { - threadCount = replicationConfig.scaleOutOfServiceLimit(threadCount); - newMaxQueueSize = - replicationConfig.scaleOutOfServiceLimit(newMaxQueueSize); - } + public void setReplicationMaxStreams(int replicationMaxStreams) { + replicationConfig.setReplicationMaxStreams(replicationMaxStreams); + resize(state.get()); + } - LOG.info("Node state updated to {}, scaling executor pool size to {}", - newState, threadCount); + private void resize(HddsProtos.NodeOperationalState nodeState) { + int threadCount = replicationConfig.getReplicationMaxStreams(); + int newMaxQueueSize = datanodeConfig.getCommandQueueLimit(); - maxQueueSize = newMaxQueueSize; - executorThreadUpdater.accept(threadCount); + if (isMaintenance(nodeState) || isDecommission(nodeState)) { + threadCount = replicationConfig.scaleOutOfServiceLimit(threadCount); + newMaxQueueSize = + replicationConfig.scaleOutOfServiceLimit(newMaxQueueSize); } + + LOG.info("Scaling replication supervisor for node state {} to executor " + + "pool size {} and queue size {}", nodeState, threadCount, + newMaxQueueSize); + + maxQueueSize = newMaxQueueSize; + executorThreadUpdater.accept(threadCount); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index abfef6fbffdc..a8b590e671e8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -20,6 +20,7 @@ import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; @@ -1139,6 +1140,37 @@ public void poolSizeCanBeDecreased() { } } + @ContainerLayoutTestInfo.ContainerTest + public void poolSizeCanBeUpdatedByReplicationStreamsLimitReconfiguration() { + final int replicationMaxStreams = 5; + ReplicationServer.ReplicationConfig repConf = + new ReplicationServer.ReplicationConfig(); + repConf.setReplicationMaxStreams(replicationMaxStreams); + + AtomicInteger threadPoolSize = new AtomicInteger(); + + ReplicationSupervisor rs = ReplicationSupervisor.newBuilder() + .executor(new DiscardingExecutorService()) + .executorThreadUpdater(threadPoolSize::set) + .replicationConfig(repConf) + .build(); + + rs.nodeStateUpdated(IN_SERVICE); + assertEquals(replicationMaxStreams, threadPoolSize.get()); + + rs.setReplicationMaxStreams(7); + assertEquals(7, threadPoolSize.get()); + + rs.nodeStateUpdated(DECOMMISSIONING); + assertEquals(repConf.scaleOutOfServiceLimit(7), threadPoolSize.get()); + + rs.setReplicationMaxStreams(3); + assertEquals(repConf.scaleOutOfServiceLimit(3), threadPoolSize.get()); + + rs.nodeStateUpdated(IN_SERVICE); + assertEquals(3, threadPoolSize.get()); + } + @ContainerLayoutTestInfo.ContainerTest public void testMaxQueueSize() { List datanodes = new ArrayList<>();