From b839d6637c350146df1d9991075f59fd06635984 Mon Sep 17 00:00:00 2001 From: Amine Date: Thu, 21 May 2026 20:04:26 +0800 Subject: [PATCH] feat: enhance AttachmentQueue to release mutex per attachment and allow mid-batch interruption - Implemented incremental persistence of attachment states during sync. - Updated `stopSync` to abort in-flight syncs promptly. - Adjusted `processAttachments` to commit state changes immediately after each attachment's I/O operation, improving responsiveness for concurrent operations. --- .changeset/lovely-years-lay.md | 5 + .../common/src/attachments/AttachmentQueue.ts | 46 ++++- .../common/src/attachments/SyncingService.ts | 73 +++++--- packages/node/tests/attachments.test.ts | 164 ++++++++++++++++++ 4 files changed, 260 insertions(+), 28 deletions(-) create mode 100644 .changeset/lovely-years-lay.md diff --git a/.changeset/lovely-years-lay.md b/.changeset/lovely-years-lay.md new file mode 100644 index 000000000..b6f67261f --- /dev/null +++ b/.changeset/lovely-years-lay.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +AttachmentQueue: release the mutex per attachment, persist incrementally, and let `stopSync` interrupt mid-batch so foreground `saveFile` / `deleteFile` aren't blocked behind in-flight uploads or downloads. diff --git a/packages/common/src/attachments/AttachmentQueue.ts b/packages/common/src/attachments/AttachmentQueue.ts index 59fa299be..e86ae6a4c 100644 --- a/packages/common/src/attachments/AttachmentQueue.ts +++ b/packages/common/src/attachments/AttachmentQueue.ts @@ -2,6 +2,7 @@ import { AbstractPowerSyncDatabase } from '../client/AbstractPowerSyncDatabase.j import { DEFAULT_WATCH_THROTTLE_MS } from '../client/watched/WatchedQuery.js'; import { DifferentialWatchedQuery } from '../client/watched/processors/DifferentialQueryProcessor.js'; import { ILogger } from '../utils/Logger.js'; +import { Mutex } from '../utils/mutex.js'; import { Transaction } from '../db/DBAdapter.js'; import { AttachmentData, LocalStorageAdapter } from './LocalStorageAdapter.js'; import { RemoteStorageAdapter } from './RemoteStorageAdapter.js'; @@ -82,6 +83,21 @@ export class AttachmentQueue { private watchAttachmentsAbortController: AbortController; + /** + * Serializes concurrent `syncStorage()` triggers (periodic timer, watch-onDiff, + * status-changed). Held across the whole batch, but only contended by other + * sync triggers — foreground `saveFile` / `deleteFile` / watched-attachment + * processing don't take this lock and proceed in parallel via the + * `AttachmentService` mutex, which is acquired only briefly per row. + */ + private syncLoopMutex = new Mutex(); + + /** + * Aborted by `stopSync()` to interrupt an in-flight batch within one + * attachment's processing time. Polled between rows by `SyncingService`. + */ + private syncAbortController?: AbortController; + /** * Creates a new AttachmentQueue instance. * @@ -164,6 +180,8 @@ export class AttachmentQueue { async startSync(): Promise { await this.stopSync(); + this.syncAbortController = new AbortController(); + this.watchActiveAttachments = this.attachmentService.watchActiveAttachments({ throttleMs: this.syncThrottleDuration }); @@ -293,24 +311,42 @@ export class AttachmentQueue { * * This is called automatically at regular intervals when sync is started, * but can also be called manually to trigger an immediate sync. + * + * Concurrent invocations are serialized via `syncLoopMutex`. */ async syncStorage(): Promise { - await this.attachmentService.withContext(async (ctx) => { - const activeAttachments = await ctx.getActiveAttachments(); + await this.syncLoopMutex.runExclusive(async () => { + const signal = this.syncAbortController?.signal; + if (signal?.aborted) return; + + const activeAttachments = await this.attachmentService.withContext((ctx) => ctx.getActiveAttachments()); await this.localStorage.initialize(); - await this.syncingService.processAttachments(activeAttachments, ctx); - await this.syncingService.deleteArchivedAttachments(ctx); + + await this.syncingService.processAttachments(activeAttachments, { + withContext: (cb) => this.attachmentService.withContext(cb), + isActive: () => !signal?.aborted + }); + + if (signal?.aborted) return; + + await this.attachmentService.withContext((ctx) => this.syncingService.deleteArchivedAttachments(ctx)); }); } /** * Stops the attachment synchronization process. * - * Clears the periodic sync timer and closes all active attachment watchers. + * Clears the periodic sync timer, closes all active attachment watchers, and + * aborts any in-flight `syncStorage()` call so it exits within one + * attachment's processing time instead of running the batch to completion. */ async stopSync(): Promise { clearInterval(this.periodicSyncTimer); this.periodicSyncTimer = undefined; + if (this.syncAbortController) { + this.syncAbortController.abort(); + this.syncAbortController = undefined; + } if (this.watchActiveAttachments) await this.watchActiveAttachments.close(); if (this.watchAttachmentsAbortController) { this.watchAttachmentsAbortController.abort(); diff --git a/packages/common/src/attachments/SyncingService.ts b/packages/common/src/attachments/SyncingService.ts index 1da66c682..77a6bac49 100644 --- a/packages/common/src/attachments/SyncingService.ts +++ b/packages/common/src/attachments/SyncingService.ts @@ -35,35 +35,62 @@ export class SyncingService { /** * Processes attachments based on their state (upload, download, or delete). - * All updates are saved in a single batch after processing. + * + * Each attachment's I/O runs outside the attachment-service mutex, and the row's + * state transition is persisted immediately after it completes. This keeps the + * mutex available to concurrent `saveFile` / `deleteFile` / watched-attachment + * processing while a batch is in flight, and means consumer queries against the + * attachments queue see incremental progress instead of one atomic commit at the + * end of the batch. * * @param attachments - Array of attachment records to process - * @param context - Attachment context for database operations - * @returns Promise that resolves when all attachments have been processed and saved + * @param options.withContext - Briefly acquires the attachment-service mutex. + * Used to persist each row after its I/O completes + * and to run the delete-row transaction. + * @param options.isActive - Polled between attachments; when it returns `false` + * the loop exits early. Used by `stopSync` to interrupt + * a running batch within one attachment's processing + * time. */ - async processAttachments(attachments: AttachmentRecord[], context: AttachmentContext): Promise { - const updatedAttachments: AttachmentRecord[] = []; + async processAttachments( + attachments: AttachmentRecord[], + options: { + withContext: (callback: (context: AttachmentContext) => Promise) => Promise; + isActive?: () => boolean; + } + ): Promise { + const { withContext, isActive } = options; + this.logger.info(`Starting processAttachments with ${attachments.length} attachments`); + for (const attachment of attachments) { - switch (attachment.state) { - case AttachmentState.QUEUED_UPLOAD: - const uploaded = await this.uploadAttachment(attachment); - updatedAttachments.push(uploaded); - break; - case AttachmentState.QUEUED_DOWNLOAD: - const downloaded = await this.downloadAttachment(attachment); - updatedAttachments.push(downloaded); - break; - case AttachmentState.QUEUED_DELETE: - const deleted = await this.deleteAttachment(attachment, context); - updatedAttachments.push(deleted); - break; - - default: - break; + if (isActive && !isActive()) { + this.logger.info('Sync cancelled; stopping iteration early'); + return; } - } - await context.saveAttachments(updatedAttachments); + try { + let updated: AttachmentRecord; + switch (attachment.state) { + case AttachmentState.QUEUED_UPLOAD: + updated = await this.uploadAttachment(attachment); + break; + case AttachmentState.QUEUED_DOWNLOAD: + updated = await this.downloadAttachment(attachment); + break; + case AttachmentState.QUEUED_DELETE: + // `deleteAttachment` needs a context (it removes the row in a + // transaction); briefly re-acquire the mutex for just this row. + updated = await withContext((ctx) => this.deleteAttachment(attachment, ctx)); + break; + default: + continue; + } + + await withContext((ctx) => ctx.saveAttachments([updated])); + } catch (error) { + this.logger.warn(`Error during sync for ${attachment.id}`, error); + } + } } /** diff --git a/packages/node/tests/attachments.test.ts b/packages/node/tests/attachments.test.ts index 15766a7f0..bb8d1edf5 100644 --- a/packages/node/tests/attachments.test.ts +++ b/packages/node/tests/attachments.test.ts @@ -118,6 +118,16 @@ async function* watchAttachmentsTable(): AsyncGenerator { } } +function deferred() { + let resolve!: (v: T) => void; + let reject!: (e: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + async function waitForMatchCondition( iteratorGenerator: () => AsyncGenerator, predicate: (attachments: AttachmentRecord[]) => boolean, @@ -487,4 +497,158 @@ describe('attachment queue', () => { expect(mockErrorHandler).toHaveBeenCalledOnce(); expect(mockDownloadFile).toHaveBeenCalledTimes(2); }); + + it( + 'stopSync should interrupt an in-flight batch without waiting for the current network call', + { timeout: 10000 }, + async () => { + const firstDownloadStarted = deferred(); + const firstDownloadGate = deferred(); + const slowDownload = vi.fn().mockImplementation(() => { + firstDownloadStarted.resolve(); + return firstDownloadGate.promise; + }); + + const slowQueue = new AttachmentQueue({ + db, + watchAttachments, + remoteStorage: { downloadFile: slowDownload, uploadFile: mockUploadFile, deleteFile: mockDeleteFile }, + localStorage: mockLocalStorage, + syncIntervalMs: INTERVAL_MILLISECONDS, + archivedCacheLimit: 0 + }); + + await slowQueue.startSync(); + + // Queue three downloads. + for (let i = 0; i < 3; i++) { + await db.execute('INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), ?, ?, uuid())', [ + `user${i}`, + `user${i}@example.com` + ]); + } + + // Wait until the first download is actually in flight. + await firstDownloadStarted.promise; + + // stopSync must return promptly even though a download is stuck. + const stopStart = Date.now(); + await slowQueue.stopSync(); + expect(Date.now() - stopStart).toBeLessThan(500); + + // Release the stuck download so the worker can observe the abort and exit cleanly. + firstDownloadGate.resolve(createMockJpegBuffer()); + + // Give the loop a tick to notice the abort. No further downloads should be attempted. + await new Promise((r) => setImmediate(r)); + expect(slowDownload).toHaveBeenCalledTimes(1); + } + ); + + it( + 'attachments queue should commit per-attachment, not at end of batch', + { timeout: 10000 }, + async () => { + const gates: Array>> = [deferred(), deferred()]; + const downloadIds: string[] = []; + const slowDownload = vi.fn().mockImplementation((attachment: AttachmentRecord) => { + const idx = downloadIds.length; + downloadIds.push(attachment.id); + return gates[idx]?.promise ?? Promise.resolve(createMockJpegBuffer()); + }); + + const slowQueue = new AttachmentQueue({ + db, + watchAttachments, + remoteStorage: { downloadFile: slowDownload, uploadFile: mockUploadFile, deleteFile: mockDeleteFile }, + localStorage: mockLocalStorage, + syncIntervalMs: INTERVAL_MILLISECONDS, + archivedCacheLimit: 0 + }); + + await slowQueue.startSync(); + + // Queue two downloads. + for (let i = 0; i < 2; i++) { + await db.execute('INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), ?, ?, uuid())', [ + `user${i}`, + `user${i}@example.com` + ]); + } + + // Wait until the first download is in flight. + await waitForMatchCondition( + () => watchAttachmentsTable(), + () => slowDownload.mock.calls.length >= 1, + 5 + ); + + // Resolve only the first download. The second is still pending. + gates[0].resolve(createMockJpegBuffer()); + + // The first attachment must transition to SYNCED while the second is still + // QUEUED_DOWNLOAD — i.e. the commit happened mid-batch, not at the end. + await waitForMatchCondition( + () => watchAttachmentsTable(), + (rows) => { + const first = rows.find((r) => r.id === downloadIds[0]); + const others = rows.filter((r) => r.id !== downloadIds[0]); + return ( + first?.state === AttachmentState.SYNCED && + others.some((r) => r.state === AttachmentState.QUEUED_DOWNLOAD) + ); + }, + 5 + ); + + // Clean up: release the second so the worker drains and stopSync finishes. + gates[1].resolve(createMockJpegBuffer()); + await slowQueue.stopSync(); + } + ); + + it( + 'saveFile should not be blocked by an in-flight sync batch', + { timeout: 10000 }, + async () => { + const downloadStarted = deferred(); + const downloadGate = deferred(); + const slowDownload = vi.fn().mockImplementation(() => { + downloadStarted.resolve(); + return downloadGate.promise; + }); + + const slowQueue = new AttachmentQueue({ + db, + watchAttachments, + remoteStorage: { downloadFile: slowDownload, uploadFile: mockUploadFile, deleteFile: mockDeleteFile }, + localStorage: mockLocalStorage, + syncIntervalMs: INTERVAL_MILLISECONDS, + archivedCacheLimit: 0 + }); + + await slowQueue.startSync(); + + // Queue a download that will hang. + await db.execute('INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), ?, ?, uuid())', [ + 'downloader', + 'downloader@example.com' + ]); + + await downloadStarted.promise; + + // saveFile must complete promptly even while a download is in flight. + const saveStart = Date.now(); + const saved = await slowQueue.saveFile({ + data: new Uint8Array(10).fill(7).buffer, + fileExtension: 'jpg' + }); + expect(Date.now() - saveStart).toBeLessThan(500); + expect(saved.state).toBe(AttachmentState.QUEUED_UPLOAD); + + // Clean up. + downloadGate.resolve(createMockJpegBuffer()); + await slowQueue.stopSync(); + } + ); });