Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
5c881c5
docs: add design for Seqera executor resourceLabels support
pditommaso Apr 17, 2026
c37151d
docs: add implementation plan for Seqera executor resourceLabels
pditommaso Apr 17, 2026
21bef93
build(nf-seqera): bump sched-client to 0.51.0 via includeBuild
pditommaso Apr 17, 2026
1c79e67
build: keep sched includeBuild commented for CI safety
pditommaso Apr 17, 2026
1d7f81e
feat(nf-seqera): add Labels.withProcessResourceLabels
pditommaso Apr 17, 2026
3117e9e
feat(nf-seqera): add Labels.toStringMap and Labels.delta helpers
pditommaso Apr 17, 2026
d53dfef
refactor(nf-seqera)!: remove seqera.executor.labels in favour of proc…
pditommaso Apr 17, 2026
eebebf1
feat(nf-seqera): attach process.resourceLabels to Sched run labels
pditommaso Apr 17, 2026
e569cc9
feat(nf-seqera): send per-task resourceLabels delta on Sched task
pditommaso Apr 17, 2026
03658b6
docs(nf-seqera): document resourceLabels support and bump to 0.18.0
pditommaso Apr 17, 2026
a7d3a1c
test(nf-seqera): cover createRun labels and tighten submit-test mocks
pditommaso Apr 17, 2026
6273bd2
build(nf-seqera): bump sched-client to 0.52.0-SNAPSHOT
pditommaso Apr 17, 2026
f496f01
Minor change [ci fast]
pditommaso Apr 19, 2026
0129976
Minor change [ci skip]
pditommaso Apr 19, 2026
29a4340
refactor(nf-seqera): defensive check and immutable run labels [ci fast]
pditommaso Apr 19, 2026
171e140
feat(nf-seqera): filter autoLabels to selected workflow-metadata fiel…
pditommaso Apr 19, 2026
6b3d007
Merge branch 'master' into feat/seqera-auto-labels-filter [ci fast]
pditommaso Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,7 @@ Resource labels are currently supported by the following executors:
- {ref}`azurebatch-executor`
- {ref}`google-batch-executor`
- {ref}`k8s-executor`
- {ref}`seqera-executor`

