diff --git a/common/api/core-bentley.api.md b/common/api/core-bentley.api.md index daa63c49cc7..efda934e352 100644 --- a/common/api/core-bentley.api.md +++ b/common/api/core-bentley.api.md @@ -1831,6 +1831,9 @@ export function using(resources: T | T[], func: // @public export function utf8ToString(utf8: Uint8Array): string | undefined; +// @beta +export function wrapTimerCallback(timerPromises: Set>, callback: () => Promise): Promise; + // @public export class YieldManager { constructor(options?: YieldManagerOptions); diff --git a/common/api/summary/core-bentley.exports.csv b/common/api/summary/core-bentley.exports.csv index fbd763df1a8..b6b42dd4d50 100644 --- a/common/api/summary/core-bentley.exports.csv +++ b/common/api/summary/core-bentley.exports.csv @@ -164,5 +164,6 @@ public;class;UnexpectedErrors public;function;using deprecated;function;using public;function;utf8ToString +beta;function;wrapTimerCallback public;class;YieldManager public;interface;YieldManagerOptions \ No newline at end of file diff --git a/common/changes/@itwin/core-backend/tcobbs-cloudsqlite-job-race-fix_2026-04-16-20-40.json b/common/changes/@itwin/core-backend/tcobbs-cloudsqlite-job-race-fix_2026-04-16-20-40.json new file mode 100644 index 00000000000..99b35bb89b6 --- /dev/null +++ b/common/changes/@itwin/core-backend/tcobbs-cloudsqlite-job-race-fix_2026-04-16-20-40.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@itwin/core-backend", + "comment": "", + "type": "none" + } + ], + "packageName": "@itwin/core-backend" +} \ No newline at end of file diff --git a/common/changes/@itwin/core-bentley/tcobbs-cloudsqlite-job-race-fix_2026-04-17-16-25.json b/common/changes/@itwin/core-bentley/tcobbs-cloudsqlite-job-race-fix_2026-04-17-16-25.json new file mode 100644 index 00000000000..cc4dc5538e2 --- /dev/null +++ b/common/changes/@itwin/core-bentley/tcobbs-cloudsqlite-job-race-fix_2026-04-17-16-25.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@itwin/core-bentley", + "comment": "Added wrapTimerCallback utility function.", + "type": "none" + } + ], + "packageName": "@itwin/core-bentley" +} \ No newline at end of file diff --git a/core/backend/src/CloudSqlite.ts b/core/backend/src/CloudSqlite.ts index 4fc5e817c9d..675f1f35039 100644 --- a/core/backend/src/CloudSqlite.ts +++ b/core/backend/src/CloudSqlite.ts @@ -11,7 +11,7 @@ import { mkdirSync, unlinkSync } from "fs"; import { dirname, join } from "path"; import { NativeLibrary } from "@bentley/imodeljs-native"; import { - AccessToken, BeDuration, BriefcaseStatus, Constructor, GuidString, Logger, LogLevel, OpenMode, Optional, PickAsyncMethods, PickMethods, StopWatch, + AccessToken, BeDuration, BriefcaseStatus, Constructor, GuidString, ITwinError, Logger, LogLevel, OpenMode, Optional, PickAsyncMethods, PickMethods, StopWatch, wrapTimerCallback, } from "@itwin/core-bentley"; import { CloudSqliteError, LocalDirName, LocalFileName } from "@itwin/core-common"; import { BlobContainer } from "./BlobContainerService"; @@ -496,7 +496,7 @@ export namespace CloudSqlite { * Notes: * - all methods and accessors of this interface (other than `initializeContainer`) require that the `connect` method be successfully called first. * Otherwise they will throw an exception or return meaningless values. - * - before a SQLiteDb in a container may be opened for write access, the container's write lock must be held (see [[acquireWriteLock]].) + * - before a SQLiteDb in a container may be opened for write access, the container's write lock must be held (see [[acquireWriteLock]]). * - a single CloudContainer may hold more than one SQLiteDb, but often they are 1:1. * - the write lock is per-Container, not per-SQLiteDb (which is the reason they are often 1:1) * - the accessToken (a SAS key) member provides time limited, restricted, access to the container. It must be refreshed before it expires. @@ -693,6 +693,16 @@ export namespace CloudSqlite { promise: Promise; } + /** + * Determine if error is a "transfer already completed" error. + * @param err Any thrown error + * @returns true if the error is a "transfer already completed" error, or false otherwise. + * @internal + */ + function isTransferAlreadyCompletedError(err: any): boolean { + return ITwinError.isError(err, "imodel-native", "BadArg") && err.message === "transfer already completed"; + } + /** * Clean any unused deleted blocks from cloud storage. Unused deleted blocks can accumulate in cloud storage in a couple of ways: * 1) When a database is updated, a subset of its blocks are replaced by new versions, sometimes leaving the originals unused. @@ -704,19 +714,35 @@ export namespace CloudSqlite { */ export async function cleanDeletedBlocks(container: CloudContainer, options: CleanDeletedBlocksOptions): Promise { let timer: NodeJS.Timeout | undefined; + const intervalPromises = new Set>(); try { const cleanJob = new NativeLibrary.nativeLib.CancellableCloudSqliteJob("cleanup", container, options); let total = 0; const onProgress = options?.onProgress; if (onProgress) { - timer = setInterval(async () => { // set an interval timer to show progress every 250ms - const progress = cleanJob.getProgress(); - total = progress.total; - const result = await onProgress(progress.loaded, progress.total); - if (result === 1) - cleanJob.stopAndSaveProgress(); - else if (result !== 0) - cleanJob.cancelTransfer(); + timer = setInterval(() => { // set an interval timer to show progress every 250ms + void wrapTimerCallback(intervalPromises, async () => { + try { + const progress = cleanJob.getProgress(); + total = progress.total; + const result = await onProgress(progress.loaded, progress.total); + if (result === 1) + cleanJob.stopAndSaveProgress(); + else if (result !== 0) + cleanJob.cancelTransfer(); + } catch (err: any) { + if (timer) { + clearInterval(timer); + timer = undefined; + } + // A race condition exists where cleanJob has completed but the timer has not yet been cleared, or it + // completes while we are waiting on the onProgress callback. If this happens, we will get an error from any + // function call made to cleanJob after the job is done. In that case, just ignore the error. + if (isTransferAlreadyCompletedError(err)) + return; + throw err; + } + }); }, 250); } await cleanJob.promise; @@ -731,6 +757,12 @@ export namespace CloudSqlite { if (timer) clearInterval(timer); } + // Note: if an error is thrown before we get here then we don't care about any possible errors from the interval + // callbacks, so we don't await the promises in that case. If we do get here, then we want to await any remaining + // promises to ensure all callbacks have completed before this function returns. + if (intervalPromises.size > 0) { + await Promise.all(intervalPromises); + } } /** @internal */ @@ -739,16 +771,32 @@ export namespace CloudSqlite { mkdirSync(dirname(props.localFileName), { recursive: true }); // make sure the directory exists before starting download let timer: NodeJS.Timeout | undefined; + const intervalPromises = new Set>(); try { const transfer = new NativeLibrary.nativeLib.CancellableCloudSqliteJob(direction, container, props); let total = 0; const onProgress = props.onProgress; if (onProgress) { - timer = setInterval(async () => { // set an interval timer to show progress every 250ms - const progress = transfer.getProgress(); - total = progress.total; - if (onProgress(progress.loaded, progress.total)) - transfer.cancelTransfer(); + timer = setInterval(() => { // set an interval timer to show progress every 250ms + void wrapTimerCallback(intervalPromises, async () => { + try { + const progress = transfer.getProgress(); + total = progress.total; + if (onProgress(progress.loaded, progress.total)) + transfer.cancelTransfer(); + } catch (err: any) { + if (timer) { + clearInterval(timer); + timer = undefined; + } + // A race condition exists where transfer has completed but the timer has not yet been cleared. If this + // happens, we will get an error from any function call made to transfer after the job is done. In that + // case, just ignore the error. + if (isTransferAlreadyCompletedError(err)) + return; + throw err; + } + }); }, 250); } await transfer.promise; @@ -761,7 +809,12 @@ export namespace CloudSqlite { } finally { if (timer) clearInterval(timer); - + } + // Note: if an error is thrown before we get here then we don't care about any possible errors from the interval + // callbacks, so we don't await the promises in that case. If we do get here, then we want to await any remaining + // promises to ensure all callbacks have completed before this function returns. + if (intervalPromises.size > 0) { + await Promise.all(intervalPromises); } } diff --git a/core/bentley/src/UtilityFunctions.ts b/core/bentley/src/UtilityFunctions.ts new file mode 100644 index 00000000000..cb151521843 --- /dev/null +++ b/core/bentley/src/UtilityFunctions.ts @@ -0,0 +1,53 @@ +/*--------------------------------------------------------------------------------------------- +* Copyright (c) Bentley Systems, Incorporated. All rights reserved. +* See LICENSE.md in the project root for license terms and full copyright notice. +*--------------------------------------------------------------------------------------------*/ +/** @packageDocumentation + * @module Utils + */ + +/** + * Wrapper function designed to be used for callbacks called by setInterval or setTimeout in order to propagate any + * exceptions thrown in the callback to the main promise chain. It does this by creating a new promise for the callback + * invocation and adding it to the timerPromises set. The main promise chain can then await + * Promise.all(timerPromises) to catch any exceptions thrown in any of the callbacks. Note that if the callback + * completes successfully, the promise is resolved and removed from the set. If it throws an exception, the promise is + * rejected but not removed from the set, so that the main promise chain can detect that an error occurred and handle + * it appropriately. + * @param timerPromises A set of promises representing the currently active timer callbacks. + * @param callback The async callback to be executed within the timer. + * @beta + */ +export async function wrapTimerCallback(timerPromises: Set>, callback: () => Promise) { + let resolvePromise: (() => void) | undefined; + let rejectPromise: ((reason?: any) => void) | undefined; + + // The callback of the Promise constructor does not have access to the promise itself, so all we do there is + // capture the resolve and reject functions for use in the async callback that would have otherwise been + // placed in the setInterval or setTimeout callback. + const timerPromise = new Promise((resolve, reject) => { + resolvePromise = resolve; + rejectPromise = reject; + }); + // Note: when we get here, resolvePromise and rejectPromise will always be defined, but there is no way to + // convince TS of that fact without extra unnecessary checks, so we use ?. when accessing them. + + // Prevent unhandled rejection warnings. The rejection is still observable + // when the consumer awaits Promise.all(promises). + timerPromise.catch(() => {}); + + timerPromises.add(timerPromise); + + const cleanupAndResolve = () => { + resolvePromise?.(); + // No need to keep track of this promise anymore since it's resolved. + timerPromises.delete(timerPromise); + }; + + try { + await callback(); + cleanupAndResolve(); + } catch (err) { + rejectPromise?.(err); + } +} diff --git a/core/bentley/src/core-bentley.ts b/core/bentley/src/core-bentley.ts index e1cc2801a34..c0fb31edcd0 100644 --- a/core/bentley/src/core-bentley.ts +++ b/core/bentley/src/core-bentley.ts @@ -36,6 +36,7 @@ export * from "./Tracing"; export * from "./TupleKeyedMap"; export * from "./TypedArrayBuilder"; export * from "./UnexpectedErrors"; +export * from "./UtilityFunctions"; export * from "./UtilityTypes"; export * from "./YieldManager"; diff --git a/core/bentley/src/test/UtilityFunctions.test.ts b/core/bentley/src/test/UtilityFunctions.test.ts new file mode 100644 index 00000000000..9ca66c39092 --- /dev/null +++ b/core/bentley/src/test/UtilityFunctions.test.ts @@ -0,0 +1,60 @@ +/*--------------------------------------------------------------------------------------------- +* Copyright (c) Bentley Systems, Incorporated. All rights reserved. +* See LICENSE.md in the project root for license terms and full copyright notice. +*--------------------------------------------------------------------------------------------*/ +import { describe, expect, it } from "vitest"; +import { wrapTimerCallback } from "../UtilityFunctions"; + +describe("wrapTimerCallback", () => { + it("resolves and removes promise from set on successful callback", async () => { + const promises = new Set>(); + await wrapTimerCallback(promises, async () => {}); + expect(promises.size).to.equal(0); + }); + + it("rejects and keeps promise in set on failed callback", async () => { + const promises = new Set>(); + const error = new Error("test error"); + + await wrapTimerCallback(promises, async () => { throw error; }); + + expect(promises.size).to.equal(1); + try { + await Promise.all(promises); + expect.fail("should have thrown"); + } catch (err) { + expect(err).to.equal(error); + } + }); + + it("adds promise to set before callback executes", async () => { + const promises = new Set>(); + let sizeInsideCallback = 0; + + await wrapTimerCallback(promises, async () => { + sizeInsideCallback = promises.size; + }); + + expect(sizeInsideCallback).to.equal(1); + expect(promises.size).to.equal(0); + }); + + it("handles multiple successful wrappers sequentially", async () => { + const promises = new Set>(); + + await wrapTimerCallback(promises, async () => {}); + await wrapTimerCallback(promises, async () => {}); + + expect(promises.size).to.equal(0); + }); + + it("handles mix of successful and failed callbacks", async () => { + const promises = new Set>(); + + await wrapTimerCallback(promises, async () => {}); + await wrapTimerCallback(promises, async () => { throw new Error("fail"); }); + await wrapTimerCallback(promises, async () => {}); + + expect(promises.size).to.equal(1); + }); +}); \ No newline at end of file diff --git a/full-stack-tests/backend/src/integration/CloudSqlite.test.ts b/full-stack-tests/backend/src/integration/CloudSqlite.test.ts index 0186fcd7d11..6a6270514c0 100644 --- a/full-stack-tests/backend/src/integration/CloudSqlite.test.ts +++ b/full-stack-tests/backend/src/integration/CloudSqlite.test.ts @@ -565,7 +565,7 @@ describe("CloudSqlite", () => { newCache2.destroy(); }); - it("should be able to interrupt cleanDeletedBlocks operation", async () => { + const prepareDbForCleaningBlocks = async () => { const cache = azSqlite.makeCache("clean-blocks-cache"); const container = testContainers[0]; @@ -600,6 +600,11 @@ describe("CloudSqlite", () => { expect(container.garbageBlocks).to.be.greaterThan(0); const garbageBlocksPrev = container.garbageBlocks; + return { container, garbageBlocksPrev }; + }; + + it("should be able to interrupt cleanDeletedBlocks operation", async () => { + const { container, garbageBlocksPrev } = await prepareDbForCleaningBlocks(); // cleanDeletedBlocks defaults to an nSeconds of 3600, so we expect to keep our garbage blocks, because they are less than 3600 seconds old. await CloudSqlite.cleanDeletedBlocks(container, {}); @@ -653,6 +658,94 @@ describe("CloudSqlite", () => { container.disconnect({ detach: true }); }); + it("slow cleanDeletedBlocks onProgress should not cause race condition", async () => { + const { container } = await prepareDbForCleaningBlocks(); + + let progressCalled = false; + let progressWaited = false; + const onProgress = async () => { + if (!progressCalled) { + progressCalled = true; + await BeDuration.wait(2000); // simulate a long onProgress callback that takes 2000ms to complete. + // await new Promise((resolve) => clock.setTimeout(resolve, 2000)); // simulate a long onProgress callback that takes 2000ms to complete. + progressWaited = true; // We have to wait for onProgress to finish before we can be sure that cleanDeletedBlocks + // didn't throw an error after our return. + return 2; // return a number greater than 1 to abort the cleanDeletedBlocks operation. + } + return 0; // return 0 to not abort the cleanDeletedBlocks operation. + }; + + // In the past, cleanDeletedBlocks would occasionally throw an error inside a setInterval handler. This is because a + // race condition could cause the handler to still be called after cleanDeletedBlocks had already resolved and + // cleaned up its resources, including the container it was operating on. If the handler was called after + // cleanDeletedBlocks had resolved, or if onProgress took a long time and returned non-zero, then it would try to + // access the container that was already cleaned up, which would cause an error to be thrown. This race condition + // could be forced by having a slow onProgress handler return non-zero. This test makes sure that even if onProgress + // is slow, we don't have unhandled rejections or uncaught exceptions. The race condition was fixed, but we don't + // want to accidentally regress it, so this test will stay. + // + // Note: we would like to add a similar test for transferDb, but the progress callback for transferDb is not async, + // so there is no way to simulate a long onProgress callback that would cause the race condition. + const unhandledRejections: Array = []; + const rejectionHandler = (reason: any) => { + unhandledRejections.push(reason); + }; + process.on("unhandledRejection", rejectionHandler); + + // Capture uncaught exceptions + const uncaughtExceptions: Error[] = []; + const exceptionHandler = (error: Error) => { + uncaughtExceptions.push(error); + }; + process.on("uncaughtException", exceptionHandler); + + // Faking the interval setup in cleanDeletedBlocks. + const clock = sinon.useFakeTimers({ toFake: ["setInterval"], shouldAdvanceTime: true, advanceTimeDelta: 1 }); + + try { + let resolved = false; + let cleanDeletedBlocksError: any; + CloudSqlite.cleanDeletedBlocks(container, { nSeconds: 0, findOrphanedBlocks: true, onProgress }).then(() => { + resolved = true; + }).catch((err) => { + resolved = true; + cleanDeletedBlocksError = err; + }); + + while (!resolved || !progressWaited) { + await clock.tickAsync(250); + await new Promise((resolve) => clock.setTimeout(resolve, 1)); + } + // Give a bit more time for any async errors to surface + await clock.tickAsync(100); + await new Promise((resolve) => setImmediate(resolve)); + clock.reset(); + clock.restore(); + + if (cleanDeletedBlocksError) { + throw cleanDeletedBlocksError; // cleanDeletedBlocks should not throw an error, even if onProgress is slow and returns non-zero. + } + + // Check for unhandled errors + if (unhandledRejections.length > 0) { + throw new Error(`Unhandled rejection detected: ${unhandledRejections[0]}`); + } + if (uncaughtExceptions.length > 0) { + throw new Error(`Uncaught exception detected: ${uncaughtExceptions[0].message}`); + } + + container.checkForChanges(); + expect(container.garbageBlocks).to.be.equal(0); // we should have successfully cleaned our garbage blocks, because of slow onProgress. + + } finally { + clock.restore(); + process.off("unhandledRejection", rejectionHandler); + process.off("uncaughtException", exceptionHandler); + container.releaseWriteLock(); + container.disconnect({ detach: true }); + } + }); + /** make sure that the auto-refresh for container tokens happens every hour */ it("Auto refresh container tokens", async () => { const contain1 = testContainers[0];