Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
get(MASTER_SLOT_ASSIGN_LOADAWARE_FLUSHTIME_WEIGHT)
def masterSlotAssignLoadAwareFetchTimeWeight: Double =
get(MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT)
def masterSlotAssignLoadAwareActiveSlotsWeight: Double =
get(MASTER_SLOT_ASSIGN_LOADAWARE_ACTIVE_SLOTS_WEIGHT)
def masterSlotAssignExtraSlots: Int = get(MASTER_SLOT_ASSIGN_EXTRA_SLOTS)
def masterSlotAssignMaxWorkers: Int = get(MASTER_SLOT_ASSIGN_MAX_WORKERS)
def masterSlotAssignMinWorkers: Int = get(MASTER_SLOT_ASSIGN_MIN_WORKERS)
Expand Down Expand Up @@ -3154,6 +3156,16 @@ object CelebornConf extends Logging {
.doubleConf
.createWithDefault(1)

val MASTER_SLOT_ASSIGN_LOADAWARE_ACTIVE_SLOTS_WEIGHT: ConfigEntry[Double] =
buildConf("celeborn.master.slot.assign.loadAware.activeSlotsWeight")
.withAlternative("celeborn.slots.assign.loadAware.activeSlotsWeight")
Comment thread
SteNicholas marked this conversation as resolved.
Outdated
.categories("master")
.doc(
"Weight of active slots when calculating ordering in load-aware assignment strategy")
.version("0.7.0")
.doubleConf
.createWithDefault(0)

val MASTER_SLOT_ASSIGN_EXTRA_SLOTS: ConfigEntry[Int] =
buildConf("celeborn.master.slot.assign.extraSlots")
.withAlternative("celeborn.slots.assign.extraSlots")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ license: |
| celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. Provided enough workers are available. | 0.3.0 | celeborn.slots.assign.extraSlots |
| celeborn.master.slot.assign.interruptionAware | false | false | If this is set to true, Celeborn master will prioritize partition placement on workers that are not in scope for maintenance soon. | 0.7.0 | |
| celeborn.master.slot.assign.interruptionAware.threshold | 50 | false | This controls what percentage of hosts would be selected for slot selection in the first iteration of creating partitions. Default is 50%. | 0.7.0 | |
| celeborn.master.slot.assign.loadAware.activeSlotsWeight | 0.0 | false | Weight of active slots when calculating ordering in load-aware assignment strategy | 0.7.0 | celeborn.slots.assign.loadAware.activeSlotsWeight |
| celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient |
| celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight |
| celeborn.master.slot.assign.loadAware.flushTimeWeight | 0.0 | false | Weight of average flush time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.flushTimeWeight |
Expand Down
3 changes: 2 additions & 1 deletion docs/developers/slotsallocation.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ celeborn.master.slot.assign.loadAware.numDiskGroups 5
celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1
celeborn.master.slot.assign.loadAware.flushTimeWeight 0
celeborn.master.slot.assign.loadAware.fetchTimeWeight 1
celeborn.master.slot.assign.loadAware.activeSlotsWeight 0
[spark.client.]celeborn.storage.availableTypes HDD,SSD
```
### Detail
Expand All @@ -44,7 +45,7 @@ Load-aware slots allocation will take following elements into consideration.
- disk's used slot

Slots allocator will find out all worker involved in this allocation and sort their disks by
`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight`.
`disk's average flushtime * flush time weight + disk's average fetch time * fetch time weight + disk's active slots * active slots weight`.
Comment thread
SteNicholas marked this conversation as resolved.
After getting the sorted disks list, Celeborn will split the disks into
`celeborn.master.slot.assign.loadAware.numDiskGroups` groups. The slots number to be placed into a disk group
is controlled by the `celeborn.master.slot.assign.loadAware.diskGroupGradient` which means that a group's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ static class UsableDiskInfo {
double diskGroupGradient,
double flushTimeWeight,
double fetchTimeWeight,
double activeSlotsWeight,
int availableStorageTypes,
boolean interruptionAware,
int interruptionAwareThreshold) {
Expand Down Expand Up @@ -193,7 +194,8 @@ static class UsableDiskInfo {

Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions =
getSlotsRestrictionsByLoadAwareAlgorithm(
placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight, fetchTimeWeight),
placeDisksToGroups(
usableDisks, diskGroupCount, flushTimeWeight, fetchTimeWeight, activeSlotsWeight),
diskToWorkerMap,
shouldReplicate ? partitionIds.size() * 2 : partitionIds.size());
return locateSlots(
Expand Down Expand Up @@ -685,13 +687,17 @@ private static List<List<DiskInfo>> placeDisksToGroups(
List<DiskInfo> usableDisks,
int diskGroupCount,
double flushTimeWeight,
double fetchTimeWeight) {
double fetchTimeWeight,
double activeSlotsWeight) {
List<List<DiskInfo>> diskGroups = new ArrayList<>();
usableDisks.sort(
(o1, o2) -> {
double delta =
(o1.avgFlushTime() * flushTimeWeight + o1.avgFetchTime() * fetchTimeWeight)
- (o2.avgFlushTime() * flushTimeWeight + o2.avgFetchTime() * fetchTimeWeight);
+ o1.activeSlots() * activeSlotsWeight
- (o2.avgFlushTime() * flushTimeWeight
+ o2.avgFetchTime() * fetchTimeWeight
+ o2.activeSlots() * activeSlotsWeight);
return delta < 0 ? -1 : (delta > 0 ? 1 : 0);
});
int diskCount = usableDisks.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ private[celeborn] class Master(
conf.masterSlotAssignLoadAwareDiskGroupGradient
private val loadAwareFlushTimeWeight = conf.masterSlotAssignLoadAwareFlushTimeWeight
private val loadAwareFetchTimeWeight = conf.masterSlotAssignLoadAwareFetchTimeWeight
private val loadAwareActiveSlotsWeight = conf.masterSlotAssignLoadAwareActiveSlotsWeight

private val estimatedPartitionSizeUpdaterInitialDelay =
conf.estimatedPartitionSizeUpdaterInitialDelay
Expand Down Expand Up @@ -981,6 +982,7 @@ private[celeborn] class Master(
slotsAssignLoadAwareDiskGroupGradient,
loadAwareFlushTimeWeight,
loadAwareFetchTimeWeight,
loadAwareActiveSlotsWeight,
requestSlots.availableStorageTypes,
slotsAssignInterruptionAware,
slotsAssignInterruptionAwareThreshold)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,41 @@ public void testAllocate3000ReduceIdsWithReplicateOnRoundRobin() {
check(workers, partitionIds, shouldReplicate, true, true, false, 0);
}

@Test
public void testLoadAwarePrefersLowerActiveSlotsWhenConfigured() {
final List<WorkerInfo> workers =
basePrepareWorkers(
2,
true,
ImmutableMap.of("/mnt/disk1", 100 * 1024 * 1024 * 1024L),
64 * 1024 * 1024L,
1,
false,
new Random(0));
final DiskInfo overloadedDisk = workers.get(0).diskInfos().get("/mnt/disk1");
final DiskInfo lightlyReservedDisk = workers.get(1).diskInfos().get("/mnt/disk1");
overloadedDisk.activeSlots_$eq(1000);
lightlyReservedDisk.activeSlots_$eq(0);

final Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots =
SlotsAllocator.offerSlotsLoadAware(
workers,
Collections.singletonList(0),
false,
false,
2,
1,
0,
0,
1,
StorageInfo.ALL_TYPES_AVAILABLE_MASK,
false,
0);

assertTrue(slots.containsKey(workers.get(1)));
assertFalse(slots.containsKey(workers.get(0)));
}

private void check(
List<WorkerInfo> workers,
List<Integer> partitionIds,
Expand Down Expand Up @@ -186,6 +221,7 @@ private Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>
conf.masterSlotAssignLoadAwareDiskGroupGradient(),
conf.masterSlotAssignLoadAwareFlushTimeWeight(),
conf.masterSlotAssignLoadAwareFetchTimeWeight(),
conf.masterSlotAssignLoadAwareActiveSlotsWeight(),
StorageInfo.ALL_TYPES_AVAILABLE_MASK,
interruptionAware,
interruptionAwareThreshold);
Expand Down Expand Up @@ -298,6 +334,7 @@ private void checkSlotsOnDFS(
0.1,
0,
1,
0,
StorageInfo.LOCAL_DISK_MASK | availableStorageTypes,
false,
0);
Expand Down
Loading