Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import nextflow.container.ShifterConfig
import nextflow.container.SingularityConfig
import nextflow.dag.DAG
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortRunException
import nextflow.exception.AbortSignalException
import nextflow.exception.IllegalConfigException
import nextflow.exception.MissingLibraryException
Expand Down Expand Up @@ -1103,6 +1104,11 @@ class Session implements ISession {
try {
action.accept(observer)
}
catch (AbortRunException e) {
// AbortRunException are forwarded to produce an error in the execution
log.error("Abort exception produced when notifying an event - $e.message")
throw e
}
catch ( Throwable e ) {
log.debug(e.getMessage(), e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import io.seqera.tower.plugin.exception.UnauthorizedException
import io.seqera.util.trace.TraceUtils
import nextflow.BuildInfo
import nextflow.SysEnv
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortRunException
import nextflow.util.Duration
import nextflow.util.TestOnly
/**
Expand Down Expand Up @@ -122,7 +122,7 @@ class TowerClient {
}

Map traceBegin(Map req, String workspaceId, String workflowId){
return sendAndProcessRequest( getUrlTraceBegin(workspaceId, workflowId), req, 'POST')
return sendAndProcessRequest( getUrlTraceBegin(workspaceId, workflowId), req, 'PUT')
}

void traceComplete(Map req, String workspaceId, String workflowId) {
Expand All @@ -140,19 +140,27 @@ class TowerClient {
void traceProgress(Map req, String workspaceId, String workflowId) {
final url = getUrlTraceProgress( workspaceId, workflowId )
final resp = sendHttpMessage(url, req, 'PUT')
logHttpResponse(url, resp)
if( resp.error ) {
final message = """\
Unexpected HTTP response
- endpoint : $url
- status code : $resp.code
- response msg: $resp.message
""".stripIndent(true)
throw new AbortRunException(message)
}
}

protected Map sendAndProcessRequest(String url, Map req, String method){
final resp = sendHttpMessage(url, req, method)
if( resp.error ) {
log.debug """\
final message = """\
Unexpected HTTP response
- endpoint : $url
- status code : $resp.code
- response msg: $resp.cause
- response msg: $resp.message
""".stripIndent(true)
throw new AbortOperationException(resp.message)
throw new AbortRunException(message)
}
return parseTowerResponse(resp)
}
Expand Down Expand Up @@ -241,7 +249,7 @@ class TowerClient {

String getAccessToken() {
if( !accessToken )
throw new AbortOperationException("Missing Seqera Platform access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment")
throw new AbortRunException("Missing Seqera Platform access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment")
return accessToken
}

Expand Down Expand Up @@ -377,7 +385,7 @@ class TowerClient {
""".stripIndent(true)
// append separately otherwise formatting get broken
msg += "- error cause : ${cause ?: '-'}"
throw new Exception(msg)
throw new AbortRunException(msg)
}

protected String parseCause(String cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import groovy.transform.ToString
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.container.resolver.ContainerMeta
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortRunException
import nextflow.processor.TaskHandler
import nextflow.processor.TaskId
import nextflow.processor.TaskProcessor
Expand Down Expand Up @@ -156,7 +156,7 @@ class TowerObserver implements TraceObserverV2 {
final ret = client.traceCreate(makeCreateReq(session), workspaceId)
this.workflowId = ret.workflowId
if( !workflowId )
throw new AbortOperationException("Invalid Seqera Platform API response - Missing workflow Id")
throw new AbortRunException("Invalid Seqera Platform API response - Missing workflow Id")
log.debug "Platform workflow id: $workflowId; workflow url: ${ret.watchUrl}"
session.workflowMetadata.platform.workflowId = workflowId
// note: `watchUrl` in the create response requires Platform 26.01 or later
Expand Down Expand Up @@ -530,46 +530,53 @@ class TowerObserver implements TraceObserverV2 {


protected void sendTasks0(dummy) {
final tasks = new HashMap<TaskId, TraceRecord>(TASKS_PER_REQUEST)
boolean complete = false
long previous = System.currentTimeMillis()
final long period = requestInterval.millis
final long delay = period / 10 as long

while( !complete ) {
final ProcessEvent ev = events.poll(delay, TimeUnit.MILLISECONDS)
// reconcile task events ie. send out only the last event
if( ev ) {
log.trace "Tower event=$ev"
if( ev.trace )
tasks[ev.trace.taskId] = ev.trace
if( ev.completed )
complete = true
}

// check if there's something to send
final now = System.currentTimeMillis()
final delta = now -previous
try {
final tasks = new HashMap<TaskId, TraceRecord>(TASKS_PER_REQUEST)
boolean complete = false
long previous = System.currentTimeMillis()
final long period = requestInterval.millis
final long delay = period / 10 as long

while( !complete ) {
final ProcessEvent ev = events.poll(delay, TimeUnit.MILLISECONDS)
// reconcile task events ie. send out only the last event
if( ev ) {
log.trace "Tower event=$ev"
if( ev.trace )
tasks[ev.trace.taskId] = ev.trace
if( ev.completed )
complete = true
}

if( !tasks ) {
if( delta > aliveInterval.millis ) {
final req = makeHeartbeatReq()
client.traceHeartbeat(req, workspaceId, workflowId)
previous = now
// check if there's something to send
final now = System.currentTimeMillis()
final delta = now - previous

if( !tasks ) {
if( delta > aliveInterval.millis ) {
final req = makeHeartbeatReq()
client.traceHeartbeat(req, workspaceId, workflowId)
previous = now
}
continue
}
continue
}

if( delta > period || tasks.size() >= TASKS_PER_REQUEST || complete ) {
// send
final req = makeTasksReq(tasks.values())
client.traceProgress(req, workspaceId, workflowId)
if( delta > period || tasks.size() >= TASKS_PER_REQUEST || complete ) {
// send
final req = makeTasksReq(tasks.values())
client.traceProgress(req, workspaceId, workflowId)

// clean up for next iteration
previous = now
tasks.clear()
// clean up for next iteration
previous = now
tasks.clear()
}
}
}
catch( Exception e ) {
this.sender = null
log.error("Aborting session due to Seqera Platform telemetry exception - $e.message", e)
session.abort(e)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.time.Instant
import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock
import io.seqera.http.HxClient
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortRunException
import nextflow.util.Duration
import spock.lang.Specification
/**
Expand Down Expand Up @@ -113,7 +113,7 @@ class TowerClientTest extends Specification {
def c = new TowerClient()
c.getAccessToken()
then:
thrown(AbortOperationException)
thrown(AbortRunException)
}

def 'should set the auth token' () {
Expand Down Expand Up @@ -276,4 +276,35 @@ class TowerClientTest extends Specification {
url.contains('name=test+workflow')
}

def 'should send AbortRunException in selected client calls'() {
given:
def client = Spy(new TowerClient(new TowerConfig([:], [TOWER_ACCESS_TOKEN: 'token']))){
sendHttpMessage(_,_,_) >> new TowerClient.Response(401)
}

when:
client.traceCreate([:], '1234')
then:
thrown(AbortRunException)

when:
client.traceBegin([:], '1234', '5678')
then:
thrown(AbortRunException)

when:
client.traceProgress([:], '1234', '5678')
then:
thrown(AbortRunException)

when:
client.traceComplete([:], '1234', '5678')
then:
notThrown(AbortRunException)

when:
client.traceHeartbeat([:], '1234', '5678')
then:
notThrown(AbortRunException)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.seqera.tower.plugin

import nextflow.exception.AbortOperationException

import static com.github.tomakehurst.wiremock.client.WireMock.*

import java.time.Instant
Expand All @@ -31,6 +29,7 @@ import io.seqera.tower.plugin.exception.UnauthorizedException
import nextflow.Global
import nextflow.Session
import nextflow.SysEnv
import nextflow.exception.AbortRunException
import nextflow.script.WorkflowMetadata
import nextflow.serde.gson.InstantAdapter
import spock.lang.Shared
Expand Down Expand Up @@ -258,7 +257,7 @@ class TowerFusionEnvTest extends Specification {
def provider = new TowerFusionToken()

then: 'the access token has the expected value'
def e = thrown(AbortOperationException)
def e = thrown(AbortRunException)
e.message.contains("Missing Seqera Platform access token")

cleanup:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,24 @@

package io.seqera.tower.plugin

import nextflow.script.PlatformMetadata

import java.net.http.HttpResponse
import java.nio.file.Files
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneId

import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock
import io.seqera.http.HxClient
import nextflow.Session
import nextflow.SysEnv
import nextflow.cloud.types.CloudMachineInfo
import nextflow.cloud.types.PriceModel
import nextflow.container.DockerConfig
import nextflow.container.resolver.ContainerMeta
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortRunException
import nextflow.script.PlatformMetadata
import nextflow.script.ScriptBinding
import nextflow.script.WorkflowMetadata
import nextflow.trace.TraceRecord
import nextflow.trace.WorkflowStats
import nextflow.trace.WorkflowStatsObserver
import nextflow.util.Duration
import nextflow.util.ProcessHelper
import spock.lang.Specification
/**
Expand Down Expand Up @@ -577,5 +570,22 @@ class TowerObserverTest extends Specification {
req.tasks[0].gpuMetrics.peak == 100
}

def 'should throw AbortRunException if workflow id is not found'() {
given:
def session = Mock(Session){
getUniqueId() >> UUID.randomUUID()
getWorkflowMetadata() >> Mock(WorkflowMetadata)
}
def client = Mock(TowerClient){
traceCreate(_,_) >> [:]
}
def observer = new TowerObserver(session, client, null, [:])

when:
observer.onFlowCreate(session)
then:
thrown(AbortRunException)
}


}
Loading