Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public final class arrow/resilience/SagaBuilder : arrow/resilience/SagaScope {
public fun bind (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun invoke (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun saga (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun totalCompensation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final synthetic fun totalCompensation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface annotation class arrow/resilience/SagaDSLMarker : java/lang/annotation/Annotation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public final class arrow/resilience/SagaBuilder : arrow/resilience/SagaScope {
public fun bind (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun invoke (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun saga (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun totalCompensation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final synthetic fun totalCompensation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface annotation class arrow/resilience/SagaDSLMarker : java/lang/annotation/Annotation {
Expand Down
3 changes: 1 addition & 2 deletions arrow-libs/resilience/arrow-resilience/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ kotlin {
sourceSets {
commonMain {
dependencies {
api(projects.arrowCore)
implementation(libs.coroutines.core)
api(projects.arrowFxCoroutines)
Comment thread
kyay10 marked this conversation as resolved.
Outdated
}
}
commonTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package arrow.resilience
import arrow.atomic.Atomic
import arrow.atomic.update
import arrow.core.nonFatalOrThrow
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext
import kotlin.coroutines.cancellation.CancellationException
import arrow.core.prependTo
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.ResourceScope
import arrow.fx.coroutines.resourceScope


/**
Expand Down Expand Up @@ -98,14 +99,8 @@ public fun <A> saga(
* fails then all compensating actions are guaranteed to run. When a compensating action failed it
* will be ignored, and the other compensating actions will continue to be run.
*/
public suspend fun <A> Saga<A>.transact(): A {
val builder = SagaBuilder()
return guaranteeCase({ invoke(builder) }) { res ->
when (res) {
null -> builder.totalCompensation()
else -> Unit
}
}
public suspend fun <A> Saga<A>.transact(): A = resourceScope {
invoke(SagaResourceScope(this))
}

/** DSL Marker for the SagaEffect DSL */
Expand All @@ -114,10 +109,19 @@ public suspend fun <A> Saga<A>.transact(): A {
/**
* Marker object to protect [SagaScope.saga] from calling [SagaScope.bind] in its `action` step.
*/
@SagaDSLMarker
public object SagaActionStep

// Internal implementation of the `saga { }` builder.
private class SagaResourceScope(private val scope: ResourceScope) : SagaScope {
override suspend fun <A> saga(
action: suspend SagaActionStep.() -> A,
compensation: suspend (A) -> Unit
): A = action(SagaActionStep).also { a ->
scope.onRelease { if (it !is ExitCase.Completed) compensation(a) }
}
}

@Deprecated("Binary compatibility", level = DeprecationLevel.HIDDEN)
@PublishedApi
internal class SagaBuilder(
private val stack: Atomic<List<suspend () -> Unit>> = Atomic(emptyList())
Expand All @@ -127,19 +131,11 @@ internal class SagaBuilder(
override suspend fun <A> saga(
action: suspend SagaActionStep.() -> A,
compensation: suspend (A) -> Unit
): A =
guaranteeCase({ action(SagaActionStep) }) { res ->
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of this change is:

  • the finalizer lambda doesn't call any coroutines code, so NonCancellable is pointless
  • when res is null, finalizer does nothing, so guaranteeCase simply catches and rethrows, which is pointless
  • when res is not null, guaranteeCase simply runs the action, and then the finalizer here (and again, NonCancellable is irrelevant)

Hence, guaranteeCase's code can be simplified to the following (assumptions: finalizer returns immediately when res is null, and finalizer doesn't suspend):

private suspend fun <A> guaranteeCase(
  fa: suspend () -> A,
  finalizer: suspend (value: A?) -> Unit
): A {
  val res = fa()
  finalizer(res)
  return res
}

which is just also!
There is a nasty bug in that implementation as well, which is that compensation doesn't get scheduled for null values! This fixes that as well.
Obviously, this is code left for binary compatibility, so I can roll this back, but it felt nicer just so that I can remove guaranteeCase and runReleaseAndRethrow

// This action failed, so we have no compensate to push on the stack
// the compensation stack will run in the `transact` stage, this is just the builder
when (res) {
null -> Unit
else -> stack.update(
function = { listOf(suspend { compensation(res) }) + it },
transform = { _, new -> new }
)
}
}
): A = action(SagaActionStep).also { res ->
stack.update(suspend { compensation(res) }::prependTo)
}

@Deprecated("Binary compatibility", level = DeprecationLevel.HIDDEN)
@PublishedApi
internal suspend fun totalCompensation() {
stack
Expand All @@ -156,28 +152,3 @@ internal class SagaBuilder(
?.let { throw it }
}
}

private suspend fun <A> guaranteeCase(
fa: suspend () -> A,
finalizer: suspend (value: A?) -> Unit
): A {
val res =
try {
fa()
} catch (e: CancellationException) {
runReleaseAndRethrow(e) { finalizer(null) }
} catch (t: Throwable) {
runReleaseAndRethrow(t) { finalizer(null) }
}
withContext(NonCancellable) { finalizer(res) }
return res
}

private suspend fun runReleaseAndRethrow(original: Throwable, f: suspend () -> Unit): Nothing {
try {
withContext(NonCancellable) { f() }
} catch (e: Throwable) {
original.addSuppressed(e.nonFatalOrThrow())
}
throw original
}
Loading