diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java index 235a0ac91b..efc985acd5 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java @@ -23,6 +23,7 @@ public enum TaskType { DYNAMIC, FORK_JOIN, FORK_JOIN_DYNAMIC, + PERMISSIVE, DECISION, SWITCH, JOIN, @@ -70,6 +71,7 @@ public enum TaskType { public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM"; public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE"; public static final String TASK_TYPE_FORK = "FORK"; + public static final String TASK_TYPE_PERMISSIVE = "PERMISSIVE"; public static final String TASK_TYPE_NOOP = "NOOP"; private static final Set BUILT_IN_TASKS = new HashSet<>(); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index c6bf486baa..52afc52c21 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -44,6 +44,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE; import static com.netflix.conductor.common.metadata.tasks.TaskType.USER_DEFINED; import static com.netflix.conductor.model.TaskModel.Status.*; @@ -207,7 +208,11 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS tasksToBeScheduled.put(retryTask.get().getReferenceTaskName(), retryTask.get()); executedTaskRefNames.remove(retryTask.get().getReferenceTaskName()); outcome.tasksToBeUpdated.add(pendingTask); - } else { + } else if (!(pendingTask.getWorkflowTask() != null + && TaskType.PERMISSIVE + .name() + .equals(pendingTask.getWorkflowTask().getType()) + && !pendingTask.getWorkflowTask().isOptional())) { pendingTask.setStatus(COMPLETED_WITH_ERRORS); } } @@ -254,6 +259,39 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS if (hasSuccessfulTerminateTask || (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) { LOGGER.debug("Marking workflow: {} as complete.", workflow); + List permissiveTasksTerminalNonSuccessful = + workflow.getTasks().stream() + .filter(t -> t.getWorkflowTask() != null) + .filter(t -> PERMISSIVE.name().equals(t.getWorkflowTask().getType())) + .filter(t -> !t.getWorkflowTask().isOptional()) + .collect( + Collectors.toMap( + TaskModel::getReferenceTaskName, + t -> t, + (t1, t2) -> + t1.getRetryCount() > t2.getRetryCount() + ? t1 + : t2)) + .values() + .stream() + .filter( + t -> + t.getStatus().isTerminal() + && !t.getStatus().isSuccessful()) + .toList(); + if (!permissiveTasksTerminalNonSuccessful.isEmpty()) { + final String errMsg = + permissiveTasksTerminalNonSuccessful.stream() + .map( + t -> + String.format( + "Task %s failed with status: %s and reason: '%s'", + t.getTaskId(), + t.getStatus(), + t.getReasonForIncompletion())) + .collect(Collectors.joining(". ")); + throw new TerminateWorkflowException(errMsg); + } outcome.isComplete = true; } @@ -437,11 +475,6 @@ public boolean checkForWorkflowCompletion(final WorkflowModel workflow) if (status == null || !status.isTerminal()) { return false; } - // if we reach here, the task has been completed. - // Was the task successful in completion? - if (!status.isSuccessful()) { - return false; - } } boolean noPendingSchedule = @@ -529,7 +562,9 @@ Optional retry( if (!task.getStatus().isRetriable() || TaskType.isBuiltIn(task.getTaskType()) || expectedRetryCount <= retryCount) { - if (workflowTask != null && workflowTask.isOptional()) { + if (workflowTask != null + && (workflowTask.isOptional() + || TaskType.PERMISSIVE.name().equals(workflowTask.getType()))) { return Optional.empty(); } WorkflowModel.Status status; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 6070741a29..37bc358bb1 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -336,6 +336,18 @@ private void retry(WorkflowModel workflow) { for (TaskModel task : workflow.getTasks()) { switch (task.getStatus()) { case FAILED: + if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString()) + || task.getTaskType() + .equalsIgnoreCase(TaskType.EXCLUSIVE_JOIN.toString())) { + @SuppressWarnings("unchecked") + List joinOn = (List) task.getInputData().get("joinOn"); + boolean joinOnFailedPermissive = isJoinOnFailedPermissive(joinOn, workflow); + if (joinOnFailedPermissive) { + task.setStatus(IN_PROGRESS); + addTaskToQueue(task); + break; + } + } case FAILED_WITH_TERMINAL_ERROR: case TIMED_OUT: retriableMap.put(task.getReferenceTaskName(), task); @@ -1814,4 +1826,14 @@ private void expediteLazyWorkflowEvaluation(String workflowId) { LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE); } + + private static boolean isJoinOnFailedPermissive(List joinOn, WorkflowModel workflow) { + return joinOn.stream() + .map(workflow::getTaskByRefName) + .anyMatch( + t -> + TaskType.PERMISSIVE.name().equals(t.getWorkflowTask().getType()) + && !t.getWorkflowTask().isOptional() + && t.getStatus().equals(FAILED)); + } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java new file mode 100644 index 0000000000..124a9b2e03 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java @@ -0,0 +1,102 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.mapper; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskType; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.utils.ParametersUtils; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.WorkflowModel; + +/** + * An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link + * TaskType#PERMISSIVE} to a {@link TaskModel} with status {@link TaskModel.Status#SCHEDULED}. + */ +@Component +public class PermissiveTaskMapper implements TaskMapper { + + public static final Logger LOGGER = LoggerFactory.getLogger(PermissiveTaskMapper.class); + private final ParametersUtils parametersUtils; + + public PermissiveTaskMapper(ParametersUtils parametersUtils) { + this.parametersUtils = parametersUtils; + } + + @Override + public String getTaskType() { + return TaskType.PERMISSIVE.name(); + } + + /** + * This method maps a {@link WorkflowTask} of type {@link TaskType#PERMISSIVE} to a {@link + * TaskModel} + * + * @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link + * WorkflowDef}, {@link WorkflowModel} and a string representation of the TaskId + * @return a List with just one exclusive task + * @throws TerminateWorkflowException In case if the task definition does not exist + */ + @Override + public List getMappedTasks(TaskMapperContext taskMapperContext) + throws TerminateWorkflowException { + + LOGGER.debug("TaskMapperContext {} in PermissiveTaskMapper", taskMapperContext); + + WorkflowTask workflowTask = taskMapperContext.getWorkflowTask(); + WorkflowModel workflowModel = taskMapperContext.getWorkflowModel(); + int retryCount = taskMapperContext.getRetryCount(); + String retriedTaskId = taskMapperContext.getRetryTaskId(); + + TaskDef taskDefinition = + Optional.ofNullable(workflowTask.getTaskDefinition()) + .orElseThrow( + () -> { + String reason = + String.format( + "Invalid task. Task %s does not have a definition", + workflowTask.getName()); + return new TerminateWorkflowException(reason); + }); + + Map input = + parametersUtils.getTaskInput( + workflowTask.getInputParameters(), + workflowModel, + taskDefinition, + taskMapperContext.getTaskId()); + TaskModel permissiveTask = taskMapperContext.createTaskModel(); + permissiveTask.setTaskType(workflowTask.getName()); + permissiveTask.setStartDelayInSeconds(workflowTask.getStartDelay()); + permissiveTask.setInputData(input); + permissiveTask.setStatus(TaskModel.Status.SCHEDULED); + permissiveTask.setRetryCount(retryCount); + permissiveTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); + permissiveTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds()); + permissiveTask.setRetriedTaskId(retriedTaskId); + permissiveTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency()); + permissiveTask.setRateLimitFrequencyInSeconds( + taskDefinition.getRateLimitFrequencyInSeconds()); + return List.of(permissiveTask); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java index e2bf0ac0bf..62a02c3c95 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java @@ -24,6 +24,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_EXCLUSIVE_JOIN; @Component(TASK_TYPE_EXCLUSIVE_JOIN) @@ -65,9 +66,20 @@ public boolean execute( } taskStatus = exclusiveTask.getStatus(); foundExlusiveJoinOnTask = taskStatus.isTerminal(); - hasFailures = !taskStatus.isSuccessful(); + hasFailures = + !taskStatus.isSuccessful() + && (!PERMISSIVE.name().equals(exclusiveTask.getWorkflowTask().getType()) + || joinOn.stream() + .map(workflow::getTaskByRefName) + .allMatch(t -> t.getStatus().isTerminal())); if (hasFailures) { - failureReason.append(exclusiveTask.getReasonForIncompletion()).append(" "); + final String failureReasons = + joinOn.stream() + .map(workflow::getTaskByRefName) + .filter(t -> !t.getStatus().isSuccessful()) + .map(TaskModel::getReasonForIncompletion) + .collect(Collectors.joining(" ")); + failureReason.append(failureReasons); } break; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index e0f7be9faa..14fdba6f13 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -23,6 +23,7 @@ import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE; import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN; @Component(TASK_TYPE_JOIN) @@ -57,9 +58,21 @@ public boolean execute( break; } TaskModel.Status taskStatus = forkedTask.getStatus(); - hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional(); + hasFailures = + !taskStatus.isSuccessful() + && !forkedTask.getWorkflowTask().isOptional() + && (!PERMISSIVE.name().equals(forkedTask.getWorkflowTask().getType()) + || joinOn.stream() + .map(workflow::getTaskByRefName) + .allMatch(t -> t.getStatus().isTerminal())); if (hasFailures) { - failureReason.append(forkedTask.getReasonForIncompletion()).append(" "); + final String failureReasons = + joinOn.stream() + .map(workflow::getTaskByRefName) + .filter(t -> !t.getStatus().isSuccessful()) + .map(TaskModel::getReasonForIncompletion) + .collect(Collectors.joining(" ")); + failureReason.append(failureReasons); } // Only add to task output if it's not empty if (!forkedTask.getOutputData().isEmpty()) { diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index dbf0277f6e..ca06c8ea86 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -440,6 +440,80 @@ public void testOptional() { outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); } + /** Similar to {@link #testOptional} */ + @Test + public void testPermissive() { + WorkflowDef def = new WorkflowDef(); + def.setName("test-permissive"); + + WorkflowTask task1 = new WorkflowTask(); + task1.setName("task0"); + task1.setType("PERMISSIVE"); + task1.setTaskReferenceName("t0"); + task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}"); + task1.setTaskDefinition(new TaskDef("task0")); + + WorkflowTask task2 = new WorkflowTask(); + task2.setName("task1"); + task2.setType("PERMISSIVE"); + task2.setTaskReferenceName("t1"); + task2.setTaskDefinition(new TaskDef("task1")); + + def.getTasks().add(task1); + def.getTasks().add(task2); + def.setSchemaVersion(2); + + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(def); + workflow.setCreateTime(System.currentTimeMillis()); + DeciderOutcome outcome = deciderService.decide(workflow); + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeScheduled.size()); + assertEquals( + task1.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + + for (int i = 0; i < 3; i++) { + String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId(); + assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId")); + + workflow.getTasks().clear(); + workflow.getTasks().addAll(outcome.tasksToBeScheduled); + workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED); + + outcome = deciderService.decide(workflow); + + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeUpdated.size()); + assertEquals(1, outcome.tasksToBeScheduled.size()); + + assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus()); + assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId()); + assertEquals( + task1.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + assertEquals(i + 1, outcome.tasksToBeScheduled.get(0).getRetryCount()); + } + + String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId(); + + workflow.getTasks().clear(); + workflow.getTasks().addAll(outcome.tasksToBeScheduled); + workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED); + + outcome = deciderService.decide(workflow); + + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeUpdated.size()); + assertEquals(1, outcome.tasksToBeScheduled.size()); + + assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus()); + assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId()); + assertEquals( + task2.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + } + @Test public void testOptionalWithDynamicFork() { WorkflowDef def = new WorkflowDef(); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java b/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java new file mode 100644 index 0000000000..97920a3f02 --- /dev/null +++ b/core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.execution.mapper; + +import java.util.HashMap; +import java.util.List; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.utils.IDGenerator; +import com.netflix.conductor.core.utils.ParametersUtils; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.model.WorkflowModel; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.*; + +public class PermissiveTaskMapperTest { + + private PermissiveTaskMapper permissiveTaskMapper; + + private IDGenerator idGenerator = new IDGenerator(); + + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() { + ParametersUtils parametersUtils = mock(ParametersUtils.class); + permissiveTaskMapper = new PermissiveTaskMapper(parametersUtils); + } + + @Test + public void getMappedTasks() { + + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("permissive_task"); + workflowTask.setTaskDefinition(new TaskDef("permissive_task")); + + String taskId = idGenerator.generate(); + String retriedTaskId = idGenerator.generate(); + + WorkflowDef workflowDef = new WorkflowDef(); + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput(new HashMap<>()) + .withRetryCount(0) + .withRetryTaskId(retriedTaskId) + .withTaskId(taskId) + .build(); + + List mappedTasks = permissiveTaskMapper.getMappedTasks(taskMapperContext); + assertNotNull(mappedTasks); + assertEquals(1, mappedTasks.size()); + } + + @Test + public void getMappedTasksException() { + + // Given + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("permissive_task"); + String taskId = idGenerator.generate(); + String retriedTaskId = idGenerator.generate(); + + WorkflowDef workflowDef = new WorkflowDef(); + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(workflowDef); + + TaskMapperContext taskMapperContext = + TaskMapperContext.newBuilder() + .withWorkflowModel(workflow) + .withTaskDefinition(new TaskDef()) + .withWorkflowTask(workflowTask) + .withTaskInput(new HashMap<>()) + .withRetryCount(0) + .withRetryTaskId(retriedTaskId) + .withTaskId(taskId) + .build(); + + // then + expectedException.expect(TerminateWorkflowException.class); + expectedException.expectMessage( + String.format( + "Invalid task. Task %s does not have a definition", + workflowTask.getName())); + + // when + permissiveTaskMapper.getMappedTasks(taskMapperContext); + } +} diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java new file mode 100644 index 0000000000..f767f8ea5b --- /dev/null +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/PermissiveTask.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.sdk.workflow.def.tasks; + +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskType; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; + +/* Workflow permissive task executed by a worker */ +public class PermissiveTask extends Task { + + private TaskDef taskDef; + + public PermissiveTask(String taskDefName, String taskReferenceName) { + super(taskReferenceName, TaskType.PERMISSIVE); + super.name(taskDefName); + } + + PermissiveTask(WorkflowTask workflowTask) { + super(workflowTask); + this.taskDef = workflowTask.getTaskDefinition(); + } + + public TaskDef getTaskDef() { + return taskDef; + } + + public PermissiveTask setTaskDef(TaskDef taskDef) { + this.taskDef = taskDef; + return this; + } + + @Override + protected void updateWorkflowTask(WorkflowTask workflowTask) { + workflowTask.setTaskDefinition(taskDef); + } +} diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index ae75b6c839..6601f1a23c 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -72,6 +72,7 @@ public static void initTaskImplementations() { TaskRegistry.register(TaskType.DYNAMIC.name(), Dynamic.class); TaskRegistry.register(TaskType.FORK_JOIN_DYNAMIC.name(), DynamicFork.class); TaskRegistry.register(TaskType.FORK_JOIN.name(), ForkJoin.class); + TaskRegistry.register(TaskType.PERMISSIVE.name(), PermissiveTask.class); TaskRegistry.register(TaskType.HTTP.name(), Http.class); TaskRegistry.register(TaskType.INLINE.name(), Javascript.class); TaskRegistry.register(TaskType.JOIN.name(), Join.class); diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index 437f4e1ccc..b6540b160c 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -16,11 +16,11 @@ import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef +import com.netflix.conductor.common.metadata.tasks.TaskType import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow -import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification import spock.lang.Shared @@ -45,18 +45,23 @@ class ForkJoinSpec extends AbstractSpecification { @Shared def FORK_JOIN_SUB_WORKFLOW = 'integration_test_fork_join_sw' + @Shared + def FORK_JOIN_PERMISSIVE_WF = 'FanInOutPermissiveTest' + @Autowired SubWorkflow subWorkflowTask def setup() { workflowTestUtil.registerWorkflows('fork_join_integration_test.json', 'fork_join_with_no_task_retry_integration_test.json', + 'fork_join_with_no_permissive_task_retry_integration_test.json', 'nested_fork_join_integration_test.json', 'simple_workflow_1_integration_test.json', 'nested_fork_join_with_sub_workflow_integration_test.json', 'simple_one_task_sub_workflow_integration_test.json', 'fork_join_with_optional_sub_workflow_forks_integration_test.json', - 'fork_join_sub_workflow.json' + 'fork_join_sub_workflow.json', + 'fork_join_permissive_integration_test.json', ) } @@ -251,6 +256,110 @@ class ForkJoinSpec extends AbstractSpecification { metadataService.updateTaskDef(persistedIntegrationTask2Definition) } + /** + * start + * | + * fork + * / \ + * p_task1 p_task2 + * | / + * \ / + * \ / + * join + * | + * s_task3 + * | + * End + */ + def "Test a simple workflow with fork join permissive failure flow"() { + setup: "Ensure that 'integration_task_1' has a retry count of 0" + def persistedIntegrationTask1Definition = workflowTestUtil.getPersistedTaskDefinition('integration_task_1').get() + def modifiedIntegrationTask1Definition = new TaskDef(persistedIntegrationTask1Definition.name, + persistedIntegrationTask1Definition.description, persistedIntegrationTask1Definition.ownerEmail, 0, + 0, persistedIntegrationTask1Definition.responseTimeoutSeconds) + metadataService.updateTaskDef(modifiedIntegrationTask1Definition) + + when: "A fork join workflow is started" + def workflowInstanceId = startWorkflow(FORK_JOIN_PERMISSIVE_WF, 1, + 'fanoutTest', [:], + null) + + then: "verify that the workflow has started and the starting nodes of the each fork are in scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].workflowTask.type == TaskType.PERMISSIVE.name() + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'integration_task_1' + tasks[2].workflowTask.type == TaskType.PERMISSIVE.name() + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The first task of the fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").getTaskId() + def polledAndAckTask1Try1 = workflowTestUtil.pollAndFailTask('integration_task_1', 'task1.worker', 'Failed...') + + then: "verify that the 'integration_task_1' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask1Try1) + + and: "The workflow has been updated and has all the required tasks in the right status to move forward" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The other node of the fork is completed by completing 'integration_task_2'" + def polledAndAckTask2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2','task1.worker') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_task_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) + + and: "the workflow is in the failed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "JOIN task executed by the async executor" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "The workflow has been updated with the task status and task list" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.FAILED + tasks[3].taskType == 'JOIN' + } + + cleanup: "Restore the task definitions that were modified as part of this feature testing" + metadataService.updateTaskDef(persistedIntegrationTask1Definition) + } + def "Test retrying a failed fork join workflow"() { when: "A fork join workflow is started" @@ -384,6 +493,166 @@ class ForkJoinSpec extends AbstractSpecification { } } + def "Test retrying a failed permissive fork join workflow"() { + + when: "A fork join permissive workflow is started" + def workflowInstanceId = startWorkflow(FORK_JOIN_PERMISSIVE_WF + '_2', 1, + 'fanoutTest', [:], + null) + + then: "verify that the workflow has started and the starting nodes of the each fork are in scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The first task of the fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").getTaskId() + def polledAndAckTask1Try1 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_1', 'task1.worker') + + then: "verify that the 'integration_task_p_0_RT_1' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask1Try1) + + and: "The workflow has been updated and has all the required tasks in the right status to move forward" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 5 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.SCHEDULED + tasks[4].taskType == 'integration_p_task_0_RT_3' + } + + when: "The other node of the fork is completed by completing 'integration_p_task_0_RT_2'" + def polledAndAckTask2Try1 = workflowTestUtil.pollAndFailTask('integration_p_task_0_RT_2', + 'task1.worker', 'Failed....') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_p_task_0_RT_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) + + and: "the workflow is not in the failed state, until the completion of the permissive forked tasks" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 5 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.SCHEDULED + tasks[4].taskType == 'integration_p_task_0_RT_3' + } + + when: "The other node of the fork is completed by completing 'integration_p_task_0_RT_3'" + def polledAndAckTask3Try1 = workflowTestUtil.pollAndFailTask('integration_p_task_0_RT_3', + 'task1.worker', 'Failed....') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_p_task_0_RT_3' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask3Try1) + + and: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + and: "the workflow is in the failed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 5 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.FAILED + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.FAILED + tasks[4].taskType == 'integration_p_task_0_RT_3' + } + + when: "The workflow is retried" + workflowExecutor.retry(workflowInstanceId, false) + + then: "verify that all the workflow is retried and new tasks are added in place of the failed tasks" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 7 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.FAILED + tasks[4].taskType == 'integration_p_task_0_RT_3' + tasks[5].status == Task.Status.SCHEDULED + tasks[5].taskType == 'integration_p_task_0_RT_2' + tasks[6].status == Task.Status.SCHEDULED + tasks[6].taskType == 'integration_p_task_0_RT_3' + } + + when: "The 'integration_p_task_0_RT_3' is polled and completed" + def polledAndAckTask3Try2 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_3', 'task1.worker') + + then: "verify that the 'integration_p_task_3' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask3Try2) + + when: "The other node of the fork is completed by completing 'integration_p_task_0_RT_2'" + def polledAndAckTask2Try2 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_2', 'task1.worker') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_p_task_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try2) + + when: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + and: "The last task of the workflow is then polled and completed integration_p_task_0_RT_4'" + def polledAndAckTask4Try1 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_4', 'task1.worker') + + then: "verify that the 'integration_p_task_0_RT_4' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask4Try1) + + then: "Then verify that the workflow is completed and the task list of execution is as expected" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 8 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.COMPLETED + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.FAILED + tasks[4].taskType == 'integration_p_task_0_RT_3' + tasks[5].status == Task.Status.COMPLETED + tasks[5].taskType == 'integration_p_task_0_RT_2' + tasks[6].status == Task.Status.COMPLETED + tasks[6].taskType == 'integration_p_task_0_RT_3' + tasks[7].status == Task.Status.COMPLETED + tasks[7].taskType == 'integration_p_task_0_RT_4' + } + } + def "Test nested fork join workflow success flow"() { given: "Input for the nested fork join workflow" Map input = new HashMap() diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy index 0fbf0ec0b3..53d5312df5 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy @@ -45,6 +45,12 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { @Shared def WORKFLOW_WITH_OPTIONAL_TASK = 'optional_task_wf' + @Shared + def WORKFLOW_WITH_PERMISSIVE_TASK = 'permissive_task_wf' + + @Shared + def WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK = 'permissive_optional_task_wf' + @Shared def TEST_WORKFLOW = 'integration_test_wf3' @@ -52,12 +58,14 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { def WAIT_TIME_OUT_WORKFLOW = 'test_wait_timeout' def setup() { - //Register LINEAR_WORKFLOW_T1_T2, TEST_WORKFLOW, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK + //Register LINEAR_WORKFLOW_T1_T2, TEST_WORKFLOW, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK, WORKFLOW_WITH_PERMISSIVE_TASK, WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK workflowTestUtil.registerWorkflows( 'simple_workflow_1_integration_test.json', 'simple_workflow_1_input_template_integration_test.json', 'simple_workflow_3_integration_test.json', 'simple_workflow_with_optional_task_integration_test.json', + 'simple_workflow_with_permissive_task_integration_test.json', + 'simple_workflow_with_permissive_optional_task_integration_test.json', 'simple_wait_task_workflow_integration_test.json') } @@ -133,6 +141,155 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { } } + def "Test simple workflow which has a permissive task"() { + + given: "A input parameters for a workflow with a permissive task" + def correlationId = 'integration_test' + UUID.randomUUID().toString() + def workflowInput = new HashMap() + workflowInput['param1'] = 'p1 value' + workflowInput['param2'] = 'p2 value' + + when: "A permissive task workflow is started" + def workflowInstanceId = startWorkflow(WORKFLOW_WITH_PERMISSIVE_TASK, 1, + correlationId, workflowInput, + null) + + then: "verify that the workflow has started and the permissive task is in a scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].status == Task.Status.SCHEDULED + tasks[0].taskType == 'task_permissive' + } + + when: "The first permissive task is polled and failed" + Tuple polledAndFailedTaskTry1 = workflowTestUtil.pollAndFailTask('task_permissive', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_permissive was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry1) + + when: "A decide is executed on the workflow" + workflowExecutor.decide(workflowInstanceId) + + then: "verify that the workflow is still running and the first permissive task has failed and the retry has kicked in" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 2 + tasks[0].status == Task.Status.FAILED + tasks[0].taskType == 'task_permissive' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'task_permissive' + } + + when: "The first permissive task is polled and failed" + Tuple polledAndFailedTaskTry2 = workflowTestUtil.pollAndFailTask('task_permissive', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_permissive was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry2) + + workflowExecutor.decide(workflowInstanceId) + + then: "Ensure that the workflow is updated" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 3 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'task_permissive' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + } + + when: "The second task 'integration_task_2' is polled and completed" + def task2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker') + + then: "Verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(task2Try1) + + and: "Ensure that the workflow is in completed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + reasonForIncompletion == "Task ${tasks[1].taskId} failed with status: FAILED and reason: 'NETWORK ERROR'" + tasks.size() == 3 + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + } + } + + def "Test simple workflow which has a permissive optional task"() { + + given: "A input parameters for a workflow with a permissive optional task" + def correlationId = 'integration_test' + UUID.randomUUID().toString() + def workflowInput = new HashMap() + workflowInput['param1'] = 'p1 value' + workflowInput['param2'] = 'p2 value' + + when: "A permissive optional task workflow is started" + def workflowInstanceId = startWorkflow(WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK, 1, + correlationId, workflowInput, + null) + + then: "verify that the workflow has started and the permissive optional task is in a scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].status == Task.Status.SCHEDULED + tasks[0].taskType == 'task_optional' + } + + when: "The first permissive optional task is polled and failed" + Tuple polledAndFailedTaskTry1 = workflowTestUtil.pollAndFailTask('task_optional', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_optional was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry1) + + when: "A decide is executed on the workflow" + workflowExecutor.decide(workflowInstanceId) + + then: "verify that the workflow is still running and the first permissive optional task has failed and the retry has kicked in" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 2 + tasks[0].status == Task.Status.FAILED + tasks[0].taskType == 'task_optional' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'task_optional' + } + + when: "Poll the permissive optional task again and do not complete it and run decide" + workflowExecutionService.poll('task_optional', 'task1.integration.worker') + Thread.sleep(5000) + workflowExecutor.decide(workflowInstanceId) + + then: "Ensure that the workflow is updated" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 3 + tasks[1].status == Task.Status.COMPLETED_WITH_ERRORS + tasks[1].taskType == 'task_optional' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + } + + when: "The second task 'integration_task_2' is polled and completed" + def task2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker') + + then: "Verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(task2Try1) + + and: "Ensure that the workflow is in completed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 3 + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + } + } + def "test workflow with input template parsing"() { given: "Input parameters for a workflow with input template" def correlationId = 'integration_test' + UUID.randomUUID().toString() diff --git a/test-harness/src/test/resources/fork_join_permissive_integration_test.json b/test-harness/src/test/resources/fork_join_permissive_integration_test.json new file mode 100644 index 0000000000..6f65d4e118 --- /dev/null +++ b/test-harness/src/test/resources/fork_join_permissive_integration_test.json @@ -0,0 +1,109 @@ +{ + "name": "FanInOutPermissiveTest", + "description": "FanInOutPermissiveTest", + "version": 1, + "tasks": [ + { + "name": "fork", + "taskReferenceName": "fanouttask", + "inputParameters": {}, + "type": "FORK_JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [ + [ + { + "name": "integration_task_1", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + [ + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ] + ], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "join", + "taskReferenceName": "fanouttask_join", + "inputParameters": {}, + "type": "JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [ + "t1", + "t2" + ], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_3", + "taskReferenceName": "t3", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": {}, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json b/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json new file mode 100644 index 0000000000..8c1ebef77e --- /dev/null +++ b/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json @@ -0,0 +1,190 @@ +{ + "name": "FanInOutPermissiveTest_2", + "description": "FanInOutPermissiveTest_2", + "version": 1, + "tasks": [ + { + "name": "fork", + "taskReferenceName": "fanouttask", + "inputParameters": {}, + "type": "FORK_JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [ + [ + { + "name": "integration_p_task_0_RT_1", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_1", + "description": "integration_p_task_0_RT_1", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + }, + { + "name": "integration_p_task_0_RT_3", + "taskReferenceName": "t3", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_3", + "description": "integration_p_task_0_RT_3", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + } + ], + [ + { + "name": "integration_p_task_0_RT_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_2", + "description": "integration_p_task_0_RT_2", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + } + ] + ], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "join", + "taskReferenceName": "fanouttask_join", + "inputParameters": {}, + "type": "JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [ + "t3", + "t2" + ], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_p_task_0_RT_4", + "taskReferenceName": "t4", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_4", + "description": "integration_p_task_0_RT_4", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": {}, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json new file mode 100644 index 0000000000..84c6910abf --- /dev/null +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json @@ -0,0 +1,58 @@ +{ + "name": "permissive_optional_task_wf", + "description": "permissive_optional_task_wf", + "version": 1, + "tasks": [ + { + "name": "task_optional", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "${workflow.input.param1}", + "p2": "${workflow.input.param2}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "${workflow.input.param1}", + "tp2": "${t1.output.op}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": { + "o1": "${workflow.input.param1}", + "o2": "${t2.output.uuid}", + "o3": "${t1.output.op}" + }, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json new file mode 100644 index 0000000000..2b8f4dbfe0 --- /dev/null +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json @@ -0,0 +1,92 @@ +{ + "name": "permissive_task_wf", + "description": "permissive_task_wf", + "version": 1, + "tasks": [ + { + "name": "task_permissive", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "${workflow.input.param1}", + "p2": "${workflow.input.param2}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "retryCount": 1, + "taskDefinition": { + "createdBy": "integration_app", + "name": "task_permissive", + "description": "task_permissive", + "retryCount": 1, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + }, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "${workflow.input.param1}", + "tp2": "${t1.output.op}" + }, + "type": "PERMISSIVE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "retryCount": 1, + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_task_2", + "description": "integration_task_2", + "retryCount": 1, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + }, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": { + "o1": "${workflow.input.param1}", + "o2": "${t2.output.uuid}", + "o3": "${t1.output.op}" + }, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file