Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,9 @@ class ChangePartitionManager(
|| (unavailableWorkerRatio >= dynamicResourceUnavailableFactor)) {

// get new available workers for the request partition ids
val partitionIds = new util.ArrayList[Integer](
changePartitions.map(_.partitionId).map(Integer.valueOf).toList.asJava)
// The partition id value is not important here because we're just trying to get the workers to use
val requestSlotsRes =
lifecycleManager.requestMasterRequestSlotsWithRetry(shuffleId, partitionIds)
lifecycleManager.requestMasterRequestSlotsWithRetry(shuffleId, changePartitions.size)

requestSlotsRes.status match {
case StatusCode.REQUEST_FAILED =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,9 +776,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}

// First, request to get allocated slots from Primary
val ids = new util.ArrayList[Integer](numPartitions)
(0 until numPartitions).foreach(idx => ids.add(Integer.valueOf(idx)))
val res = requestMasterRequestSlotsWithRetry(shuffleId, ids)
val res = requestMasterRequestSlotsWithRetry(shuffleId, numPartitions)

res.status match {
case StatusCode.REQUEST_FAILED =>
Expand Down Expand Up @@ -1832,7 +1830,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends

def requestMasterRequestSlotsWithRetry(
shuffleId: Int,
ids: util.ArrayList[Integer]): RequestSlotsResponse = {
numPartitions: Int): RequestSlotsResponse = {
val excludedWorkerSet =
if (excludedWorkersFilter) {
workerStatusTracker.excludedWorkers.asScala.keys.toSet
Expand All @@ -1845,7 +1843,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
RequestSlots(
appUniqueId,
shuffleId,
ids,
numPartitions,
lifecycleHost,
pushReplicateEnabled,
pushRackAwareEnabled,
Expand Down
18 changes: 18 additions & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ enum MessageType {
READ_REDUCER_PARTITION_END = 94;
READ_REDUCER_PARTITION_END_RESPONSE = 95;
REGISTER_APPLICATION_INFO = 96;

REQUEST_SLOTS_V2 = 97;
}

enum StreamType {
Expand Down Expand Up @@ -325,6 +327,22 @@ message PbRequestSlots {
string tagsExpr = 14;
}

message PbRequestSlotsV2 {
string applicationId = 1;
int32 shuffleId = 2;
int32 numPartitions = 3;
string hostname = 4;
bool shouldReplicate = 5;
string requestId = 6;
PbUserIdentifier userIdentifier = 7;
bool shouldRackAware = 8;
int32 maxWorkers = 9;
int32 availableStorageTypes = 10;
repeated PbWorkerInfo excludedWorkerSet = 11;
bool packed = 12;
string tagsExpr = 13;
}

message PbSlotInfo {
map<string, int32> slot = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ object ControlMessages extends Logging {
case class RequestSlots(
applicationId: String,
shuffleId: Int,
partitionIdList: util.ArrayList[Integer],
numPartitions: Int,
hostname: String,
shouldReplicate: Boolean,
shouldRackAware: Boolean,
Expand Down Expand Up @@ -650,7 +650,7 @@ object ControlMessages extends Logging {
case RequestSlots(
applicationId,
shuffleId,
partitionIdList,
numPartitions,
hostname,
shouldReplicate,
shouldRackAware,
Expand All @@ -661,10 +661,10 @@ object ControlMessages extends Logging {
packed,
tagsExpr,
requestId) =>
val payload = PbRequestSlots.newBuilder()
val payload = PbRequestSlotsV2.newBuilder()
.setApplicationId(applicationId)
.setShuffleId(shuffleId)
.addAllPartitionIdList(partitionIdList)
.setNumPartitions(numPartitions)
.setHostname(hostname)
.setShouldReplicate(shouldReplicate)
.setShouldRackAware(shouldRackAware)
Expand All @@ -677,7 +677,7 @@ object ControlMessages extends Logging {
.setPacked(packed)
.setTagsExpr(tagsExpr)
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)
new TransportMessage(MessageType.REQUEST_SLOTS_V2, payload)

case RequestSlotsResponse(status, workerResource, packed) =>
val builder = PbRequestSlotsResponse.newBuilder()
Expand Down Expand Up @@ -1151,7 +1151,7 @@ object ControlMessages extends Logging {
RequestSlots(
pbRequestSlots.getApplicationId,
pbRequestSlots.getShuffleId,
new util.ArrayList[Integer](pbRequestSlots.getPartitionIdListList),
pbRequestSlots.getPartitionIdListList.size(),
pbRequestSlots.getHostname,
pbRequestSlots.getShouldReplicate,
pbRequestSlots.getShouldRackAware,
Expand All @@ -1163,6 +1163,26 @@ object ControlMessages extends Logging {
pbRequestSlots.getTagsExpr,
pbRequestSlots.getRequestId)

case REQUEST_SLOTS_V2_VALUE =>
val pb = PbRequestSlotsV2.parseFrom(message.getPayload)
val userIdentifier = PbSerDeUtils.fromPbUserIdentifier(pb.getUserIdentifier)
val excludedWorkerInfoSet =
pb.getExcludedWorkerSetList.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toSet
RequestSlots(
pb.getApplicationId,
pb.getShuffleId,
pb.getNumPartitions,
pb.getHostname,
pb.getShouldReplicate,
pb.getShouldRackAware,
userIdentifier,
pb.getMaxWorkers,
pb.getAvailableStorageTypes,
excludedWorkerInfoSet,
pb.getPacked,
pb.getTagsExpr,
pb.getRequestId)

case REQUEST_SLOTS_RESPONSE_VALUE =>
val pbRequestSlotsResponse = PbRequestSlotsResponse.parseFrom(message.getPayload)
val workerResource =
Expand Down
55 changes: 55 additions & 0 deletions master/benchmarks/SlotsAllocatorBenchmark-jdk17-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
================================================================================================
200 workers, 10K partitions, no replication
================================================================================================

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 15.4
Apple M2 Pro
200 workers, 10K partitions, no replication: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
offerSlotsRoundRobin 1 1 0 15.6 64.3 1.0X


================================================================================================
200 workers, 100K partitions, no replication
================================================================================================

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 15.4
Apple M2 Pro
200 workers, 100K partitions, no replication: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------
offerSlotsRoundRobin 6 7 0 15.8 63.4 1.0X


================================================================================================
500 workers, 100K partitions, with replication
================================================================================================

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 15.4
Apple M2 Pro
500 workers, 100K partitions, with replication: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------
offerSlotsRoundRobin 12 15 2 8.0 124.5 1.0X


================================================================================================
500 workers, 2M partitions, no replication
================================================================================================

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 15.4
Apple M2 Pro
500 workers, 2M partitions, no replication: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
offerSlotsRoundRobin 252 351 102 7.9 126.1 1.0X


================================================================================================
1000 workers, 500K partitions, with replication
================================================================================================

OpenJDK 64-Bit Server VM 17.0.17+10 on Mac OS X 15.4
Apple M2 Pro
1000 workers, 500K partitions, with replication: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
offerSlotsRoundRobin 77 159 46 6.5 154.7 1.0X


Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ private static List<Integer> roundRobin(
}
// workerInfo -> (diskIndexForPrimaryAndReplica)
Map<WorkerInfo, Integer> workerDiskIndex = new HashMap<>();
List<Integer> partitionIdList = new LinkedList<>(partitionIds);

final int primaryWorkersSize = primaryWorkers.size();
final int replicaWorkersSize = replicaWorkers.size();
Expand All @@ -533,19 +532,27 @@ private static List<Integer> roundRobin(
replicaIndex = -1;
}

ListIterator<Integer> iter = partitionIdList.listIterator(partitionIdList.size());
// Iterate from the end to preserve O(1) removal of processed partitions.
// This is important when we have a high number of concurrent apps that have a
// high number of partitions.
// Pre-compute usable slots per worker to avoid repeated stream operations O(N*W) -> O(W)
long[] primaryUsableSlots = null;
long[] replicaUsableSlots = null;
if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) {
primaryUsableSlots = computeUsableSlots(primaryWorkers, slotsRestrictions);
if (shouldReplicate) {
replicaUsableSlots = computeUsableSlots(replicaWorkers, slotsRestrictions);
}
}

// Use index-based iteration to avoid O(N^2) LinkedList.remove() overhead.
int allocatedCount = 0;
outer:
while (iter.hasPrevious()) {
for (int pidIdx = 0; pidIdx < partitionIds.size(); pidIdx++) {
int nextPrimaryInd = primaryIndex;

int partitionId = iter.previous();
int partitionId = partitionIds.get(pidIdx);
StorageInfo storageInfo;
if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) {
if (primaryUsableSlots != null) {
// this means that we'll select a mount point
while (!haveUsableSlots(slotsRestrictions, primaryWorkers, nextPrimaryInd)) {
while (primaryUsableSlots[nextPrimaryInd] <= 0) {
nextPrimaryInd = primaryWorkersIncrementIndex.applyAsInt(nextPrimaryInd);
if (nextPrimaryInd == primaryIndex) {
break outer;
Expand All @@ -558,6 +565,7 @@ private static List<Integer> roundRobin(
slotsRestrictions,
workerDiskIndex,
availableStorageTypes);
primaryUsableSlots[nextPrimaryInd]--;
} else {
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
while (!primaryWorkers.get(nextPrimaryInd).haveDisk()) {
Expand All @@ -576,9 +584,9 @@ private static List<Integer> roundRobin(

if (shouldReplicate) {
int nextReplicaInd = replicaIndex;
if (slotsRestrictions != null) {
if (replicaUsableSlots != null) {
while ((nextReplicaInd == nextPrimaryInd && skipLocationsOnSameWorkerCheck)
|| !haveUsableSlots(slotsRestrictions, replicaWorkers, nextReplicaInd)
|| replicaUsableSlots[nextReplicaInd] <= 0
|| !satisfyRackAware(
shouldRackAware,
primaryWorkers,
Expand All @@ -597,6 +605,7 @@ private static List<Integer> roundRobin(
slotsRestrictions,
workerDiskIndex,
availableStorageTypes);
replicaUsableSlots[nextReplicaInd]--;
} else if (shouldRackAware) {
while ((nextReplicaInd == nextPrimaryInd && skipLocationsOnSameWorkerCheck)
|| !satisfyRackAware(
Expand Down Expand Up @@ -642,9 +651,26 @@ private static List<Integer> roundRobin(
v -> new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
locations._1.add(primaryPartition);
primaryIndex = primaryWorkersIncrementIndex.applyAsInt(nextPrimaryInd);
iter.remove();
allocatedCount++;
}
if (allocatedCount == partitionIds.size()) {
return Collections.emptyList();
}
return partitionIdList;
return new ArrayList<>(partitionIds.subList(allocatedCount, partitionIds.size()));
}

private static long[] computeUsableSlots(
List<WorkerInfo> workers, Map<WorkerInfo, List<UsableDiskInfo>> restrictions) {
long[] slots = new long[workers.size()];
for (int i = 0; i < workers.size(); i++) {
List<UsableDiskInfo> disks = restrictions.get(workers.get(i));
if (disks != null) {
for (UsableDiskInfo d : disks) {
slots[i] += d.usableSlots;
}
}
}
return slots;
}

private static boolean haveUsableSlots(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ private[celeborn] class Master(
}

def handleRequestSlots(context: RpcCallContext, requestSlots: RequestSlots): Unit = {
val numReducers = requestSlots.partitionIdList.size()
val numReducers = requestSlots.numPartitions
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId)

var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
Expand Down Expand Up @@ -966,14 +966,21 @@ private[celeborn] class Master(
0,
startIndex + numWorkers - numAvailableWorkers))
}
// Build partitionIds list locally from numPartitions
val partitionIds = new util.ArrayList[Integer](numReducers)
var i = 0
while (i < numReducers) {
partitionIds.add(Integer.valueOf(i))
i += 1
}
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") {
statusSystem.workersMap.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) {
SlotsAllocator.offerSlotsLoadAware(
selectedWorkers,
requestSlots.partitionIdList,
partitionIds,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
slotsAssignLoadAwareDiskGroupNum,
Expand All @@ -986,7 +993,7 @@ private[celeborn] class Master(
} else {
SlotsAllocator.offerSlotsRoundRobin(
selectedWorkers,
requestSlots.partitionIdList,
partitionIds,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
requestSlots.availableStorageTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class MasterSuite extends AnyFunSuite
val requestSlots = RequestSlots(
"app1",
0,
new util.ArrayList[Integer](),
0,
"localhost",
shouldReplicate = false,
shouldRackAware = false,
Expand Down
Loading
Loading