Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class DiskInfo(
lazy val shuffleAllocations = new util.HashMap[String, Integer]()
lazy val applicationAllocations = new util.HashMap[String, Integer]()

def isHealthy: Boolean = {
DiskStatus.HEALTHY.equals(status) && actualUsableSpace > 0
}

def setStorageType(storageType: StorageInfo.Type) = {
this.storageType = storageType
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}

def healthyWorkingDirs(): List[File] =
disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs)
disksSnapshot()
.filter(_.isHealthy)
.flatMap(_.dirs)

private val diskOperators: ConcurrentHashMap[String, ThreadPoolExecutor] = {
val cleaners = JavaUtils.newConcurrentHashMap[String, ThreadPoolExecutor]()
Expand Down Expand Up @@ -1139,11 +1141,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
while (retryCount < conf.workerCreateWriterMaxAttempts) {
val diskInfo = diskInfos.get(suggestedMountPoint)
val dirs =
if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
if (diskInfo != null && diskInfo.isHealthy) {
diskInfo.dirs
} else {
if (suggestedMountPoint.isEmpty) {
Comment thread
zaynt4606 marked this conversation as resolved.
logDebug(s"Location suggestedMountPoint is not set, return all healthy working dirs.")
} else if (diskInfo == null) {
Comment thread
SteNicholas marked this conversation as resolved.
logInfo(s"Disk info not found for suggestedMountPoint $suggestedMountPoint, return all healthy " +
s"working dirs.")
} else {
logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for $suggestedMountPoint, return all healthy" +
s" working dirs.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

package org.apache.celeborn.service.deploy.worker.storage

import java.{lang, util}
import java.io.IOException

import org.mockito.{Mockito, MockitoSugar}
import org.mockito.ArgumentMatchersSugar.any
import org.mockito.stubbing.Stubber

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.{WORKER_DISK_RESERVE_SIZE, WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
import org.apache.celeborn.common.meta.DiskInfo
import org.apache.celeborn.common.CelebornConf.{WORKER_DISK_RESERVE_SIZE, WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH, WORKER_STORAGE_DIRS}
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{DiskInfo, DiskStatus}
import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionType, StorageInfo}
import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.service.deploy.worker.WorkerSource

Expand Down Expand Up @@ -108,4 +113,52 @@ class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
spyStorageManager.updateDiskInfos()
assert(diskInfo.actualUsableSpace == 0L)
}

test("[CELEBORN-2310] Ensure createFile rejected with disks are full, but status is HEALTHY") {
val conf = new CelebornConf().set(WORKER_DISK_RESERVE_SIZE, Utils.byteStringAsBytes("5g")).set(
WORKER_STORAGE_DIRS,
Seq("/"))
val storageManager = new StorageManager(conf, new WorkerSource(conf))
val spyStorageManager = spy(storageManager)
val diskInfo = new DiskInfo("/", List.empty, null, conf)
diskInfo.setUsableSpace(-1L)
// Should fail even if the status is HEALTHY
diskInfo.setStatus(DiskStatus.HEALTHY)
doReturn(List(diskInfo)).when(spyStorageManager).disksSnapshot()

val partitionLocation = genPartitionLocation(0, Array(0L))

try {
val file = storageManager.createDiskFile(
partitionLocation,
"myAppId",
0,
"myFile",
new UserIdentifier("t1", "u1"),
PartitionType.REDUCE,
partitionSplitEnabled = false)
fail("Should throw IOException when disks are full")
} catch {
case e: IOException =>
assert(e.getMessage.equals(
s"No available disks! suggested mountPoint ${partitionLocation.getStorageInfo.getMountPoint}"))
case e: Throwable =>
fail(s"Should throw IOException, but got ${e.getClass.getSimpleName}", e)
}
}

private def genPartitionLocation(epoch: Int, offsets: Array[Long]): PartitionLocation = {
val location: PartitionLocation =
new PartitionLocation(0, epoch, "localhost", 0, 0, 0, 0, PartitionLocation.Mode.PRIMARY)
val storageInfo: StorageInfo = new StorageInfo(
StorageInfo.Type.HDD,
"/",
false,
"filePath",
StorageInfo.ALL_TYPES_AVAILABLE_MASK,
offsets(offsets.length - 1),
new util.ArrayList[lang.Long]())
location.setStorageInfo(storageInfo)
location
}
}
Loading