diff --git a/clients/java/openfeature-provider/src/main/java/io/juspay/superposition/openfeature/RefreshJob.java b/clients/java/openfeature-provider/src/main/java/io/juspay/superposition/openfeature/RefreshJob.java index 15344176e..c270138c1 100644 --- a/clients/java/openfeature-provider/src/main/java/io/juspay/superposition/openfeature/RefreshJob.java +++ b/clients/java/openfeature-provider/src/main/java/io/juspay/superposition/openfeature/RefreshJob.java @@ -5,8 +5,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.ref.WeakReference; import java.util.Optional; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; interface RefreshJob { @@ -19,46 +21,79 @@ interface RefreshJob { final class Poll implements RefreshJob { private final RefreshStrategy.Polling config; private final Supplier> action; - private final CompletableFuture output; + private final Runnable onChange; + private final CompletableFuture firstOutput; + private volatile T latestOutput = null; private ScheduledFuture poll; - Poll(RefreshStrategy.Polling config, Supplier> action) { + Poll(RefreshStrategy.Polling config, Supplier> action, Runnable onChange) { this.config = config; this.action = action; - this.output = new CompletableFuture<>(); + this.onChange = onChange; + this.firstOutput = new CompletableFuture<>(); } void start() { log.debug("Starting polling-refresh."); - poll = SEXEC.schedule( + WeakReference> weakSelf = new WeakReference<>(this); + AtomicReference> taskRef = new AtomicReference<>(); + + ScheduledFuture scheduled = SEXEC.scheduleAtFixedRate( () -> { - var o = RefreshJob.runRefreshWithTimeout(action, config.timeout); + Poll self = weakSelf.get(); + if (self == null) { + log.debug("Poll referent GC'd — self-cancelling polling task."); + ScheduledFuture t = taskRef.get(); + if (t != null) t.cancel(false); + return; + } + var o = RefreshJob.runRefreshWithTimeout(self.action, self.config.timeout); if (o != null) { - output.complete(o); + boolean changed = !o.equals(self.latestOutput); + self.latestOutput = o; + if (!self.firstOutput.isDone()) { + self.firstOutput.complete(o); + } + if (changed && self.onChange != null) { + try { + self.onChange.run(); + } catch (Exception e) { + log.error("onChange callback error: {}", e.getMessage()); + } + } else if (!changed) { + log.debug("Output unchanged, skipping onChange callback."); + } } }, + 0, config.interval, TimeUnit.MILLISECONDS ); + + taskRef.set(scheduled); + this.poll = scheduled; } @Override public Optional getOutput() { + if (latestOutput != null) { + return Optional.of(latestOutput); + } try { if (poll == null) { log.warn("Polling hasn't started but the output is being used."); - } else if (!poll.isCancelled() && !output.isDone()) { - return Optional.ofNullable(output.get(config.timeout, TimeUnit.MILLISECONDS)); + } else if (!firstOutput.isDone()) { + return Optional.ofNullable(firstOutput.get(config.timeout, TimeUnit.MILLISECONDS)); } } catch (Exception e) { log.warn("Attempted to await for poll output but an exception occurred: {}", e.toString()); } - return Optional.ofNullable(output.getNow(null)); + return Optional.ofNullable(latestOutput); } @Override public void shutdown() { - if (!poll.isCancelled()) { + if (poll != null && !poll.isCancelled()) { log.debug("Shutting down polling-refresh."); poll.cancel(false); } @@ -71,11 +106,13 @@ final class OnDemand implements RefreshJob { private T output = null; private final RefreshStrategy.OnDemand config; private final Supplier> action; + private final Runnable onChange; private boolean stopped = false; - OnDemand(RefreshStrategy.OnDemand config, Supplier> action) { + OnDemand(RefreshStrategy.OnDemand config, Supplier> action, Runnable onChange) { this.config = config; this.action = action; + this.onChange = onChange; } @Override @@ -85,8 +122,18 @@ public Optional getOutput() { log.debug("Running refresh as current output is stale."); var o = RefreshJob.runRefreshWithTimeout(action, config.timeout); if (o != null) { + boolean changed = !o.equals(output); output = o; lastUpdated = System.currentTimeMillis(); + if (changed && onChange != null) { + try { + onChange.run(); + } catch (Exception e) { + log.error("onChange callback error: {}", e.getMessage()); + } + } else if (!changed) { + log.debug("Output unchanged, skipping onChange callback."); + } } } else { log.debug("Current output is fresh, no refresh required."); @@ -114,10 +161,14 @@ private static T runRefreshWithTimeout(Supplier> action, } static RefreshJob create(RefreshStrategy config, Supplier> action) { + return create(config, action, null); + } + + static RefreshJob create(RefreshStrategy config, Supplier> action, Runnable onChange) { if (config instanceof RefreshStrategy.Polling) { - return new Poll<>((RefreshStrategy.Polling)config, action); + return new Poll<>((RefreshStrategy.Polling)config, action, onChange); } else if (config instanceof RefreshStrategy.OnDemand) { - return new OnDemand<>((RefreshStrategy.OnDemand)config, action); + return new OnDemand<>((RefreshStrategy.OnDemand)config, action, onChange); } throw new IllegalArgumentException("Invalid refresh-strategy: " + config); } diff --git a/clients/java/openfeature-provider/src/main/java/io/juspay/superposition/openfeature/SuperpositionOpenFeatureProvider.java b/clients/java/openfeature-provider/src/main/java/io/juspay/superposition/openfeature/SuperpositionOpenFeatureProvider.java index e4b05a6e7..2e76026d2 100644 --- a/clients/java/openfeature-provider/src/main/java/io/juspay/superposition/openfeature/SuperpositionOpenFeatureProvider.java +++ b/clients/java/openfeature-provider/src/main/java/io/juspay/superposition/openfeature/SuperpositionOpenFeatureProvider.java @@ -15,10 +15,15 @@ import uniffi.superposition_client.FfiExperiment; import uniffi.superposition_client.FfiExperimentGroup; import uniffi.superposition_client.OperationException; +import uniffi.superposition_client.ProviderCache; +import uniffi.superposition_types.MergeStrategy; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; /** @@ -65,11 +70,14 @@ public class SuperpositionOpenFeatureProvider implements FeatureProvider { private static final Gson gson = new Gson(); private final SuperpositionAsyncClient sdk; + private final ProviderCache cache; private final RefreshJob configRefresh; private final Optional>> expRefresh; private final Optional>> expGroupRefresh; private Optional defaultCtx; private final Optional fallbackArgs; + private final CompletableFuture cacheReady = new CompletableFuture<>(); + private final int configTimeout; public SuperpositionOpenFeatureProvider(@NonNull SuperpositionProviderOptions options) { if (options.fallbackConfig != null) { @@ -84,6 +92,9 @@ public SuperpositionOpenFeatureProvider(@NonNull SuperpositionProviderOptions op builder.transport(options.transport); } this.sdk = builder.build(); + this.cache = new ProviderCache(); + this.configTimeout = options.refreshStrategy.getTimeout(); + var getConfigInput = GetConfigInput.builder() .context(Map.of()) .orgId(options.orgId) @@ -91,7 +102,8 @@ public SuperpositionOpenFeatureProvider(@NonNull SuperpositionProviderOptions op .build(); this.configRefresh = RefreshJob.create( options.refreshStrategy, - () -> sdk.getConfig(getConfigInput).thenApply(EvaluationArgs::new) + () -> sdk.getConfig(getConfigInput).thenApply(EvaluationArgs::new), + this::reinitConfigCache ); if (options.experimentationOptions != null) { @@ -109,7 +121,8 @@ public SuperpositionOpenFeatureProvider(@NonNull SuperpositionProviderOptions op .stream() .map(EvaluationArgs.Helpers::toFfiExperiment) .toList() - )) + ), + this::reinitExperimentsCache) ); // New logic for experiment_groups @@ -126,7 +139,8 @@ public SuperpositionOpenFeatureProvider(@NonNull SuperpositionProviderOptions op .stream() .map(EvaluationArgs.Helpers::toFfiExperimentGroup) .toList() - )) + ), + this::reinitExperimentsCache) ); } else { this.expRefresh = Optional.empty(); @@ -162,6 +176,9 @@ public void initialize(EvaluationContext eCtx) { @Override public void shutdown() { configRefresh.shutdown(); + expRefresh.ifPresent(RefreshJob::shutdown); + expGroupRefresh.ifPresent(RefreshJob::shutdown); + cache.close(); } @SneakyThrows @@ -285,9 +302,19 @@ private EvaluationArgs getEvaluationArgs(EvaluationContext ctx) throws Exception } private Map evaluateConfigInternal(EvaluationContext ctx) throws Exception { - EvaluationArgs args = getEvaluationArgs(ctx); + // Block until cache.initConfig has been called (completed inside reinitConfigCache). + // This guarantees cache.evalConfig never runs on an uninitialized cache. + if (!cacheReady.isDone()) { + try { + cacheReady.get(configTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new Exception("Config cache not initialized within timeout (" + configTimeout + "ms)."); + } + } var ctx_ = defaultCtx.isPresent() ? ctx.merge(defaultCtx.get()) : ctx; - return args.evaluate(ctx_, getExperimentationArgs(ctx_)); + var queryData = EvaluationArgs.Companion.buildQueryData(ctx_); + String targetingKey = ctx_.getTargetingKey(); + return cache.evalConfig(queryData, MergeStrategy.MERGE, null, targetingKey); } private List getApplicableVariantsInternal(EvaluationContext ctx) throws Exception { @@ -314,4 +341,38 @@ private ExperimentationArgs getExperimentationArgs(EvaluationContext ctx) { } return null; } + + private void reinitConfigCache() { + var out = configRefresh.getOutput(); + if (out.isPresent()) { + var args = out.get(); + try { + cache.initConfig( + args.getDefaultConfig(), + args.getContexts(), + args.getOverrides(), + args.getDimensions() + ); + cacheReady.complete(null); + log.debug("Config cache re-initialized"); + } catch (Exception e) { + log.error("Failed to reinitialize config cache: {}", e.getMessage()); + } + } + } + + private void reinitExperimentsCache() { + if (expRefresh.isPresent() && expGroupRefresh.isPresent()) { + var exps = expRefresh.get().getOutput(); + var groups = expGroupRefresh.get().getOutput(); + if (exps.isPresent() && groups.isPresent()) { + try { + cache.initExperiments(exps.get(), groups.get()); + log.debug("Experiments cache re-initialized"); + } catch (Exception e) { + log.error("Failed to reinitialize experiments cache: {}", e.getMessage()); + } + } + } + } } diff --git a/clients/java/openfeature-provider/src/main/kotlin/io/juspay/superposition/openfeature/EvaluationArgs.kt b/clients/java/openfeature-provider/src/main/kotlin/io/juspay/superposition/openfeature/EvaluationArgs.kt index 9d7a723de..bc4e6779e 100644 --- a/clients/java/openfeature-provider/src/main/kotlin/io/juspay/superposition/openfeature/EvaluationArgs.kt +++ b/clients/java/openfeature-provider/src/main/kotlin/io/juspay/superposition/openfeature/EvaluationArgs.kt @@ -129,6 +129,23 @@ internal class EvaluationArgs { } } + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is EvaluationArgs) return false + return defaultConfig == other.defaultConfig && + contexts == other.contexts && + overrides == other.overrides && + dimensions == other.dimensions + } + + override fun hashCode(): Int { + var result = defaultConfig.hashCode() + result = 31 * result + contexts.hashCode() + result = 31 * result + overrides.hashCode() + result = 31 * result + dimensions.hashCode() + return result + } + companion object { private val gson = Gson() @@ -144,6 +161,11 @@ internal class EvaluationArgs { return m.mapValues { valueToJsonString(it.value) } } + @JvmStatic + fun buildQueryData(eContext: EvaluationContext): Map { + return toQueryData(eContext) + } + private fun serializeDocument(d: Document): String { return valueToJsonString(d.asObject()) }