Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/api/core-bentley.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,9 @@ export class IndexMap<T> {
toArray(): T[];
}

// @beta
export function intervalWrapper(intervalPromises: Set<Promise<void>>, callback: () => Promise<void>): Promise<void>;

// @public
export function isDisposable(obj: unknown): obj is Disposable;

Expand Down
1 change: 1 addition & 0 deletions common/api/summary/core-bentley.exports.csv
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public;enum;IModelHubStatus
public;enum;IModelStatus
public;class;IndexedValue
public;class;IndexMap
beta;function;intervalWrapper
public;function;isDisposable
public;function;isIDisposable
deprecated;function;isIDisposable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@itwin/core-backend",
"comment": "",
"type": "none"
}
],
"packageName": "@itwin/core-backend"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@itwin/core-bentley",
"comment": "Added intervalWrapper utility function.",
"type": "none"
}
],
"packageName": "@itwin/core-bentley"
}
79 changes: 63 additions & 16 deletions core/backend/src/CloudSqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { mkdirSync, unlinkSync } from "fs";
import { dirname, join } from "path";
import { NativeLibrary } from "@bentley/imodeljs-native";
import {
AccessToken, BeDuration, BriefcaseStatus, Constructor, GuidString, Logger, LogLevel, OpenMode, Optional, PickAsyncMethods, PickMethods, StopWatch,
AccessToken, BeDuration, BriefcaseStatus, Constructor, GuidString, intervalWrapper, ITwinError, Logger, LogLevel, OpenMode, Optional, PickAsyncMethods, PickMethods, StopWatch,
} from "@itwin/core-bentley";
import { CloudSqliteError, LocalDirName, LocalFileName } from "@itwin/core-common";
import { BlobContainer } from "./BlobContainerService";
Expand Down Expand Up @@ -496,7 +496,7 @@ export namespace CloudSqlite {
* Notes:
* - all methods and accessors of this interface (other than `initializeContainer`) require that the `connect` method be successfully called first.
* Otherwise they will throw an exception or return meaningless values.
* - before a SQLiteDb in a container may be opened for write access, the container's write lock must be held (see [[acquireWriteLock]].)
* - before a SQLiteDb in a container may be opened for write access, the container's write lock must be held (see [[acquireWriteLock]]).
* - a single CloudContainer may hold more than one SQLiteDb, but often they are 1:1.
* - the write lock is per-Container, not per-SQLiteDb (which is the reason they are often 1:1)
* - the accessToken (a SAS key) member provides time limited, restricted, access to the container. It must be refreshed before it expires.
Expand Down Expand Up @@ -693,6 +693,16 @@ export namespace CloudSqlite {
promise: Promise<boolean>;
}

/**
* Determine if error is a "transfer already completed" error.
* @param err Any thrown error
* @returns true if the error is a "transfer already completed" error, or false otherwise.
* @internal
*/
function isTransferAlreadyCompletedError(err: any): boolean {
return ITwinError.isError(err, "imodel-native", "BadArg") && err.message === "transfer already completed";
Comment thread
tcobbs-bentley marked this conversation as resolved.
}

/**
* Clean any unused deleted blocks from cloud storage. Unused deleted blocks can accumulate in cloud storage in a couple of ways:
* 1) When a database is updated, a subset of its blocks are replaced by new versions, sometimes leaving the originals unused.
Expand All @@ -704,19 +714,35 @@ export namespace CloudSqlite {
*/
export async function cleanDeletedBlocks(container: CloudContainer, options: CleanDeletedBlocksOptions): Promise<void> {
let timer: NodeJS.Timeout | undefined;
const intervalPromises = new Set<Promise<void>>();
try {
const cleanJob = new NativeLibrary.nativeLib.CancellableCloudSqliteJob("cleanup", container, options);
let total = 0;
const onProgress = options?.onProgress;
if (onProgress) {
timer = setInterval(async () => { // set an interval timer to show progress every 250ms
const progress = cleanJob.getProgress();
total = progress.total;
const result = await onProgress(progress.loaded, progress.total);
if (result === 1)
cleanJob.stopAndSaveProgress();
else if (result !== 0)
cleanJob.cancelTransfer();
timer = setInterval(() => { // set an interval timer to show progress every 250ms
void intervalWrapper(intervalPromises, async () => {
try {
const progress = cleanJob.getProgress();
total = progress.total;
const result = await onProgress(progress.loaded, progress.total);
if (result === 1)
cleanJob.stopAndSaveProgress();
else if (result !== 0)
cleanJob.cancelTransfer();
} catch (err: any) {
if (timer) {
clearInterval(timer);
timer = undefined;
}
// A race condition exists where cleanJob has completed but the timer has not yet been cleared, or it
// completes while we are waiting on the onProgress callback. If this happens, we will get an error from any
// function call made to cleanJob after the job is done. In that case, just ignore the error.
if (isTransferAlreadyCompletedError(err))
return;
throw err;
}
});
}, 250);
}
await cleanJob.promise;
Expand All @@ -730,6 +756,9 @@ export namespace CloudSqlite {
} finally {
if (timer)
clearInterval(timer);
if (intervalPromises.size > 0) {
await Promise.all(intervalPromises);
}
Comment thread
tcobbs-bentley marked this conversation as resolved.
Outdated
}
}

Expand All @@ -739,16 +768,32 @@ export namespace CloudSqlite {
mkdirSync(dirname(props.localFileName), { recursive: true }); // make sure the directory exists before starting download

let timer: NodeJS.Timeout | undefined;
const intervalPromises = new Set<Promise<void>>();
try {
const transfer = new NativeLibrary.nativeLib.CancellableCloudSqliteJob(direction, container, props);
let total = 0;
const onProgress = props.onProgress;
if (onProgress) {
timer = setInterval(async () => { // set an interval timer to show progress every 250ms
const progress = transfer.getProgress();
total = progress.total;
if (onProgress(progress.loaded, progress.total))
transfer.cancelTransfer();
timer = setInterval(() => { // set an interval timer to show progress every 250ms
void intervalWrapper(intervalPromises, async () => {
try {
const progress = transfer.getProgress();
total = progress.total;
if (onProgress(progress.loaded, progress.total))
transfer.cancelTransfer();
} catch (err: any) {
if (timer) {
clearInterval(timer);
timer = undefined;
}
// A race condition exists where transfer has completed but the timer has not yet been cleared. If this
// happens, we will get an error from any function call made to transfer after the job is done. In that
// case, just ignore the error.
if (isTransferAlreadyCompletedError(err))
return;
Comment thread
tcobbs-bentley marked this conversation as resolved.
Outdated
throw err;
}
});
}, 250);
}
await transfer.promise;
Expand All @@ -761,7 +806,9 @@ export namespace CloudSqlite {
} finally {
if (timer)
clearInterval(timer);

if (intervalPromises.size > 0) {
await Promise.all(intervalPromises);
}
Comment thread
tcobbs-bentley marked this conversation as resolved.
Outdated
}
}

Expand Down
49 changes: 49 additions & 0 deletions core/bentley/src/UtilityFunctions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Bentley Systems, Incorporated. All rights reserved.
* See LICENSE.md in the project root for license terms and full copyright notice.
*--------------------------------------------------------------------------------------------*/
/** @packageDocumentation
* @module Utils
*/

/**
* Wrapper function designed to be used for callbacks called by setInterval in order to propagate any exceptions
* thrown in the callback to the main promise chain. It does this by creating a new promise for the callback
* invocation and adding it to the intervalPromises set. The main promise chain can then await
* Promise.all(intervalPromises) to catch any exceptions thrown in any of the callbacks. Note that if the callback
* completes successfully, the promise is resolved and removed from the set. If it throws an exception, the promise is
* rejected but not removed from the set, so that the main promise chain can detect that an error occurred and handle
* it appropriately.
* @param intervalPromises A set of promises representing the currently active interval callbacks.
* @param callback The async callback to be executed within the interval.
* @beta
*/
export async function intervalWrapper(intervalPromises: Set<Promise<void>>, callback: () => Promise<void>) {
Comment thread
ben-polinsky marked this conversation as resolved.
Outdated
let resolvePromise: (() => void) | undefined;
let rejectPromise: ((reason?: any) => void) | undefined;

// The callback of the Promise constructor does not have access to the promise itself, so all we do there is
// capture the resolve and reject functions for use in the async code below that would have otherwise been
// placed in the setInterval callback.
const intervalPromise = new Promise<void>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});
// Note: when we get here, resolvePromise and rejectPromise will always be defined, but there is no way to
// convince TS of that fact without extra unnecessary checks, so we use ?. when accessing them.

intervalPromises.add(intervalPromise);

const cleanupAndResolve = () => {
resolvePromise?.();
// No need to keep track of this promise anymore since it's resolved.
intervalPromises.delete(intervalPromise);
};

try {
await callback();
cleanupAndResolve();
} catch (err) {
rejectPromise?.(err);
}
}
1 change: 1 addition & 0 deletions core/bentley/src/core-bentley.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export * from "./Tracing";
export * from "./TupleKeyedMap";
export * from "./TypedArrayBuilder";
export * from "./UnexpectedErrors";
export * from "./UtilityFunctions";
export * from "./UtilityTypes";
export * from "./YieldManager";

Expand Down
95 changes: 94 additions & 1 deletion full-stack-tests/backend/src/integration/CloudSqlite.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ describe("CloudSqlite", () => {
newCache2.destroy();
});

it("should be able to interrupt cleanDeletedBlocks operation", async () => {
const prepareDbForCleaningBlocks = async () => {
const cache = azSqlite.makeCache("clean-blocks-cache");
const container = testContainers[0];

Expand Down Expand Up @@ -600,6 +600,11 @@ describe("CloudSqlite", () => {

expect(container.garbageBlocks).to.be.greaterThan(0);
const garbageBlocksPrev = container.garbageBlocks;
return { container, garbageBlocksPrev };
Comment thread
tcobbs-bentley marked this conversation as resolved.
};

it("should be able to interrupt cleanDeletedBlocks operation", async () => {
const { container, garbageBlocksPrev } = await prepareDbForCleaningBlocks();

// cleanDeletedBlocks defaults to an nSeconds of 3600, so we expect to keep our garbage blocks, because they are less than 3600 seconds old.
await CloudSqlite.cleanDeletedBlocks(container, {});
Expand Down Expand Up @@ -653,6 +658,94 @@ describe("CloudSqlite", () => {
container.disconnect({ detach: true });
});

it("slow cleanDeletedBlocks onProgress should not cause race condition", async () => {
const { container } = await prepareDbForCleaningBlocks();

let progressCalled = false;
let progressWaited = false;
const onProgress = async () => {
if (!progressCalled) {
progressCalled = true;
await BeDuration.wait(2000); // simulate a long onProgress callback that takes 2000ms to complete.
// await new Promise((resolve) => clock.setTimeout(resolve, 2000)); // simulate a long onProgress callback that takes 2000ms to complete.
progressWaited = true; // We have to wait for onProgress to finish before we can be sure that cleanDeletedBlocks
Comment thread
tcobbs-bentley marked this conversation as resolved.
// didn't throw an error after our return.
return 2; // return a number greater than 1 to abort the cleanDeletedBlocks operation.
}
return 0; // return 0 to not abort the cleanDeletedBlocks operation.
};

// In the past, cleanDeletedBlocks would occasionally throw an error inside a setInterval handler. This is because a
// race condition could cause the handler to still be called after cleanDeletedBlocks had already resolved and
// cleaned up its resources, including the container it was operating on. If the handler was called after
// cleanDeletedBlocks had resolved, or if onProgress took a long time and returned non-zero, then it would try to
// access the container that was already cleaned up, which would cause an error to be thrown. This race condition
// could be forced by having a slow onProgress handler return non-zero. This test makes sure that even if onProgress
// is slow, we don't have unhandled rejections or uncaught exceptions. The race condition was fixed, but we don't
// want to accidentally regress it, so this test will stay.
//
// Note: we would like to add a similar test for transferDb, but the progress callback for transferDb is not async,
// so there is no way to simulate a long onProgress callback that would cause the race condition.
const unhandledRejections: Array<any> = [];
const rejectionHandler = (reason: any) => {
unhandledRejections.push(reason);
};
process.on("unhandledRejection", rejectionHandler);

// Capture uncaught exceptions
const uncaughtExceptions: Error[] = [];
const exceptionHandler = (error: Error) => {
uncaughtExceptions.push(error);
};
process.on("uncaughtException", exceptionHandler);

// Faking the interval setup in cleanDeletedBlocks.
const clock = sinon.useFakeTimers({ toFake: ["setInterval"], shouldAdvanceTime: true, advanceTimeDelta: 1 });

try {
let resolved = false;
let cleanDeletedBlocksError: any;
CloudSqlite.cleanDeletedBlocks(container, { nSeconds: 0, findOrphanedBlocks: true, onProgress }).then(() => {
resolved = true;
}).catch((err) => {
resolved = true;
cleanDeletedBlocksError = err;
});

while (!resolved || !progressWaited) {
await clock.tickAsync(250);
await new Promise((resolve) => clock.setTimeout(resolve, 1));
}
// Give a bit more time for any async errors to surface
await clock.tickAsync(100);
await new Promise((resolve) => setImmediate(resolve));
clock.reset();
clock.restore();

if (cleanDeletedBlocksError) {
throw cleanDeletedBlocksError; // cleanDeletedBlocks should not throw an error, even if onProgress is slow and returns non-zero.
}

// Check for unhandled errors
if (unhandledRejections.length > 0) {
throw new Error(`Unhandled rejection detected: ${unhandledRejections[0]}`);
}
if (uncaughtExceptions.length > 0) {
throw new Error(`Uncaught exception detected: ${uncaughtExceptions[0].message}`);
}

container.checkForChanges();
expect(container.garbageBlocks).to.be.equal(0); // we should have successfully cleaned our garbage blocks, because of slow onProgress.

} finally {
clock.restore();
process.off("unhandledRejection", rejectionHandler);
Comment thread
tcobbs-bentley marked this conversation as resolved.
process.off("uncaughtException", exceptionHandler);
container.releaseWriteLock();
container.disconnect({ detach: true });
}
});

/** make sure that the auto-refresh for container tokens happens every hour */
it("Auto refresh container tokens", async () => {
const contain1 = testContainers[0];
Expand Down
Loading