diff --git a/src/sync/impl/__tests__/markAsSynced.test.js b/src/sync/impl/__tests__/markAsSynced.test.js index db2787641..08e18ae1f 100644 --- a/src/sync/impl/__tests__/markAsSynced.test.js +++ b/src/sync/impl/__tests__/markAsSynced.test.js @@ -131,7 +131,7 @@ describe('markLocalChangesAsSynced', () => { }) // test that second push will mark all as synced - await markLocalChangesAsSynced(database, localChanges2) + await markLocalChangesAsSynced(database, localChanges2, false) expect(destroyDeletedRecordsSpy).toHaveBeenCalledTimes(2) expect(await fetchLocalChanges(database)).toEqual(emptyLocalChanges) @@ -146,7 +146,7 @@ describe('markLocalChangesAsSynced', () => { const localChanges = await fetchLocalChanges(database) // mark as synced - await markLocalChangesAsSynced(database, localChanges, { + await markLocalChangesAsSynced(database, localChanges, false, { mock_projects: ['pCreated1', 'pUpdated'], mock_comments: ['cDeleted'], }) @@ -161,6 +161,30 @@ describe('markLocalChangesAsSynced', () => { ) expect(await allDeletedRecords([comments])).toEqual(['cDeleted']) }) + it(`marks only acceptedIds as synced`, async () => { + const { database, comments } = makeDatabase() + + const { pCreated1, pUpdated } = await makeLocalChanges(database) + const localChanges = await fetchLocalChanges(database) + + // mark as synced + await markLocalChangesAsSynced(database, localChanges, true, {}, { + // probably better solution exists (we essentially list all but expected in verify) + mock_projects: ['pCreated2', 'pDeleted'], + mock_comments: ['cCreated', 'cUpdated'], + mock_tasks: ['tCreated', 'tUpdated', 'tDeleted'], + }) + + // verify + const localChanges2 = await fetchLocalChanges(database) + expect(localChanges2.changes).toEqual( + makeChangeSet({ + mock_projects: { created: [pCreated1._raw], updated: [pUpdated._raw] }, + mock_comments: { deleted: ['cDeleted'] }, + }), + ) + expect(await allDeletedRecords([comments])).toEqual(['cDeleted']) + }) it(`can mark records as synced when ids are per-table not globally unique`, async () => { const { database, projects, tasks, comments } = makeDatabase() diff --git a/src/sync/impl/__tests__/synchronize-partialRejections.test.js b/src/sync/impl/__tests__/synchronize-partialRejections.test.js index c45f85a80..74700a78d 100644 --- a/src/sync/impl/__tests__/synchronize-partialRejections.test.js +++ b/src/sync/impl/__tests__/synchronize-partialRejections.test.js @@ -62,4 +62,71 @@ describe('synchronize - partial push rejections', () => { }), ) }) + it(`can partially accept a push`, async () => { + const { database } = makeDatabase() + + const { tCreated, tUpdated } = await makeLocalChanges(database) + + const acceptedIds = Object.freeze({ + // probably better solution exists (we essentially list all but expected in expect below) + mock_projects: ['pCreated1', 'pCreated2', 'pDeleted', 'pUpdated'], + mock_comments: ['cCreated', 'cUpdated'], + mock_tasks: ['tDeleted'], + }) + const rejectedIds = Object.freeze({ + mock_tasks: ['tCreated', 'tUpdated'], + mock_comments: ['cDeleted'], + }) + const log = {} + await synchronize({ + database, + pullChanges: jest.fn(emptyPull()), + pushChanges: jest.fn(() => ({ experimentalAcceptedIds: acceptedIds })), + pushShouldConfirmOnlyAccepted: true, + log, + }) + expect((await fetchLocalChanges(database)).changes).toEqual( + makeChangeSet({ + mock_tasks: { created: [tCreated._raw], updated: [tUpdated._raw] }, + mock_comments: { deleted: ['cDeleted'] }, + }), + ) + expect(log.rejectedIds).toStrictEqual(rejectedIds) + }) + it(`can partially accept a push and make changes during push`, async () => { + const { database, comments } = makeDatabase() + + const { pCreated1, tUpdated } = await makeLocalChanges(database) + const pCreated1Raw = { ...pCreated1._raw } + let newComment + await synchronize({ + database, + pullChanges: jest.fn(emptyPull()), + pushChanges: jest.fn(async () => { + await database.write(async () => { + await pCreated1.update((p) => { + p.name = 'updated!' + }) + newComment = await comments.create((c) => { + c.body = 'bazinga' + }) + }) + return { + experimentalAcceptedIds: { + mock_projects: ['pCreated1', 'pCreated2', 'pDeleted', 'pUpdated'], + mock_comments: ['cCreated', 'cUpdated'], + mock_tasks: ['tCreated', 'tDeleted'], + }, + } + }), + pushShouldConfirmOnlyAccepted: true, + }) + expect((await fetchLocalChanges(database)).changes).toEqual( + makeChangeSet({ + mock_projects: { created: [{ ...pCreated1Raw, _changed: 'name', name: 'updated!' }] }, + mock_tasks: { updated: [tUpdated._raw] }, + mock_comments: { created: [newComment._raw], deleted: ['cDeleted'] }, + }), + ) + }) }) diff --git a/src/sync/impl/helpers.js b/src/sync/impl/helpers.js index df9d32d19..03985a93f 100644 --- a/src/sync/impl/helpers.js +++ b/src/sync/impl/helpers.js @@ -4,9 +4,14 @@ import { values } from '../../utils/fp' import areRecordsEqual from '../../utils/fp/areRecordsEqual' import { invariant } from '../../utils/common' -import type { Model, Collection, Database } from '../..' +import type { Model, Collection, Database, TableName, RecordId } from '../..' import { type RawRecord, type DirtyRaw, sanitizedRaw } from '../../RawRecord' -import type { SyncLog, SyncDatabaseChangeSet, SyncConflictResolver } from '../index' +import type { + SyncIds, + SyncLog, + SyncDatabaseChangeSet, + SyncConflictResolver, +} from '../index' // Returns raw record with naive solution to a conflict based on local `_changed` field // This is a per-column resolution algorithm. All columns that were changed locally win @@ -148,3 +153,46 @@ export const changeSetCount: (SyncDatabaseChangeSet) => number = (changeset) => ({ created, updated, deleted }) => created.length + updated.length + deleted.length, ), ) + +const extractChangeSetIds: (SyncDatabaseChangeSet) => { [TableName]: RecordId[] } = (changeset) => + Object.keys(changeset).reduce((acc: { [TableName]: RecordId[] }, key: string) => { + // $FlowFixMe + const { created, updated, deleted } = changeset[key] + // $FlowFixMe + acc[key] = [ + ...created.map(it => it.id), + ...updated.map(it => it.id), + ...deleted, + ] + return acc + }, {}) + +// Returns all rejected ids and is used when accepted ids are used +export const findRejectedIds: + (?SyncIds, ?SyncIds, SyncDatabaseChangeSet) => SyncIds = + (experimentalRejectedIds, experimentalAcceptedIds, changeset) => { + const localIds = extractChangeSetIds(changeset) + + const acceptedIdsSets = Object.keys(changeset).reduce( + (acc: { [TableName]: Set }, key: string) => { + // $FlowFixMe + acc[key] = new Set(experimentalAcceptedIds[key]) + return acc + }, {}) + + return Object.keys(changeset).reduce((acc: { [TableName]: RecordId[] }, key: string) => { + const rejectedIds = [ + // $FlowFixMe + ...(experimentalRejectedIds ? experimentalRejectedIds[key] || [] : []), + // $FlowFixMe + ...(localIds[key] || []), + // $FlowFixMe + ].filter(it => !acceptedIdsSets[key].has(it)) + + if (rejectedIds.length > 0) { + // $FlowFixMe + acc[key] = rejectedIds + } + return acc + }, {}) + } \ No newline at end of file diff --git a/src/sync/impl/markAsSynced.d.ts b/src/sync/impl/markAsSynced.d.ts index 242ec0356..e41621e0d 100644 --- a/src/sync/impl/markAsSynced.d.ts +++ b/src/sync/impl/markAsSynced.d.ts @@ -1,9 +1,11 @@ import type { Database, Model, TableName } from '../..' -import type { SyncLocalChanges, SyncRejectedIds } from '../index' +import type { SyncLocalChanges, SyncIds } from '../index' export default function markLocalChangesAsSynced( db: Database, syncedLocalChanges: SyncLocalChanges, - rejectedIds?: SyncRejectedIds, + allowOnlyAcceptedIds: boolean, + rejectedIds?: SyncIds, + allAcceptedIds?: SyncIds, ): Promise diff --git a/src/sync/impl/markAsSynced.js b/src/sync/impl/markAsSynced.js index 8a3b6c13e..7e8e2d012 100644 --- a/src/sync/impl/markAsSynced.js +++ b/src/sync/impl/markAsSynced.js @@ -5,11 +5,13 @@ import { logError } from '../../utils/common' import type { Database, Model, TableName } from '../..' import { prepareMarkAsSynced } from './helpers' -import type { SyncLocalChanges, SyncRejectedIds } from '../index' +import type { SyncLocalChanges, SyncIds } from '../index' const recordsToMarkAsSynced = ( { changes, affectedRecords }: SyncLocalChanges, - allRejectedIds: SyncRejectedIds, + allowOnlyAcceptedIds: boolean, + allRejectedIds: SyncIds, + allAcceptedIds: SyncIds, ): Model[] => { const syncedRecords = [] @@ -17,6 +19,7 @@ const recordsToMarkAsSynced = ( const { created, updated } = changes[(table: any)] const raws = created.concat(updated) const rejectedIds = new Set(allRejectedIds[(table: any)]) + const acceptedIds = new Set(allAcceptedIds[(table: any)] || []) raws.forEach((raw) => { const { id } = raw @@ -27,7 +30,8 @@ const recordsToMarkAsSynced = ( ) return } - if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id)) { + const isAccepted = !allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id) + if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id) && isAccepted) { syncedRecords.push(record) } }) @@ -38,27 +42,35 @@ const recordsToMarkAsSynced = ( const destroyDeletedRecords = ( db: Database, { changes }: SyncLocalChanges, - allRejectedIds: SyncRejectedIds, + allowOnlyAcceptedIds: boolean, + allRejectedIds: SyncIds, + allAcceptedIds: SyncIds, ): Promise[] => Object.keys(changes).map((_tableName) => { const tableName: TableName = (_tableName: any) const rejectedIds = new Set(allRejectedIds[tableName]) - const deleted = changes[tableName].deleted.filter((id) => !rejectedIds.has(id)) + const acceptedIds = new Set(allAcceptedIds[tableName] || []) + const deleted = changes[tableName].deleted.filter((id) => !rejectedIds.has(id) && + (!allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id))) return deleted.length ? db.adapter.destroyDeletedRecords(tableName, deleted) : Promise.resolve() }) export default function markLocalChangesAsSynced( db: Database, syncedLocalChanges: SyncLocalChanges, - rejectedIds?: ?SyncRejectedIds, + allowOnlyAcceptedIds: boolean, + rejectedIds?: ?SyncIds, + allAcceptedIds?: ?SyncIds, ): Promise { return db.write(async () => { // update and destroy records concurrently await Promise.all([ db.batch( - recordsToMarkAsSynced(syncedLocalChanges, rejectedIds || {}).map(prepareMarkAsSynced), + recordsToMarkAsSynced(syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {}, + allAcceptedIds || {}).map(prepareMarkAsSynced), ), - ...destroyDeletedRecords(db, syncedLocalChanges, rejectedIds || {}), + ...destroyDeletedRecords(db, syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {}, + allAcceptedIds || {}), ]) }, 'sync-markLocalChangesAsSynced') } diff --git a/src/sync/impl/synchronize.d.ts b/src/sync/impl/synchronize.d.ts index f19c96957..ea9557b5c 100644 --- a/src/sync/impl/synchronize.d.ts +++ b/src/sync/impl/synchronize.d.ts @@ -9,6 +9,7 @@ export default function synchronize({ migrationsEnabledAtVersion, log, conflictResolver, + pushShouldConfirmOnlyAccepted, _unsafeBatchPerCollection, unsafeTurbo, }: SyncArgs): Promise diff --git a/src/sync/impl/synchronize.js b/src/sync/impl/synchronize.js index eef1f6e81..712a5acee 100644 --- a/src/sync/impl/synchronize.js +++ b/src/sync/impl/synchronize.js @@ -11,7 +11,7 @@ import { setLastPulledSchemaVersion, getMigrationInfo, } from './index' -import { ensureSameDatabase, isChangeSetEmpty, changeSetCount } from './helpers' +import { ensureSameDatabase, isChangeSetEmpty, changeSetCount, findRejectedIds } from './helpers' import type { SyncArgs, Timestamp, SyncPullStrategy } from '../index' export default async function synchronize({ @@ -24,6 +24,7 @@ export default async function synchronize({ migrationsEnabledAtVersion, log, conflictResolver, + pushShouldConfirmOnlyAccepted, _unsafeBatchPerCollection, unsafeTurbo, }: SyncArgs): Promise { @@ -134,9 +135,14 @@ export default async function synchronize({ (await pushChanges({ changes: localChanges.changes, lastPulledAt: newLastPulledAt })) || {} log && (log.phase = 'pushed') log && (log.rejectedIds = pushResult.experimentalRejectedIds) + if (log && pushShouldConfirmOnlyAccepted) { + log.rejectedIds = findRejectedIds(pushResult.experimentalRejectedIds, + pushResult.experimentalAcceptedIds, localChanges.changes) + } ensureSameDatabase(database, resetCount) - await markLocalChangesAsSynced(database, localChanges, pushResult.experimentalRejectedIds) + await markLocalChangesAsSynced(database, localChanges, pushShouldConfirmOnlyAccepted || false, + pushResult.experimentalRejectedIds, pushResult.experimentalAcceptedIds) log && (log.phase = 'marked local changes as synced') } } else { diff --git a/src/sync/index.d.ts b/src/sync/index.d.ts index b4449d31f..16e5eb552 100644 --- a/src/sync/index.d.ts +++ b/src/sync/index.d.ts @@ -27,11 +27,16 @@ export type SyncPullResult = | $Exact<{ syncJson: string }> | $Exact<{ syncJsonId: number }> -export type SyncRejectedIds = { [tableName: TableName]: RecordId[] } +export type SyncIds = { [tableName: TableName]: RecordId[] } + +export type SyncRejectedIds = SyncIds export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }> -export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncRejectedIds }> +export type SyncPushResult = $Exact<{ + experimentalRejectedIds?: SyncIds, + experimentalAcceptedIds?: SyncIds, +}> type SyncConflict = $Exact<{ local: DirtyRaw; remote: DirtyRaw; resolved: DirtyRaw }> export type SyncLog = { @@ -41,7 +46,7 @@ export type SyncLog = { migration?: MigrationSyncChanges; newLastPulledAt?: number; resolvedConflicts?: SyncConflict[]; - rejectedIds?: SyncRejectedIds; + rejectedIds?: SyncIds; finishedAt?: Date; remoteChangeCount?: number; localChangeCount?: number; @@ -70,6 +75,11 @@ export type SyncArgs = $Exact<{ // If you don't want to change default behavior for a given record, return `resolved` as is // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. conflictResolver?: SyncConflictResolver; + // experimental customization that will cause to only set records as synced if we return id. + // This will in turn cause all records to be re-pushed if id wasn't returned. This allows to + // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that + // unpredicted error will cause data loss (when failed data push isn't re-pushed) + pushShouldConfirmOnlyAccepted?: boolean; // commits changes in multiple batches, and not one - temporary workaround for memory issue _unsafeBatchPerCollection?: boolean; // Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code. diff --git a/src/sync/index.js b/src/sync/index.js index fcce9b80d..1a8d02e20 100644 --- a/src/sync/index.js +++ b/src/sync/index.js @@ -53,11 +53,16 @@ export type SyncPullResult = | $Exact<{ syncJson: string }> | $Exact<{ syncJsonId: number }> -export type SyncRejectedIds = { [TableName]: RecordId[] } +export type SyncIds = { [TableName]: RecordId[] } + +export type SyncRejectedIds = SyncIds export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet, lastPulledAt: Timestamp }> -export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncRejectedIds }> +export type SyncPushResult = $Exact<{ + experimentalRejectedIds?: SyncIds, + experimentalAcceptedIds?: SyncIds, +}> type SyncConflict = $Exact<{ local: DirtyRaw, remote: DirtyRaw, resolved: DirtyRaw }> export type SyncLog = { @@ -67,7 +72,7 @@ export type SyncLog = { migration?: ?MigrationSyncChanges, newLastPulledAt?: number, resolvedConflicts?: SyncConflict[], - rejectedIds?: SyncRejectedIds, + rejectedIds?: SyncIds, finishedAt?: Date, remoteChangeCount?: number, localChangeCount?: number, @@ -97,6 +102,11 @@ export type SyncArgs = $Exact<{ // If you don't want to change default behavior for a given record, return `resolved` as is // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. conflictResolver?: SyncConflictResolver, + // experimental customization that will cause to only set records as synced if we return id. + // This will in turn cause all records to be re-pushed if id wasn't returned. This allows to + // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that + // unpredicted error will cause data loss (when failed data push isn't re-pushed) + pushShouldConfirmOnlyAccepted?: boolean; // commits changes in multiple batches, and not one - temporary workaround for memory issue _unsafeBatchPerCollection?: boolean, // Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code.