From 3040b5463b08b0df6bd56a2b9d09c9523834ece4 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Thu, 2 Apr 2026 17:29:03 +0530 Subject: [PATCH 01/15] Improve disk full detection in PushHandler by avoiding relying on worker heartbeat cycle for usable space updation --- .../celeborn/common/meta/DiskFileInfo.java | 32 +++++++++++++++++-- .../apache/celeborn/common/meta/FileInfo.java | 5 +++ .../celeborn/common/meta/MemoryFileInfo.java | 6 ++++ .../celeborn/common/meta/DeviceInfo.scala | 16 ++++++++-- .../deploy/worker/PushDataHandler.scala | 3 +- .../worker/storage/StorageManager.scala | 23 +++++++------ 6 files changed, 70 insertions(+), 15 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java index d4571fa4bbe..e6342bca012 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java @@ -39,6 +39,19 @@ public class DiskFileInfo extends FileInfo { private static final Logger logger = LoggerFactory.getLogger(DiskFileInfo.class); private final String filePath; private final StorageInfo.Type storageType; + private final DiskInfo diskInfo; + + public DiskFileInfo(DiskInfo diskInfo, + UserIdentifier userIdentifier, + boolean partitionSplitEnabled, + FileMeta fileMeta, + String filePath, + StorageInfo.Type storageType) { + super(userIdentifier, partitionSplitEnabled, fileMeta); + this.diskInfo = diskInfo; + this.filePath = filePath; + this.storageType = storageType; + } public DiskFileInfo( UserIdentifier userIdentifier, @@ -46,9 +59,7 @@ public DiskFileInfo( FileMeta fileMeta, String filePath, StorageInfo.Type storageType) { - super(userIdentifier, partitionSplitEnabled, fileMeta); - this.filePath = filePath; - this.storageType = storageType; + this(null, userIdentifier, partitionSplitEnabled, fileMeta, filePath, storageType); } // only called when restore from pb or in UT @@ -60,6 +71,11 @@ public DiskFileInfo( StorageInfo.Type storageType, long bytesFlushed) { super(userIdentifier, partitionSplitEnabled, fileMeta); + + // TODO: Figure out a way to map the right diskInfo when restoring from pb, + // currently we just set it to null and skip the acquireBytesFlushed logic in DiskFileInfo#acquireBytesFlushed + // However during graceful shutdown, we likley have already hard split therefore no more writes to this file. + this.diskInfo = null; this.filePath = filePath; if (storageType != null) { this.storageType = storageType; @@ -83,6 +99,7 @@ public DiskFileInfo(UserIdentifier userIdentifier, FileMeta fileMeta, String fil super(userIdentifier, true, fileMeta); this.filePath = filePath; this.storageType = StorageInfo.Type.HDD; + this.diskInfo = null; } public File getFile() { @@ -94,6 +111,15 @@ public String getFilePath() { return filePath; } + @Override + protected boolean acquireBytesFlushed(long bytes) { + if (diskInfo != null) { + return diskInfo.acquireBytesFlushed(bytes); + } else { + return true; + } + } + public String getSortedPath() { return Utils.getSortedFilePath(filePath); } diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index e8511f1bff1..6ccb43264ab 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -57,6 +57,10 @@ public long getFileLength() { } public synchronized void updateBytesFlushed(long bytes) { + if (!acquireBytesFlushed(bytes)) { + throw new IllegalStateException( + "Failed to acquire bytesFlushed for file: " + getFilePath() + ", current bytesFlushed: " + bytesFlushed + ", trying to add: " + bytes); + } bytesFlushed += bytes; if (isReduceFileMeta) { getReduceFileMeta().updateChunkOffset(bytesFlushed, false); @@ -112,4 +116,5 @@ public boolean isStreamsEmpty() { public boolean isReduceFileMeta() { return isReduceFileMeta; } + protected abstract boolean acquireBytesFlushed(long bytes); } diff --git a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java index 9b933ea6c00..cdb9230e0ad 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java @@ -85,4 +85,10 @@ public int releaseMemoryBuffers() { public String getFilePath() { return ""; } + + @Override + protected boolean acquireBytesFlushed(long bytes) { + // NO-OP + return true; + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index f66df671116..459e71f06ca 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -22,15 +22,15 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer - import org.slf4j.LoggerFactory - import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.protocol.StorageInfo import org.apache.celeborn.common.util.{JavaUtils, Utils} import org.apache.celeborn.common.util.Utils.runCommand +import java.util.concurrent.atomic.AtomicLong + class DiskInfo( val mountPoint: String, var actualUsableSpace: Long, @@ -88,6 +88,17 @@ class DiskInfo( lazy val shuffleAllocations = new util.HashMap[String, Integer]() lazy val applicationAllocations = new util.HashMap[String, Integer]() + @volatile + var transientAvailableBytes = new AtomicLong(actualUsableSpace) + + def getTransientAvailableBytes: Long = { + transientAvailableBytes.get() + } + + def acquireBytesFlushed(bytes: Long): Boolean = { + transientAvailableBytes.addAndGet(-bytes) >= 0 + } + def setStorageType(storageType: StorageInfo.Type) = { this.storageType = storageType } @@ -99,6 +110,7 @@ class DiskInfo( def setUsableSpace(usableSpace: Long): this.type = this.synchronized { this.actualUsableSpace = usableSpace + transientAvailableBytes = new AtomicLong(usableSpace) this } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 50f085bf85e..438d25820a0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1430,7 +1430,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler if (flusher.isInstanceOf[LocalFlusher]) { val mountPoint = flusher.asInstanceOf[LocalFlusher].mountPoint val diskInfo = workerInfo.diskInfos.get(mountPoint) - diskInfo.status.equals(DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <= 0 + diskInfo.status.equals(DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <= 0 || + diskInfo.getTransientAvailableBytes <= 0 } else { false } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 6d375932b82..e9b8361aeaf 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -110,6 +110,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } } + def healthyWorkingDirsWithDiskInfo(): List[(DiskInfo, File)] = + disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(diskInfo => + diskInfo.dirs.map(dir => (diskInfo, dir))) + def healthyWorkingDirs(): List[File] = disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs) @@ -1138,9 +1142,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) while (retryCount < conf.workerCreateWriterMaxAttempts) { val diskInfo = diskInfos.get(suggestedMountPoint) - val dirs = + val dirsWithDiskInfos: List[(DiskInfo, File)] = if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) { - diskInfo.dirs + diskInfo.dirs.map(dir => (diskInfo, dir)).toList } else { if (suggestedMountPoint.isEmpty) { logDebug(s"Location suggestedMountPoint is not set, return all healthy working dirs.") @@ -1148,9 +1152,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for $suggestedMountPoint, return all healthy" + s" working dirs.") } - healthyWorkingDirs() + healthyWorkingDirsWithDiskInfo() } - if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && ossFlusher.isEmpty) { + if (dirsWithDiskInfos.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && ossFlusher.isEmpty) { throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint") } @@ -1205,10 +1209,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs fileName, ossFileInfo) return (ossFlusher.get, ossFileInfo, null) - } else if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) { - val dir = dirs(getNextIndex % dirs.size) - val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, mountPoints) - val shuffleDir = new File(dir, s"$appId/$shuffleId") + } else if (dirsWithDiskInfos.nonEmpty && location.getStorageInfo.localDiskAvailable()) { + val dir = dirsWithDiskInfos(getNextIndex % dirsWithDiskInfos.size) + val mountPoint = DeviceInfo.getMountPoint(dir._2.getAbsolutePath, mountPoints) + val shuffleDir = new File(dir._2, s"$appId/$shuffleId") shuffleDir.mkdirs() val file = new File(shuffleDir, fileName) try { @@ -1226,6 +1230,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val fileMeta = getFileMeta(partitionType, mountPoint, conf.shuffleChunkSize) val storageType = diskInfos.get(mountPoint).storageType val diskFileInfo = new DiskFileInfo( + diskInfo, userIdentifier, partitionSplitEnabled, fileMeta, @@ -1238,7 +1243,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs return ( localFlushers.get(mountPoint), diskFileInfo, - dir) + dirWithDiskInfo._2) } catch { case fe: FileAlreadyExistsException => logError("Failed to create fileWriter because of existed file", fe) From 36224f7018c0e9ec0958ab0b52ff5a07d9c7efd3 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Thu, 2 Apr 2026 17:45:14 +0530 Subject: [PATCH 02/15] Add comment --- .../main/java/org/apache/celeborn/common/meta/DiskFileInfo.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java index e6342bca012..da670550a39 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java @@ -99,6 +99,8 @@ public DiskFileInfo(UserIdentifier userIdentifier, FileMeta fileMeta, String fil super(userIdentifier, true, fileMeta); this.filePath = filePath; this.storageType = StorageInfo.Type.HDD; + + // Only used by sorter, hence we know no diskInfo acquires needed this.diskInfo = null; } From f42f70105da6c1bc51fbbff75fe5d51d229b72de Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 11:11:23 +0530 Subject: [PATCH 03/15] Add config --- .../org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++ .../deploy/worker/storage/PartitionFilesSorter.java | 1 + .../deploy/worker/storage/StorageManager.scala | 3 ++- 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 798099bcbd2..1b6613e67b6 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1319,6 +1319,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT) def workerStorageBaseDirDiskType: String = get(WORKER_STORAGE_BASE_DIR_DISK_TYPE) def workerStorageExpireDirTimeout: Long = get(WORKER_STORAGE_EXPIRE_DIR_TIMEOUT) + def workerDiskStorageStrictReserveEnabled: Boolean = get(WORKER_DISK_STORAGE_STRICT_RESERVE_ENABLED) def creditStreamThreadsPerMountpoint: Int = get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT) def workerDirectMemoryRatioForReadBuffer: Double = get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER) def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN) @@ -3313,6 +3314,16 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1h") + val WORKER_DISK_STORAGE_STRICT_RESERVE_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.disk.storage.strictReserve.enabled") + .categories("worker") + .version("0.6.0") + .doc("Whether to enable strict bookkeeping for worker's disk storage." + + "With this set to true, data wrtiers try to acquire storage space before each flush," + + "ensuring that disk full based HARD_SPLITs are accurately triggered."+) + .booleanConf + .createWithDefault(false) + val HDFS_DIR: OptionalConfigEntry[String] = buildConf("celeborn.storage.hdfs.dir") .categories("worker", "master", "client") diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index cdfb95c1e59..fcbe7f6bdef 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -863,6 +863,7 @@ private long transferBlock(long offset, long length) throws IOException { if (isDfs) { return transferStreamFully(dfsOriginInput, dfsSortedOutput, offset, length); } else { + originFileInfo.updateBytesFlushed(length); return transferChannelFully(originFileChannel, sortedFileChannel, offset, length); } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index e9b8361aeaf..c26824b5eff 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -75,6 +75,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val remoteStorageDirs = conf.remoteStorageDirs val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout + val diskStorageStrictReserveEnabled = conf.workerDiskStorageStrictReserveEnabled val storagePolicy = new StoragePolicy(conf, this, workerSource) val diskReserveSize = conf.workerDiskReserveSize @@ -1230,7 +1231,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val fileMeta = getFileMeta(partitionType, mountPoint, conf.shuffleChunkSize) val storageType = diskInfos.get(mountPoint).storageType val diskFileInfo = new DiskFileInfo( - diskInfo, + if (diskStorageStrictReserveEnabled) diskInfo else null, userIdentifier, partitionSplitEnabled, fileMeta, From 816c4971918c693ed68e01df880ad01e87f01726 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 11:20:59 +0530 Subject: [PATCH 04/15] Lint --- .../celeborn/common/meta/DiskFileInfo.java | 19 +++++++++++-------- .../apache/celeborn/common/meta/FileInfo.java | 10 ++++++++-- .../apache/celeborn/common/CelebornConf.scala | 5 +++-- .../celeborn/common/meta/DeviceInfo.scala | 5 +++-- docs/configuration/worker.md | 1 + .../deploy/worker/PushDataHandler.scala | 2 +- 6 files changed, 27 insertions(+), 15 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java index da670550a39..4bf76940840 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java @@ -41,12 +41,13 @@ public class DiskFileInfo extends FileInfo { private final StorageInfo.Type storageType; private final DiskInfo diskInfo; - public DiskFileInfo(DiskInfo diskInfo, - UserIdentifier userIdentifier, - boolean partitionSplitEnabled, - FileMeta fileMeta, - String filePath, - StorageInfo.Type storageType) { + public DiskFileInfo( + DiskInfo diskInfo, + UserIdentifier userIdentifier, + boolean partitionSplitEnabled, + FileMeta fileMeta, + String filePath, + StorageInfo.Type storageType) { super(userIdentifier, partitionSplitEnabled, fileMeta); this.diskInfo = diskInfo; this.filePath = filePath; @@ -73,8 +74,10 @@ public DiskFileInfo( super(userIdentifier, partitionSplitEnabled, fileMeta); // TODO: Figure out a way to map the right diskInfo when restoring from pb, - // currently we just set it to null and skip the acquireBytesFlushed logic in DiskFileInfo#acquireBytesFlushed - // However during graceful shutdown, we likley have already hard split therefore no more writes to this file. + // currently we just set it to null and skip the acquireBytesFlushed logic in + // DiskFileInfo#acquireBytesFlushed + // However during graceful shutdown, we likley have already hard split therefore no more writes + // to this file. this.diskInfo = null; this.filePath = filePath; if (storageType != null) { diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index 6ccb43264ab..3a1ded7172f 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -58,8 +58,13 @@ public long getFileLength() { public synchronized void updateBytesFlushed(long bytes) { if (!acquireBytesFlushed(bytes)) { - throw new IllegalStateException( - "Failed to acquire bytesFlushed for file: " + getFilePath() + ", current bytesFlushed: " + bytesFlushed + ", trying to add: " + bytes); + throw new IllegalStateException( + "Failed to acquire bytesFlushed for file: " + + getFilePath() + + ", current bytesFlushed: " + + bytesFlushed + + ", trying to add: " + + bytes); } bytesFlushed += bytes; if (isReduceFileMeta) { @@ -116,5 +121,6 @@ public boolean isStreamsEmpty() { public boolean isReduceFileMeta() { return isReduceFileMeta; } + protected abstract boolean acquireBytesFlushed(long bytes); } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 1b6613e67b6..6ed49d8bf6f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1319,7 +1319,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT) def workerStorageBaseDirDiskType: String = get(WORKER_STORAGE_BASE_DIR_DISK_TYPE) def workerStorageExpireDirTimeout: Long = get(WORKER_STORAGE_EXPIRE_DIR_TIMEOUT) - def workerDiskStorageStrictReserveEnabled: Boolean = get(WORKER_DISK_STORAGE_STRICT_RESERVE_ENABLED) + def workerDiskStorageStrictReserveEnabled: Boolean = + get(WORKER_DISK_STORAGE_STRICT_RESERVE_ENABLED) def creditStreamThreadsPerMountpoint: Int = get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT) def workerDirectMemoryRatioForReadBuffer: Double = get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER) def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN) @@ -3320,7 +3321,7 @@ object CelebornConf extends Logging { .version("0.6.0") .doc("Whether to enable strict bookkeeping for worker's disk storage." + "With this set to true, data wrtiers try to acquire storage space before each flush," + - "ensuring that disk full based HARD_SPLITs are accurately triggered."+) + "ensuring that disk full based HARD_SPLITs are accurately triggered.") .booleanConf .createWithDefault(false) diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index 459e71f06ca..e766d55c01a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -19,18 +19,19 @@ package org.apache.celeborn.common.meta import java.io.File import java.util +import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer + import org.slf4j.LoggerFactory + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.protocol.StorageInfo import org.apache.celeborn.common.util.{JavaUtils, Utils} import org.apache.celeborn.common.util.Utils.runCommand -import java.util.concurrent.atomic.AtomicLong - class DiskInfo( val mountPoint: String, var actualUsableSpace: Long, diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index bb2cec89bc3..0e16a9fb550 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -85,6 +85,7 @@ license: | | celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | false | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | | | celeborn.worker.directMemoryRatioToResume | 0.7 | false | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | | | celeborn.worker.disk.clean.threads | 4 | false | Thread number of worker to clean up directories of expired shuffle keys on disk. | 0.3.2 | | +| celeborn.worker.disk.storage.strictReserve.enabled | false | false | Whether to enable strict bookkeeping for worker's disk storage.With this set to true, data wrtiers try to acquire storage space before each flush,ensuring that disk full based HARD_SPLITs are accurately triggered. | 0.6.0 | | | celeborn.worker.fetch.heartbeat.enabled | false | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | | | celeborn.worker.fetch.io.threads | <undefined> | false | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | | | celeborn.worker.fetch.port | 0 | false | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 438d25820a0..af899b35668 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1431,7 +1431,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler val mountPoint = flusher.asInstanceOf[LocalFlusher].mountPoint val diskInfo = workerInfo.diskInfos.get(mountPoint) diskInfo.status.equals(DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <= 0 || - diskInfo.getTransientAvailableBytes <= 0 + diskInfo.getTransientAvailableBytes <= 0 } else { false } From b27e31990729ae7357a15c7411605f9833f93617 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 11:23:24 +0530 Subject: [PATCH 05/15] Release during sort --- .../service/deploy/worker/storage/PartitionFilesSorter.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index fcbe7f6bdef..4957fa55289 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -877,6 +877,8 @@ public void deleteOriginFiles() throws IOException { } if (!deleteSuccess) { logger.warn("Clean origin file failed, origin file is : {}", originFilePath); + } else { + originFileInfo.updateBytesFlushed(-originFileLen); } } From fbdb7c8d12b6b8960275b9cd505bdc3f3e2c4cc6 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 11:33:46 +0530 Subject: [PATCH 06/15] Fix dir --- .../celeborn/service/deploy/worker/storage/StorageManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index c26824b5eff..4b99985f12d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -1244,7 +1244,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs return ( localFlushers.get(mountPoint), diskFileInfo, - dirWithDiskInfo._2) + dir) } catch { case fe: FileAlreadyExistsException => logError("Failed to create fileWriter because of existed file", fe) From 2ac1e7971621a8fff8b57782eb7d7ce41d9aab6e Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 11:37:22 +0530 Subject: [PATCH 07/15] Fix compilation --- .../celeborn/service/deploy/worker/storage/StorageManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 4b99985f12d..0464b4f8084 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -1244,7 +1244,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs return ( localFlushers.get(mountPoint), diskFileInfo, - dir) + dir._2) } catch { case fe: FileAlreadyExistsException => logError("Failed to create fileWriter because of existed file", fe) From 34a30e4f285cf79036d760198adb0a796ec90e03 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 12:40:34 +0530 Subject: [PATCH 08/15] Add fileInfo test --- .../celeborn/common/meta/DeviceInfo.scala | 13 ++- .../common/meta/DiskFileInfoTest.scala | 110 ++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoTest.scala diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index e766d55c01a..30bf1c45c2b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -97,7 +97,18 @@ class DiskInfo( } def acquireBytesFlushed(bytes: Long): Boolean = { - transientAvailableBytes.addAndGet(-bytes) >= 0 + // Update only if transientAvailableBytes is greater than or equal to bytes to acquire, otherwise return false. + var updated = false + transientAvailableBytes.getAndUpdate { current => + if (current >= bytes) { + updated = true + current - bytes + } else { + updated = false + current + } + } + updated } def setStorageType(storageType: StorageInfo.Type) = { diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoTest.scala b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoTest.scala new file mode 100644 index 00000000000..229da367295 --- /dev/null +++ b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoTest.scala @@ -0,0 +1,110 @@ +package org.apache.celeborn.common.meta + +import java.nio.file.Files + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.identity.UserIdentifier +import org.apache.celeborn.common.protocol.StorageInfo + +class DiskFileInfoTest extends CelebornFunSuite { + + test("test diskFileInfoUsageAccounting positive + negative") { + val usableSpace = 1000L + val diskInfo = new DiskInfo( + "SSD", + usableSpace, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + StorageInfo.Type.SSD) + val tmpFilePath = Files.createTempFile("testDiskUsageAccounting", ".tmp") + val diskFileInfo = new DiskFileInfo( + diskInfo, + new UserIdentifier("tenant1", "user1"), + false, + new ReduceFileMeta(8192), + tmpFilePath.toString, + StorageInfo.Type.SSD) + + diskFileInfo.updateBytesFlushed(100) + assert(diskFileInfo.getFileLength == 100, "file length should be updated to flushed bytes") + assert( + diskInfo.getTransientAvailableBytes == (usableSpace - 100), + "available bytes should be reduced by flushed bytes") + + try { + diskFileInfo.updateBytesFlushed(901) + fail("should throw IllegalStateException when flush bytes exceed usable space") + } catch { + case IllegalStateException => + assert( + diskInfo.getTransientAvailableBytes == usableSpace - 100, + "available bytes should not be reduced when flush bytes exceed usable space") + } + // The failed acquire should not affect the available bytes, and the successful acquire should reduce the available bytes + assert( + diskInfo.getTransientAvailableBytes == usableSpace - 100, + "available bytes should be reduced by flushed bytes") + + // With null diskInfo, no exceptions during acquisition + val diskFileInfo2 = new DiskFileInfo( + null, + new UserIdentifier("tenant1", "user1"), + false, + new ReduceFileMeta(8192), + tmpFilePath.toString, + StorageInfo.Type.SSD) + diskFileInfo2.updateBytesFlushed(5000) + assert(diskFileInfo2.getFileLength == 5000, "file length should be updated to flushed bytes") + tmpFilePath.toFile.deleteOnExit() + } + + test("Multi threaded acquisition") { + val usableSpace = 1000L + val diskInfo = new DiskInfo( + "SSD", + usableSpace, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + StorageInfo.Type.SSD) + val tmpFilePath = Files.createTempFile("testDiskUsageAccountingMultiThreaded", ".tmp") + + val failures = new java.util.concurrent.atomic.AtomicInteger(0) + val totalSuccessfulAcquisition = new java.util.concurrent.atomic.AtomicInteger(0) + val perThreadFlushBytes = 101 + + val threads = (1 to 10).map { _ => + new Thread(new Runnable { + override def run(): Unit = { + try { + val diskFileInfo = new DiskFileInfo( + diskInfo, + new UserIdentifier("tenant1", "user1"), + false, + new ReduceFileMeta(8192), + tmpFilePath.toString, + StorageInfo.Type.SSD) + diskFileInfo.updateBytesFlushed(perThreadFlushBytes) + totalSuccessfulAcquisition.addAndGet(perThreadFlushBytes) + } catch { + case _: IllegalStateException => + failures.incrementAndGet() + // expected when flush bytes exceed usable space + } + } + }) + } + + threads.foreach(_.start()) + threads.foreach(_.join()) + + // Only the first 6 threads should succeed in flushing 150 bytes each (total 900 bytes), and the rest should fail due to insufficient space + assert( + diskInfo.getTransientAvailableBytes == (usableSpace - totalSuccessfulAcquisition.get()), + "available bytes should be reduced by flushed bytes") + assert(diskInfo.getTransientAvailableBytes > 0, "available bytes should not be negative") + assert(failures.get() == 1, "1 threads should fail due to insufficient space") + } + +} From 497e77b6cf7112a60da9c141bc662a1fe7fb8b7d Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 13:04:03 +0530 Subject: [PATCH 09/15] Add tests --- .../apache/celeborn/common/meta/FileInfo.java | 17 +++++++++++- .../common/meta/DeviceInfoSuite.scala | 27 +++++++++++++++++++ ...InfoTest.scala => DiskFileInfoSuite.scala} | 2 +- .../worker/storage/PartitionFilesSorter.java | 4 +-- .../worker/storage/StorageManager.scala | 6 ++--- 5 files changed, 49 insertions(+), 7 deletions(-) rename common/src/test/scala/org/apache/celeborn/common/meta/{DiskFileInfoTest.scala => DiskFileInfoSuite.scala} (98%) diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index 3a1ded7172f..987e6ebf6b4 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -30,6 +30,7 @@ public abstract class FileInfo { protected FileMeta fileMeta; protected final Set streams = ConcurrentHashMap.newKeySet(); protected volatile long bytesFlushed; + protected volatile long acquiredBytes; private boolean isReduceFileMeta; public FileInfo(UserIdentifier userIdentifier, boolean partitionSplitEnabled, FileMeta fileMeta) { @@ -53,10 +54,18 @@ public ReduceFileMeta getReduceFileMeta() { } public long getFileLength() { - return bytesFlushed; + return getFileLength(false); + } + + public long getFileLength(boolean includeAcquired) { + return bytesFlushed + (includeAcquired ? 0 : acquiredBytes); } public synchronized void updateBytesFlushed(long bytes) { + updateBytesFlushed(bytes, false); + } + + public synchronized void updateBytesFlushed(long bytes, boolean acquireOnly) { if (!acquireBytesFlushed(bytes)) { throw new IllegalStateException( "Failed to acquire bytesFlushed for file: " @@ -66,6 +75,12 @@ public synchronized void updateBytesFlushed(long bytes) { + ", trying to add: " + bytes); } + + if (acquireOnly) { + acquiredBytes += bytes; + return; + } + bytesFlushed += bytes; if (isReduceFileMeta) { getReduceFileMeta().updateChunkOffset(bytesFlushed, false); diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala index 250c20c3cbb..fddc6e63a2e 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala @@ -20,6 +20,7 @@ package org.apache.celeborn.common.meta import java.util import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.protocol.StorageInfo class DeviceInfoSuite extends CelebornFunSuite { @@ -38,4 +39,30 @@ class DeviceInfoSuite extends CelebornFunSuite { assert(DeviceInfo.getMountPoint("/data/data", mountPoints) === "/data") assert(DeviceInfo.getMountPoint("/data1/data", mountPoints) === "/") } + + test("Test diskInfo usableSpace accounting") { + val usableSpace = 1000L + val diskInfo = new DiskInfo( + "SSD", + usableSpace, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + StorageInfo.Type.SSD) + + assert(diskInfo.actualUsableSpace === usableSpace) + assert(diskInfo.getTransientAvailableBytes === usableSpace) + assert(diskInfo.acquireBytesFlushed(100L), "Should be able to acquire 100 bytes") + assert(diskInfo.getTransientAvailableBytes === usableSpace - 100L) + + assert(!diskInfo.acquireBytesFlushed(900L), "Should not be able to acquire 900 bytes") + assert( + diskInfo.getTransientAvailableBytes === usableSpace - 100L, + "Usable space should not change after failed acquire") + + diskInfo.setUsableSpace(800L) + assert( + diskInfo.getTransientAvailableBytes === 800L, + "Usable space should reflect the new usable space") + } } diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoTest.scala b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala similarity index 98% rename from common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoTest.scala rename to common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala index 229da367295..19e80a380a4 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoTest.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala @@ -6,7 +6,7 @@ import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.protocol.StorageInfo -class DiskFileInfoTest extends CelebornFunSuite { +class DiskFileInfoSuite extends CelebornFunSuite { test("test diskFileInfoUsageAccounting positive + negative") { val usableSpace = 1000L diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 4957fa55289..39d4bd1febd 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -863,7 +863,7 @@ private long transferBlock(long offset, long length) throws IOException { if (isDfs) { return transferStreamFully(dfsOriginInput, dfsSortedOutput, offset, length); } else { - originFileInfo.updateBytesFlushed(length); + originFileInfo.updateBytesFlushed(length, true); return transferChannelFully(originFileChannel, sortedFileChannel, offset, length); } } @@ -878,7 +878,7 @@ public void deleteOriginFiles() throws IOException { if (!deleteSuccess) { logger.warn("Clean origin file failed, origin file is : {}", originFilePath); } else { - originFileInfo.updateBytesFlushed(-originFileLen); + originFileInfo.updateBytesFlushed(-originFileLen, true); } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 0464b4f8084..38e11b0f586 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -960,7 +960,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val writers = workingDirWriters.get(dir) if (writers != null) { writers.synchronized { - writers.values.asScala.map(_.getDiskFileInfo.getFileLength).sum + writers.values.asScala.map(_.getDiskFileInfo.getFileLength(true)).sum } } else { 0 @@ -1033,7 +1033,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val diskFileInfos = fileInfos.filter(!_.isDFS) val hdfsFileInfos = fileInfos.filter(_.isHdfs) ResourceConsumption( - diskFileInfos.map(_.getFileLength).sum, + diskFileInfos.map(_.getFileLength(true)).sum, diskFileInfos.size, hdfsFileInfos.map(_.getFileLength).sum, hdfsFileInfos.size, @@ -1048,7 +1048,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } def getActiveShuffleSize: Long = { - diskFileInfos.values().asScala.map(_.values().asScala.map(_.getFileLength).sum).sum + diskFileInfos.values().asScala.map(_.values().asScala.map(_.getFileLength(true)).sum).sum } def getActiveShuffleFileCount: Long = { From e8b0dc454a9ef5984642e3e545b1daf0331dd7a7 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 13:37:01 +0530 Subject: [PATCH 10/15] Streamline acquisition --- .../celeborn/common/meta/DiskFileInfo.java | 2 +- .../apache/celeborn/common/meta/FileInfo.java | 33 ++++++++----------- .../celeborn/common/meta/MemoryFileInfo.java | 2 +- .../worker/storage/PartitionFilesSorter.java | 4 +-- .../worker/storage/StorageManager.scala | 6 ++-- 5 files changed, 21 insertions(+), 26 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java index 4bf76940840..86f8cb1353e 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java @@ -117,7 +117,7 @@ public String getFilePath() { } @Override - protected boolean acquireBytesFlushed(long bytes) { + protected boolean canAcquireBytes(long bytes) { if (diskInfo != null) { return diskInfo.acquireBytesFlushed(bytes); } else { diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index 987e6ebf6b4..ef9a75e5804 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -54,37 +54,32 @@ public ReduceFileMeta getReduceFileMeta() { } public long getFileLength() { - return getFileLength(false); + return bytesFlushed; } - public long getFileLength(boolean includeAcquired) { - return bytesFlushed + (includeAcquired ? 0 : acquiredBytes); + public long getAcquiredBytes() { + return acquiredBytes; } public synchronized void updateBytesFlushed(long bytes) { - updateBytesFlushed(bytes, false); + acquireBytes(bytes); + bytesFlushed += bytes; + if (isReduceFileMeta) { + getReduceFileMeta().updateChunkOffset(bytesFlushed, false); + } } - public synchronized void updateBytesFlushed(long bytes, boolean acquireOnly) { - if (!acquireBytesFlushed(bytes)) { + public synchronized void acquireBytes(long bytes) { + if (!canAcquireBytes(bytes)) { throw new IllegalStateException( - "Failed to acquire bytesFlushed for file: " + "Failed to acquire bytes for file: " + getFilePath() + ", current bytesFlushed: " + bytesFlushed - + ", trying to add: " + + ", trying to acquire: " + bytes); } - - if (acquireOnly) { - acquiredBytes += bytes; - return; - } - - bytesFlushed += bytes; - if (isReduceFileMeta) { - getReduceFileMeta().updateChunkOffset(bytesFlushed, false); - } + acquiredBytes += bytes; } public UserIdentifier getUserIdentifier() { @@ -137,5 +132,5 @@ public boolean isReduceFileMeta() { return isReduceFileMeta; } - protected abstract boolean acquireBytesFlushed(long bytes); + protected abstract boolean canAcquireBytes(long bytes); } diff --git a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java index cdb9230e0ad..017ec5d781e 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java @@ -87,7 +87,7 @@ public String getFilePath() { } @Override - protected boolean acquireBytesFlushed(long bytes) { + protected boolean canAcquireBytes(long bytes) { // NO-OP return true; } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 39d4bd1febd..24e29cfcef3 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -863,7 +863,7 @@ private long transferBlock(long offset, long length) throws IOException { if (isDfs) { return transferStreamFully(dfsOriginInput, dfsSortedOutput, offset, length); } else { - originFileInfo.updateBytesFlushed(length, true); + originFileInfo.acquireBytes(length); return transferChannelFully(originFileChannel, sortedFileChannel, offset, length); } } @@ -878,7 +878,7 @@ public void deleteOriginFiles() throws IOException { if (!deleteSuccess) { logger.warn("Clean origin file failed, origin file is : {}", originFilePath); } else { - originFileInfo.updateBytesFlushed(-originFileLen, true); + originFileInfo.acquireBytes(-originFileLen); } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 38e11b0f586..df7aa503b0d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -960,7 +960,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val writers = workingDirWriters.get(dir) if (writers != null) { writers.synchronized { - writers.values.asScala.map(_.getDiskFileInfo.getFileLength(true)).sum + writers.values.asScala.map(_.getDiskFileInfo.getAcquiredBytes).sum } } else { 0 @@ -1033,7 +1033,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val diskFileInfos = fileInfos.filter(!_.isDFS) val hdfsFileInfos = fileInfos.filter(_.isHdfs) ResourceConsumption( - diskFileInfos.map(_.getFileLength(true)).sum, + diskFileInfos.map(_.getFileLength).sum, diskFileInfos.size, hdfsFileInfos.map(_.getFileLength).sum, hdfsFileInfos.size, @@ -1048,7 +1048,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } def getActiveShuffleSize: Long = { - diskFileInfos.values().asScala.map(_.values().asScala.map(_.getFileLength(true)).sum).sum + diskFileInfos.values().asScala.map(_.values().asScala.map(_.getFileLength).sum).sum } def getActiveShuffleFileCount: Long = { From 70bf450610a2947ddd285b47213b57178045e25b Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 13:39:17 +0530 Subject: [PATCH 11/15] Add license --- .../common/meta/DiskFileInfoSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala index 19e80a380a4..80e80713da7 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.celeborn.common.meta import java.nio.file.Files From 49c56c05cfd6288314ada12def7766de638a68a2 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 13:42:48 +0530 Subject: [PATCH 12/15] Fix test --- .../org/apache/celeborn/common/meta/DiskFileInfoSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala index 80e80713da7..79a7f34f46f 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/DiskFileInfoSuite.scala @@ -53,7 +53,7 @@ class DiskFileInfoSuite extends CelebornFunSuite { diskFileInfo.updateBytesFlushed(901) fail("should throw IllegalStateException when flush bytes exceed usable space") } catch { - case IllegalStateException => + case _: IllegalStateException => assert( diskInfo.getTransientAvailableBytes == usableSpace - 100, "available bytes should not be reduced when flush bytes exceed usable space") From 46f0e8d36bdfe8ea06143b02d1e75a746c27d3a0 Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 13:55:07 +0530 Subject: [PATCH 13/15] Fix test --- .../scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala index fddc6e63a2e..943545cf1a8 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/DeviceInfoSuite.scala @@ -55,7 +55,7 @@ class DeviceInfoSuite extends CelebornFunSuite { assert(diskInfo.acquireBytesFlushed(100L), "Should be able to acquire 100 bytes") assert(diskInfo.getTransientAvailableBytes === usableSpace - 100L) - assert(!diskInfo.acquireBytesFlushed(900L), "Should not be able to acquire 900 bytes") + assert(!diskInfo.acquireBytesFlushed(901L), "Should not be able to acquire 900 bytes") assert( diskInfo.getTransientAvailableBytes === usableSpace - 100L, "Usable space should not change after failed acquire") From 27b04c39730f0ca20106e9f04394cc4a16d4ef9b Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 14:01:32 +0530 Subject: [PATCH 14/15] Fix spark2 compilation --- .../main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index 30bf1c45c2b..4e6bdfd714f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -99,7 +99,7 @@ class DiskInfo( def acquireBytesFlushed(bytes: Long): Boolean = { // Update only if transientAvailableBytes is greater than or equal to bytes to acquire, otherwise return false. var updated = false - transientAvailableBytes.getAndUpdate { current => + transientAvailableBytes.getAndUpdate { current: Long => if (current >= bytes) { updated = true current - bytes From 2f0bc53248c118f0b47b02401e6b26489c658ebd Mon Sep 17 00:00:00 2001 From: Saurabh Dubey Date: Mon, 13 Apr 2026 14:32:41 +0530 Subject: [PATCH 15/15] Fix compilation --- .../celeborn/common/meta/DeviceInfo.scala | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index 4e6bdfd714f..8b138ba980f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -20,6 +20,7 @@ package org.apache.celeborn.common.meta import java.io.File import java.util import java.util.concurrent.atomic.AtomicLong +import java.util.function.LongUnaryOperator import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -30,7 +31,7 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.protocol.StorageInfo import org.apache.celeborn.common.util.{JavaUtils, Utils} -import org.apache.celeborn.common.util.Utils.runCommand +import org.apache.celeborn.common.util.Utils.{runCommand, userPort} class DiskInfo( val mountPoint: String, @@ -99,15 +100,18 @@ class DiskInfo( def acquireBytesFlushed(bytes: Long): Boolean = { // Update only if transientAvailableBytes is greater than or equal to bytes to acquire, otherwise return false. var updated = false - transientAvailableBytes.getAndUpdate { current: Long => - if (current >= bytes) { - updated = true - current - bytes - } else { - updated = false - current + transientAvailableBytes.getAndUpdate(new LongUnaryOperator() { + override def applyAsLong(availableBytes: Long): Long = { + if (availableBytes >= bytes) { + updated = true + availableBytes - bytes + } else { + updated = false + availableBytes + } } - } + }) + updated }