Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ static class BalancerPostRequest implements Request {
"org.astraea.common.cost.ReplicaLeaderCost",
"org.astraea.common.cost.RecordSizeCost",
"org.astraea.common.cost.ReplicaNumberCost",
"org.astraea.common.cost.ReplicaLeaderSizeCost");
"org.astraea.common.cost.ReplicaLeaderSizeCost",
"org.astraea.common.cost.BrokerDiskSpaceCost");

HasClusterCost clusterCost() {
if (clusterCosts.isEmpty())
Expand Down
124 changes: 124 additions & 0 deletions common/src/main/java/org/astraea/common/cost/BrokerDiskSpaceCost.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.cost;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.metrics.ClusterBean;

public class BrokerDiskSpaceCost implements HasMoveCost {

public static final String BROKER_COST_LIMIT_KEY = "max.broker.total.disk.space";
public static final String BROKER_PATH_COST_LIMIT_KEY = "max.broker.path.disk.space";
private final Map<Integer, DataSize> brokerMoveCostLimit;
private final Map<BrokerPath, DataSize> diskMoveCostLimit;

public BrokerDiskSpaceCost(Configuration configuration) {
this.diskMoveCostLimit = diskMoveCostLimit(configuration);
this.brokerMoveCostLimit = brokerMoveCostLimit(configuration);
}

@Override
public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
if (CostUtils.brokerDiskUsageSizeOverflow(before, after, brokerMoveCostLimit))
return () -> true;
if (CostUtils.brokerPathDiskUsageSizeOverflow(before, after, diskMoveCostLimit))
return () -> true;
return () -> false;
}

private Map<BrokerPath, DataSize> diskMoveCostLimit(Configuration configuration) {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
return configuration
.string(BROKER_PATH_COST_LIMIT_KEY)
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
.map(
s ->
Arrays.stream(s.split(","))
.map(
idAndPath -> {
var brokerPathAndLimit = idAndPath.split(":");
var brokerPath = brokerPathAndLimit[0].split("-");
return Map.entry(
BrokerPath.of(
Integer.parseInt(brokerPath[0]),
IntStream.range(1, brokerPath.length)
.boxed()
.map(x -> brokerPath[x])
.collect(Collectors.joining("-"))),
DataSize.of(brokerPathAndLimit[1]));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(Map.of());
}

private Map<Integer, DataSize> brokerMoveCostLimit(Configuration configuration) {
return configuration
.string(BROKER_COST_LIMIT_KEY)
.map(
s ->
Arrays.stream(s.split(","))
.map(
idAndPath -> {
var brokerAndLimit = idAndPath.split(":");
return Map.entry(
Integer.parseInt(brokerAndLimit[0]), DataSize.of(brokerAndLimit[1]));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(Map.of());
}

static class BrokerPath {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated

private final int broker;
private final String path;

public static BrokerPath of(int broker, String path) {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
return new BrokerPath(broker, path);
}

public BrokerPath(int broker, String path) {
this.broker = broker;
this.path = path;
}

public int broker() {
return broker;
}

public String path() {
return path;
}

@Override
public boolean equals(Object o) {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BrokerPath that = (BrokerPath) o;
return broker == that.broker && Objects.equals(path, that.path);
}

@Override
public int hashCode() {
return Objects.hash(broker, path);
}
}
}
121 changes: 121 additions & 0 deletions common/src/main/java/org/astraea/common/cost/CostUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.cost;

import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.common.DataSize;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Replica;

final class CostUtils {

private CostUtils() {}

static boolean brokerDiskUsageSizeOverflow(
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
ClusterInfo before, ClusterInfo after, Map<Integer, DataSize> brokerMoveCostLimit) {
for (var id :
Stream.concat(before.nodes().stream(), after.nodes().stream())
.map(NodeInfo::id)
.parallel()
.collect(Collectors.toSet())) {

var beforeSize = (Long) before.replicaStream(id).map(Replica::size).mapToLong(y -> y).sum();
var addedSize =
(Long)
after
.replicaStream(id)
.filter(r -> before.replicaStream(id).noneMatch(r::equals))
.map(Replica::size)
.mapToLong(y -> y)
.sum();
if ((beforeSize + addedSize)
> brokerMoveCostLimit.getOrDefault(id, DataSize.Byte.of(Long.MAX_VALUE)).bytes())
return true;
}
return false;
}

static boolean brokerPathDiskUsageSizeOverflow(
ClusterInfo before,
ClusterInfo after,
Map<BrokerDiskSpaceCost.BrokerPath, DataSize> diskMoveCostLimit) {
for (var brokerPaths :
Stream.concat(
before.brokerFolders().entrySet().stream(),
after.brokerFolders().entrySet().stream())
.collect(Collectors.toSet())) {
for (var path : brokerPaths.getValue()) {
var brokerPath = BrokerDiskSpaceCost.BrokerPath.of(brokerPaths.getKey(), path);
var beforeSize =
before
.replicaStream(brokerPaths.getKey())
.filter(r -> r.path().equals(path))
.mapToLong(Replica::size)
.sum();
var addedSize =
(Long)
after
.replicaStream(brokerPaths.getKey())
.filter(r -> before.replicaStream(brokerPaths.getKey()).noneMatch(r::equals))
.map(Replica::size)
.mapToLong(y -> y)
.sum();
if ((beforeSize + addedSize)
> diskMoveCostLimit.getOrDefault(brokerPath, DataSize.Byte.of(Long.MAX_VALUE)).bytes())
return true;
}
}
return false;
}

static boolean changedRecordSizeOverflow(
ClusterInfo before, ClusterInfo after, Predicate<Replica> predicate, long limit) {
var totalRemovedSize = 0L;
var totalAddedSize = 0L;
for (var id :
Stream.concat(before.nodes().stream(), after.nodes().stream())
.map(NodeInfo::id)
.parallel()
.collect(Collectors.toSet())) {
var removed =
(int)
before
.replicaStream(id)
.filter(predicate)
.filter(r -> !after.replicas(r.topicPartition()).contains(r))
.mapToLong(Replica::size)
.sum();
var added =
(int)
after
.replicaStream(id)
.filter(predicate)
.filter(r -> !before.replicas(r.topicPartition()).contains(r))
.mapToLong(Replica::size)
.sum();
totalRemovedSize = totalRemovedSize + removed;
totalAddedSize = totalAddedSize + added;
// if migrate cost overflow, leave early and return true
if (totalRemovedSize > limit || totalAddedSize > limit) return true;
}
return Math.max(totalRemovedSize, totalAddedSize) > limit;
}
}
Loading