Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.SysEnv
import nextflow.container.ContainerBuilder
import nextflow.container.ContainerHelper
Expand Down Expand Up @@ -582,7 +583,9 @@ class BashWrapperBuilder {
}

protected boolean isTraceRequired() {
statsEnabled || fixOwnership()
if( fusionEnabled && Global.isFusionTraceEnabled() )
return fixOwnership()
return statsEnabled || fixOwnership()
}

protected String shellPath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class TaskBean implements Serializable, Cloneable {

boolean statsEnabled

boolean fusionEnabled

List<String> outputEnvNames

Map<String,String> outputEvals
Expand Down Expand Up @@ -164,6 +166,7 @@ class TaskBean implements Serializable, Cloneable {
this.outputEnvNames = task.getOutputEnvNames()
this.outputEvals = task.getOutputEvals()
this.statsEnabled = task.getProcessor().getSession().statsEnabled
this.fusionEnabled = task.getProcessor().isFusionEnabled()

this.inputFiles = task.getInputFilesMap()
this.outputFiles = task.getOutputFilesNames()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package nextflow.processor

import static nextflow.processor.TaskStatus.*
import static nextflow.trace.TraceRecord.toLong
import static nextflow.trace.TraceRecord.toFloat

import java.nio.file.NoSuchFileException
import java.util.concurrent.atomic.AtomicBoolean

import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.trace.TraceRecord
import nextflow.util.TestOnly
/**
Expand Down Expand Up @@ -241,15 +244,21 @@ abstract class TaskHandler {
}
}

final file = task.workDir?.resolve(TaskRun.CMD_TRACE)
try {
if(file) record.parseTraceFile(file)
}
catch( NoSuchFileException e ) {
// ignore it
}
catch( IOException e ) {
log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}"
// When Fusion trace is enabled, task metrics are collected by the Fusion client
// and reported in .fusion/trace.json — skip parsing the .command.trace file
// generated by the bash trace wrapper, as it won't exist
final fusionTraceEnabled = task.processor.executor.isFusionEnabled() && Global.isFusionTraceEnabled()
if( !fusionTraceEnabled ) {
final file = task.workDir?.resolve(TaskRun.CMD_TRACE)
try {
if(file) record.parseTraceFile(file)
}
catch( NoSuchFileException e ) {
log.trace "Unable to find trace file: $file"
}
catch( IOException e ) {
log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}"
}
}

parseFusionTrace(record)
Expand All @@ -261,22 +270,77 @@ abstract class TaskHandler {
protected void parseFusionTrace(TraceRecord record) {
if( !task.processor.executor.isFusionEnabled() )
return
// Resolve the Fusion trace file path (.fusion/trace.json) in the task work directory.
// This file is produced by the Fusion client and contains process, cgroup, and GPU metrics.
final fusionTrace = task.workDir?.resolve(TaskRun.FUSION_TRACE)
try {
if( fusionTrace ) {
final gpu = TraceRecord.parseFusionTraceFile(fusionTrace)
if( gpu )
record.gpuMetrics = gpu
}
if( !fusionTrace )
return
// Parse the full Fusion trace JSON — sections: 'proc', 'cgroup', 'gpu'
final json = TraceRecord.parseFusionTraceFile(fusionTrace)
// GPU metrics are always extracted when available, regardless of the
// NXF_FUSION_TRACE setting — this preserves backward compatibility
final gpu = (Map<String,Object>) json.get('gpu')
if( gpu )
record.gpuMetrics = gpu
// When NXF_FUSION_TRACE is disabled, stop here — only GPU metrics are collected.
// When enabled, also map the 'proc' and 'cgroup' sections into TraceRecord fields,
// replacing the metrics that would normally come from the bash trace wrapper
if( !Global.isFusionTraceEnabled() )
return
applyFusionMetrics(record, json)
}
catch( NoSuchFileException e ) {
// ignore it
log.trace "Unable to find Fusion trace file: $fusionTrace"
}
catch( Exception e ) {
log.debug "[WARN] Cannot read Fusion trace file: $fusionTrace -- Cause: ${e.message}"
}
}

protected void applyFusionMetrics(TraceRecord record, Map<String,Object> json) {
final proc = (Map<String,Object>) json.get('proc')
final cgroup = (Map<String,Object>) json.get('cgroup')

if( proc ) {
// CPU and timing metrics
record.store.put('realtime', toLong(proc.get('realtime')))
record.store.put('%cpu', toFloat(proc.get('pct_cpu')) / 10.0f as float)
record.store.put('cpu_model', proc.get('cpu_name')?.toString())
// I/O metrics
record.store.put('rchar', toLong(proc.get('rchar')))
record.store.put('wchar', toLong(proc.get('wchar')))
record.store.put('syscr', toLong(proc.get('syscr')))
record.store.put('syscw', toLong(proc.get('syscw')))
record.store.put('read_bytes', toLong(proc.get('read_bytes')))
record.store.put('write_bytes', toLong(proc.get('write_bytes')))
// context switches
record.store.put('vol_ctxt', toLong(proc.get('vol_ctxt')))
record.store.put('inv_ctxt', toLong(proc.get('inv_ctxt')))
}

// Prefer cgroup memory metrics (more accurate for containerized tasks)
if( cgroup ) {
record.store.put('vmem', toLong(cgroup.get('memory_current')))
record.store.put('rss', toLong(cgroup.get('memory_rss')))
record.store.put('peak_vmem', toLong(cgroup.get('memory_peak')))
record.store.put('peak_rss', toLong(cgroup.get('memory_peak_rss')))
final memLimit = toLong(cgroup.get('memory_limit'))
final memRss = toLong(cgroup.get('memory_rss'))
if( memLimit > 0 ) {
record.store.put('%mem', (memRss / (float)memLimit * 100.0f) as float)
Comment thread
pditommaso marked this conversation as resolved.
Outdated
}
}
else if( proc ) {
// Fallback to proc memory metrics when cgroup is not available
record.store.put('vmem', toLong(proc.get('vmem')))
record.store.put('rss', toLong(proc.get('rss')))
record.store.put('peak_vmem', toLong(proc.get('peak_vmem')))
record.store.put('peak_rss', toLong(proc.get('peak_rss')))
record.store.put('%mem', toFloat(proc.get('pct_mem')) / 10.0f as float)
}
}

/**
* Determine if a process can be forked i.e. can launch
* a parallel task execution. This is only enforced when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ class TaskProcessor {
*/
Executor getExecutor() { executor }

boolean isFusionEnabled() { executor?.isFusionEnabled() ?: false }

/**
* @return The {@code DataflowOperator} underlying this process
*/
Expand Down
42 changes: 40 additions & 2 deletions modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,47 @@ class TraceRecord implements Serializable {
return this
}

/**
* Parse the Fusion trace file (.fusion/trace.json) produced by the Fusion client.
* The file contains task metrics in the following format:
* <pre>
* {
* "proc": {
* "realtime": 660541, "pct_cpu": 1045, "cpu_name": "Intel Xeon ...",
* "rchar": 14112539262, "wchar": 12668821375, "syscr": 1823378, "syscw": 169293,
* "read_bytes": 8011776, "write_bytes": 102400,
* "vmem": 39015152, "rss": 14826068, "peak_vmem": 39047920, "peak_rss": 15775480,
* "pct_mem": 56, "vol_ctxt": 413015, "inv_ctxt": 1540
* },
* "gpu": {
* "name": "Tesla T4", "mem": 15360, "driver": "580.126.09",
* "pct": 75, "peak": 100, "pct_mem": 40.1, "peak_mem": 74.1
* },
* "cgroup": {
* "version": "v2", "memory_current": 25469927424, "memory_peak": 41178980352,
* "memory_rss": 67919872, "memory_peak_rss": 14783070208,
* "memory_limit": 77309411328, "cpu_usage_usec": 785302059
* }
* }
* </pre>
*
* @param file Path to the .fusion/trace.json file
* @return The parsed JSON as a map with keys: {@code proc}, {@code gpu}, {@code cgroup}
*/
static Map<String,Object> parseFusionTraceFile(Path file) {
final json = (Map) new JsonSlurper().parse(file)
return (Map<String,Object>) json.get('gpu')
return (Map<String,Object>) new JsonSlurper().parse(file)
}

static long toLong(Object value) {
if( value instanceof Number )
return value.longValue()
return value != null ? value.toString().toLong() : 0L
}

static float toFloat(Object value) {
if( value instanceof Number )
return value.floatValue()
return value != null ? value.toString().toFloat() : 0.0f
}

private long parseInt( String str, Path file, String row ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,42 @@ class BashWrapperBuilderTest extends Specification {
binding.kill_cmd == 'podman stop $NXF_BOXID'
}

def 'should skip trace wrapper when NXF_FUSION_TRACE is enabled and Fusion is active'() {
given:
SysEnv.push([NXF_FUSION_TRACE: 'true'])
def bean = new TaskBean()
bean.statsEnabled = true
bean.fusionEnabled = true
bean.workDir = Path.of('/work/xx/yy')
bean.script = 'echo hello'

def builder = new BashWrapperBuilder(bean)

expect:
!builder.isTraceRequired()

cleanup:
SysEnv.pop()
}

def 'should keep trace wrapper when NXF_FUSION_TRACE is disabled even with Fusion'() {
given:
SysEnv.push([NXF_FUSION_TRACE: 'false'])
def bean = new TaskBean()
bean.statsEnabled = true
bean.fusionEnabled = true
bean.workDir = Path.of('/work/xx/yy')
bean.script = 'echo hello'

def builder = new BashWrapperBuilder(bean)

expect:
builder.isTraceRequired()

cleanup:
SysEnv.pop()
}

@Unroll
def 'should check retryable errors' () {
expect:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.processor
import java.util.concurrent.atomic.LongAdder

import nextflow.Session
import nextflow.SysEnv
import nextflow.executor.Executor
import nextflow.trace.TraceRecord
import nextflow.util.Duration
Expand Down Expand Up @@ -345,6 +346,114 @@ class TaskHandlerTest extends Specification {
record.gpuMetrics == null
}

def 'should parse full Fusion trace metrics when NXF_FUSION_TRACE is enabled'() {
given:
SysEnv.push([NXF_FUSION_TRACE: 'true'])
def folder = TestHelper.createInMemTempDir()
folder.resolve('.fusion').mkdir()
folder.resolve(TaskRun.FUSION_TRACE).text = '{"proc":{"realtime":660541,"pct_cpu":1045,"cpu_name":"Intel CPU","rchar":100,"wchar":200,"syscr":10,"syscw":20,"read_bytes":300,"write_bytes":400,"pct_mem":56,"vmem":1000,"rss":500,"peak_vmem":1100,"peak_rss":600,"vol_ctxt":100,"inv_ctxt":50},"gpu":{"name":"Tesla T4","pct":75,"peak":100},"cgroup":{"version":"v2","memory_current":25469927424,"memory_peak":41178980352,"memory_rss":67919872,"memory_peak_rss":14783070208,"memory_limit":77309411328}}'

def task = new TaskRun(workDir: folder)
task.processor = Mock(TaskProcessor)
task.processor.getExecutor() >> Mock(Executor) { isFusionEnabled() >> true }

def handler = Spy(TaskHandler)
handler.task = task

def record = new TraceRecord()

when:
handler.parseFusionTrace(record)

then:
// cgroup memory metrics
record.store.get('vmem') == 25469927424L
record.store.get('rss') == 67919872L
record.store.get('peak_vmem') == 41178980352L
record.store.get('peak_rss') == 14783070208L
// proc metrics
record.store.get('realtime') == 660541L
record.store.get('%cpu') == 104.5f
record.store.get('cpu_model') == 'Intel CPU'
// gpu metrics
record.gpuMetrics.name == 'Tesla T4'
record.gpuMetrics.pct == 75

cleanup:
SysEnv.pop()
}

def 'should only parse GPU metrics when NXF_FUSION_TRACE is disabled'() {
given:
SysEnv.push([NXF_FUSION_TRACE: 'false'])
def folder = TestHelper.createInMemTempDir()
folder.resolve('.fusion').mkdir()
folder.resolve(TaskRun.FUSION_TRACE).text = '{"proc":{"realtime":660541,"pct_cpu":1045},"gpu":{"name":"Tesla T4","pct":75,"peak":100},"cgroup":{"version":"v2","memory_current":25469927424}}'

def task = new TaskRun(workDir: folder)
task.processor = Mock(TaskProcessor)
task.processor.getExecutor() >> Mock(Executor) { isFusionEnabled() >> true }

def handler = Spy(TaskHandler)
handler.task = task

def record = new TraceRecord()

when:
handler.parseFusionTrace(record)

then:
// only GPU metrics populated (existing behavior)
record.gpuMetrics.name == 'Tesla T4'
// no proc/cgroup metrics populated
record.store.get('realtime') == null
record.store.get('vmem') == null

cleanup:
SysEnv.pop()
}

def 'should skip command trace file when NXF_FUSION_TRACE is enabled'() {
given:
SysEnv.push([NXF_FUSION_TRACE: 'true'])
def folder = TestHelper.createInMemTempDir()
// Create a .command.trace with different values to prove it's NOT read
folder.resolve(TaskRun.CMD_TRACE).text = '''\
nextflow.trace/v2
realtime=99999
%cpu=100
'''.stripIndent().leftTrim()
// Create fusion trace
folder.resolve('.fusion').mkdir()
folder.resolve(TaskRun.FUSION_TRACE).text = '{"proc":{"realtime":1000,"pct_cpu":500,"cpu_name":"CPU","rchar":0,"wchar":0,"syscr":0,"syscw":0,"read_bytes":0,"write_bytes":0,"pct_mem":0,"vmem":0,"rss":0,"peak_vmem":0,"peak_rss":0,"vol_ctxt":0,"inv_ctxt":0}}'

def task = new TaskRun(workDir: folder)
task.processor = Mock(TaskProcessor)
task.processor.getSession() >> new Session()
task.processor.getExecutor() >> Mock(Executor) { isFusionEnabled() >> true }
task.processor.getName() >> 'test'
task.processor.getProcessEnvironment() >> [:]
task.config = new TaskConfig()
task.context = new TaskContext(Mock(Script), [:], 'none')

def handler = Spy(TaskHandler)
handler.task = task
handler.status = TaskStatus.COMPLETED
handler.submitTimeMillis = 500L
handler.startTimeMillis = 1000L
handler.completeTimeMillis = 2000L

when:
def record = handler.getTraceRecord()

then:
// Should use Fusion realtime (1000), NOT .command.trace realtime (99999)
record.store.get('realtime') == 1000L

cleanup:
SysEnv.pop()
}

@Unroll
def 'should set isChildArray flag'() {
given:
Expand Down
Loading
Loading