diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index ed7013ed5fa..40d06617fea 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -679,6 +679,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se get(MASTER_SLOT_ASSIGN_LOADAWARE_FLUSHTIME_WEIGHT) def masterSlotAssignLoadAwareFetchTimeWeight: Double = get(MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT) + def masterSlotAssignLoadAwareActiveSlotsWeight: Double = + get(MASTER_SLOT_ASSIGN_LOADAWARE_ACTIVE_SLOTS_WEIGHT) def masterSlotAssignExtraSlots: Int = get(MASTER_SLOT_ASSIGN_EXTRA_SLOTS) def masterSlotAssignMaxWorkers: Int = get(MASTER_SLOT_ASSIGN_MAX_WORKERS) def masterSlotAssignMinWorkers: Int = get(MASTER_SLOT_ASSIGN_MIN_WORKERS) @@ -3154,6 +3156,15 @@ object CelebornConf extends Logging { .doubleConf .createWithDefault(1) + val MASTER_SLOT_ASSIGN_LOADAWARE_ACTIVE_SLOTS_WEIGHT: ConfigEntry[Double] = + buildConf("celeborn.master.slot.assign.loadAware.activeSlotsWeight") + .categories("master") + .doc( + "Weight of active slots when calculating ordering in load-aware assignment strategy") + .version("0.7.0") + .doubleConf + .createWithDefault(0) + val MASTER_SLOT_ASSIGN_EXTRA_SLOTS: ConfigEntry[Int] = buildConf("celeborn.master.slot.assign.extraSlots") .withAlternative("celeborn.slots.assign.extraSlots") diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 9ae889fe0f0..2667682895a 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -77,6 +77,7 @@ license: | | celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. Provided enough workers are available. | 0.3.0 | celeborn.slots.assign.extraSlots | | celeborn.master.slot.assign.interruptionAware | false | false | If this is set to true, Celeborn master will prioritize partition placement on workers that are not in scope for maintenance soon. | 0.7.0 | | | celeborn.master.slot.assign.interruptionAware.threshold | 50 | false | This controls what percentage of hosts would be selected for slot selection in the first iteration of creating partitions. Default is 50%. | 0.7.0 | | +| celeborn.master.slot.assign.loadAware.activeSlotsWeight | 0.0 | false | Weight of active slots when calculating ordering in load-aware assignment strategy | 0.7.0 | | | celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient | | celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight | | celeborn.master.slot.assign.loadAware.flushTimeWeight | 0.0 | false | Weight of average flush time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.flushTimeWeight | diff --git a/docs/developers/slotsallocation.md b/docs/developers/slotsallocation.md index 71060bba1e6..f1dcb040ed9 100644 --- a/docs/developers/slotsallocation.md +++ b/docs/developers/slotsallocation.md @@ -33,6 +33,7 @@ celeborn.master.slot.assign.loadAware.numDiskGroups 5 celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1 celeborn.master.slot.assign.loadAware.flushTimeWeight 0 celeborn.master.slot.assign.loadAware.fetchTimeWeight 1 +celeborn.master.slot.assign.loadAware.activeSlotsWeight 0 [spark.client.]celeborn.storage.availableTypes HDD,SSD ``` ### Detail @@ -44,7 +45,12 @@ Load-aware slots allocation will take following elements into consideration. - disk's used slot Slots allocator will find out all worker involved in this allocation and sort their disks by -`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight`. +`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight + disk's active slots * active slots weight`. +The average flush/fetch times are measured in nanoseconds, while active slots is a slot count, so +`activeSlotsWeight` is effectively a nanoseconds-per-slot conversion factor. For example, if the +average fetch time is around `100 ms` (`10^8` ns) and a disk has about `1000` active slots, +`activeSlotsWeight=10^5` makes the active-slot term contribute about `10^8`, comparable to the +fetch-time term. After getting the sorted disks list, Celeborn will split the disks into `celeborn.master.slot.assign.loadAware.numDiskGroups` groups. The slots number to be placed into a disk group is controlled by the `celeborn.master.slot.assign.loadAware.diskGroupGradient` which means that a group's diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index 5580cd341a4..0260ac984df 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -124,6 +124,7 @@ static class UsableDiskInfo { double diskGroupGradient, double flushTimeWeight, double fetchTimeWeight, + double activeSlotsWeight, int availableStorageTypes, boolean interruptionAware, int interruptionAwareThreshold) { @@ -193,7 +194,8 @@ static class UsableDiskInfo { Map> slotsRestrictions = getSlotsRestrictionsByLoadAwareAlgorithm( - placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight, fetchTimeWeight), + placeDisksToGroups( + usableDisks, diskGroupCount, flushTimeWeight, fetchTimeWeight, activeSlotsWeight), diskToWorkerMap, shouldReplicate ? partitionIds.size() * 2 : partitionIds.size()); return locateSlots( @@ -685,13 +687,17 @@ private static List> placeDisksToGroups( List usableDisks, int diskGroupCount, double flushTimeWeight, - double fetchTimeWeight) { + double fetchTimeWeight, + double activeSlotsWeight) { List> diskGroups = new ArrayList<>(); usableDisks.sort( (o1, o2) -> { double delta = (o1.avgFlushTime() * flushTimeWeight + o1.avgFetchTime() * fetchTimeWeight) - - (o2.avgFlushTime() * flushTimeWeight + o2.avgFetchTime() * fetchTimeWeight); + + o1.activeSlots() * activeSlotsWeight + - (o2.avgFlushTime() * flushTimeWeight + + o2.avgFetchTime() * fetchTimeWeight + + o2.activeSlots() * activeSlotsWeight); return delta < 0 ? -1 : (delta > 0 ? 1 : 0); }); int diskCount = usableDisks.size(); diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index c87f5373926..81e0e0076ca 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -212,6 +212,7 @@ private[celeborn] class Master( conf.masterSlotAssignLoadAwareDiskGroupGradient private val loadAwareFlushTimeWeight = conf.masterSlotAssignLoadAwareFlushTimeWeight private val loadAwareFetchTimeWeight = conf.masterSlotAssignLoadAwareFetchTimeWeight + private val loadAwareActiveSlotsWeight = conf.masterSlotAssignLoadAwareActiveSlotsWeight private val estimatedPartitionSizeUpdaterInitialDelay = conf.estimatedPartitionSizeUpdaterInitialDelay @@ -981,6 +982,7 @@ private[celeborn] class Master( slotsAssignLoadAwareDiskGroupGradient, loadAwareFlushTimeWeight, loadAwareFetchTimeWeight, + loadAwareActiveSlotsWeight, requestSlots.availableStorageTypes, slotsAssignInterruptionAware, slotsAssignInterruptionAwareThreshold) diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java index b3b53f22173..f7c70d26e57 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java @@ -144,6 +144,41 @@ public void testAllocate3000ReduceIdsWithReplicateOnRoundRobin() { check(workers, partitionIds, shouldReplicate, true, true, false, 0); } + @Test + public void testLoadAwarePrefersLowerActiveSlotsWhenConfigured() { + final List workers = + basePrepareWorkers( + 2, + true, + ImmutableMap.of("/mnt/disk1", 100 * 1024 * 1024 * 1024L), + 64 * 1024 * 1024L, + 1, + false, + new Random(0)); + final DiskInfo overloadedDisk = workers.get(0).diskInfos().get("/mnt/disk1"); + final DiskInfo lightlyReservedDisk = workers.get(1).diskInfos().get("/mnt/disk1"); + overloadedDisk.activeSlots_$eq(1000); + lightlyReservedDisk.activeSlots_$eq(0); + + final Map, List>> slots = + SlotsAllocator.offerSlotsLoadAware( + workers, + Collections.singletonList(0), + false, + false, + 2, + 1, + 0, + 0, + 1, + StorageInfo.ALL_TYPES_AVAILABLE_MASK, + false, + 0); + + assertTrue(slots.containsKey(workers.get(1))); + assertFalse(slots.containsKey(workers.get(0))); + } + private void check( List workers, List partitionIds, @@ -186,6 +221,7 @@ private Map, List> conf.masterSlotAssignLoadAwareDiskGroupGradient(), conf.masterSlotAssignLoadAwareFlushTimeWeight(), conf.masterSlotAssignLoadAwareFetchTimeWeight(), + conf.masterSlotAssignLoadAwareActiveSlotsWeight(), StorageInfo.ALL_TYPES_AVAILABLE_MASK, interruptionAware, interruptionAwareThreshold); @@ -298,6 +334,7 @@ private void checkSlotsOnDFS( 0.1, 0, 1, + 0, StorageInfo.LOCAL_DISK_MASK | availableStorageTypes, false, 0);