diff --git a/docs/reference/env-vars.md b/docs/reference/env-vars.md index 91cbb0373e..908566b754 100644 --- a/docs/reference/env-vars.md +++ b/docs/reference/env-vars.md @@ -123,6 +123,11 @@ The following environment variables control the configuration of the Nextflow ru : The file storage path against which relative file paths are resolved. : For example, with `NXF_FILE_ROOT=/some/root/path`, the use of `file('hello')` will be resolved to the absolute path `/some/root/path/hello`. A remote root path can be specified using the usual protocol prefix, e.g. `NXF_FILE_ROOT=s3://my-bucket/data`. Files defined using an absolute path are not affected by this setting. +`NXF_FUSION_TRACE` +: :::{versionadded} 26.04.0 + ::: +: When set to `true`, collect task resource metrics (CPU, memory, I/O) from the Fusion trace file (`.fusion/trace.json`) produced in the task work directory, replacing the metrics collected by the default bash command-trace wrapper. Requires Fusion to be enabled. GPU metrics from Fusion are always collected regardless of this setting. + `NXF_HOME` : Nextflow home directory (default: `$HOME/.nextflow`). diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy index 3005364cdf..3c77b09714 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy @@ -26,6 +26,7 @@ import java.nio.file.Path import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.util.logging.Slf4j +import nextflow.Global import nextflow.SysEnv import nextflow.container.ContainerBuilder import nextflow.container.ContainerHelper @@ -582,7 +583,9 @@ class BashWrapperBuilder { } protected boolean isTraceRequired() { - statsEnabled || fixOwnership() + if( fusionEnabled && Global.isFusionTraceEnabled() ) + return fixOwnership() + return statsEnabled || fixOwnership() } protected String shellPath() { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy index 1458dee615..a5ab89c351 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy @@ -73,6 +73,8 @@ class TaskBean implements Serializable, Cloneable { boolean statsEnabled + boolean fusionEnabled + List outputEnvNames Map outputEvals @@ -164,6 +166,7 @@ class TaskBean implements Serializable, Cloneable { this.outputEnvNames = task.getOutputEnvNames() this.outputEvals = task.getOutputEvals() this.statsEnabled = task.getProcessor().getSession().statsEnabled + this.fusionEnabled = task.getProcessor().isFusionEnabled() this.inputFiles = task.getInputFilesMap() this.outputFiles = task.getOutputFilesNames() diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index 7406359acb..394e2177a6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -17,6 +17,8 @@ package nextflow.processor import static nextflow.processor.TaskStatus.* +import static nextflow.trace.TraceRecord.toLong +import static nextflow.trace.TraceRecord.toFloat import java.nio.file.NoSuchFileException import java.util.concurrent.atomic.AtomicBoolean @@ -24,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean import groovy.transform.CompileDynamic import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.Global import nextflow.trace.TraceRecord import nextflow.util.TestOnly /** @@ -241,15 +244,21 @@ abstract class TaskHandler { } } - final file = task.workDir?.resolve(TaskRun.CMD_TRACE) - try { - if(file) record.parseTraceFile(file) - } - catch( NoSuchFileException e ) { - // ignore it - } - catch( IOException e ) { - log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}" + // When Fusion trace is enabled, task metrics are collected by the Fusion client + // and reported in .fusion/trace.json — skip parsing the .command.trace file + // generated by the bash trace wrapper, as it won't exist + final fusionTraceEnabled = task.processor.executor.isFusionEnabled() && Global.isFusionTraceEnabled() + if( !fusionTraceEnabled ) { + final file = task.workDir?.resolve(TaskRun.CMD_TRACE) + try { + if(file) record.parseTraceFile(file) + } + catch( NoSuchFileException e ) { + log.trace "Unable to find trace file: $file" + } + catch( IOException e ) { + log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}" + } } parseFusionTrace(record) @@ -261,22 +270,82 @@ abstract class TaskHandler { protected void parseFusionTrace(TraceRecord record) { if( !task.processor.executor.isFusionEnabled() ) return + // Resolve the Fusion trace file path (.fusion/trace.json) in the task work directory. + // This file is produced by the Fusion client and contains process, cgroup, and GPU metrics. final fusionTrace = task.workDir?.resolve(TaskRun.FUSION_TRACE) try { - if( fusionTrace ) { - final gpu = TraceRecord.parseFusionTraceFile(fusionTrace) - if( gpu ) - record.gpuMetrics = gpu - } + if( !fusionTrace ) + return + // Parse the full Fusion trace JSON — sections: 'proc', 'cgroup', 'gpu' + final json = TraceRecord.parseFusionTraceFile(fusionTrace) + // GPU metrics are always extracted when available, regardless of the + // NXF_FUSION_TRACE setting — this preserves backward compatibility + final gpu = (Map) json.get('gpu') + if( gpu ) + record.gpuMetrics = gpu + // When NXF_FUSION_TRACE is disabled, stop here — only GPU metrics are collected. + // When enabled, also map the 'proc' and 'cgroup' sections into TraceRecord fields, + // replacing the metrics that would normally come from the bash trace wrapper + if( !Global.isFusionTraceEnabled() ) + return + applyFusionMetrics(record, json) } catch( NoSuchFileException e ) { - // ignore it + log.trace "Unable to find Fusion trace file: $fusionTrace" } catch( Exception e ) { log.debug "[WARN] Cannot read Fusion trace file: $fusionTrace -- Cause: ${e.message}" } } + protected void applyFusionMetrics(TraceRecord record, Map json) { + final proc = (Map) json.get('proc') + final cgroup = (Map) json.get('cgroup') + + if( proc ) { + // CPU and timing metrics + record.store.put('realtime', toLong(proc.get('realtime'))) + record.store.put('%cpu', toFloat(proc.get('pct_cpu')) / 10.0f as float) + record.store.put('cpu_model', proc.get('cpu_name')?.toString()) + // I/O metrics + record.store.put('rchar', toLong(proc.get('rchar'))) + record.store.put('wchar', toLong(proc.get('wchar'))) + record.store.put('syscr', toLong(proc.get('syscr'))) + record.store.put('syscw', toLong(proc.get('syscw'))) + record.store.put('read_bytes', toLong(proc.get('read_bytes'))) + record.store.put('write_bytes', toLong(proc.get('write_bytes'))) + // context switches + record.store.put('vol_ctxt', toLong(proc.get('vol_ctxt'))) + record.store.put('inv_ctxt', toLong(proc.get('inv_ctxt'))) + } + + // Prefer cgroup memory metrics (more accurate for containerized tasks) + if( cgroup ) { + record.store.put('vmem', toLong(cgroup.get('memory_current'))) + record.store.put('rss', toLong(cgroup.get('memory_rss'))) + record.store.put('peak_vmem', toLong(cgroup.get('memory_peak'))) + record.store.put('peak_rss', toLong(cgroup.get('memory_peak_rss'))) + // Compute %mem as peak RSS against the cgroup memory limit — i.e. how close the + // task got to its allocated memory. This overrides proc.pct_mem, which is RSS against + // total host memory (ps %MEM semantics) and underestimates utilization for containerized + // tasks whose limit is smaller than the host. Using peak (not current) RSS because Fusion + // overwrites memory_rss post-exit, making the last sample unrepresentative. + final memLimit = toLong(cgroup.get('memory_limit')) + final memPeakRss = toLong(cgroup.get('memory_peak_rss')) + if( memLimit > 0 ) { + record.store.put('%mem', (memPeakRss / (float)memLimit * 100.0f) as float) + } + } + else if( proc ) { + // Fallback to proc memory metrics when cgroup is not available + record.store.put('vmem', toLong(proc.get('vmem'))) + record.store.put('rss', toLong(proc.get('rss'))) + record.store.put('peak_vmem', toLong(proc.get('peak_vmem'))) + record.store.put('peak_rss', toLong(proc.get('peak_rss'))) + record.store.put('%mem', toFloat(proc.get('pct_mem')) / 10.0f as float) + } + } + /** * Determine if a process can be forked i.e. can launch * a parallel task execution. This is only enforced when diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index de324786ac..15cb435680 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -331,6 +331,8 @@ class TaskProcessor { */ Executor getExecutor() { executor } + boolean isFusionEnabled() { executor?.isFusionEnabled() ?: false } + /** * @return The {@code DataflowOperator} underlying this process */ diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy index 6459d45270..810200e9ed 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy @@ -524,9 +524,47 @@ class TraceRecord implements Serializable { return this } + /** + * Parse the Fusion trace file (.fusion/trace.json) produced by the Fusion client. + * The file contains task metrics in the following format: + *
+     * {
+     *   "proc": {
+     *     "realtime": 660541, "pct_cpu": 1045, "cpu_name": "Intel Xeon ...",
+     *     "rchar": 14112539262, "wchar": 12668821375, "syscr": 1823378, "syscw": 169293,
+     *     "read_bytes": 8011776, "write_bytes": 102400,
+     *     "vmem": 39015152, "rss": 14826068, "peak_vmem": 39047920, "peak_rss": 15775480,
+     *     "pct_mem": 56, "vol_ctxt": 413015, "inv_ctxt": 1540
+     *   },
+     *   "gpu": {
+     *     "name": "Tesla T4", "mem": 15360, "driver": "580.126.09",
+     *     "pct": 75, "peak": 100, "pct_mem": 40.1, "peak_mem": 74.1
+     *   },
+     *   "cgroup": {
+     *     "version": "v2", "memory_current": 25469927424, "memory_peak": 41178980352,
+     *     "memory_rss": 67919872, "memory_peak_rss": 14783070208,
+     *     "memory_limit": 77309411328, "cpu_usage_usec": 785302059
+     *   }
+     * }
+     * 
+ * + * @param file Path to the .fusion/trace.json file + * @return The parsed JSON as a map with keys: {@code proc}, {@code gpu}, {@code cgroup} + */ static Map parseFusionTraceFile(Path file) { - final json = (Map) new JsonSlurper().parse(file) - return (Map) json.get('gpu') + return (Map) new JsonSlurper().parse(file) + } + + static long toLong(Object value) { + if( value instanceof Number ) + return value.longValue() + return value != null ? value.toString().toLong() : 0L + } + + static float toFloat(Object value) { + if( value instanceof Number ) + return value.floatValue() + return value != null ? value.toString().toFloat() : 0.0f } private long parseInt( String str, Path file, String row ) { diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy index 0a34b825df..b00d95e6cf 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy @@ -1447,6 +1447,42 @@ class BashWrapperBuilderTest extends Specification { binding.kill_cmd == 'podman stop $NXF_BOXID' } + def 'should skip trace wrapper when NXF_FUSION_TRACE is enabled and Fusion is active'() { + given: + SysEnv.push([NXF_FUSION_TRACE: 'true']) + def bean = new TaskBean() + bean.statsEnabled = true + bean.fusionEnabled = true + bean.workDir = Path.of('/work/xx/yy') + bean.script = 'echo hello' + + def builder = new BashWrapperBuilder(bean) + + expect: + !builder.isTraceRequired() + + cleanup: + SysEnv.pop() + } + + def 'should keep trace wrapper when NXF_FUSION_TRACE is disabled even with Fusion'() { + given: + SysEnv.push([NXF_FUSION_TRACE: 'false']) + def bean = new TaskBean() + bean.statsEnabled = true + bean.fusionEnabled = true + bean.workDir = Path.of('/work/xx/yy') + bean.script = 'echo hello' + + def builder = new BashWrapperBuilder(bean) + + expect: + builder.isTraceRequired() + + cleanup: + SysEnv.pop() + } + @Unroll def 'should check retryable errors' () { expect: diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy index d64ed2d2e2..b5f913c97c 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy @@ -19,6 +19,7 @@ package nextflow.processor import java.util.concurrent.atomic.LongAdder import nextflow.Session +import nextflow.SysEnv import nextflow.executor.Executor import nextflow.trace.TraceRecord import nextflow.util.Duration @@ -345,6 +346,114 @@ class TaskHandlerTest extends Specification { record.gpuMetrics == null } + def 'should parse full Fusion trace metrics when NXF_FUSION_TRACE is enabled'() { + given: + SysEnv.push([NXF_FUSION_TRACE: 'true']) + def folder = TestHelper.createInMemTempDir() + folder.resolve('.fusion').mkdir() + folder.resolve(TaskRun.FUSION_TRACE).text = '{"proc":{"realtime":660541,"pct_cpu":1045,"cpu_name":"Intel CPU","rchar":100,"wchar":200,"syscr":10,"syscw":20,"read_bytes":300,"write_bytes":400,"pct_mem":56,"vmem":1000,"rss":500,"peak_vmem":1100,"peak_rss":600,"vol_ctxt":100,"inv_ctxt":50},"gpu":{"name":"Tesla T4","pct":75,"peak":100},"cgroup":{"version":"v2","memory_current":25469927424,"memory_peak":41178980352,"memory_rss":67919872,"memory_peak_rss":14783070208,"memory_limit":77309411328}}' + + def task = new TaskRun(workDir: folder) + task.processor = Mock(TaskProcessor) + task.processor.getExecutor() >> Mock(Executor) { isFusionEnabled() >> true } + + def handler = Spy(TaskHandler) + handler.task = task + + def record = new TraceRecord() + + when: + handler.parseFusionTrace(record) + + then: + // cgroup memory metrics + record.store.get('vmem') == 25469927424L + record.store.get('rss') == 67919872L + record.store.get('peak_vmem') == 41178980352L + record.store.get('peak_rss') == 14783070208L + // proc metrics + record.store.get('realtime') == 660541L + record.store.get('%cpu') == 104.5f + record.store.get('cpu_model') == 'Intel CPU' + // gpu metrics + record.gpuMetrics.name == 'Tesla T4' + record.gpuMetrics.pct == 75 + + cleanup: + SysEnv.pop() + } + + def 'should only parse GPU metrics when NXF_FUSION_TRACE is disabled'() { + given: + SysEnv.push([NXF_FUSION_TRACE: 'false']) + def folder = TestHelper.createInMemTempDir() + folder.resolve('.fusion').mkdir() + folder.resolve(TaskRun.FUSION_TRACE).text = '{"proc":{"realtime":660541,"pct_cpu":1045},"gpu":{"name":"Tesla T4","pct":75,"peak":100},"cgroup":{"version":"v2","memory_current":25469927424}}' + + def task = new TaskRun(workDir: folder) + task.processor = Mock(TaskProcessor) + task.processor.getExecutor() >> Mock(Executor) { isFusionEnabled() >> true } + + def handler = Spy(TaskHandler) + handler.task = task + + def record = new TraceRecord() + + when: + handler.parseFusionTrace(record) + + then: + // only GPU metrics populated (existing behavior) + record.gpuMetrics.name == 'Tesla T4' + // no proc/cgroup metrics populated + record.store.get('realtime') == null + record.store.get('vmem') == null + + cleanup: + SysEnv.pop() + } + + def 'should skip command trace file when NXF_FUSION_TRACE is enabled'() { + given: + SysEnv.push([NXF_FUSION_TRACE: 'true']) + def folder = TestHelper.createInMemTempDir() + // Create a .command.trace with different values to prove it's NOT read + folder.resolve(TaskRun.CMD_TRACE).text = '''\ + nextflow.trace/v2 + realtime=99999 + %cpu=100 + '''.stripIndent().leftTrim() + // Create fusion trace + folder.resolve('.fusion').mkdir() + folder.resolve(TaskRun.FUSION_TRACE).text = '{"proc":{"realtime":1000,"pct_cpu":500,"cpu_name":"CPU","rchar":0,"wchar":0,"syscr":0,"syscw":0,"read_bytes":0,"write_bytes":0,"pct_mem":0,"vmem":0,"rss":0,"peak_vmem":0,"peak_rss":0,"vol_ctxt":0,"inv_ctxt":0}}' + + def task = new TaskRun(workDir: folder) + task.processor = Mock(TaskProcessor) + task.processor.getSession() >> new Session() + task.processor.getExecutor() >> Mock(Executor) { isFusionEnabled() >> true } + task.processor.getName() >> 'test' + task.processor.getProcessEnvironment() >> [:] + task.config = new TaskConfig() + task.context = new TaskContext(Mock(Script), [:], 'none') + + def handler = Spy(TaskHandler) + handler.task = task + handler.status = TaskStatus.COMPLETED + handler.submitTimeMillis = 500L + handler.startTimeMillis = 1000L + handler.completeTimeMillis = 2000L + + when: + def record = handler.getTraceRecord() + + then: + // Should use Fusion realtime (1000), NOT .command.trace realtime (99999) + record.store.get('realtime') == 1000L + + cleanup: + SysEnv.pop() + } + @Unroll def 'should set isChildArray flag'() { given: diff --git a/modules/nextflow/src/test/groovy/nextflow/trace/TraceRecordTest.groovy b/modules/nextflow/src/test/groovy/nextflow/trace/TraceRecordTest.groovy index d379d03184..2d361d45fa 100644 --- a/modules/nextflow/src/test/groovy/nextflow/trace/TraceRecordTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/trace/TraceRecordTest.groovy @@ -394,7 +394,7 @@ class TraceRecordTest extends Specification { rec2.getGpuMetrics() == null } - def 'should parse Fusion trace file and extract gpu block'() { + def 'should parse Fusion trace file'() { given: def folder = TestHelper.createInMemTempDir() def file = folder.resolve('.fusion/trace.json') @@ -402,30 +402,30 @@ class TraceRecordTest extends Specification { file.text = '{"proc":{"realtime":100},"gpu":{"name":"Tesla T4","mem":15360,"driver":"580.126.09","active_time":651030,"pct":75,"peak":100,"pct_mem":40.1,"peak_mem":74.1,"avg_mem":6161,"peak_mem_used":11388,"avg_mem_bw_util":43,"peak_mem_bw_util":83},"cgroup":{"version":"v2"}}' when: - def gpu = TraceRecord.parseFusionTraceFile(file) + def json = TraceRecord.parseFusionTraceFile(file) then: - gpu.name == 'Tesla T4' - gpu.mem == 15360 - gpu.driver == '580.126.09' - gpu.active_time == 651030 - gpu.pct == 75 - gpu.peak == 100 - gpu.avg_mem_bw_util == 43 - gpu.peak_mem_bw_util == 83 + json.proc.realtime == 100 + json.gpu.name == 'Tesla T4' + json.gpu.mem == 15360 + json.gpu.driver == '580.126.09' + json.gpu.pct == 75 + json.gpu.peak == 100 + json.cgroup.version == 'v2' } - def 'should return null when Fusion trace file has no gpu block'() { + def 'should parse Fusion trace file without gpu block'() { given: def folder = TestHelper.createInMemTempDir() def file = folder.resolve('trace.json') file.text = '{"proc":{"realtime":100},"cgroup":{"version":"v2"}}' when: - def gpu = TraceRecord.parseFusionTraceFile(file) + def json = TraceRecord.parseFusionTraceFile(file) then: - gpu == null + json.proc.realtime == 100 + json.gpu == null } def 'should throw exception when Fusion trace file has malformed JSON'() { diff --git a/modules/nf-commons/src/main/nextflow/Global.groovy b/modules/nf-commons/src/main/nextflow/Global.groovy index 78b4e2f665..df8c13e036 100644 --- a/modules/nf-commons/src/main/nextflow/Global.groovy +++ b/modules/nf-commons/src/main/nextflow/Global.groovy @@ -34,6 +34,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils @Slf4j class Global { + /** + * When {@code true}, Fusion trace metrics replace the bash command-trace wrapper + */ + static boolean isFusionTraceEnabled() { + return SysEnv.get('NXF_FUSION_TRACE', 'true') == 'true' + } + /** * The pipeline session instance */