Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1418,9 +1418,6 @@ The following settings are available:
`seqera.executor.autoLabels`
: When `true`, automatically adds workflow metadata labels to the session with the `nextflow.io/` prefix (default: `false`). The following labels are added: `projectName`, `userName`, `runName`, `sessionId`, `resume`, `revision`, `commitId`, `repository`, `manifestName`, `runtimeVersion`. A `seqera.io/runId` label is also added, computed as a SipHash of the session ID and run name.

`seqera.executor.labels`
: Custom labels to apply to AWS resources for cost tracking and resource organization. Labels are propagated to ECS tasks, capacity providers, and EC2 instances. When used together with `autoLabels`, user-defined labels take precedence over auto-generated labels.

`seqera.executor.machineRequirement.arch`
: The CPU architecture for task execution, e.g. `'x86_64'` or `'arm64'`.

Expand Down
1 change: 1 addition & 0 deletions docs/reference/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,7 @@ Resource labels are currently supported by the following executors:
- {ref}`azurebatch-executor`
- {ref}`google-batch-executor`
- {ref}`k8s-executor`
- {ref}`seqera-executor`

:::{note}
The limits and the syntax of the corresponding executor should be taken into consideration when using resource labels.
Expand Down
2 changes: 1 addition & 1 deletion plugins/nf-seqera/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ dependencies {
compileOnly project(':nextflow')
compileOnly 'org.slf4j:slf4j-api:2.0.17'
compileOnly 'org.pf4j:pf4j:3.14.1'
api 'io.seqera:sched-client:0.49.0'
api 'io.seqera:sched-client:0.52.0'

testImplementation(testFixtures(project(":nextflow")))
testImplementation "org.apache.groovy:groovy:4.0.31"
Expand Down
13 changes: 0 additions & 13 deletions plugins/nf-seqera/src/main/io/seqera/config/ExecutorOpts.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ class ExecutorOpts implements ConfigScope {
""")
final MachineRequirementOpts machineRequirement

@ConfigOption
@Description("""
Custom labels to apply to AWS resources for cost tracking and resource organization.
Labels are propagated to ECS tasks, capacity providers, and EC2 instances.
""")
final Map<String, String> labels

@ConfigOption
@Description("""
When `true`, automatically adds workflow metadata labels (e.g. project name,
Expand Down Expand Up @@ -126,8 +119,6 @@ class ExecutorOpts implements ConfigScope {
: Duration.of('1 sec')
// machine requirement settings
this.machineRequirement = new MachineRequirementOpts(opts.machineRequirement as Map ?: Map.of())
// labels for cost tracking
this.labels = opts.labels as Map<String, String>
this.autoLabels = opts.autoLabels as boolean ?: false
// prediction model
this.predictionModel = opts.predictionModel as String ?: null
Expand Down Expand Up @@ -165,10 +156,6 @@ class ExecutorOpts implements ConfigScope {
return machineRequirement
}

Map<String, String> getLabels() {
return labels
}

boolean getAutoLabels() {
return autoLabels
}
Expand Down
48 changes: 44 additions & 4 deletions plugins/nf-seqera/src/main/io/seqera/executor/Labels.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ class Labels {
}

/**
* Add user-configured labels. These take precedence over implicit labels.
* Add config-level {@code process.resourceLabels}. Values are coerced to
* string via {@link String#valueOf} to satisfy the scheduler API typing.
*/
Labels withUserLabels(Map<String,String> labels) {
if( labels )
entries.putAll(labels)
Labels withProcessResourceLabels(Map<String,?> map) {
if( !map ) return this
for( Map.Entry<String,?> entry : map.entrySet() )
entries.put(entry.key.toString(), String.valueOf(entry.value))
return this
}

Expand All @@ -106,4 +108,42 @@ class Labels {
.hash()
.toString()
}

/**
* Coerce arbitrary map values to strings via {@link String#valueOf}.
* Returns an empty map for null/empty input. Throws
* {@link IllegalArgumentException} when the value is not a {@link Map},
* to surface a clear error when {@code process.resourceLabels} is
* misconfigured (e.g. as a list).
*/
static Map<String,String> toStringMap(Object value) {
if( value == null )
return Collections.<String,String>emptyMap()
if( value !instanceof Map )
throw new IllegalArgumentException("Invalid value for 'resourceLabels' directive - expected a map of key/value pairs, got '${value.getClass().getName()}'")
final map = (Map<?,?>) value
if( map.isEmpty() )
return Collections.<String,String>emptyMap()
final result = new LinkedHashMap<String,String>(map.size())
for( Map.Entry<?,?> entry : map.entrySet() )
result.put(entry.key.toString(), String.valueOf(entry.value))
return result
}

/**
* Return the entries of {@code task} that are missing from {@code run}
* or have a different value. Returns {@code null} if the resulting
* map would be empty (so callers can omit the field).
*/
static Map<String,String> delta(Map<String,String> task, Map<String,String> run) {
if( !task ) return null
final result = new LinkedHashMap<String,String>()
for( Map.Entry<String,String> entry : task.entrySet() ) {
final k = entry.key
final v = entry.value
if( run == null || !run.containsKey(k) || run.get(k) != v )
result.put(k, v)
}
return result.isEmpty() ? null : result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.seqera.executor

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import io.seqera.config.SeqeraConfig
import io.seqera.config.ExecutorOpts
Expand Down Expand Up @@ -64,6 +65,8 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {

private volatile String runId

private volatile Map<String,String> runResourceLabels = Collections.<String,String>emptyMap()

private SeqeraBatchSubmitter batchSubmitter

@Override
Expand Down Expand Up @@ -114,10 +117,11 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {
final workspaceId = PlatformHelper.getWorkspaceId(towerConfig, SysEnv.get()) as Long
final computeEnvId = PlatformHelper.getComputeEnvId(towerConfig, SysEnv.get()) ?: seqeraConfig.computeEnvId

computeRunResourceLabels()
final labels = new Labels()
if( seqeraConfig.autoLabels )
labels.withWorkflowMetadata(session.workflowMetadata)
labels.withUserLabels(seqeraConfig.labels)
labels.withProcessResourceLabels(runResourceLabels)
final predictionModel = seqeraConfig.predictionModel ? PredictionModel.fromValue(seqeraConfig.predictionModel) : null
final pipeline = new PipelineSpec()
.workflowId(workflowId)
Expand Down Expand Up @@ -203,6 +207,16 @@ class SeqeraExecutor extends Executor implements ExtensionPoint {
return runId
}

Map<String,String> getRunResourceLabels() {
return Collections.unmodifiableMap(runResourceLabels)
}

@PackageScope
void computeRunResourceLabels() {
final processMap = session.config.process as Map
this.runResourceLabels = Labels.toStringMap(processMap?.get('resourceLabels'))
}

SeqeraBatchSubmitter getBatchSubmitter() {
return batchSubmitter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import io.seqera.executor.Labels
import io.seqera.sched.api.schema.v1a1.AcceleratorType
import io.seqera.sched.api.schema.v1a1.GetTaskLogsResponse
import io.seqera.sched.api.schema.v1a1.NextflowTask
Expand Down Expand Up @@ -138,6 +139,11 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask {
.taskId(task.id?.intValue())
.hash(task.hash?.toString())
.workDir(task.getWorkDirStr()))
// attach per-task resource labels delta (over run-level baseline)
final taskLabels = Labels.toStringMap(task.config.getResourceLabels())
final delta = Labels.delta(taskLabels, executor.runResourceLabels)
if( delta )
schedTask.labels(delta)
log.debug "[SEQERA] Enqueueing task for batch submission: ${schedTask}"
// Enqueue for batch submission - status will be set by setBatchTaskId callback
executor.getBatchSubmitter().submit(this, schedTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,42 +128,6 @@ class ExecutorOptsTest extends Specification {
config.machineRequirement.provisioning == 'spot'
}

def 'should create config with labels' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
labels: [
project: 'genomics',
team: 'research',
costCenter: 'CC-1234'
]
])

then:
config.labels == [project: 'genomics', team: 'research', costCenter: 'CC-1234']
}

def 'should handle null labels' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com'
])

then:
config.labels == null
}

def 'should handle empty labels' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
labels: [:]
])

then:
config.labels == [:]
}

def 'should enable auto labels' () {
when:
def config = new ExecutorOpts([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ class SeqeraConfigTest extends Specification {
machineRequirement: [
arch: 'arm64',
provisioning: 'spot'
],
labels: [
project: 'genomics',
team: 'research'
]
]
])
Expand All @@ -76,7 +72,6 @@ class SeqeraConfigTest extends Specification {
config.executor.batchFlushInterval == Duration.of('2 sec')
config.executor.machineRequirement.arch == 'arm64'
config.executor.machineRequirement.provisioning == 'spot'
config.executor.labels == [project: 'genomics', team: 'research']
}

def 'should throw error when executor endpoint is missing' () {
Expand Down
83 changes: 64 additions & 19 deletions plugins/nf-seqera/src/test/io/seqera/executor/LabelsTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class LabelsTest extends Specification {
!labels.entries.containsKey('seqera:sched:clusterId')
}

def 'should allow user labels to override implicit labels'() {
def 'should include platform workflowId when available'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'hello'
Expand All @@ -133,23 +133,18 @@ class LabelsTest extends Specification {
getSessionId() >> UUID.randomUUID()
isResume() >> false
getManifest() >> new Manifest([:])
getPlatform() >> new PlatformMetadata('wf-abc123')
}

when:
def labels = new Labels()
.withWorkflowMetadata(workflow)
.withUserLabels([
'nextflow.io/runName': 'custom_name',
'team': 'research'
])

then:
labels.entries['nextflow.io/runName'] == 'custom_name'
labels.entries['team'] == 'research'
labels.entries['nextflow.io/projectName'] == 'hello'
labels.entries['seqera.io/platform/workflowId'] == 'wf-abc123'
}

def 'should include platform workflowId when available'() {
def 'should omit platform workflowId when not set'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'hello'
Expand All @@ -158,43 +153,93 @@ class LabelsTest extends Specification {
getSessionId() >> UUID.randomUUID()
isResume() >> false
getManifest() >> new Manifest([:])
getPlatform() >> new PlatformMetadata('wf-abc123')
getPlatform() >> new PlatformMetadata()
}

when:
def labels = new Labels()
.withWorkflowMetadata(workflow)

then:
labels.entries['seqera.io/platform/workflowId'] == 'wf-abc123'
!labels.entries.containsKey('seqera.io/platform/workflowId')
}

def 'should omit platform workflowId when not set'() {
def 'should add process resource labels coercing values to string'() {
when:
def labels = new Labels()
.withProcessResourceLabels([team: 'genomics', priority: 7, retain: true])

then:
labels.entries['team'] == 'genomics'
labels.entries['priority'] == '7'
labels.entries['retain'] == 'true'
}

def 'should ignore null or empty process resource labels'() {
when:
def a = new Labels().withProcessResourceLabels(null)
def b = new Labels().withProcessResourceLabels([:])

then:
a.entries.isEmpty()
b.entries.isEmpty()
}

def 'should let process resource labels override workflow metadata on key collision'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'hello'
getUserName() >> 'user1'
getRunName() >> 'happy_turing'
getSessionId() >> UUID.randomUUID()
isResume() >> false
getManifest() >> new Manifest([:])
getPlatform() >> new PlatformMetadata()
}

when:
def labels = new Labels()
.withWorkflowMetadata(workflow)
.withProcessResourceLabels(['nextflow.io/runName': 'custom', team: 'a'])

then:
!labels.entries.containsKey('seqera.io/platform/workflowId')
labels.entries['nextflow.io/runName'] == 'custom'
labels.entries['team'] == 'a'
labels.entries['nextflow.io/projectName'] == 'hello'
}

def 'should handle null user labels'() {
def 'should coerce map values to strings'() {
expect:
Labels.toStringMap(null) == [:]
Labels.toStringMap([:]) == [:]
Labels.toStringMap([a: 1, b: 'x', c: true]) == [a: '1', b: 'x', c: 'true']
}

def 'should reject non-map resourceLabels with a clear error'() {
when:
def labels = new Labels()
.withUserLabels(null)
Labels.toStringMap(['foo', 'bar'])

then:
labels.entries.isEmpty()
def err = thrown(IllegalArgumentException)
err.message.contains("'resourceLabels'")
err.message.contains('map of key/value pairs')
err.message.contains('java.util.ArrayList')
}

def 'should compute null delta when task labels are empty'() {
expect:
Labels.delta(null, [team: 'a']) == null
Labels.delta([:], [team: 'a']) == null
}

def 'should return full task labels when run labels are empty'() {
expect:
Labels.delta([team: 'a', region: 'us'], null) == [team: 'a', region: 'us']
Labels.delta([team: 'a', region: 'us'], [:]) == [team: 'a', region: 'us']
}

def 'should keep only differing or missing keys in delta'() {
expect:
Labels.delta([team: 'a', region: 'us'], [team: 'a']) == [region: 'us']
Labels.delta([team: 'b'], [team: 'a']) == [team: 'b']
Labels.delta([team: 'a', region: 'us'], [team: 'a', region: 'us']) == null
}
}
Loading
Loading