diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 720a79a6e0..07792c8787 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -47,6 +47,7 @@ import nextflow.container.ShifterConfig import nextflow.container.SingularityConfig import nextflow.dag.DAG import nextflow.exception.AbortOperationException +import nextflow.exception.AbortRunException import nextflow.exception.AbortSignalException import nextflow.exception.IllegalConfigException import nextflow.exception.MissingLibraryException @@ -1103,6 +1104,11 @@ class Session implements ISession { try { action.accept(observer) } + catch (AbortRunException e) { + // AbortRunException are forwarded to produce an error in the execution + log.error("Abort exception produced when notifying an event - $e.message") + throw e + } catch ( Throwable e ) { log.debug(e.getMessage(), e) } diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy index fc97b5d50f..64d76fb929 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy @@ -32,7 +32,7 @@ import io.seqera.tower.plugin.exception.UnauthorizedException import io.seqera.util.trace.TraceUtils import nextflow.BuildInfo import nextflow.SysEnv -import nextflow.exception.AbortOperationException +import nextflow.exception.AbortRunException import nextflow.util.Duration import nextflow.util.TestOnly /** @@ -122,7 +122,7 @@ class TowerClient { } Map traceBegin(Map req, String workspaceId, String workflowId){ - return sendAndProcessRequest( getUrlTraceBegin(workspaceId, workflowId), req, 'POST') + return sendAndProcessRequest( getUrlTraceBegin(workspaceId, workflowId), req, 'PUT') } void traceComplete(Map req, String workspaceId, String workflowId) { @@ -140,19 +140,27 @@ class TowerClient { void traceProgress(Map req, String workspaceId, String workflowId) { final url = getUrlTraceProgress( workspaceId, workflowId ) final resp = sendHttpMessage(url, req, 'PUT') - logHttpResponse(url, resp) + if( resp.error ) { + final message = """\ + Unexpected HTTP response + - endpoint : $url + - status code : $resp.code + - response msg: $resp.message + """.stripIndent(true) + throw new AbortRunException(message) + } } protected Map sendAndProcessRequest(String url, Map req, String method){ final resp = sendHttpMessage(url, req, method) if( resp.error ) { - log.debug """\ + final message = """\ Unexpected HTTP response - endpoint : $url - status code : $resp.code - - response msg: $resp.cause + - response msg: $resp.message """.stripIndent(true) - throw new AbortOperationException(resp.message) + throw new AbortRunException(message) } return parseTowerResponse(resp) } @@ -241,7 +249,7 @@ class TowerClient { String getAccessToken() { if( !accessToken ) - throw new AbortOperationException("Missing Seqera Platform access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment") + throw new AbortRunException("Missing Seqera Platform access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment") return accessToken } @@ -377,7 +385,7 @@ class TowerClient { """.stripIndent(true) // append separately otherwise formatting get broken msg += "- error cause : ${cause ?: '-'}" - throw new Exception(msg) + throw new AbortRunException(msg) } protected String parseCause(String cause) { diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerObserver.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerObserver.groovy index 93d9f40cfd..93a1e9c7b2 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerObserver.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerObserver.groovy @@ -21,7 +21,7 @@ import groovy.transform.ToString import groovy.util.logging.Slf4j import nextflow.Session import nextflow.container.resolver.ContainerMeta -import nextflow.exception.AbortOperationException +import nextflow.exception.AbortRunException import nextflow.processor.TaskHandler import nextflow.processor.TaskId import nextflow.processor.TaskProcessor @@ -156,7 +156,7 @@ class TowerObserver implements TraceObserverV2 { final ret = client.traceCreate(makeCreateReq(session), workspaceId) this.workflowId = ret.workflowId if( !workflowId ) - throw new AbortOperationException("Invalid Seqera Platform API response - Missing workflow Id") + throw new AbortRunException("Invalid Seqera Platform API response - Missing workflow Id") log.debug "Platform workflow id: $workflowId; workflow url: ${ret.watchUrl}" session.workflowMetadata.platform.workflowId = workflowId // note: `watchUrl` in the create response requires Platform 26.01 or later @@ -530,46 +530,53 @@ class TowerObserver implements TraceObserverV2 { protected void sendTasks0(dummy) { - final tasks = new HashMap(TASKS_PER_REQUEST) - boolean complete = false - long previous = System.currentTimeMillis() - final long period = requestInterval.millis - final long delay = period / 10 as long - - while( !complete ) { - final ProcessEvent ev = events.poll(delay, TimeUnit.MILLISECONDS) - // reconcile task events ie. send out only the last event - if( ev ) { - log.trace "Tower event=$ev" - if( ev.trace ) - tasks[ev.trace.taskId] = ev.trace - if( ev.completed ) - complete = true - } - - // check if there's something to send - final now = System.currentTimeMillis() - final delta = now -previous + try { + final tasks = new HashMap(TASKS_PER_REQUEST) + boolean complete = false + long previous = System.currentTimeMillis() + final long period = requestInterval.millis + final long delay = period / 10 as long + + while( !complete ) { + final ProcessEvent ev = events.poll(delay, TimeUnit.MILLISECONDS) + // reconcile task events ie. send out only the last event + if( ev ) { + log.trace "Tower event=$ev" + if( ev.trace ) + tasks[ev.trace.taskId] = ev.trace + if( ev.completed ) + complete = true + } - if( !tasks ) { - if( delta > aliveInterval.millis ) { - final req = makeHeartbeatReq() - client.traceHeartbeat(req, workspaceId, workflowId) - previous = now + // check if there's something to send + final now = System.currentTimeMillis() + final delta = now - previous + + if( !tasks ) { + if( delta > aliveInterval.millis ) { + final req = makeHeartbeatReq() + client.traceHeartbeat(req, workspaceId, workflowId) + previous = now + } + continue } - continue - } - if( delta > period || tasks.size() >= TASKS_PER_REQUEST || complete ) { - // send - final req = makeTasksReq(tasks.values()) - client.traceProgress(req, workspaceId, workflowId) + if( delta > period || tasks.size() >= TASKS_PER_REQUEST || complete ) { + // send + final req = makeTasksReq(tasks.values()) + client.traceProgress(req, workspaceId, workflowId) - // clean up for next iteration - previous = now - tasks.clear() + // clean up for next iteration + previous = now + tasks.clear() + } } } + catch( Exception e ) { + this.sender = null + log.error("Aborting session due to Seqera Platform telemetry exception - $e.message", e) + session.abort(e) + } } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy index 8c3d13f84e..9d8fcc4a74 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy @@ -22,7 +22,7 @@ import java.time.Instant import com.github.tomakehurst.wiremock.WireMockServer import com.github.tomakehurst.wiremock.client.WireMock import io.seqera.http.HxClient -import nextflow.exception.AbortOperationException +import nextflow.exception.AbortRunException import nextflow.util.Duration import spock.lang.Specification /** @@ -113,7 +113,7 @@ class TowerClientTest extends Specification { def c = new TowerClient() c.getAccessToken() then: - thrown(AbortOperationException) + thrown(AbortRunException) } def 'should set the auth token' () { @@ -276,4 +276,35 @@ class TowerClientTest extends Specification { url.contains('name=test+workflow') } + def 'should send AbortRunException in selected client calls'() { + given: + def client = Spy(new TowerClient(new TowerConfig([:], [TOWER_ACCESS_TOKEN: 'token']))){ + sendHttpMessage(_,_,_) >> new TowerClient.Response(401) + } + + when: + client.traceCreate([:], '1234') + then: + thrown(AbortRunException) + + when: + client.traceBegin([:], '1234', '5678') + then: + thrown(AbortRunException) + + when: + client.traceProgress([:], '1234', '5678') + then: + thrown(AbortRunException) + + when: + client.traceComplete([:], '1234', '5678') + then: + notThrown(AbortRunException) + + when: + client.traceHeartbeat([:], '1234', '5678') + then: + notThrown(AbortRunException) + } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy index 9887cb2553..7404afae2f 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy @@ -16,8 +16,6 @@ package io.seqera.tower.plugin -import nextflow.exception.AbortOperationException - import static com.github.tomakehurst.wiremock.client.WireMock.* import java.time.Instant @@ -31,6 +29,7 @@ import io.seqera.tower.plugin.exception.UnauthorizedException import nextflow.Global import nextflow.Session import nextflow.SysEnv +import nextflow.exception.AbortRunException import nextflow.script.WorkflowMetadata import nextflow.serde.gson.InstantAdapter import spock.lang.Shared @@ -258,7 +257,7 @@ class TowerFusionEnvTest extends Specification { def provider = new TowerFusionToken() then: 'the access token has the expected value' - def e = thrown(AbortOperationException) + def e = thrown(AbortRunException) e.message.contains("Missing Seqera Platform access token") cleanup: diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerObserverTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerObserverTest.groovy index 87f5c0f4af..a313d6c6cd 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerObserverTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerObserverTest.groovy @@ -16,31 +16,24 @@ package io.seqera.tower.plugin -import nextflow.script.PlatformMetadata - -import java.net.http.HttpResponse import java.nio.file.Files import java.time.Instant import java.time.OffsetDateTime import java.time.ZoneId -import com.github.tomakehurst.wiremock.WireMockServer -import com.github.tomakehurst.wiremock.client.WireMock -import io.seqera.http.HxClient import nextflow.Session import nextflow.SysEnv import nextflow.cloud.types.CloudMachineInfo import nextflow.cloud.types.PriceModel import nextflow.container.DockerConfig import nextflow.container.resolver.ContainerMeta -import nextflow.exception.AbortOperationException +import nextflow.exception.AbortRunException import nextflow.script.PlatformMetadata import nextflow.script.ScriptBinding import nextflow.script.WorkflowMetadata import nextflow.trace.TraceRecord import nextflow.trace.WorkflowStats import nextflow.trace.WorkflowStatsObserver -import nextflow.util.Duration import nextflow.util.ProcessHelper import spock.lang.Specification /** @@ -577,5 +570,22 @@ class TowerObserverTest extends Specification { req.tasks[0].gpuMetrics.peak == 100 } + def 'should throw AbortRunException if workflow id is not found'() { + given: + def session = Mock(Session){ + getUniqueId() >> UUID.randomUUID() + getWorkflowMetadata() >> Mock(WorkflowMetadata) + } + def client = Mock(TowerClient){ + traceCreate(_,_) >> [:] + } + def observer = new TowerObserver(session, client, null, [:]) + + when: + observer.onFlowCreate(session) + then: + thrown(AbortRunException) + } + }