diff --git a/common/src/main/java/org/astraea/common/Configuration.java b/common/src/main/java/org/astraea/common/Configuration.java index bdde8c7f6c..1dc62648ab 100644 --- a/common/src/main/java/org/astraea/common/Configuration.java +++ b/common/src/main/java/org/astraea/common/Configuration.java @@ -44,7 +44,7 @@ public Optional string(String key) { @Override public List list(String key, String separator) { - return Arrays.asList(requireString(key).split(separator)); + return string(key).map(s -> Arrays.asList(s.split(separator))).orElseGet(List::of); } }; } diff --git a/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java b/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java index b3d9e92a22..7cb4771841 100644 --- a/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java +++ b/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java @@ -41,7 +41,8 @@ public class BalancerProblemFormat { "org.astraea.common.cost.ReplicaLeaderCost", "org.astraea.common.cost.RecordSizeCost", "org.astraea.common.cost.ReplicaNumberCost", - "org.astraea.common.cost.ReplicaLeaderSizeCost"); + "org.astraea.common.cost.ReplicaLeaderSizeCost", + "org.astraea.common.cost.BrokerDiskSpaceCost"); public AlgorithmConfig parse() { return AlgorithmConfig.builder() diff --git a/common/src/main/java/org/astraea/common/cost/BrokerDiskSpaceCost.java b/common/src/main/java/org/astraea/common/cost/BrokerDiskSpaceCost.java new file mode 100644 index 0000000000..e524e1a31f --- /dev/null +++ b/common/src/main/java/org/astraea/common/cost/BrokerDiskSpaceCost.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.astraea.common.Configuration; +import org.astraea.common.DataSize; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.metrics.ClusterBean; + +public class BrokerDiskSpaceCost implements HasMoveCost { + + public static final String BROKER_COST_LIMIT_KEY = "max.broker.total.disk.space"; + public static final String BROKER_PATH_COST_LIMIT_KEY = "max.broker.path.disk.space"; + private final Map brokerMoveCostLimit; + private final Map diskMoveCostLimit; + + public BrokerDiskSpaceCost(Configuration configuration) { + this.diskMoveCostLimit = diskMoveCostLimit(configuration); + this.brokerMoveCostLimit = brokerMoveCostLimit(configuration); + } + + @Override + public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { + if (brokerDiskUsageSizeOverflow(before, after, brokerMoveCostLimit)) return () -> true; + if (brokerPathDiskUsageSizeOverflow(before, after, diskMoveCostLimit)) return () -> true; + return () -> false; + } + + private static Map diskMoveCostLimit(Configuration configuration) { + return configuration.list(BROKER_PATH_COST_LIMIT_KEY, ",").stream() + .collect( + Collectors.toMap( + idAndPath -> { + var brokerPath = idAndPath.split(":")[0].split("-"); + return new BrokerPath( + Integer.parseInt(brokerPath[0]), + IntStream.range(1, brokerPath.length) + .boxed() + .map(x -> brokerPath[x]) + .collect(Collectors.joining("-"))); + }, + idAndPath -> DataSize.of(idAndPath.split(":")[1]))); + } + + private Map brokerMoveCostLimit(Configuration configuration) { + return configuration.list(BROKER_COST_LIMIT_KEY, ",").stream() + .collect( + Collectors.toMap( + idAndPath -> Integer.parseInt(idAndPath.split(":")[0]), + idAndPath -> DataSize.of(idAndPath.split(":")[1]))); + } + + static boolean brokerDiskUsageSizeOverflow( + ClusterInfo before, ClusterInfo after, Map brokerMoveCostLimit) { + for (var id : + Stream.concat(before.nodes().stream(), after.nodes().stream()) + .map(NodeInfo::id) + .parallel() + .collect(Collectors.toSet())) { + + var beforeSize = (Long) before.replicaStream(id).map(Replica::size).mapToLong(y -> y).sum(); + var addedSize = + (Long) + after + .replicaStream(id) + .filter(r -> before.replicaStream(id).noneMatch(r::equals)) + .map(Replica::size) + .mapToLong(y -> y) + .sum(); + if ((beforeSize + addedSize) + > brokerMoveCostLimit.getOrDefault(id, DataSize.Byte.of(Long.MAX_VALUE)).bytes()) + return true; + } + return false; + } + + static boolean brokerPathDiskUsageSizeOverflow( + ClusterInfo before, + ClusterInfo after, + Map diskMoveCostLimit) { + for (var brokerPaths : + Stream.concat( + before.brokerFolders().entrySet().stream(), + after.brokerFolders().entrySet().stream()) + .collect(Collectors.toSet())) { + for (var path : brokerPaths.getValue()) { + var brokerPath = new BrokerDiskSpaceCost.BrokerPath(brokerPaths.getKey(), path); + var beforeSize = + before + .replicaStream(brokerPaths.getKey()) + .filter(r -> r.path().equals(path)) + .mapToLong(Replica::size) + .sum(); + var addedSize = + (Long) + after + .replicaStream(brokerPaths.getKey()) + .filter(r -> before.replicaStream(brokerPaths.getKey()).noneMatch(r::equals)) + .map(Replica::size) + .mapToLong(y -> y) + .sum(); + if ((beforeSize + addedSize) + > diskMoveCostLimit.getOrDefault(brokerPath, DataSize.Byte.of(Long.MAX_VALUE)).bytes()) + return true; + } + } + return false; + } + + record BrokerPath(int broker, String path) {} +} diff --git a/common/src/main/java/org/astraea/common/cost/CostUtils.java b/common/src/main/java/org/astraea/common/cost/CostUtils.java new file mode 100644 index 0000000000..b23985d657 --- /dev/null +++ b/common/src/main/java/org/astraea/common/cost/CostUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.admin.Replica; + +final class CostUtils { + + private CostUtils() {} + + static boolean changedRecordSizeOverflow( + ClusterInfo before, ClusterInfo after, Predicate predicate, long limit) { + var totalRemovedSize = 0L; + var totalAddedSize = 0L; + for (var id : + Stream.concat(before.nodes().stream(), after.nodes().stream()) + .map(NodeInfo::id) + .parallel() + .collect(Collectors.toSet())) { + var removed = + (int) + before + .replicaStream(id) + .filter(predicate) + .filter(r -> !after.replicas(r.topicPartition()).contains(r)) + .mapToLong(Replica::size) + .sum(); + var added = + (int) + after + .replicaStream(id) + .filter(predicate) + .filter(r -> !before.replicas(r.topicPartition()).contains(r)) + .mapToLong(Replica::size) + .sum(); + totalRemovedSize = totalRemovedSize + removed; + totalAddedSize = totalAddedSize + added; + // if migrate cost overflow, leave early and return true + if (totalRemovedSize > limit || totalAddedSize > limit) return true; + } + return Math.max(totalRemovedSize, totalAddedSize) > limit; + } +} diff --git a/common/src/main/java/org/astraea/common/cost/MigrationCost.java b/common/src/main/java/org/astraea/common/cost/MigrationCost.java index b3cdafddec..f580d6ee71 100644 --- a/common/src/main/java/org/astraea/common/cost/MigrationCost.java +++ b/common/src/main/java/org/astraea/common/cost/MigrationCost.java @@ -117,39 +117,6 @@ private static Map migratedChanged( .collect(Collectors.toMap(Function.identity(), n -> cost.getOrDefault(n, 0L))); } - static boolean changedRecordSizeOverflow( - ClusterInfo before, ClusterInfo after, Predicate predicate, long limit) { - var totalRemovedSize = 0L; - var totalAddedSize = 0L; - for (var id : - Stream.concat(before.nodes().stream(), after.nodes().stream()) - .map(NodeInfo::id) - .parallel() - .collect(Collectors.toSet())) { - var removed = - (int) - before - .replicaStream(id) - .filter(predicate) - .filter(r -> !after.replicas(r.topicPartition()).contains(r)) - .mapToLong(Replica::size) - .sum(); - var added = - (int) - after - .replicaStream(id) - .filter(predicate) - .filter(r -> !before.replicas(r.topicPartition()).contains(r)) - .mapToLong(Replica::size) - .sum(); - totalRemovedSize = totalRemovedSize + removed; - totalAddedSize = totalAddedSize + added; - // if migrate cost overflow, leave early and return true - if (totalRemovedSize > limit || totalAddedSize > limit) return true; - } - return Math.max(totalRemovedSize, totalAddedSize) > limit; - } - private static Map changedReplicaNumber(ClusterInfo before, ClusterInfo after) { return Stream.concat(before.nodes().stream(), after.nodes().stream()) .map(NodeInfo::id) diff --git a/common/src/main/java/org/astraea/common/cost/RecordSizeCost.java b/common/src/main/java/org/astraea/common/cost/RecordSizeCost.java index 19f0f58ff6..9cbf965952 100644 --- a/common/src/main/java/org/astraea/common/cost/RecordSizeCost.java +++ b/common/src/main/java/org/astraea/common/cost/RecordSizeCost.java @@ -16,7 +16,7 @@ */ package org.astraea.common.cost; -import static org.astraea.common.cost.MigrationCost.changedRecordSizeOverflow; +import static org.astraea.common.cost.CostUtils.changedRecordSizeOverflow; import java.util.Map; import java.util.stream.Collectors; diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java index 79cdf070c7..d489834fa8 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderSizeCost.java @@ -16,7 +16,7 @@ */ package org.astraea.common.cost; -import static org.astraea.common.cost.MigrationCost.changedRecordSizeOverflow; +import static org.astraea.common.cost.CostUtils.changedRecordSizeOverflow; import java.util.Map; import java.util.stream.Collectors; diff --git a/common/src/test/java/org/astraea/common/ConfigurationTest.java b/common/src/test/java/org/astraea/common/ConfigurationTest.java index 05fff83cf6..f4bc920bbe 100644 --- a/common/src/test/java/org/astraea/common/ConfigurationTest.java +++ b/common/src/test/java/org/astraea/common/ConfigurationTest.java @@ -35,6 +35,7 @@ void testString() { void testList() { var config = Configuration.of(Map.of("key", "v0,v1")); Assertions.assertEquals(List.of("v0", "v1"), config.list("key", ",")); + Assertions.assertEquals(List.of(), config.list("nonExistKey", ",")); } @Test diff --git a/common/src/test/java/org/astraea/common/cost/BrokerDiskSpaceCostTest.java b/common/src/test/java/org/astraea/common/cost/BrokerDiskSpaceCostTest.java new file mode 100644 index 0000000000..0e4c998737 --- /dev/null +++ b/common/src/test/java/org/astraea/common/cost/BrokerDiskSpaceCostTest.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import static org.astraea.common.cost.BrokerDiskSpaceCost.brokerDiskUsageSizeOverflow; +import static org.astraea.common.cost.BrokerDiskSpaceCost.brokerPathDiskUsageSizeOverflow; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.astraea.common.Configuration; +import org.astraea.common.DataSize; +import org.astraea.common.admin.Broker; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.metrics.ClusterBean; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BrokerDiskSpaceCostTest { + + @Test + void testMoveCosts() { + var dataSize = DataSize.of("500MB"); + /* + replica distribution: + p0: 0,1 -> 2,1 + p1: 0,1 -> 0,2 + p2: 0,2 -> 0,2 + replicas during migrated per broker: + 0: p0,p1,p2 + 1: p0,p1 + 2: p0,p1,p2 + */ + var before = + List.of( + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build()); + var after = + List.of( + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path1") + .build(), + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path1") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(dataSize.bytes()) + .path("/path0") + .build()); + var beforeClusterInfo = of(before); + var afterClusterInfo = of(after); + var brokerConfig = + Configuration.of( + Map.of(BrokerDiskSpaceCost.BROKER_COST_LIMIT_KEY, "0:1500MB,1:1000MB,2:1500MB")); + var brokerOverflowConfig = + Configuration.of( + Map.of(BrokerDiskSpaceCost.BROKER_COST_LIMIT_KEY, "0:1300MB,1:1000MB,2:1500MB")); + var pathConfig = + Configuration.of( + Map.of( + BrokerDiskSpaceCost.BROKER_PATH_COST_LIMIT_KEY, + "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:1000MB")); + var pathOverflowConfig = + Configuration.of( + Map.of( + BrokerDiskSpaceCost.BROKER_PATH_COST_LIMIT_KEY, + "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:900MB")); + // set broker limit no overflow + var cf0 = new BrokerDiskSpaceCost(brokerConfig); + var moveCost0 = cf0.moveCost(beforeClusterInfo, afterClusterInfo, ClusterBean.EMPTY); + // set broker limit and overflow + var cf1 = new BrokerDiskSpaceCost(brokerOverflowConfig); + var moveCost1 = cf1.moveCost(beforeClusterInfo, afterClusterInfo, ClusterBean.EMPTY); + // set path limit no overflow + var cf2 = new BrokerDiskSpaceCost(pathConfig); + var moveCost2 = cf2.moveCost(beforeClusterInfo, afterClusterInfo, ClusterBean.EMPTY); + // set path limit and overflow + var cf3 = new BrokerDiskSpaceCost(pathOverflowConfig); + var moveCost3 = cf3.moveCost(beforeClusterInfo, afterClusterInfo, ClusterBean.EMPTY); + + Assertions.assertFalse(moveCost0.overflow()); + Assertions.assertTrue(moveCost1.overflow()); + Assertions.assertFalse(moveCost2.overflow()); + Assertions.assertTrue(moveCost3.overflow()); + } + + @Test + void testBrokerDiskUsageSizeOverflow() { + var limit = + Map.of( + 0, DataSize.Byte.of(1600), + 1, DataSize.Byte.of(1598), + 2, DataSize.Byte.of(1600)); + var overFlowLimit = + Map.of( + 0, DataSize.Byte.of(1600), + 1, DataSize.Byte.of(1598), + 2, DataSize.Byte.of(1500)); + var totalResult = brokerDiskUsageSizeOverflow(beforeClusterInfo(), afterClusterInfo(), limit); + var overflowResult = + brokerDiskUsageSizeOverflow(beforeClusterInfo(), afterClusterInfo(), overFlowLimit); + Assertions.assertFalse(totalResult); + Assertions.assertTrue(overflowResult); + } + + @Test + void testBrokerPathDiskUsageSizeOverflow() { + var limit = + Map.of( + new BrokerDiskSpaceCost.BrokerPath(0, "/path0"), + DataSize.Byte.of(1600), + new BrokerDiskSpaceCost.BrokerPath(1, "/path0"), + DataSize.Byte.of(1598), + new BrokerDiskSpaceCost.BrokerPath(2, "/path0"), + DataSize.Byte.of(1600), + new BrokerDiskSpaceCost.BrokerPath(2, "/path1"), + DataSize.Byte.of(600)); + var overFlowLimit = + Map.of( + new BrokerDiskSpaceCost.BrokerPath(0, "/path0"), DataSize.Byte.of(1600), + new BrokerDiskSpaceCost.BrokerPath(1, "/path0"), DataSize.Byte.of(1598), + new BrokerDiskSpaceCost.BrokerPath(2, "/path0"), DataSize.Byte.of(1600), + new BrokerDiskSpaceCost.BrokerPath(2, "/path1"), DataSize.Byte.of(500)); + var totalResult = + brokerPathDiskUsageSizeOverflow(beforeClusterInfo(), afterClusterInfo(), limit); + var overflowResult = + brokerPathDiskUsageSizeOverflow(beforeClusterInfo(), afterClusterInfo(), overFlowLimit); + Assertions.assertFalse(totalResult); + Assertions.assertTrue(overflowResult); + } + + public static ClusterInfo of(List replicas) { + var dataPath = + Map.of( + 0, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 1, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 2, + Map.of( + "/path0", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()), + "/path1", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()))); + return ClusterInfo.of( + "fake", + replicas.stream() + .map(Replica::nodeInfo) + .distinct() + .map( + nodeInfo -> + Broker.of( + false, + new Node(nodeInfo.id(), "", nodeInfo.port()), + Map.of(), + dataPath.get(nodeInfo.id()), + List.of())) + .collect(Collectors.toList()), + Map.of(), + replicas); + } + + /* + before distribution: + p0: 0,1 + p1: 0,1 + p2: 2,0 + after distribution: + p0: 2,1 + p1: 0,2 + p2: 1,0 + leader log size: + p0: 100 + p1: 500 + p2 1000 + */ + private static ClusterInfo beforeClusterInfo() { + var dataPath = + Map.of( + 0, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 1, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 2, + Map.of( + "/path0", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()), + "/path1", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()))); + var replicas = + List.of( + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(100) + .isLeader(true) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(99) + .isLeader(false) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(500) + .isLeader(true) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(499) + .isLeader(false) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(1000) + .isLeader(true) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(1000) + .isLeader(false) + .path("/path0") + .build()); + return ClusterInfo.of( + "fake", + replicas.stream() + .map(Replica::nodeInfo) + .distinct() + .map( + nodeInfo -> + Broker.of( + false, + new Node(nodeInfo.id(), "", nodeInfo.port()), + Map.of(), + dataPath.get(nodeInfo.id()), + List.of())) + .collect(Collectors.toList()), + Map.of(), + replicas); + } + + private static ClusterInfo afterClusterInfo() { + var dataPath = + Map.of( + 0, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 1, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 2, + Map.of( + "/path0", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()), + "/path1", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()))); + var replicas = + List.of( + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(100) + .isLeader(true) + .path("/path1") + .build(), + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(99) + .isLeader(false) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(500) + .isLeader(true) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(500) + .isLeader(false) + .path("/path1") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(1000) + .isLeader(true) + .path("/path1") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(1000) + .isLeader(false) + .path("/path0") + .build()); + return ClusterInfo.of( + "fake", + replicas.stream() + .map(Replica::nodeInfo) + .distinct() + .map( + nodeInfo -> + Broker.of( + false, + new Node(nodeInfo.id(), "", nodeInfo.port()), + Map.of(), + dataPath.get(nodeInfo.id()), + List.of())) + .collect(Collectors.toList()), + Map.of(), + replicas); + } +} diff --git a/common/src/test/java/org/astraea/common/cost/CostUtilsTest.java b/common/src/test/java/org/astraea/common/cost/CostUtilsTest.java new file mode 100644 index 0000000000..e086242e57 --- /dev/null +++ b/common/src/test/java/org/astraea/common/cost/CostUtilsTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import static org.astraea.common.cost.CostUtils.changedRecordSizeOverflow; +import static org.astraea.common.cost.MigrationCost.recordSizeToFetch; +import static org.astraea.common.cost.MigrationCost.recordSizeToSync; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.astraea.common.admin.Broker; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.admin.Replica; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class CostUtilsTest { + + @Test + void testChangedRecordSizeOverflow() { + var limit = 1600; + var moveInResult = recordSizeToSync(beforeClusterInfo(), afterClusterInfo()); + Assertions.assertEquals(3, moveInResult.size()); + Assertions.assertEquals(0, moveInResult.get(0)); + Assertions.assertEquals(1000, moveInResult.get(1)); + Assertions.assertEquals(100 + 500, moveInResult.get(2)); + + var moveOutResult = recordSizeToFetch(beforeClusterInfo(), afterClusterInfo()); + Assertions.assertEquals(3, moveOutResult.size()); + Assertions.assertEquals(100 + 500, moveOutResult.get(0)); + Assertions.assertEquals(0, moveOutResult.get(1)); + Assertions.assertEquals(1000, moveOutResult.get(2)); + + var totalResult = + changedRecordSizeOverflow(beforeClusterInfo(), afterClusterInfo(), ignored -> true, limit); + var overflowResult = + changedRecordSizeOverflow( + beforeClusterInfo(), afterClusterInfo(), ignored -> true, limit - 100); + Assertions.assertFalse(totalResult); + Assertions.assertTrue(overflowResult); + } + + /* + before distribution: + p0: 0,1 + p1: 0,1 + p2: 2,0 + after distribution: + p0: 2,1 + p1: 0,2 + p2: 1,0 + leader log size: + p0: 100 + p1: 500 + p2 1000 + */ + private static ClusterInfo beforeClusterInfo() { + var dataPath = + Map.of( + 0, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 1, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 2, + Map.of( + "/path0", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()), + "/path1", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()))); + var replicas = + List.of( + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(100) + .isLeader(true) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(99) + .isLeader(false) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(500) + .isLeader(true) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(499) + .isLeader(false) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(1000) + .isLeader(true) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(1000) + .isLeader(false) + .path("/path0") + .build()); + return ClusterInfo.of( + "fake", + replicas.stream() + .map(Replica::nodeInfo) + .distinct() + .map( + nodeInfo -> + Broker.of( + false, + new Node(nodeInfo.id(), "", nodeInfo.port()), + Map.of(), + dataPath.get(nodeInfo.id()), + List.of())) + .collect(Collectors.toList()), + Map.of(), + replicas); + } + + private static ClusterInfo afterClusterInfo() { + var dataPath = + Map.of( + 0, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 1, + Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())), + 2, + Map.of( + "/path0", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()), + "/path1", + new DescribeLogDirsResponse.LogDirInfo(null, Map.of()))); + var replicas = + List.of( + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(100) + .isLeader(true) + .path("/path1") + .build(), + Replica.builder() + .topic("topic1") + .partition(0) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(99) + .isLeader(false) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(500) + .isLeader(true) + .path("/path0") + .build(), + Replica.builder() + .topic("topic1") + .partition(1) + .nodeInfo(NodeInfo.of(2, "broker0", 1111)) + .size(500) + .isLeader(false) + .path("/path1") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(1, "broker0", 1111)) + .size(1000) + .isLeader(true) + .path("/path1") + .build(), + Replica.builder() + .topic("topic1") + .partition(2) + .nodeInfo(NodeInfo.of(0, "broker0", 1111)) + .size(1000) + .isLeader(false) + .path("/path0") + .build()); + return ClusterInfo.of( + "fake", + replicas.stream() + .map(Replica::nodeInfo) + .distinct() + .map( + nodeInfo -> + Broker.of( + false, + new Node(nodeInfo.id(), "", nodeInfo.port()), + Map.of(), + dataPath.get(nodeInfo.id()), + List.of())) + .collect(Collectors.toList()), + Map.of(), + replicas); + } +} diff --git a/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java b/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java index f30297ce7b..89ddd813bf 100644 --- a/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java @@ -16,16 +16,11 @@ */ package org.astraea.common.cost; -import static org.astraea.common.cost.MigrationCost.changedRecordSizeOverflow; -import static org.astraea.common.cost.MigrationCost.recordSizeToFetch; -import static org.astraea.common.cost.MigrationCost.recordSizeToSync; import static org.astraea.common.cost.MigrationCost.replicaLeaderToAdd; import static org.astraea.common.cost.MigrationCost.replicaLeaderToRemove; import static org.astraea.common.cost.MigrationCost.replicaNumChanged; import java.util.List; -import java.util.Map; -import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.ClusterInfoTest; import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; @@ -295,142 +290,4 @@ void testChangedReplicaNumber() { Assertions.assertEquals(-1, changedReplicaCount.get(1)); Assertions.assertEquals(2, changedReplicaCount.get(2)); } - - @Test - void testChangedRecordSizeOverflow() { - var limit = 1600; - var moveInResult = recordSizeToSync(beforeClusterInfo(), afterClusterInfo()); - Assertions.assertEquals(3, moveInResult.size()); - Assertions.assertEquals(0, moveInResult.get(0)); - Assertions.assertEquals(1000, moveInResult.get(1)); - Assertions.assertEquals(100 + 500, moveInResult.get(2)); - - var moveOutResult = recordSizeToFetch(beforeClusterInfo(), afterClusterInfo()); - Assertions.assertEquals(3, moveOutResult.size()); - Assertions.assertEquals(100 + 500, moveOutResult.get(0)); - Assertions.assertEquals(0, moveOutResult.get(1)); - Assertions.assertEquals(1000, moveOutResult.get(2)); - - var totalResult = - changedRecordSizeOverflow(beforeClusterInfo(), afterClusterInfo(), ignored -> true, limit); - var overflowResult = - changedRecordSizeOverflow( - beforeClusterInfo(), afterClusterInfo(), ignored -> true, limit - 100); - Assertions.assertFalse(totalResult); - Assertions.assertTrue(overflowResult); - } - - /* - before distribution: - p0: 0,1 - p1: 0,1 - p2: 2,0 - after distribution: - p0: 2,1 - p1: 0,2 - p2: 1,0 - leader log size: - p0: 100 - p1: 500 - p2 1000 - */ - private static ClusterInfo beforeClusterInfo() { - return ClusterInfo.of( - "fake", - List.of(NodeInfo.of(0, "aa", 22), NodeInfo.of(1, "aa", 22), NodeInfo.of(2, "aa", 22)), - Map.of(), - List.of( - Replica.builder() - .topic("topic1") - .partition(0) - .nodeInfo(NodeInfo.of(0, "broker0", 1111)) - .size(100) - .isLeader(true) - .build(), - Replica.builder() - .topic("topic1") - .partition(0) - .nodeInfo(NodeInfo.of(1, "broker0", 1111)) - .size(99) - .isLeader(false) - .build(), - Replica.builder() - .topic("topic1") - .partition(1) - .nodeInfo(NodeInfo.of(0, "broker0", 1111)) - .size(500) - .isLeader(true) - .build(), - Replica.builder() - .topic("topic1") - .partition(1) - .nodeInfo(NodeInfo.of(1, "broker0", 1111)) - .size(499) - .isLeader(false) - .build(), - Replica.builder() - .topic("topic1") - .partition(2) - .nodeInfo(NodeInfo.of(2, "broker0", 1111)) - .size(1000) - .isLeader(true) - .build(), - Replica.builder() - .topic("topic1") - .partition(2) - .nodeInfo(NodeInfo.of(0, "broker0", 1111)) - .size(1000) - .isLeader(false) - .build())); - } - - private static ClusterInfo afterClusterInfo() { - return ClusterInfo.of( - "fake", - List.of(NodeInfo.of(0, "aa", 22), NodeInfo.of(1, "aa", 22), NodeInfo.of(2, "aa", 22)), - Map.of(), - List.of( - Replica.builder() - .topic("topic1") - .partition(0) - .nodeInfo(NodeInfo.of(2, "broker0", 1111)) - .size(100) - .isLeader(true) - .build(), - Replica.builder() - .topic("topic1") - .partition(0) - .nodeInfo(NodeInfo.of(1, "broker0", 1111)) - .size(99) - .isLeader(false) - .build(), - Replica.builder() - .topic("topic1") - .partition(1) - .nodeInfo(NodeInfo.of(0, "broker0", 1111)) - .size(500) - .isLeader(true) - .build(), - Replica.builder() - .topic("topic1") - .partition(1) - .nodeInfo(NodeInfo.of(2, "broker0", 1111)) - .size(500) - .isLeader(false) - .build(), - Replica.builder() - .topic("topic1") - .partition(2) - .nodeInfo(NodeInfo.of(1, "broker0", 1111)) - .size(1000) - .isLeader(true) - .build(), - Replica.builder() - .topic("topic1") - .partition(2) - .nodeInfo(NodeInfo.of(0, "broker0", 1111)) - .size(1000) - .isLeader(false) - .build())); - } } diff --git a/docs/web_server/web_api_balancer_chinese.md b/docs/web_server/web_api_balancer_chinese.md index f458409c91..ba5568aa7a 100644 --- a/docs/web_server/web_api_balancer_chinese.md +++ b/docs/web_server/web_api_balancer_chinese.md @@ -33,12 +33,14 @@ POST /balancer costConfig: -| config key | config value | -|-----------------------------|--------------------| -| max.migrated.size | 設定最大可搬移的資料量 | -| max.migrated.leader.number | 設定最大可搬移的leader 數量 | -| max.migrated.replica.number | 設定最大可搬移的replica 數量 | -| max.migrated.leader.size | 設定最大可搬移的leader 資料量 | +| config key | config value | value format | +| --------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | +| max.migrated.size | 設定最大可搬移的資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | +| max.migrated.leader.number | 設定最大可搬移的leader 數量 | "`limit number`" ex. 1,2,3,100 | +| max.migrated.replica.number | 設定最大可搬移的replica 數量 | "`limit number`" ex. 1,2,3,100 | +| max.migrated.leader.size | 設定最大可搬移的leader 資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | +| max.broker.total.disk.space | 設定搬移過程中broker最大可以佔用的replica 資料量 | "`broker Id` + `:` + `data size` " ex. "0:1500MB ,1:1000MB ,2:1500MB" | +| max.broker.path.disk.space | 設定搬移過程中broker上的data folder最大可以佔用的replica 資料量 | "`broker Id` + `-` + `data path` + `:` + `data size` " ex. "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:900MB" | 目前支援的 Cost Function @@ -61,23 +63,28 @@ cURL 範例 curl -X POST http://localhost:8001/balancer \ -H "Content-Type: application/json" \ -d '{ - "timeout": "5s", - "balancer": "org.astraea.common.balancer.algorithms.GreedyBalancer", - "balancerConfig": { - "shuffle.tweaker.min.step": 1, - "shuffle.tweaker.max.step": 10 - }, - "clusterCosts": [ - { "cost": "org.astraea.common.cost.ReplicaLeaderCost", "weight": 1 } - ], - "moveCosts": [ - "org.astraea.common.cost.ReplicaLeaderCost", - "org.astraea.common.cost.RecordSizeCost" - ], - "costConfig": { - "max.migrated.size": "500MB", - "max.migrated.leader.number": 5 - } + "timeout": "5s", + "balancer": "org.astraea.common.balancer.algorithms.GreedyBalancer", + "balancerConfig": { + "shuffle.tweaker.min.step": "1", + "shuffle.tweaker.max.step": "10" + }, + "clusterCosts": [ + { + "cost": "org.astraea.common.cost.ReplicaLeaderCost", + "weight": 1 + } + ], + "moveCosts": [ + "org.astraea.common.cost.ReplicaLeaderCost", + "org.astraea.common.cost.RecordSizeCost" + ], + "costConfig": { + "max.migrated.size": "500MB", + "max.migrated.leader.number": 5, + "max.broker.total.disk.space": "0:1500MB,1:1000MB,2:1500MB", + "max.broker.path.disk.space": "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:1000MB" + } }' ```