Skip to content
Open
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
011ccb3
Implement NetworkIngressAssignor
harryteng9527 Feb 27, 2023
f77a8c7
Add a test for checking greedyAssign
harryteng9527 Feb 27, 2023
34f8492
Add round-robin assign when the cost equals zero
harryteng9527 Feb 27, 2023
17fa82c
spotless
harryteng9527 Feb 27, 2023
2f6c7e7
Merge branch 'main' into impl-assignor
harryteng9527 Feb 27, 2023
ed52d24
add throwing exception when there is no mbeanObjects
harryteng9527 Mar 2, 2023
a9938e8
Merge branch 'main' into impl-assignor
harryteng9527 Mar 2, 2023
9a1f57e
spotless
harryteng9527 Mar 2, 2023
152e5bb
add more condition to verify whether there are sufficient metrics or not
harryteng9527 Mar 2, 2023
1e5cb45
Merge branch 'main' into impl-assignor
harryteng9527 Mar 4, 2023
a3e99b8
Add a parameter to set the waiting time that wait for fetch beanObject
harryteng9527 Mar 4, 2023
18a98ce
mask ClusterInfo with subscribed topics
harryteng9527 Mar 4, 2023
748259e
Add calculating the traffic interval to the score of cost
harryteng9527 Mar 5, 2023
03afbb6
Merge branch 'main' into impl-assignor
harryteng9527 Mar 5, 2023
25f84c8
tweak and add comment
harryteng9527 Mar 6, 2023
29c569c
rename and add comment
harryteng9527 Mar 7, 2023
e92a050
rename and add more comment
harryteng9527 Mar 8, 2023
9d85e8e
Move Kafka configuration to ours
harryteng9527 Mar 8, 2023
c2a8293
Add test
harryteng9527 Mar 8, 2023
9c514d3
Merge branch 'main' into impl-assignor
harryteng9527 Mar 8, 2023
5f389f1
Add ClusterInfo masked
harryteng9527 Mar 8, 2023
25ea0ee
Merge branch 'main' into impl-assignor
harryteng9527 Mar 9, 2023
b1ae99a
remove masked
harryteng9527 Mar 10, 2023
1e0b6bd
add new assign methods
harryteng9527 Mar 11, 2023
f1ec882
add comment and modify greedyAssign
harryteng9527 Mar 11, 2023
3690aa9
Merge branch 'main' into impl-assignor
harryteng9527 Mar 11, 2023
84b6ed4
spotless
harryteng9527 Mar 11, 2023
3e4146d
Add throw exception and change field type
harryteng9527 Mar 12, 2023
e0ccc02
Move the fields to sub-class
harryteng9527 Mar 12, 2023
b1e0acc
Merge branch 'main' into impl-assignor
harryteng9527 Mar 12, 2023
9316267
modify retry machanism
harryteng9527 Mar 15, 2023
574abf3
Fix coding style
harryteng9527 Mar 15, 2023
349da0b
Merge branch 'main' into impl-assignor
harryteng9527 Apr 11, 2023
897d4e8
Merge branch 'main' into impl-assignor
harryteng9527 Apr 14, 2023
c2d73b5
Reference feedback to assign partitions
harryteng9527 Apr 15, 2023
4d4f0b4
Add test for greedyAssign
harryteng9527 Apr 15, 2023
1407b6d
Pass config into NetworkIngressCost
harryteng9527 Apr 15, 2023
52553a5
Replace flatMap to map
harryteng9527 Apr 16, 2023
5f2eb0d
Revise lambda to avoid creating unnecessary object
harryteng9527 Apr 16, 2023
e594ce8
Add comment for greedyAssign
harryteng9527 Apr 16, 2023
38a7d83
Merge branch 'main' into impl-assignor
harryteng9527 Apr 17, 2023
e5aaa8a
Modify object name
harryteng9527 Apr 17, 2023
279128e
Separate assign and check incompatibility
harryteng9527 Apr 23, 2023
489b071
Merge branch 'main' into impl-assignor
harryteng9527 Apr 23, 2023
03517c7
Fix style
harryteng9527 Apr 23, 2023
8406bf6
Add Assign interface to move greedy impl to it
harryteng9527 Apr 23, 2023
534d7cf
Add the interface to reassign based on incompatible
harryteng9527 Apr 23, 2023
cc6582f
Change name
harryteng9527 Apr 23, 2023
10a87de
Fix test
harryteng9527 Apr 23, 2023
8e925cc
Rename interfaces
harryteng9527 Apr 25, 2023
9db68d2
revise shuffle
harryteng9527 May 1, 2023
7462966
Reduce the complexity of shuffle
harryteng9527 May 2, 2023
2fade96
Add test
harryteng9527 May 2, 2023
a6b9b3b
Merge branch 'origin/main' into impl-assignor
harryteng9527 May 2, 2023
74f24cf
Merge branch 'origin/main' into impl-assignor
harryteng9527 May 2, 2023
af35031
Spotless
harryteng9527 May 2, 2023
3e4b57c
Add wait
harryteng9527 May 2, 2023
f842f00
Revise wait
harryteng9527 May 2, 2023
3b0258a
Remove retry and test
harryteng9527 May 2, 2023
353eca7
Merge branch 'origin/main' into impl-assignor
harryteng9527 May 2, 2023
80219e4
Merge branch 'main' into impl-assignor
harryteng9527 May 23, 2023
0d5820e
Use new shuffler
harryteng9527 May 23, 2023
7e5aac6
Add filter to avoid Null pointer and make skewCostLimiter more strict
harryteng9527 May 23, 2023
98be5ee
Modify randomShuffler signature, replace config to shuffleTime
harryteng9527 May 23, 2023
9fb2b11
Add GeneratorTest and modify Hint
harryteng9527 May 23, 2023
43a25f9
Fix Hint
harryteng9527 May 23, 2023
d4f55a8
Merge branch 'main' into impl-assignor
harryteng9527 May 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions common/src/main/java/org/astraea/common/assignor/Assignor.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.cost.HasPartitionCost;
import org.astraea.common.cost.ReplicaLeaderSizeCost;
import org.astraea.common.cost.NetworkIngressCost;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.collector.MetricStore;
Expand All @@ -53,7 +53,6 @@ public abstract class Assignor implements ConsumerPartitionAssignor, Configurabl
throw new NoSuchElementException("must define either broker.x.jmx.port or jmx.port");
};
HasPartitionCost costFunction = HasPartitionCost.EMPTY;
Comment thread
harryteng9527 marked this conversation as resolved.
// TODO: metric collector may be configured by user in the future.
// TODO: need to track the performance when using the assignor in large scale consumers, see
// https://github.com/skiptests/astraea/pull/1162#discussion_r1036285677
protected MetricStore metricStore = null;
Expand All @@ -69,8 +68,6 @@ public abstract class Assignor implements ConsumerPartitionAssignor, Configurabl
*/
protected abstract Map<String, List<TopicPartition>> assign(
Map<String, SubscriptionInfo> subscriptions, ClusterInfo clusterInfo);
// TODO: replace the topicPartitions by ClusterInfo after Assignor is able to handle Admin
// https://github.com/skiptests/astraea/issues/1409

