Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions common/src/main/java/org/astraea/common/admin/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,39 +91,6 @@ private static Map<Integer, DataSize> changedRecordSize(
Collectors.toMap(Function.identity(), n -> DataSize.Byte.of(cost.getOrDefault(n, 0L))));
}

static boolean changedRecordSizeOverflow(
ClusterInfo before, ClusterInfo after, Predicate<Replica> predicate, long limit) {
var totalRemovedSize = 0L;
var totalAddedSize = 0L;
for (var id :
Stream.concat(before.nodes().stream(), after.nodes().stream())
.map(NodeInfo::id)
.parallel()
.collect(Collectors.toSet())) {
var removed =
(int)
before
.replicaStream(id)
.filter(predicate)
.filter(r -> !after.replicas(r.topicPartition()).contains(r))
.mapToLong(Replica::size)
.sum();
var added =
(int)
after
.replicaStream(id)
.filter(predicate)
.filter(r -> !before.replicas(r.topicPartition()).contains(r))
.mapToLong(Replica::size)
.sum();
totalRemovedSize = totalRemovedSize + removed;
totalAddedSize = totalAddedSize + added;
// if migrate cost overflow, leave early and return true
if (totalRemovedSize > limit || totalAddedSize > limit) return true;
}
return Math.max(totalRemovedSize, totalAddedSize) > limit;
}

