Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,11 @@ private String reconfigDeletingServiceWorkers(String value) {
}

private String reconfigReplicationStreamsLimit(String value) {
int poolSize = Integer.parseInt(value);
getDatanodeStateMachine().getContainer().getReplicationServer()
.setPoolSize(Integer.parseInt(value));
.setPoolSize(poolSize);
getDatanodeStateMachine().getSupervisor()
.setReplicationMaxStreams(poolSize);
return value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,21 +346,31 @@ public int getMaxQueueSize() {

public void nodeStateUpdated(HddsProtos.NodeOperationalState newState) {
if (state.getAndSet(newState) != newState) {
int threadCount = replicationConfig.getReplicationMaxStreams();
int newMaxQueueSize = datanodeConfig.getCommandQueueLimit();
resize(newState);
}
}

if (isMaintenance(newState) || isDecommission(newState)) {
threadCount = replicationConfig.scaleOutOfServiceLimit(threadCount);
newMaxQueueSize =
replicationConfig.scaleOutOfServiceLimit(newMaxQueueSize);
}
public void setReplicationMaxStreams(int replicationMaxStreams) {
replicationConfig.setReplicationMaxStreams(replicationMaxStreams);
resize(state.get());
}

LOG.info("Node state updated to {}, scaling executor pool size to {}",
newState, threadCount);
private void resize(HddsProtos.NodeOperationalState nodeState) {
int threadCount = replicationConfig.getReplicationMaxStreams();
int newMaxQueueSize = datanodeConfig.getCommandQueueLimit();

maxQueueSize = newMaxQueueSize;
executorThreadUpdater.accept(threadCount);
if (isMaintenance(nodeState) || isDecommission(nodeState)) {
threadCount = replicationConfig.scaleOutOfServiceLimit(threadCount);
newMaxQueueSize =
replicationConfig.scaleOutOfServiceLimit(newMaxQueueSize);
}

LOG.info("Scaling replication supervisor for node state {} to executor " +
"pool size {} and queue size {}", nodeState, threadCount,
newMaxQueueSize);

maxQueueSize = newMaxQueueSize;
executorThreadUpdater.accept(threadCount);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
Expand Down Expand Up @@ -1139,6 +1140,37 @@ public void poolSizeCanBeDecreased() {
}
}

@ContainerLayoutTestInfo.ContainerTest
public void poolSizeCanBeUpdatedByReplicationStreamsLimitReconfiguration() {
final int replicationMaxStreams = 5;
ReplicationServer.ReplicationConfig repConf =
new ReplicationServer.ReplicationConfig();
repConf.setReplicationMaxStreams(replicationMaxStreams);

AtomicInteger threadPoolSize = new AtomicInteger();

ReplicationSupervisor rs = ReplicationSupervisor.newBuilder()
.executor(new DiscardingExecutorService())
.executorThreadUpdater(threadPoolSize::set)
.replicationConfig(repConf)
.build();

rs.nodeStateUpdated(IN_SERVICE);
assertEquals(replicationMaxStreams, threadPoolSize.get());

rs.setReplicationMaxStreams(7);
assertEquals(7, threadPoolSize.get());

rs.nodeStateUpdated(DECOMMISSIONING);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test assumes OUTOFSERVICE_FACTOR_DEFAULT_VALUE=2, but since its configurable its better use config value in calculating expected value in assert.

assertEquals(14, threadPoolSize.get());

rs.setReplicationMaxStreams(3);
assertEquals(6, threadPoolSize.get());

rs.nodeStateUpdated(IN_SERVICE);
assertEquals(3, threadPoolSize.get());
}

@ContainerLayoutTestInfo.ContainerTest
public void testMaxQueueSize() {
List<DatanodeDetails> datanodes = new ArrayList<>();
Expand Down