Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import nextflow.container.ShifterConfig
import nextflow.container.SingularityConfig
import nextflow.dag.DAG
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortRunException
import nextflow.exception.AbortSignalException
import nextflow.exception.IllegalConfigException
import nextflow.exception.MissingLibraryException
Expand Down Expand Up @@ -1109,6 +1110,11 @@ class Session implements ISession {
try {
action.accept(observer)
}
catch (AbortRunException e) {
// AbortRunException are forwarded to produce an error in the execution
log.error("Abort exception produced when notifying an event - $e.message")
throw e
}
catch ( Throwable e ) {
log.debug(e.getMessage(), e)
}
Expand Down
121 changes: 82 additions & 39 deletions plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import nextflow.BuildInfo
import nextflow.Session
import nextflow.container.resolver.ContainerMeta
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortRunException
import nextflow.processor.TaskHandler
import nextflow.processor.TaskId
import nextflow.processor.TaskProcessor
Expand Down Expand Up @@ -142,6 +143,8 @@ class TowerClient implements TraceObserverV2 {

private Map<String,Boolean> allContainers = new ConcurrentHashMap<>()

private boolean abortOnError = true

TowerClient(Session session, TowerConfig config) {
this.session = session
this.endpoint = checkUrl(config.endpoint)
Expand All @@ -151,13 +154,25 @@ class TowerClient implements TraceObserverV2 {
this.schema = loadSchema()
this.generator = TowerJsonGenerator.create(schema)
this.reports = new TowerReports(session)
setAbortOnError(this.env)
}

TowerClient withEnvironment(Map env) {
this.env = env
setAbortOnError(env)
return this
}

protected void setAbortOnError(Map env){
if( !env ) {
return
}
final abort = env.get("TOWER_ABORT_ON_ERROR") as String
if( abort ){
this.abortOnError = Boolean.parseBoolean(abort)
}
}

@TestOnly
protected TowerClient() {
this.generator = TowerJsonGenerator.create(Collections.EMPTY_MAP)
Expand Down Expand Up @@ -268,7 +283,7 @@ class TowerClient implements TraceObserverV2 {
*/
@Override
void onFlowCreate(Session session) {
log.debug "Creating Seqera Platform observer -- endpoint=$endpoint; requestInterval=$requestInterval; aliveInterval=$aliveInterval; maxRetries=$maxRetries; backOffBase=$backOffBase; backOffDelay=$backOffDelay"
log.debug "Creating Seqera Platform observer -- endpoint=$endpoint; requestInterval=$requestInterval; aliveInterval=$aliveInterval; maxRetries=$maxRetries; backOffBase=$backOffBase; backOffDelay=$backOffDelay; abortOnError=$abortOnError"

this.session = session
this.aggregator = new ResourcesAggregator(session)
Expand All @@ -286,12 +301,19 @@ class TowerClient implements TraceObserverV2 {
- status code : $resp.code
- response msg: $resp.cause
""".stripIndent(true)
if( abortOnError ) {
throw new AbortRunException(resp.message)
}
Comment thread
bentsherman marked this conversation as resolved.
Outdated
throw new AbortOperationException(resp.message)
}
final ret = parseTowerResponse(resp)
this.workflowId = ret.workflowId
if( !workflowId )
if( !workflowId ) {
if( abortOnError ) {
throw new AbortRunException("Invalid Seqera Platform API response - Missing workflow Id")
}
Comment thread
bentsherman marked this conversation as resolved.
Outdated
throw new AbortOperationException("Invalid Seqera Platform API response - Missing workflow Id")
}
if( ret.message )
log.warn(ret.message.toString())

Expand Down Expand Up @@ -378,7 +400,11 @@ class TowerClient implements TraceObserverV2 {
- status code : $resp.code
- response msg: $resp.cause
""".stripIndent(true)
throw new AbortOperationException(resp.message)
if( abortOnError ) {
throw new AbortRunException(resp.message)
} else {
throw new AbortOperationException(resp.message)
}
}

final payload = parseTowerResponse(resp)
Expand Down Expand Up @@ -727,6 +753,9 @@ class TowerClient implements TraceObserverV2 {
""".stripIndent(true)
// append separately otherwise formatting get broken
msg += "- error cause : ${cause ?: '-'}"
if( abortOnError ) {
throw new AbortRunException(msg)
}
log.warn(msg)
}
}
Expand All @@ -746,7 +775,11 @@ class TowerClient implements TraceObserverV2 {
""".stripIndent(true)
// append separately otherwise formatting get broken
msg += "- error cause : ${cause ?: '-'}"
throw new Exception(msg)
if( abortOnError ) {
throw new AbortRunException(msg)
} else {
throw new Exception(msg)
}
}

protected String parseCause(String cause) {
Expand Down Expand Up @@ -787,46 +820,56 @@ class TowerClient implements TraceObserverV2 {
}

protected void sendTasks0(dummy) {
final tasks = new HashMap<TaskId, TraceRecord>(TASKS_PER_REQUEST)
boolean complete = false
long previous = System.currentTimeMillis()
final long period = requestInterval.millis
final long delay = period / 10 as long

while( !complete ) {
final ProcessEvent ev = events.poll(delay, TimeUnit.MILLISECONDS)
// reconcile task events ie. send out only the last event
if( ev ) {
log.trace "Tower event=$ev"
if( ev.trace )
tasks[ev.trace.taskId] = ev.trace
if( ev.completed )
complete = true
}
try {
final tasks = new HashMap<TaskId, TraceRecord>(TASKS_PER_REQUEST)
boolean complete = false
long previous = System.currentTimeMillis()
final long period = requestInterval.millis
final long delay = period / 10 as long

while( !complete ) {
final ProcessEvent ev = events.poll(delay, TimeUnit.MILLISECONDS)
// reconcile task events ie. send out only the last event
if( ev ) {
log.trace "Tower event=$ev"
if( ev.trace )
tasks[ev.trace.taskId] = ev.trace
if( ev.completed )
complete = true
}

// check if there's something to send
final now = System.currentTimeMillis()
final delta = now - previous

if( !tasks ) {
if( delta > aliveInterval.millis ) {
final req = makeHeartbeatReq()
final resp = sendHttpMessage(urlTraceHeartbeat, req, 'PUT')
logHttpResponse(urlTraceHeartbeat, resp)
previous = now
}
continue
}

// check if there's something to send
final now = System.currentTimeMillis()
final delta = now -previous
if( delta > period || tasks.size() >= TASKS_PER_REQUEST || complete ) {
// send
final req = makeTasksReq(tasks.values())
final resp = sendHttpMessage(urlTraceProgress, req, 'PUT')
logHttpResponse(urlTraceProgress, resp)

if( !tasks ) {
if( delta > aliveInterval.millis ) {
final req = makeHeartbeatReq()
final resp = sendHttpMessage(urlTraceHeartbeat, req, 'PUT')
logHttpResponse(urlTraceHeartbeat, resp)
// clean up for next iteration
previous = now
tasks.clear()
}
continue
}

if( delta > period || tasks.size() >= TASKS_PER_REQUEST || complete ) {
// send
final req = makeTasksReq(tasks.values())
final resp = sendHttpMessage(urlTraceProgress, req, 'PUT')
logHttpResponse(urlTraceProgress, resp)

// clean up for next iteration
previous = now
tasks.clear()
} catch( Exception e ) {
this.sender = null
if( abortOnError ) {
log.error("Aborting session due to Seqera Platform telemetry exception - $e.message", e)
session.abort(e)
} else {
log.warn("Exception in Seqera Platform telemetry - $e.message")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import nextflow.cloud.types.PriceModel
import nextflow.container.DockerConfig
import nextflow.container.resolver.ContainerMeta
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortRunException
import nextflow.script.ScriptBinding
import nextflow.script.WorkflowMetadata
import nextflow.trace.TraceRecord
Expand Down Expand Up @@ -603,4 +604,51 @@ class TowerClientTest extends Specification {
req.tasks[0].acceleratorType == 'v100'
}

def 'should detect abort on error'(){
given:
def client = new TowerClient().withEnvironment(ENV)

expect:
client.abortOnError == ABORT_ON_ERROR

where:
ENV | ABORT_ON_ERROR
[:] | true
['TOWER_ABORT_ON_ERROR': 'true'] | true
['TOWER_ABORT_ON_ERROR': 'false'] | false
['TOWER_ABORT_ON_ERROR': ''] | true
}

def 'should throw abort session when abort on error is true'(){
given:
def session = Mock(Session)
def client = Spy(new TowerClient(session, new TowerConfig([:], [:])).withEnvironment(['TOWER_ABORT_ON_ERROR': 'true'])){
newHttpClient() >> Mock(HxClient)
makeCreateReq(_) >> [:]
makeBeginReq(_) >> [:]
makeCompleteReq(_) >> [:]
sendHttpMessage(_,_,_) >> new TowerClient.Response(401)
}

when:
client.logHttpResponse("endpoint", new TowerClient.Response(401))
then:
thrown(AbortRunException)

when:
client.parseTowerResponse( new TowerClient.Response(401))
then:
thrown(AbortRunException)

when:
client.onFlowCreate(session)
then:
thrown(AbortRunException)

when:
client.onFlowBegin()
then:
thrown(AbortRunException)

}
}
Loading