:::{note}
The limits and the syntax of the corresponding executor should be taken into consideration when using resource labels.
Expand Down
2 changes: 1 addition & 1 deletion plugins/nf-seqera/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ dependencies {
compileOnly project(':nextflow')
compileOnly 'org.slf4j:slf4j-api:2.0.17'
compileOnly 'org.pf4j:pf4j:3.14.1'
api 'io.seqera:sched-client:0.49.0'
api 'io.seqera:sched-client:0.52.0'

testImplementation(testFixtures(project(":nextflow")))
testImplementation "org.apache.groovy:groovy:4.0.31"
Expand Down
52 changes: 35 additions & 17 deletions plugins/nf-seqera/src/main/io/seqera/config/ExecutorOpts.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import nextflow.util.Duration
@CompileStatic
class ExecutorOpts implements ConfigScope {

static final Set<String> VALID_AUTO_LABELS = Collections.unmodifiableSet(new LinkedHashSet<>([
'projectName', 'userName', 'runName', 'sessionId', 'resume',
'revision', 'commitId', 'repository', 'manifestName',
'runtimeVersion', 'workflowId'
]))

final RetryOpts retryPolicy

@ConfigOption
Expand Down Expand Up @@ -73,17 +79,17 @@ class ExecutorOpts implements ConfigScope {

@ConfigOption
@Description("""
Custom labels to apply to AWS resources for cost tracking and resource organization.
Labels are propagated to ECS tasks, capacity providers, and EC2 instances.
Automatically attach workflow metadata labels (with the `nextflow.io/` and
`seqera.io/platform/` prefixes) to the session. Accepts:
- `true`: include all available metadata labels
- `false` (default): disable
- a list or comma-separated string of short names: e.g.
`['runName', 'projectName']` or `'runName,projectName'`
Valid names: `projectName`, `userName`, `runName`, `sessionId`, `resume`,
`revision`, `commitId`, `repository`, `manifestName`, `runtimeVersion`,
`workflowId`.
""")
final Map<String, String> labels

@ConfigOption
@Description("""
When `true`, automatically adds workflow metadata labels (e.g. project name,
run name, session ID) with the `nextflow.io/` prefix to the session (default: `false`).
""")
final boolean autoLabels
final Set<String> autoLabels

@ConfigOption
@Description("""
Expand Down Expand Up @@ -126,9 +132,7 @@ class ExecutorOpts implements ConfigScope {
: Duration.of('1 sec')
// machine requirement settings
this.machineRequirement = new MachineRequirementOpts(opts.machineRequirement as Map ?: Map.of())
// labels for cost tracking
this.labels = opts.labels as Map<String, String>
this.autoLabels = opts.autoLabels as boolean ?: false
this.autoLabels = parseAutoLabels(opts.get('autoLabels'))
// prediction model
this.predictionModel = opts.predictionModel as String ?: null
// custom task environment variables
Expand Down Expand Up @@ -165,12 +169,26 @@ class ExecutorOpts implements ConfigScope {
return machineRequirement
}

Map<String, String> getLabels() {
return labels
Set<String> getAutoLabels() {
return autoLabels
}

boolean getAutoLabels() {
return autoLabels
protected static Set<String> parseAutoLabels(Object value) {
if( value == null || value == false )
return Collections.<String>emptySet()
if( value == true )
return VALID_AUTO_LABELS
List<String> raw
if( value instanceof CharSequence )
raw = value.toString().tokenize(',').collect { String s -> s.trim() }.findAll { String s -> s }
else if( value instanceof List )
raw = ((List) value).collect { it?.toString()?.trim() }.findAll { String s -> s } as List<String>
else
throw new IllegalArgumentException("Invalid 'seqera.executor.autoLabels' value '${value}' - expected true, false, a list, or a comma-separated string")
final invalid = raw.findAll { String s -> !(s in VALID_AUTO_LABELS) }
if( invalid )
throw new IllegalArgumentException("Invalid 'seqera.executor.autoLabels' name(s) ${invalid} - valid names are: ${VALID_AUTO_LABELS.join(', ')}")
return Collections.unmodifiableSet(new LinkedHashSet<>(raw))
}

String getPredictionModel() {
Expand Down
90 changes: 74 additions & 16 deletions plugins/nf-seqera/src/main/io/seqera/executor/Labels.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,50 @@ import nextflow.script.WorkflowMetadata
@CompileStatic
class Labels {

static final Set<String> ALL_AUTO_LABELS = Collections.unmodifiableSet(new LinkedHashSet<>([
'projectName', 'userName', 'runName', 'sessionId', 'resume',
'revision', 'commitId', 'repository', 'manifestName',
'runtimeVersion', 'workflowId'
]))

private final Map<String,String> entries = new LinkedHashMap<>(20)

/**
* Add {@code nextflow.io/*} labels from workflow metadata
* Add all {@code nextflow.io/*} and {@code seqera.io/platform/*} labels
* derived from workflow metadata.
*/
Labels withWorkflowMetadata(WorkflowMetadata workflow) {
if( workflow.projectName )
return withWorkflowMetadata(workflow, ALL_AUTO_LABELS)
}

/**
* Add workflow metadata labels filtered by the {@code include} set of
* short names (e.g. {@code 'runName'}). Unknown names are ignored; the
* caller is expected to validate membership upstream.
*/
Labels withWorkflowMetadata(WorkflowMetadata workflow, Set<String> include) {
if( !include ) return this
if( include.contains('projectName') && workflow.projectName )
entries.put('nextflow.io/projectName', workflow.projectName)
if( workflow.userName )
if( include.contains('userName') && workflow.userName )
entries.put('nextflow.io/userName', workflow.userName)
if( workflow.runName )
if( include.contains('runName') && workflow.runName )
entries.put('nextflow.io/runName', workflow.runName)
if( workflow.sessionId )
if( include.contains('sessionId') && workflow.sessionId )
entries.put('nextflow.io/sessionId', workflow.sessionId.toString())
entries.put('nextflow.io/resume', String.valueOf(workflow.resume))
if( workflow.revision )
if( include.contains('resume') )
entries.put('nextflow.io/resume', String.valueOf(workflow.resume))
if( include.contains('revision') && workflow.revision )
entries.put('nextflow.io/revision', workflow.revision)
if( workflow.commitId )
if( include.contains('commitId') && workflow.commitId )
entries.put('nextflow.io/commitId', workflow.commitId)
if( workflow.repository )
if( include.contains('repository') && workflow.repository )
entries.put('nextflow.io/repository', workflow.repository)
if( workflow.manifest?.name )
if( include.contains('manifestName') && workflow.manifest?.name )
entries.put('nextflow.io/manifestName', workflow.manifest.name)
if( NextflowMeta.instance.version )
if( include.contains('runtimeVersion') && NextflowMeta.instance.version )
entries.put('nextflow.io/runtimeVersion', NextflowMeta.instance.version.toString())
if( workflow.platform?.workflowId )
if( include.contains('workflowId') && workflow.platform?.workflowId )
entries.put('seqera.io/platform/workflowId', workflow.platform.workflowId)
return this
}
Expand All @@ -79,11 +97,13 @@ class Labels {
}

/**
* Add user-configured labels. These take precedence over implicit labels.
* Add config-level {@code process.resourceLabels}. Values are coerced to
* string via {@link String#valueOf} to satisfy the scheduler API typing.
*/
Labels withUserLabels(Map<String,String> labels) {
if( labels )
entries.putAll(labels)
Labels withProcessResourceLabels(Map<String,?> map) {
if( !map ) return this
for( Map.Entry<String,?> entry : map.entrySet() )
entries.put(entry.key.toString(), String.valueOf(entry.value))
return this
}

Expand All @@ -106,4 +126,42 @@ class Labels {
.hash()
.toString()
}

/**
* Coerce arbitrary map values to strings via {@link String#valueOf}.
* Returns an empty map for null/empty input. Throws
* {@link IllegalArgumentException} when the value is not a {@link Map},
* to surface a clear error when {@code process.resourceLabels} is
* misconfigured (e.g. as a list).
*/
static Map<String,String> toStringMap(Object value) {
if( value == null )
return Collections.<String,String>emptyMap()
if( value !instanceof Map )
throw new IllegalArgumentException("Invalid value for 'resourceLabels' directive - expected a map of key/value pairs, got '${value.getClass().getName()}'")
final map = (Map<?,?>) value
if( map.isEmpty() )
return Collections.<String,String>emptyMap()
final result = new LinkedHashMap<String,String>(map.size())
for( Map.Entry<?,?> entry : map.entrySet() )
result.put(entry.key.toString(), String.valueOf(entry.value))
return result
}

/**
* Return the entries of {@code task} that are missing from {@code run}
* or have a different value. Returns {@code null} if the resulting
* map would be empty (so callers can omit the field).
*/
static Map<String,String> delta(Map<String,String> task, Map<String,String> run) {
if( !task ) return null
final result = new LinkedHashMap<String,String>()
for( Map.Entry<String,String> entry : task.entrySet() ) {
final k = entry.key
final v = entry.value
if( run == null || !run.containsKey(k) || run.get(k) != v )
result.put(k, v)
}
return result.isEmpty() ? null : result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.seqera.executor

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import io.seqera.config.SeqeraConfig
import io.seqera.config.ExecutorOpts
Expand Down Expand Up @@ -64,6 +65,8 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {

private volatile String runId

private volatile Map<String,String> runResourceLabels = Collections.<String,String>emptyMap()

private SeqeraBatchSubmitter batchSubmitter

@Override
Expand Down Expand Up @@ -114,10 +117,11 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {
final workspaceId = PlatformHelper.getWorkspaceId(towerConfig, SysEnv.get()) as Long
final computeEnvId = PlatformHelper.getComputeEnvId(towerConfig, SysEnv.get()) ?: seqeraConfig.computeEnvId

computeRunResourceLabels()
final labels = new Labels()
if( seqeraConfig.autoLabels )
labels.withWorkflowMetadata(session.workflowMetadata)
labels.withUserLabels(seqeraConfig.labels)
labels.withWorkflowMetadata(session.workflowMetadata, seqeraConfig.autoLabels)
labels.withProcessResourceLabels(runResourceLabels)
final predictionModel = seqeraConfig.predictionModel ? PredictionModel.fromValue(seqeraConfig.predictionModel) : null
final pipeline = new PipelineSpec()
.workflowId(workflowId)
Expand Down Expand Up @@ -203,6 +207,16 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {
return runId
}

Map<String,String> getRunResourceLabels() {
return Collections.unmodifiableMap(runResourceLabels)
}

@PackageScope
void computeRunResourceLabels() {
final processMap = session.config.process as Map
this.runResourceLabels = Labels.toStringMap(processMap?.get('resourceLabels'))
}

SeqeraBatchSubmitter getBatchSubmitter() {
return batchSubmitter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import io.seqera.executor.Labels
import io.seqera.sched.api.schema.v1a1.AcceleratorType
import io.seqera.sched.api.schema.v1a1.GetTaskLogsResponse
import io.seqera.sched.api.schema.v1a1.NextflowTask
Expand Down Expand Up @@ -138,6 +139,11 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask {
.taskId(task.id?.intValue())
.hash(task.hash?.toString())
.workDir(task.getWorkDirStr()))
// attach per-task resource labels delta (over run-level baseline)
final taskLabels = Labels.toStringMap(task.config.getResourceLabels())
final delta = Labels.delta(taskLabels, executor.runResourceLabels)
if( delta )
schedTask.labels(delta)
log.debug "[SEQERA] Enqueueing task for batch submission: ${schedTask}"
// Enqueue for batch submission - status will be set by setBatchTaskId callback
executor.getBatchSubmitter().submit(this, schedTask)
Expand Down
Loading
Loading