static Map<Integer, Integer> changedReplicaNumber(
ClusterInfo before, ClusterInfo after, Predicate<Replica> predicate) {
return Stream.concat(before.nodes().stream(), after.nodes().stream())
Expand Down
118 changes: 118 additions & 0 deletions common/src/main/java/org/astraea/common/cost/BrokerDiskSpaceCost.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.cost;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;

public class BrokerDiskSpaceCost implements HasMoveCost {

public static final String BROKER_COST_LIMIT_KEY = "max.broker.total.disk.space";
public static final String BROKER_PATH_COST_LIMIT_KEY = "max.broker.path.disk.space";
private final Map<Integer, DataSize> brokerMoveCostLimit;
private final Map<BrokerPath, DataSize> diskMoveCostLimit;

public BrokerDiskSpaceCost(Configuration configuration) {
this.diskMoveCostLimit = diskMoveCostLimit(configuration);
this.brokerMoveCostLimit = brokerMoveCostLimit(configuration);
}

@Override
public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
if (CostUtils.brokerDiskUsageSizeOverflow(before, after, brokerMoveCostLimit))
return () -> true;
if (CostUtils.brokerPathDiskUsageSizeOverflow(before, after, diskMoveCostLimit))
return () -> true;
return () -> false;
}

private Map<BrokerPath, DataSize> diskMoveCostLimit(Configuration configuration) {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
return configuration
.string(BROKER_PATH_COST_LIMIT_KEY)
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
.map(
s ->
Arrays.stream(s.split(","))
.map(
idAndPath -> {
var brokerPathAndLimit = idAndPath.split(":");
var brokerPath = brokerPathAndLimit[0].split("-");
return Map.entry(
BrokerPath.of(Integer.parseInt(brokerPath[0]), brokerPath[1]),
DataSize.of(brokerPathAndLimit[1]));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(Map.of());
}

private Map<Integer, DataSize> brokerMoveCostLimit(Configuration configuration) {
return configuration
.string(BROKER_COST_LIMIT_KEY)
.map(
s ->
Arrays.stream(s.split(","))
.map(
idAndPath -> {
var brokerAndLimit = idAndPath.split(":");
return Map.entry(
Integer.parseInt(brokerAndLimit[0]), DataSize.of(brokerAndLimit[1]));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.orElse(Map.of());
}

static class BrokerPath {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated

private final int broker;
private final String path;

public static BrokerPath of(int broker, String path) {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
return new BrokerPath(broker, path);
}

public BrokerPath(int broker, String path) {
this.broker = broker;
this.path = path;
}

public int broker() {
return broker;
}

public String path() {
return path;
}

@Override
public boolean equals(Object o) {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BrokerPath that = (BrokerPath) o;
return broker == that.broker && Objects.equals(path, that.path);
}

@Override
public int hashCode() {
return Objects.hash(broker, path);
}
}
}
119 changes: 119 additions & 0 deletions common/src/main/java/org/astraea/common/cost/CostUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.cost;

import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.astraea.common.DataSize;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Replica;

class CostUtils {
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
Comment thread
qoo332001 marked this conversation as resolved.
Outdated

static boolean brokerDiskUsageSizeOverflow(
Comment thread
qoo332001 marked this conversation as resolved.
Outdated
ClusterInfo before, ClusterInfo after, Map<Integer, DataSize> brokerMoveCostLimit) {
for (var id :
Stream.concat(before.nodes().stream(), after.nodes().stream())
.map(NodeInfo::id)
.parallel()
.collect(Collectors.toSet())) {

var beforeSize = (Long) before.replicaStream(id).map(Replica::size).mapToLong(y -> y).sum();
var addedSize =
(Long)
after
.replicaStream(id)
.filter(r -> before.replicaStream(id).noneMatch(r::equals))
.map(Replica::size)
.mapToLong(y -> y)
.sum();
if ((beforeSize + addedSize)
> brokerMoveCostLimit.getOrDefault(id, DataSize.Byte.of(Long.MAX_VALUE)).bytes())
return true;
}
return false;
}

static boolean brokerPathDiskUsageSizeOverflow(
ClusterInfo before,
ClusterInfo after,
Map<BrokerDiskSpaceCost.BrokerPath, DataSize> diskMoveCostLimit) {
for (var brokerPaths :
Stream.concat(
before.brokerFolders().entrySet().stream(),
after.brokerFolders().entrySet().stream())
.collect(Collectors.toSet())) {
for (var path : brokerPaths.getValue()) {
var brokerPath = BrokerDiskSpaceCost.BrokerPath.of(brokerPaths.getKey(), path);
var beforeSize =
before
.replicaStream(brokerPaths.getKey())
.filter(r -> r.path().equals(path))
.mapToLong(Replica::size)
.sum();
var addedSize =
(Long)
after
.replicaStream(brokerPaths.getKey())
.filter(r -> before.replicaStream(brokerPaths.getKey()).noneMatch(r::equals))
.map(Replica::size)
.mapToLong(y -> y)
.sum();
if ((beforeSize + addedSize)
> diskMoveCostLimit.getOrDefault(brokerPath, DataSize.Byte.of(Long.MAX_VALUE)).bytes())
return true;
}
}
return false;
}

static boolean changedRecordSizeOverflow(
ClusterInfo before, ClusterInfo after, Predicate<Replica> predicate, long limit) {
var totalRemovedSize = 0L;
var totalAddedSize = 0L;
for (var id :
Stream.concat(before.nodes().stream(), after.nodes().stream())
.map(NodeInfo::id)
.parallel()
.collect(Collectors.toSet())) {
var removed =
(int)
before
.replicaStream(id)
.filter(predicate)
.filter(r -> !after.replicas(r.topicPartition()).contains(r))
.mapToLong(Replica::size)
.sum();
var added =
(int)
after
.replicaStream(id)
.filter(predicate)
.filter(r -> !before.replicas(r.topicPartition()).contains(r))
.mapToLong(Replica::size)
.sum();
totalRemovedSize = totalRemovedSize + removed;
totalAddedSize = totalAddedSize + added;
// if migrate cost overflow, leave early and return true
if (totalRemovedSize > limit || totalAddedSize > limit) return true;
}
return Math.max(totalRemovedSize, totalAddedSize) > limit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clus
.map(DataSize::bytes)
.orElse(Long.MAX_VALUE);
var overflow =
ClusterInfo.changedRecordSizeOverflow(before, after, ignored -> true, maxMigratedSize);
CostUtils.changedRecordSizeOverflow(before, after, ignored -> true, maxMigratedSize);
return () -> overflow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clus
var maxMigratedLeaderSize =
config.string(COST_LIMIT_KEY).map(DataSize::of).map(DataSize::bytes).orElse(Long.MAX_VALUE);
var overflow =
ClusterInfo.changedRecordSizeOverflow(
CostUtils.changedRecordSizeOverflow(
before, after, Replica::isLeader, maxMigratedLeaderSize);
return () -> overflow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,7 @@ void testChangedReplicaNumber() {
}

@Test
void testChangedRecordSizeOverflow() {
var limit = 1600;
void testChangedRecordSize() {
var moveInResult = ClusterInfo.recordSizeToSync(beforeClusterInfo(), afterClusterInfo());
Assertions.assertEquals(3, moveInResult.size());
Assertions.assertEquals(0, moveInResult.get(0).bytes());
Expand All @@ -375,15 +374,6 @@ void testChangedRecordSizeOverflow() {
Assertions.assertEquals(100 + 500, moveOutResult.get(0).bytes());
Assertions.assertEquals(0, moveOutResult.get(1).bytes());
Assertions.assertEquals(1000, moveOutResult.get(2).bytes());

var totalResult =
ClusterInfo.changedRecordSizeOverflow(
beforeClusterInfo(), afterClusterInfo(), ignored -> true, limit);
var overflowResult =
ClusterInfo.changedRecordSizeOverflow(
beforeClusterInfo(), afterClusterInfo(), ignored -> true, limit - 100);
Assertions.assertFalse(totalResult);
Assertions.assertTrue(overflowResult);
}

/*
Expand Down
Loading