Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions common/src/main/java/org/astraea/common/admin/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.common.DataSize;
import org.astraea.common.cost.BrokerDiskSpaceCost.BrokerPath;

public interface ClusterInfo {
static ClusterInfo empty() {
Expand Down Expand Up @@ -91,6 +92,61 @@ private static Map<Integer, DataSize> changedRecordSize(
Collectors.toMap(Function.identity(), n -> DataSize.Byte.of(cost.getOrDefault(n, 0L))));
}

static boolean brokerDiskUsageSizeOverflow(
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
ClusterInfo before, ClusterInfo after, Map<Integer, DataSize> brokerMoveCostLimit) {
for (var id :
Stream.concat(before.nodes().stream(), after.nodes().stream())
.map(NodeInfo::id)
.parallel()
.collect(Collectors.toSet())) {

var beforeSize = (Long) before.replicaStream(id).map(Replica::size).mapToLong(y -> y).sum();
var addedSize =
(Long)
after
.replicaStream(id)
.filter(r -> before.replicaStream(id).noneMatch(r::equals))
.map(Replica::size)
.mapToLong(y -> y)
.sum();
if ((beforeSize + addedSize)
> brokerMoveCostLimit.getOrDefault(id, DataSize.Byte.of(Long.MAX_VALUE)).bytes())
return true;
}
return false;
}

static boolean brokerPathDiskUsageSizeOverflow(
ClusterInfo before, ClusterInfo after, Map<BrokerPath, DataSize> diskMoveCostLimit) {
for (var brokerPaths :
Stream.concat(
before.brokerFolders().entrySet().stream(),
after.brokerFolders().entrySet().stream())
.collect(Collectors.toSet())) {
for (var path : brokerPaths.getValue()) {
var brokerPath = BrokerPath.of(brokerPaths.getKey(), path);
var beforeSize =
before
.replicaStream(brokerPaths.getKey())
.filter(r -> r.path().equals(path))
.mapToLong(Replica::size)
.sum();
var addedSize =
(Long)
after
.replicaStream(brokerPaths.getKey())
.filter(r -> before.replicaStream(brokerPaths.getKey()).noneMatch(r::equals))
.map(Replica::size)
.mapToLong(y -> y)
.sum();
if ((beforeSize + addedSize)
> diskMoveCostLimit.getOrDefault(brokerPath, DataSize.Byte.of(Long.MAX_VALUE)).bytes())
return true;
}
}
return false;
}

static boolean changedRecordSizeOverflow(
ClusterInfo before, ClusterInfo after, Predicate<Replica> predicate, long limit) {
var totalRemovedSize = 0L;
Expand Down
118 changes: 118 additions & 0 deletions common/src/main/java/org/astraea/common/cost/BrokerDiskSpaceCost.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.cost;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;

public class BrokerDiskSpaceCost implements HasMoveCost {

public static final String BROKER_COST_LIMIT_KEY = "max.broker.total.disk.space";
public static final String BROKER_PATH_COST_LIMIT_KEY = "max.broker.path.disk.space";
private final Map<Integer, DataSize> brokerMoveCostLimit;
private final Map<BrokerPath, DataSize> diskMoveCostLimit;

public BrokerDiskSpaceCost(Configuration configuration) {
this.diskMoveCostLimit = diskMoveCostLimit(configuration);
this.brokerMoveCostLimit = brokerMoveCostLimit(configuration);
}

@Override
public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
if (ClusterInfo.brokerDiskUsageSizeOverflow(before, after, brokerMoveCostLimit))
return () -> true;
if (ClusterInfo.brokerPathDiskUsageSizeOverflow(before, after, diskMoveCostLimit))
return () -> true;
return () -> false;
}

private Map<BrokerPath, DataSize> diskMoveCostLimit(Configuration configuration) {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
return configuration
.string(BROKER_PATH_COST_LIMIT_KEY)
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
.map(
s ->
Arrays.stream(s.split(","))
.map(
idAndPath -> {
var brokerPathAndLimit = idAndPath.split(":");
var brokerPath = brokerPathAndLimit[0].split("-");
return Map.entry(
BrokerPath.of(Integer.parseInt(brokerPath[0]), brokerPath[1]),
DataSize.of(brokerPathAndLimit[1]));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(Map.of());
}

private Map<Integer, DataSize> brokerMoveCostLimit(Configuration configuration) {
return configuration
.string(BROKER_COST_LIMIT_KEY)
.map(
s ->
Arrays.stream(s.split(","))
.map(
idAndPath -> {
var brokerAndLimit = idAndPath.split(":");
return Map.entry(
Integer.parseInt(brokerAndLimit[0]), DataSize.of(brokerAndLimit[1]));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(Map.of());
}

public static class BrokerPath {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated

private final int broker;
private final String path;

public static BrokerPath of(int broker, String path) {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
return new BrokerPath(broker, path);
}

public BrokerPath(int broker, String path) {
this.broker = broker;
this.path = path;
}

public int broker() {
return broker;
}

public String path() {
return path;
}

@Override
public boolean equals(Object o) {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BrokerPath that = (BrokerPath) o;
return broker == that.broker && Objects.equals(path, that.path);
}

@Override
public int hashCode() {
return Objects.hash(broker, path);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.cost;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Replica;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class BrokerDiskSpaceCostTest {

@Test
void testMoveCosts() {
var dataSize = DataSize.of("500MB");
/*
replica distribution:
p0: 0,1 -> 2,1
p1: 0,1 -> 0,2
p2: 0,2 -> 0,2
replicas during migrated per broker:
0: p0,p1,p2
1: p0,p1
2: p0,p1,p2
*/
var before =
List.of(
Replica.builder()
.topic("topic1")
.partition(0)
.nodeInfo(NodeInfo.of(0, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build(),
Replica.builder()
.topic("topic1")
.partition(0)
.nodeInfo(NodeInfo.of(1, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build(),
Replica.builder()
.topic("topic1")
.partition(1)
.nodeInfo(NodeInfo.of(0, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build(),
Replica.builder()
.topic("topic1")
.partition(1)
.nodeInfo(NodeInfo.of(1, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build(),
Replica.builder()
.topic("topic1")
.partition(2)
.nodeInfo(NodeInfo.of(0, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build(),
Replica.builder()
.topic("topic1")
.partition(2)
.nodeInfo(NodeInfo.of(2, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build());
var after =
List.of(
Replica.builder()
.topic("topic1")
.partition(0)
.nodeInfo(NodeInfo.of(2, "broker0", 1111))
.size(dataSize.bytes())
.path("/path1")
.build(),
Replica.builder()
.topic("topic1")
.partition(0)
.nodeInfo(NodeInfo.of(1, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build(),
Replica.builder()
.topic("topic1")
.partition(1)
.nodeInfo(NodeInfo.of(0, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build(),
Replica.builder()
.topic("topic1")
.partition(1)
.nodeInfo(NodeInfo.of(2, "broker0", 1111))
.size(dataSize.bytes())
.path("/path1")
.build(),
Replica.builder()
.topic("topic1")
.partition(2)
.nodeInfo(NodeInfo.of(0, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build(),
Replica.builder()
.topic("topic1")
.partition(2)
.nodeInfo(NodeInfo.of(2, "broker0", 1111))
.size(dataSize.bytes())
.path("/path0")
.build());
var beforeClusterInfo = of(before);
var afterClusterInfo = of(after);
var brokerConfig =
Configuration.of(
Map.of(BrokerDiskSpaceCost.BROKER_COST_LIMIT_KEY, "0:1500MB,1:1000MB,2:1500MB"));
var brokerOverflowConfig =
Configuration.of(
Map.of(BrokerDiskSpaceCost.BROKER_COST_LIMIT_KEY, "0:1300MB,1:1000MB,2:1500MB"));
var pathConfig =
Configuration.of(
Map.of(
BrokerDiskSpaceCost.BROKER_PATH_COST_LIMIT_KEY,
"0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:1000MB"));
var pathOverflowConfig =
Configuration.of(
Map.of(
BrokerDiskSpaceCost.BROKER_PATH_COST_LIMIT_KEY,
"0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:900MB"));
// set broker limit no overflow
var cf0 = new BrokerDiskSpaceCost(brokerConfig);
var moveCost0 = cf0.moveCost(beforeClusterInfo, afterClusterInfo, ClusterBean.EMPTY);
// set broker limit and overflow
var cf1 = new BrokerDiskSpaceCost(brokerOverflowConfig);
var moveCost1 = cf1.moveCost(beforeClusterInfo, afterClusterInfo, ClusterBean.EMPTY);
// set path limit no overflow
var cf2 = new BrokerDiskSpaceCost(pathConfig);
var moveCost2 = cf2.moveCost(beforeClusterInfo, afterClusterInfo, ClusterBean.EMPTY);
// set path limit and overflow
var cf3 = new BrokerDiskSpaceCost(pathOverflowConfig);
var moveCost3 = cf3.moveCost(beforeClusterInfo, afterClusterInfo, ClusterBean.EMPTY);

Assertions.assertFalse(moveCost0.overflow());
Assertions.assertTrue(moveCost1.overflow());
Assertions.assertFalse(moveCost2.overflow());
Assertions.assertTrue(moveCost3.overflow());
}

public static ClusterInfo of(List<Replica> replicas) {
var dataPath =
Map.of(
0,
Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())),
1,
Map.of("/path0", new DescribeLogDirsResponse.LogDirInfo(null, Map.of())),
2,
Map.of(
"/path0",
new DescribeLogDirsResponse.LogDirInfo(null, Map.of()),
"/path1",
new DescribeLogDirsResponse.LogDirInfo(null, Map.of())));
return ClusterInfo.of(
"fake",
replicas.stream()
.map(Replica::nodeInfo)
.distinct()
.map(
nodeInfo ->
Broker.of(
false,
new Node(nodeInfo.id(), "", nodeInfo.port()),
Map.of(),
dataPath.get(nodeInfo.id()),
List.of()))
.collect(Collectors.toList()),
Map.of(),
replicas);
}
}
Loading