Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lovely-years-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

AttachmentQueue: release the mutex per attachment, persist incrementally, and let `stopSync` interrupt mid-batch so foreground `saveFile` / `deleteFile` aren't blocked behind in-flight uploads or downloads.
46 changes: 41 additions & 5 deletions packages/common/src/attachments/AttachmentQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { AbstractPowerSyncDatabase } from '../client/AbstractPowerSyncDatabase.j
import { DEFAULT_WATCH_THROTTLE_MS } from '../client/watched/WatchedQuery.js';
import { DifferentialWatchedQuery } from '../client/watched/processors/DifferentialQueryProcessor.js';
import { ILogger } from '../utils/Logger.js';
import { Mutex } from '../utils/mutex.js';
import { Transaction } from '../db/DBAdapter.js';
import { AttachmentData, LocalStorageAdapter } from './LocalStorageAdapter.js';
import { RemoteStorageAdapter } from './RemoteStorageAdapter.js';
Expand Down Expand Up @@ -82,6 +83,21 @@ export class AttachmentQueue {

private watchAttachmentsAbortController: AbortController;

/**
* Serializes concurrent `syncStorage()` triggers (periodic timer, watch-onDiff,
* status-changed). Held across the whole batch, but only contended by other
* sync triggers — foreground `saveFile` / `deleteFile` / watched-attachment
* processing don't take this lock and proceed in parallel via the
* `AttachmentService` mutex, which is acquired only briefly per row.
*/
private syncLoopMutex = new Mutex();

/**
* Aborted by `stopSync()` to interrupt an in-flight batch within one
* attachment's processing time. Polled between rows by `SyncingService`.
*/
private syncAbortController?: AbortController;

/**
* Creates a new AttachmentQueue instance.
*
Expand Down Expand Up @@ -164,6 +180,8 @@ export class AttachmentQueue {
async startSync(): Promise<void> {
await this.stopSync();

this.syncAbortController = new AbortController();

this.watchActiveAttachments = this.attachmentService.watchActiveAttachments({
throttleMs: this.syncThrottleDuration
});
Expand Down Expand Up @@ -293,24 +311,42 @@ export class AttachmentQueue {
*
* This is called automatically at regular intervals when sync is started,
* but can also be called manually to trigger an immediate sync.
*
* Concurrent invocations are serialized via `syncLoopMutex`.
*/
async syncStorage(): Promise<void> {
await this.attachmentService.withContext(async (ctx) => {
const activeAttachments = await ctx.getActiveAttachments();
await this.syncLoopMutex.runExclusive(async () => {
const signal = this.syncAbortController?.signal;
if (signal?.aborted) return;

const activeAttachments = await this.attachmentService.withContext((ctx) => ctx.getActiveAttachments());
await this.localStorage.initialize();
await this.syncingService.processAttachments(activeAttachments, ctx);
await this.syncingService.deleteArchivedAttachments(ctx);

await this.syncingService.processAttachments(activeAttachments, {
withContext: (cb) => this.attachmentService.withContext(cb),
isActive: () => !signal?.aborted
});

if (signal?.aborted) return;

await this.attachmentService.withContext((ctx) => this.syncingService.deleteArchivedAttachments(ctx));
});
}

/**
* Stops the attachment synchronization process.
*
* Clears the periodic sync timer and closes all active attachment watchers.
* Clears the periodic sync timer, closes all active attachment watchers, and
* aborts any in-flight `syncStorage()` call so it exits within one
* attachment's processing time instead of running the batch to completion.
*/
async stopSync(): Promise<void> {
clearInterval(this.periodicSyncTimer);
this.periodicSyncTimer = undefined;
if (this.syncAbortController) {
this.syncAbortController.abort();
this.syncAbortController = undefined;
}
if (this.watchActiveAttachments) await this.watchActiveAttachments.close();
if (this.watchAttachmentsAbortController) {
this.watchAttachmentsAbortController.abort();
Expand Down
73 changes: 50 additions & 23 deletions packages/common/src/attachments/SyncingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,62 @@ export class SyncingService {

/**
* Processes attachments based on their state (upload, download, or delete).
* All updates are saved in a single batch after processing.
*
* Each attachment's I/O runs outside the attachment-service mutex, and the row's
* state transition is persisted immediately after it completes. This keeps the
* mutex available to concurrent `saveFile` / `deleteFile` / watched-attachment
* processing while a batch is in flight, and means consumer queries against the
* attachments queue see incremental progress instead of one atomic commit at the
* end of the batch.
*
* @param attachments - Array of attachment records to process
* @param context - Attachment context for database operations
* @returns Promise that resolves when all attachments have been processed and saved
* @param options.withContext - Briefly acquires the attachment-service mutex.
* Used to persist each row after its I/O completes
* and to run the delete-row transaction.
* @param options.isActive - Polled between attachments; when it returns `false`
* the loop exits early. Used by `stopSync` to interrupt
* a running batch within one attachment's processing
* time.
*/
async processAttachments(attachments: AttachmentRecord[], context: AttachmentContext): Promise<void> {
const updatedAttachments: AttachmentRecord[] = [];
async processAttachments(
attachments: AttachmentRecord[],
options: {
withContext: <T>(callback: (context: AttachmentContext) => Promise<T>) => Promise<T>;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since SyncingService already receives AttachmentService in its constructor, do we need to pass withContext into processAttachments as an option? It seems like SyncingService could call this.attachmentService.withContext(...) directly when it needs an AttachmentContext.

isActive?: () => boolean;
}
): Promise<void> {
const { withContext, isActive } = options;
this.logger.info(`Starting processAttachments with ${attachments.length} attachments`);

for (const attachment of attachments) {
switch (attachment.state) {
case AttachmentState.QUEUED_UPLOAD:
const uploaded = await this.uploadAttachment(attachment);
updatedAttachments.push(uploaded);
break;
case AttachmentState.QUEUED_DOWNLOAD:
const downloaded = await this.downloadAttachment(attachment);
updatedAttachments.push(downloaded);
break;
case AttachmentState.QUEUED_DELETE:
const deleted = await this.deleteAttachment(attachment, context);
updatedAttachments.push(deleted);
break;

default:
break;
if (isActive && !isActive()) {
this.logger.info('Sync cancelled; stopping iteration early');
return;
}
}

await context.saveAttachments(updatedAttachments);
try {
let updated: AttachmentRecord;
switch (attachment.state) {
case AttachmentState.QUEUED_UPLOAD:
updated = await this.uploadAttachment(attachment);
break;
case AttachmentState.QUEUED_DOWNLOAD:
updated = await this.downloadAttachment(attachment);
break;
case AttachmentState.QUEUED_DELETE:
// `deleteAttachment` needs a context (it removes the row in a
// transaction); briefly re-acquire the mutex for just this row.
updated = await withContext((ctx) => this.deleteAttachment(attachment, ctx));
break;
default:
continue;
}

await withContext((ctx) => ctx.saveAttachments([updated]));
} catch (error) {
this.logger.warn(`Error during sync for ${attachment.id}`, error);
}
}
}

/**
Expand Down
164 changes: 164 additions & 0 deletions packages/node/tests/attachments.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ async function* watchAttachmentsTable(): AsyncGenerator<AttachmentRecord[]> {
}
}

function deferred<T = void>() {
let resolve!: (v: T) => void;
let reject!: (e: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}

async function waitForMatchCondition(
iteratorGenerator: () => AsyncGenerator<AttachmentRecord[]>,
predicate: (attachments: AttachmentRecord[]) => boolean,
Expand Down Expand Up @@ -487,4 +497,158 @@ describe('attachment queue', () => {
expect(mockErrorHandler).toHaveBeenCalledOnce();
expect(mockDownloadFile).toHaveBeenCalledTimes(2);
});

it(
'stopSync should interrupt an in-flight batch without waiting for the current network call',
{ timeout: 10000 },
async () => {
const firstDownloadStarted = deferred();
const firstDownloadGate = deferred<ArrayBuffer>();
const slowDownload = vi.fn().mockImplementation(() => {
firstDownloadStarted.resolve();
return firstDownloadGate.promise;
});

const slowQueue = new AttachmentQueue({
db,
watchAttachments,
remoteStorage: { downloadFile: slowDownload, uploadFile: mockUploadFile, deleteFile: mockDeleteFile },
localStorage: mockLocalStorage,
syncIntervalMs: INTERVAL_MILLISECONDS,
archivedCacheLimit: 0
});

await slowQueue.startSync();

// Queue three downloads.
for (let i = 0; i < 3; i++) {
await db.execute('INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), ?, ?, uuid())', [
`user${i}`,
`user${i}@example.com`
]);
}

// Wait until the first download is actually in flight.
await firstDownloadStarted.promise;

// stopSync must return promptly even though a download is stuck.
const stopStart = Date.now();
await slowQueue.stopSync();
expect(Date.now() - stopStart).toBeLessThan(500);

// Release the stuck download so the worker can observe the abort and exit cleanly.
firstDownloadGate.resolve(createMockJpegBuffer());

// Give the loop a tick to notice the abort. No further downloads should be attempted.
await new Promise((r) => setImmediate(r));
expect(slowDownload).toHaveBeenCalledTimes(1);
}
);

it(
'attachments queue should commit per-attachment, not at end of batch',
{ timeout: 10000 },
async () => {
const gates: Array<ReturnType<typeof deferred<ArrayBuffer>>> = [deferred<ArrayBuffer>(), deferred<ArrayBuffer>()];
const downloadIds: string[] = [];
const slowDownload = vi.fn().mockImplementation((attachment: AttachmentRecord) => {
const idx = downloadIds.length;
downloadIds.push(attachment.id);
return gates[idx]?.promise ?? Promise.resolve(createMockJpegBuffer());
});

const slowQueue = new AttachmentQueue({
db,
watchAttachments,
remoteStorage: { downloadFile: slowDownload, uploadFile: mockUploadFile, deleteFile: mockDeleteFile },
localStorage: mockLocalStorage,
syncIntervalMs: INTERVAL_MILLISECONDS,
archivedCacheLimit: 0
});

await slowQueue.startSync();

// Queue two downloads.
for (let i = 0; i < 2; i++) {
await db.execute('INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), ?, ?, uuid())', [
`user${i}`,
`user${i}@example.com`
]);
}

// Wait until the first download is in flight.
await waitForMatchCondition(
() => watchAttachmentsTable(),
() => slowDownload.mock.calls.length >= 1,
5
);

// Resolve only the first download. The second is still pending.
gates[0].resolve(createMockJpegBuffer());

// The first attachment must transition to SYNCED while the second is still
// QUEUED_DOWNLOAD — i.e. the commit happened mid-batch, not at the end.
await waitForMatchCondition(
() => watchAttachmentsTable(),
(rows) => {
const first = rows.find((r) => r.id === downloadIds[0]);
const others = rows.filter((r) => r.id !== downloadIds[0]);
return (
first?.state === AttachmentState.SYNCED &&
others.some((r) => r.state === AttachmentState.QUEUED_DOWNLOAD)
);
},
5
);

// Clean up: release the second so the worker drains and stopSync finishes.
gates[1].resolve(createMockJpegBuffer());
await slowQueue.stopSync();
}
);

it(
'saveFile should not be blocked by an in-flight sync batch',
{ timeout: 10000 },
async () => {
const downloadStarted = deferred();
const downloadGate = deferred<ArrayBuffer>();
const slowDownload = vi.fn().mockImplementation(() => {
downloadStarted.resolve();
return downloadGate.promise;
});

const slowQueue = new AttachmentQueue({
db,
watchAttachments,
remoteStorage: { downloadFile: slowDownload, uploadFile: mockUploadFile, deleteFile: mockDeleteFile },
localStorage: mockLocalStorage,
syncIntervalMs: INTERVAL_MILLISECONDS,
archivedCacheLimit: 0
});

await slowQueue.startSync();

// Queue a download that will hang.
await db.execute('INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), ?, ?, uuid())', [
'downloader',
'downloader@example.com'
]);

await downloadStarted.promise;

// saveFile must complete promptly even while a download is in flight.
const saveStart = Date.now();
const saved = await slowQueue.saveFile({
data: new Uint8Array(10).fill(7).buffer,
fileExtension: 'jpg'
});
expect(Date.now() - saveStart).toBeLessThan(500);
expect(saved.state).toBe(AttachmentState.QUEUED_UPLOAD);

// Clean up.
downloadGate.resolve(createMockJpegBuffer());
await slowQueue.stopSync();
}
);
});
Loading