Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/api/core-bentley.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,9 @@ export function using<T extends IDisposable, TResult>(resources: T | T[], func:
// @public
export function utf8ToString(utf8: Uint8Array): string | undefined;

// @beta
export function wrapTimerCallback(timerPromises: Set<Promise<void>>, callback: () => Promise<void>): Promise<void>;

// @public
export class YieldManager {
constructor(options?: YieldManagerOptions);
Expand Down
1 change: 1 addition & 0 deletions common/api/summary/core-bentley.exports.csv
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,6 @@ public;class;UnexpectedErrors
public;function;using
deprecated;function;using
public;function;utf8ToString
beta;function;wrapTimerCallback
public;class;YieldManager
public;interface;YieldManagerOptions
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@itwin/core-backend",
"comment": "",
"type": "none"
}
],
"packageName": "@itwin/core-backend"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@itwin/core-bentley",
"comment": "Added wrapTimerCallback utility function.",
"type": "none"
}
],
"packageName": "@itwin/core-bentley"
}
85 changes: 69 additions & 16 deletions core/backend/src/CloudSqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { mkdirSync, unlinkSync } from "fs";
import { dirname, join } from "path";
import { NativeLibrary } from "@bentley/imodeljs-native";
import {
AccessToken, BeDuration, BriefcaseStatus, Constructor, GuidString, Logger, LogLevel, OpenMode, Optional, PickAsyncMethods, PickMethods, StopWatch,
AccessToken, BeDuration, BriefcaseStatus, Constructor, GuidString, ITwinError, Logger, LogLevel, OpenMode, Optional, PickAsyncMethods, PickMethods, StopWatch, wrapTimerCallback,
} from "@itwin/core-bentley";
import { CloudSqliteError, LocalDirName, LocalFileName } from "@itwin/core-common";
import { BlobContainer } from "./BlobContainerService";
Expand Down Expand Up @@ -496,7 +496,7 @@ export namespace CloudSqlite {
* Notes:
* - all methods and accessors of this interface (other than `initializeContainer`) require that the `connect` method be successfully called first.
* Otherwise they will throw an exception or return meaningless values.
* - before a SQLiteDb in a container may be opened for write access, the container's write lock must be held (see [[acquireWriteLock]].)
* - before a SQLiteDb in a container may be opened for write access, the container's write lock must be held (see [[acquireWriteLock]]).
* - a single CloudContainer may hold more than one SQLiteDb, but often they are 1:1.
* - the write lock is per-Container, not per-SQLiteDb (which is the reason they are often 1:1)
* - the accessToken (a SAS key) member provides time limited, restricted, access to the container. It must be refreshed before it expires.
Expand Down Expand Up @@ -693,6 +693,16 @@ export namespace CloudSqlite {
promise: Promise<boolean>;
}

/**
* Determine if error is a "transfer already completed" error.
* @param err Any thrown error
* @returns true if the error is a "transfer already completed" error, or false otherwise.
* @internal
*/
function isTransferAlreadyCompletedError(err: any): boolean {
return ITwinError.isError(err, "imodel-native", "BadArg") && err.message === "transfer already completed";
}

/**
* Clean any unused deleted blocks from cloud storage. Unused deleted blocks can accumulate in cloud storage in a couple of ways:
* 1) When a database is updated, a subset of its blocks are replaced by new versions, sometimes leaving the originals unused.
Expand All @@ -704,19 +714,35 @@ export namespace CloudSqlite {
*/
export async function cleanDeletedBlocks(container: CloudContainer, options: CleanDeletedBlocksOptions): Promise<void> {
let timer: NodeJS.Timeout | undefined;
const intervalPromises = new Set<Promise<void>>();
try {
const cleanJob = new NativeLibrary.nativeLib.CancellableCloudSqliteJob("cleanup", container, options);
let total = 0;
const onProgress = options?.onProgress;
if (onProgress) {
timer = setInterval(async () => { // set an interval timer to show progress every 250ms
const progress = cleanJob.getProgress();
total = progress.total;
const result = await onProgress(progress.loaded, progress.total);
if (result === 1)
cleanJob.stopAndSaveProgress();
else if (result !== 0)
cleanJob.cancelTransfer();
timer = setInterval(() => { // set an interval timer to show progress every 250ms
void wrapTimerCallback(intervalPromises, async () => {
try {
const progress = cleanJob.getProgress();
total = progress.total;
const result = await onProgress(progress.loaded, progress.total);
if (result === 1)
cleanJob.stopAndSaveProgress();
else if (result !== 0)
cleanJob.cancelTransfer();
} catch (err: any) {
if (timer) {
clearInterval(timer);
timer = undefined;
}
// A race condition exists where cleanJob has completed but the timer has not yet been cleared, or it
// completes while we are waiting on the onProgress callback. If this happens, we will get an error from any
// function call made to cleanJob after the job is done. In that case, just ignore the error.
if (isTransferAlreadyCompletedError(err))
return;
throw err;
}
});
}, 250);
}
await cleanJob.promise;
Expand All @@ -731,6 +757,12 @@ export namespace CloudSqlite {
if (timer)
clearInterval(timer);
}
// Note: if an error is thrown before we get here then we don't care about any possible errors from the interval
// callbacks, so we don't await the promises in that case. If we do get here, then we want to await any remaining
// promises to ensure all callbacks have completed before this function returns.
if (intervalPromises.size > 0) {
await Promise.all(intervalPromises);
}
}

/** @internal */
Expand All @@ -739,16 +771,32 @@ export namespace CloudSqlite {
mkdirSync(dirname(props.localFileName), { recursive: true }); // make sure the directory exists before starting download

let timer: NodeJS.Timeout | undefined;
const intervalPromises = new Set<Promise<void>>();
try {
const transfer = new NativeLibrary.nativeLib.CancellableCloudSqliteJob(direction, container, props);
let total = 0;
const onProgress = props.onProgress;
if (onProgress) {
timer = setInterval(async () => { // set an interval timer to show progress every 250ms
const progress = transfer.getProgress();
total = progress.total;
if (onProgress(progress.loaded, progress.total))
transfer.cancelTransfer();
timer = setInterval(() => { // set an interval timer to show progress every 250ms
void wrapTimerCallback(intervalPromises, async () => {
try {
const progress = transfer.getProgress();
total = progress.total;
if (onProgress(progress.loaded, progress.total))
transfer.cancelTransfer();
} catch (err: any) {
if (timer) {
clearInterval(timer);
timer = undefined;
}
// A race condition exists where transfer has completed but the timer has not yet been cleared. If this
// happens, we will get an error from any function call made to transfer after the job is done. In that
// case, just ignore the error.
if (isTransferAlreadyCompletedError(err))
return;
throw err;
}
});
}, 250);
}
await transfer.promise;
Expand All @@ -761,7 +809,12 @@ export namespace CloudSqlite {
} finally {
if (timer)
clearInterval(timer);

}
// Note: if an error is thrown before we get here then we don't care about any possible errors from the interval
// callbacks, so we don't await the promises in that case. If we do get here, then we want to await any remaining
// promises to ensure all callbacks have completed before this function returns.
if (intervalPromises.size > 0) {
await Promise.all(intervalPromises);
}
}

Expand Down
53 changes: 53 additions & 0 deletions core/bentley/src/UtilityFunctions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Bentley Systems, Incorporated. All rights reserved.
* See LICENSE.md in the project root for license terms and full copyright notice.
*--------------------------------------------------------------------------------------------*/
/** @packageDocumentation
* @module Utils
*/

/**
* Wrapper function designed to be used for callbacks called by setInterval or setTimeout in order to propagate any
* exceptions thrown in the callback to the main promise chain. It does this by creating a new promise for the callback
* invocation and adding it to the timerPromises set. The main promise chain can then await
* Promise.all(timerPromises) to catch any exceptions thrown in any of the callbacks. Note that if the callback
* completes successfully, the promise is resolved and removed from the set. If it throws an exception, the promise is
* rejected but not removed from the set, so that the main promise chain can detect that an error occurred and handle
* it appropriately.
* @param timerPromises A set of promises representing the currently active timer callbacks.
* @param callback The async callback to be executed within the timer.
* @beta
*/
export async function wrapTimerCallback(timerPromises: Set<Promise<void>>, callback: () => Promise<void>) {
let resolvePromise: (() => void) | undefined;
let rejectPromise: ((reason?: any) => void) | undefined;

// The callback of the Promise constructor does not have access to the promise itself, so all we do there is
// capture the resolve and reject functions for use in the async callback that would have otherwise been
// placed in the setInterval or setTimeout callback.
const timerPromise = new Promise<void>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});
// Note: when we get here, resolvePromise and rejectPromise will always be defined, but there is no way to
// convince TS of that fact without extra unnecessary checks, so we use ?. when accessing them.

// Prevent unhandled rejection warnings. The rejection is still observable
// when the consumer awaits Promise.all(promises).
timerPromise.catch(() => {});

timerPromises.add(timerPromise);

const cleanupAndResolve = () => {
resolvePromise?.();
// No need to keep track of this promise anymore since it's resolved.
timerPromises.delete(timerPromise);
};

try {
await callback();
cleanupAndResolve();
} catch (err) {
rejectPromise?.(err);
}
}
1 change: 1 addition & 0 deletions core/bentley/src/core-bentley.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export * from "./Tracing";
export * from "./TupleKeyedMap";
export * from "./TypedArrayBuilder";
export * from "./UnexpectedErrors";
export * from "./UtilityFunctions";
export * from "./UtilityTypes";
export * from "./YieldManager";

Expand Down
60 changes: 60 additions & 0 deletions core/bentley/src/test/UtilityFunctions.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Bentley Systems, Incorporated. All rights reserved.
* See LICENSE.md in the project root for license terms and full copyright notice.
*--------------------------------------------------------------------------------------------*/
import { describe, expect, it } from "vitest";
import { wrapTimerCallback } from "../UtilityFunctions";

describe("wrapTimerCallback", () => {
it("resolves and removes promise from set on successful callback", async () => {
const promises = new Set<Promise<void>>();
await wrapTimerCallback(promises, async () => {});
expect(promises.size).to.equal(0);
});

it("rejects and keeps promise in set on failed callback", async () => {
const promises = new Set<Promise<void>>();
const error = new Error("test error");

await wrapTimerCallback(promises, async () => { throw error; });

expect(promises.size).to.equal(1);
try {
await Promise.all(promises);
expect.fail("should have thrown");
} catch (err) {
expect(err).to.equal(error);
}
});

it("adds promise to set before callback executes", async () => {
const promises = new Set<Promise<void>>();
let sizeInsideCallback = 0;

await wrapTimerCallback(promises, async () => {
sizeInsideCallback = promises.size;
});

expect(sizeInsideCallback).to.equal(1);
expect(promises.size).to.equal(0);
});

it("handles multiple successful wrappers sequentially", async () => {
const promises = new Set<Promise<void>>();

await wrapTimerCallback(promises, async () => {});
await wrapTimerCallback(promises, async () => {});

expect(promises.size).to.equal(0);
});

it("handles mix of successful and failed callbacks", async () => {
const promises = new Set<Promise<void>>();

await wrapTimerCallback(promises, async () => {});
await wrapTimerCallback(promises, async () => { throw new Error("fail"); });
await wrapTimerCallback(promises, async () => {});

expect(promises.size).to.equal(1);
});
});
Loading
Loading