-
Notifications
You must be signed in to change notification settings - Fork 785
Expand file tree
/
Copy pathSeqeraExecutor.groovy
More file actions
233 lines (204 loc) · 8.38 KB
/
SeqeraExecutor.groovy
File metadata and controls
233 lines (204 loc) · 8.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import io.seqera.config.SeqeraConfig
import io.seqera.config.ExecutorOpts
import io.seqera.util.SchemaMapperUtil
import io.seqera.sched.api.schema.v1a1.CreateRunRequest
import io.seqera.sched.api.schema.v1a1.PipelineSpec
import io.seqera.sched.api.schema.v1a1.PredictionModel
import io.seqera.sched.client.SchedClient
import io.seqera.sched.api.schema.v1a1.TerminateRunRequest
import io.seqera.sched.client.SchedClientConfig
import nextflow.exception.AbortOperationException
import nextflow.executor.Executor
import nextflow.fusion.FusionHelper
import nextflow.platform.PlatformHelper
import nextflow.processor.TaskHandler
import nextflow.processor.TaskMonitor
import nextflow.processor.TaskPollingMonitor
import nextflow.processor.TaskRun
import nextflow.SysEnv
import nextflow.util.Duration
import nextflow.util.ServiceName
import org.pf4j.ExtensionPoint
/**
* Nextflow executor that delegates task execution to the Seqera scheduler API.
*
* <p>This executor creates a run on the Seqera scheduler, submits tasks in batches
* via {@link SeqeraBatchSubmitter}, and monitors their lifecycle through the scheduler API.
* It requires Fusion file system to be enabled and all processes to specify a container image.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@ServiceName(SEQERA)
@CompileStatic
class SeqeraExecutor extends Executor implements ExtensionPoint {
public static final String SEQERA = 'seqera'
private static final String DEFAULT_FUSION_VERSION = '2.6'
private ExecutorOpts seqeraConfig
private SchedClient client
private volatile String runId
private volatile Map<String,String> runResourceLabels = Collections.<String,String>emptyMap()
private SeqeraBatchSubmitter batchSubmitter
@Override
protected void register() {
applyFusionDefaults()
createClient()
}
protected void applyFusionDefaults() {
final fusionConfig = session.config.fusion as Map
if( fusionConfig!=null && !fusionConfig.containerConfigUrl ) {
fusionConfig.put('targetVersion', DEFAULT_FUSION_VERSION)
}
}
@Override
void shutdown() {
// Flush any pending batch jobs before terminating run
session.error
batchSubmitter?.shutdown()
terminateRun()
}
protected void createClient() {
final seqera = new SeqeraConfig(session.config.seqera as Map ?: Collections.<String,Object>emptyMap())
this.seqeraConfig = seqera.executor
if (!seqeraConfig)
throw new IllegalArgumentException("Missing Seqera executor configuration - make sure to specify 'seqera.executor' settings")
// Get access token and refresh token from tower config (shares authentication with Platform)
def towerConfig = session.config.tower as Map ?: Collections.emptyMap()
def accessToken = PlatformHelper.getAccessToken(towerConfig, SysEnv.get())
def refreshToken = PlatformHelper.getRefreshToken(towerConfig, SysEnv.get())
def platformUrl = PlatformHelper.getEndpoint(towerConfig, SysEnv.get())
def clientConfig = SchedClientConfig.builder()
.endpoint(seqeraConfig.endpoint)
.platformUrl(platformUrl)
.accessToken(accessToken)
.refreshToken(refreshToken)
.retryConfig(seqeraConfig.retryOpts())
.build()
this.client = new SchedClient(clientConfig)
}
protected void createRun() {
final towerConfig = session.config.tower as Map ?: Collections.emptyMap()
final workflowId = session.workflowMetadata?.platform?.workflowId
final workflowUrl = session.workflowMetadata?.platform?.workflowUrl
final workspaceId = PlatformHelper.getWorkspaceId(towerConfig, SysEnv.get()) as Long
final computeEnvId = PlatformHelper.getComputeEnvId(towerConfig, SysEnv.get()) ?: seqeraConfig.computeEnvId
computeRunResourceLabels()
final labels = new Labels()
if( seqeraConfig.autoLabels )
labels.withWorkflowMetadata(session.workflowMetadata)
labels.withProcessResourceLabels(runResourceLabels)
final predictionModel = seqeraConfig.predictionModel ? PredictionModel.fromValue(seqeraConfig.predictionModel) : null
final pipeline = new PipelineSpec()
.workflowId(workflowId)
.workflowUrl(workflowUrl)
.workDir(session.workDir?.toUriString())
final request = new CreateRunRequest()
.provider(seqeraConfig.provider)
.region(seqeraConfig.region)
.name(session.runName)
.machineRequirement(SchemaMapperUtil.toMachineRequirement(seqeraConfig.machineRequirement))
.labels(labels.entries)
.workspaceId(workspaceId)
.pipeline(pipeline)
.predictionModel(predictionModel)
.computeEnvId(computeEnvId)
log.debug "[SEQERA] Creating run: ${request}"
final response = client.createRun(request)
this.runId = response.getRunId()
log.debug "[SEQERA] Run created id: ${runId}; workflowId: '${workflowId}'; workflowUrl: '${workflowUrl}'"
// Initialize and start batch submitter with error callback to abort on fatal errors
this.batchSubmitter = new SeqeraBatchSubmitter(
client,
runId,
seqeraConfig.batchFlushInterval,
SeqeraBatchSubmitter.KEEP_ALIVE_INTERVAL,
{ Throwable t -> session.abort(t) }
)
this.batchSubmitter.start()
}
protected void terminateRun() {
if (!runId) {
return
}
final stopReason = truncate(session.fault?.report, 10_000)
log.debug "[SEQERA] Terminating run: ${runId}; stopReason: ${stopReason}"
client.terminateRun(runId, new TerminateRunRequest().stopReason(stopReason))
log.debug "[SEQERA] Run terminated"
}
@Override
protected TaskMonitor createTaskMonitor() {
TaskPollingMonitor.create(session, config, name, 1000, Duration.of('10 sec'))
}
@Override
TaskHandler createTaskHandler(TaskRun task) {
return new SeqeraTaskHandler(task, this)
}
/**
* @return {@code true} whenever the containerization is managed by the executor itself
*/
boolean isContainerNative() {
return true
}
@Override
boolean isFusionEnabled() {
final enabled = FusionHelper.isFusionEnabled(session)
if (!enabled)
throw new AbortOperationException("Seqera executor requires the use of Fusion file system")
return true
}
/**
* Lazily creates the run on first access, ensuring workflowId and labels
* are available (they are set by TowerClient.onFlowCreate before tasks are submitted).
*/
void ensureRunCreated() {
if (runId) return
synchronized (this) {
if (runId) return
createRun()
}
}
SchedClient getClient() {
return client
}
String getRunId() {
return runId
}
Map<String,String> getRunResourceLabels() {
return Collections.unmodifiableMap(runResourceLabels)
}
@PackageScope
void computeRunResourceLabels() {
final processMap = session.config.process as Map
this.runResourceLabels = Labels.toStringMap(processMap?.get('resourceLabels'))
}
SeqeraBatchSubmitter getBatchSubmitter() {
return batchSubmitter
}
ExecutorOpts getSeqeraConfig() {
return seqeraConfig
}
protected static String truncate(String value, int maxLen) {
if (!value || value.length() <= maxLen)
return value
return value.take(maxLen) + '\n.. [TRUNCATED]'
}
}