diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index f66df671116..baec385c5fb 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -88,6 +88,10 @@ class DiskInfo( lazy val shuffleAllocations = new util.HashMap[String, Integer]() lazy val applicationAllocations = new util.HashMap[String, Integer]() + def isHealthy: Boolean = { + DiskStatus.HEALTHY.equals(status) && actualUsableSpace > 0 + } + def setStorageType(storageType: StorageInfo.Type) = { this.storageType = storageType } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index cde13f29a91..9a2d4a8a740 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -115,7 +115,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } def healthyLocalWorkingDirs(): List[File] = - localDisksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs) + localDisksSnapshot().filter(_.isHealthy).flatMap(_.dirs) private val diskOperators: ConcurrentHashMap[String, ThreadPoolExecutor] = { val cleaners = JavaUtils.newConcurrentHashMap[String, ThreadPoolExecutor]() @@ -1143,11 +1143,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs while (retryCount < conf.workerCreateWriterMaxAttempts) { val diskInfo = diskInfos.get(suggestedMountPoint) val dirs = - if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) { + if (diskInfo != null && diskInfo.isHealthy) { diskInfo.dirs } else { if (suggestedMountPoint.isEmpty) { logDebug(s"Location suggestedMountPoint is not set, return all healthy working dirs.") + } else if (diskInfo == null) { + logInfo(s"Disk info not found for suggestedMountPoint $suggestedMountPoint, return all healthy " + + s"working dirs.") } else { logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for $suggestedMountPoint, return all healthy" + s" working dirs.") diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala index fb1d3e2d253..6107faf986a 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala @@ -17,14 +17,19 @@ package org.apache.celeborn.service.deploy.worker.storage +import java.{lang, util} +import java.io.IOException + import org.mockito.{Mockito, MockitoSugar} import org.mockito.ArgumentMatchersSugar.any import org.mockito.stubbing.Stubber import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.CelebornConf.{WORKER_DISK_RESERVE_SIZE, WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH} -import org.apache.celeborn.common.meta.DiskInfo +import org.apache.celeborn.common.CelebornConf.{WORKER_DISK_RESERVE_SIZE, WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH, WORKER_STORAGE_DIRS} +import org.apache.celeborn.common.identity.UserIdentifier +import org.apache.celeborn.common.meta.{DiskInfo, DiskStatus} +import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionType, StorageInfo} import org.apache.celeborn.common.util.Utils import org.apache.celeborn.service.deploy.worker.WorkerSource @@ -108,4 +113,52 @@ class StorageManagerSuite extends CelebornFunSuite with MockitoHelper { spyStorageManager.updateDiskInfos() assert(diskInfo.actualUsableSpace == 0L) } + + test("[CELEBORN-2310] Ensure createFile rejected with disks are full, but status is HEALTHY") { + val conf = new CelebornConf().set(WORKER_DISK_RESERVE_SIZE, Utils.byteStringAsBytes("5g")).set( + WORKER_STORAGE_DIRS, + Seq("/")) + val storageManager = new StorageManager(conf, new WorkerSource(conf)) + val spyStorageManager = spy(storageManager) + val diskInfo = new DiskInfo("/", List.empty, null, conf) + diskInfo.setUsableSpace(-1L) + // Should fail even if the status is HEALTHY + diskInfo.setStatus(DiskStatus.HEALTHY) + doReturn(List(diskInfo)).when(spyStorageManager).localDisksSnapshot() + + val partitionLocation = genPartitionLocation(0, Array(0L)) + + try { + val file = storageManager.createDiskFile( + partitionLocation, + "myAppId", + 0, + "myFile", + new UserIdentifier("t1", "u1"), + PartitionType.REDUCE, + partitionSplitEnabled = false) + fail("Should throw IOException when disks are full") + } catch { + case e: IOException => + assert(e.getMessage.equals( + s"No available disks! suggested mountPoint ${partitionLocation.getStorageInfo.getMountPoint}")) + case e: Throwable => + fail(s"Should throw IOException, but got ${e.getClass.getSimpleName}", e) + } + } + + private def genPartitionLocation(epoch: Int, offsets: Array[Long]): PartitionLocation = { + val location: PartitionLocation = + new PartitionLocation(0, epoch, "localhost", 0, 0, 0, 0, PartitionLocation.Mode.PRIMARY) + val storageInfo: StorageInfo = new StorageInfo( + StorageInfo.Type.HDD, + "/", + false, + "filePath", + StorageInfo.ALL_TYPES_AVAILABLE_MASK, + offsets(offsets.length - 1), + new util.ArrayList[lang.Long]()) + location.setStorageInfo(storageInfo) + location + } }