/**
* Parse config to get JMX port and cost function type.
Expand Down Expand Up @@ -110,9 +107,6 @@ public final GroupAssignment assign(Cluster metadata, GroupSubscription groupSub
// convert Kafka's data structure to ours
var subscriptionsPerMember = GroupSubscriptionInfo.from(groupSubscription).groupSubscription();

// TODO: Detected if consumers subscribed to the same topics.
// For now, assume that the consumers only subscribed to identical topics

return new GroupAssignment(
assign(subscriptionsPerMember, clusterInfo).entrySet().stream()
.collect(
Expand Down Expand Up @@ -146,7 +140,7 @@ public final void configure(Map<String, ?> configs) {
var defaultJMXPort = config.integer(JMX_PORT);
this.costFunction =
costFunctions.isEmpty()
? HasPartitionCost.of(Map.of(new ReplicaLeaderSizeCost(), 1D))
? HasPartitionCost.of(Map.of(new NetworkIngressCost(config), 1D))
: HasPartitionCost.of(costFunctions);
this.jmxPortGetter =
id ->
Expand Down
68 changes: 68 additions & 0 deletions common/src/main/java/org/astraea/common/assignor/Combinator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.assignor;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.astraea.common.admin.TopicPartition;

@FunctionalInterface
public interface Combinator {
Map<String, List<TopicPartition>> combine(
Map<String, SubscriptionInfo> subscriptions, Map<TopicPartition, Double> costs);

/**
* Using a greedy strategy to assign partitions to consumers, selecting the consumer with the
* lowest cost each time to assign.
*
* @return the assignment by greedy strategy
*/
static Combinator greedy() {
Comment thread
harryteng9527 marked this conversation as resolved.
Outdated
return (subscriptions, costs) -> {
var tmpConsumerCost =
subscriptions.keySet().stream()
.collect(Collectors.toMap(Function.identity(), ignore -> 0.0D));

var lowestCostConsumer =
Comment thread
harryteng9527 marked this conversation as resolved.
Outdated
(Function<TopicPartition, String>)
(tp) ->
tmpConsumerCost.entrySet().stream()
.filter(e -> subscriptions.get(e.getKey()).topics().contains(tp.topic()))
.min(Map.Entry.comparingByValue())
.get()
.getKey();

var result =
costs.entrySet().stream()
.map(
e -> {
var consumer = lowestCostConsumer.apply(e.getKey());
tmpConsumerCost.compute(
consumer, (ignore, totalCost) -> totalCost + e.getValue());
return Map.entry(consumer, e.getKey());
})
.collect(
Collectors.groupingBy(
Map.Entry::getKey,
Collectors.mapping(Map.Entry::getValue, Collectors.toList())));

return result;
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.assignor;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.TopicPartition;

/**
* This assignor scores the partitions by cost function(s) that user given. Each cost function
* evaluate the partitions' cost in each node by metrics depend on which cost function user use. The
* default cost function ranks partitions that are in the same node by NetworkIngressCost{@link
* org.astraea.common.cost.NetworkIngressCost}
*
* <p>The important configs are JMX port. Most cost function need the JMX metrics to score
* partitions. Normally, all brokers use the same JMX port, so you could just define the
* `jmx.port=12345`. If one of brokers uses different JMX client port, you can define
* `broker.1001.jmx.port=3456` (`1001` is the broker id) to replace the value of `jmx.port`. If the
* jmx port is undefined, only local mbean client is created for each cost function.
*/
public class CostAwareAssignor extends Assignor {
Comment thread
harryteng9527 marked this conversation as resolved.
protected static final String MAX_RETRY_TIME = "max.retry.time";
protected static final String SHUFFLE_TIME = "shuffle.time";
Duration maxRetryTime = Duration.ofSeconds(30);
Duration shuffleTime = Duration.ofSeconds(5);

@Override
protected Map<String, List<TopicPartition>> assign(
Comment thread
harryteng9527 marked this conversation as resolved.
Map<String, SubscriptionInfo> subscriptions, ClusterInfo clusterInfo) {
var subscribedTopics =
subscriptions.values().stream()
.map(SubscriptionInfo::topics)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());

metricStore.wait(
(clusterBean) ->
costFunction.partitionCost(clusterInfo, clusterBean).value().values().stream()
.noneMatch(v -> Double.isNaN(v)),
shuffleTime);

var clusterBean = metricStore.clusterBean();
var partitionCost = costFunction.partitionCost(clusterInfo, clusterBean);
var cost =
partitionCost.value().entrySet().stream()
.filter(e -> subscribedTopics.contains(e.getKey().topic()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
var incompatiblePartition = partitionCost.incompatibility();

var assignment = Combinator.greedy().combine(subscriptions, cost);
return Shuffler.incompatible(shuffleTime)
.shuffle(subscriptions, assignment, incompatiblePartition, cost);
}

@Override
protected void configure(Configuration config) {
config.duration(MAX_RETRY_TIME).ifPresent(v -> this.maxRetryTime = v);
config.duration(SHUFFLE_TIME).ifPresent(v -> this.shuffleTime = v);
}

@Override
public String name() {
return "costAware";
}
}
141 changes: 141 additions & 0 deletions common/src/main/java/org/astraea/common/assignor/Shuffler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.assignor;

import java.time.Duration;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.astraea.common.admin.TopicPartition;

@FunctionalInterface
public interface Shuffler {
/**
* Try to avoid putting incompatible partitions on the same consumer.
*
* @param subscriptions the subscription of consumers
* @param assignment assignment
* @param incompatible incompatible partition calculated by cost function
* @param costs partition cost
* @return assignment that filter out most incompatible partitions
*/
Map<String, List<TopicPartition>> shuffle(
Map<String, SubscriptionInfo> subscriptions,
Map<String, List<TopicPartition>> assignment,
Map<TopicPartition, Set<TopicPartition>> incompatible,
Map<TopicPartition, Double> costs);

static Shuffler incompatible(Duration maxTime) {
Comment thread
harryteng9527 marked this conversation as resolved.
Outdated
return (subscriptions, assignment, incompatible, costs) -> {
if (incompatible.isEmpty()) return assignment;
// get the incompatible partitions of each consumer from consumer assignment
var unsuitable =
assignment.entrySet().stream()
.map(
e ->
Map.entry(
e.getKey(),
e.getValue().stream()
.filter(incompatible::containsKey)
.flatMap(tp -> incompatible.get(tp).stream())
.collect(Collectors.toUnmodifiableSet())))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
if (assignment.entrySet().stream()
.noneMatch(
e -> e.getValue().stream().anyMatch(tp -> unsuitable.get(e.getKey()).contains(tp))))
return assignment;

var possibleAssignments = new HashSet<Map<String, List<TopicPartition>>>();
var randomAssign =
(Function<TopicPartition, String>)
(tp) -> {
var subsConsumer =
subscriptions.entrySet().stream()
.filter(e -> e.getValue().topics().contains(tp.topic()))
.toList();
return subsConsumer
.get(ThreadLocalRandom.current().nextInt(subsConsumer.size()))
.getKey();
};

var start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < maxTime.toMillis()) {
possibleAssignments.add(
costs.keySet().stream()
.map(tp -> Map.entry(randomAssign.apply(tp), tp))
.collect(
Collectors.groupingBy(
Map.Entry::getKey,
Collectors.mapping(Map.Entry::getValue, Collectors.toUnmodifiableList()))));
}

var standardSigma =
(Function<Map<String, List<TopicPartition>>, Double>)
(r) -> {
var totalCost =
r.entrySet().stream()
.map(
e ->
Map.entry(
e.getKey(),
e.getValue().stream().mapToDouble(costs::get).sum()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
var avg = totalCost.values().stream().mapToDouble(d -> d).average().getAsDouble();

return Math.sqrt(
totalCost.values().stream().mapToDouble(c -> Math.pow(avg - c, 2)).sum()
/ totalCost.size());
};
var numberOfIncompatibility =
(Function<Map<String, List<TopicPartition>>, Integer>)
(possibleAssignment) -> {
var unsuit =
possibleAssignment.entrySet().stream()
.map(
e ->
Map.entry(
e.getKey(),
e.getValue().stream()
.flatMap(tp -> incompatible.get(tp).stream())
.collect(Collectors.toUnmodifiableSet())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return possibleAssignment.entrySet().stream()
.mapToInt(
e ->
(int)
e.getValue().stream()
.filter(tp -> unsuit.get(e.getKey()).contains(tp))
.count())
.sum();
};

return possibleAssignments.stream()
.map(e -> Map.entry(e, standardSigma.apply(e)))
.sorted(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.limit((int) Math.floor((double) possibleAssignments.size() / 10))
.min(Comparator.comparingLong(numberOfIncompatibility::apply))
.get();
};
}
}
Loading