Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions arrow-libs/resilience/arrow-resilience/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ kotlin {
sourceSets {
commonMain {
dependencies {
api(projects.arrowCore)
implementation(libs.coroutines.core)
implementation(projects.arrowExceptionUtils)
implementation(projects.arrowFxCoroutines)
}
}
commonTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import arrow.atomic.update
import arrow.atomic.updateAndGet
import arrow.core.Either
import arrow.core.raise.catch
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.guaranteeCase
import arrow.resilience.CircuitBreaker.State.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
Expand Down Expand Up @@ -263,19 +262,22 @@ private constructor(
contract {
callsInPlace(task, InvocationKind.EXACTLY_ONCE)
}
return try {
return guaranteeCase({
onHalfOpen.invoke()
task.invoke()
} catch (e: CancellationException) {
// We need to return to Open state, otherwise we get stuck in Half-Open (see https://github.com/monix/monix/issues/1080 )
state.set(Open(state.get().openingStrategy, lastStartedAt, resetTimeout, awaitClose))
onOpenAndThrow(e)
} catch (e: Throwable) {
// Failed reset, which means we go back in the Open state with new expiry val nextTimeout
val value: Duration = (resetTimeout * exponentialBackoffFactor)
val nextTimeout = if (maxResetTimeout.isFinite() && value > maxResetTimeout) maxResetTimeout else value
state.set(Open(state.get().openingStrategy, timeSource.markNow(), nextTimeout, awaitClose))
onOpenAndThrow(e)
}) {
val (value, timeout) = when (it) {
is ExitCase.Completed -> return@guaranteeCase
// We need to return to Open state, otherwise we get stuck in Half-Open (see https://github.com/monix/monix/issues/1080 )
is ExitCase.Cancelled -> lastStartedAt to resetTimeout
is ExitCase.Failure -> {
val value = resetTimeout * exponentialBackoffFactor
val nextTimeout = if (maxResetTimeout.isFinite() && value > maxResetTimeout) maxResetTimeout else value
timeSource.markNow() to nextTimeout
}
}
state.set(Open(state.get().openingStrategy, value, timeout, awaitClose))
onOpen.invoke()
}.also {
// While in HalfOpen only a reset attempt is allowed to update the state, so setting this directly is safe
state.set(Closed(state.get().openingStrategy.resetFailuresCount()))
Expand All @@ -284,13 +286,6 @@ private constructor(
}
}

private suspend fun onOpenAndThrow(original: Throwable): Nothing {
runCatching {
withContext(NonCancellable) { onOpen.invoke() }
}.exceptionOrNull()?.let { original.addSuppressed(it) }
throw original
}

/** Returns a new circuit breaker that wraps the state of the source
* and that upon a task being rejected will execute the given [callback].
*
Expand Down
Loading