From f7e0fc0775ac7b0d6abee3ba1c9a331ca33ebc39 Mon Sep 17 00:00:00 2001 From: reisepass <1474408+reisepass@users.noreply.github.com> Date: Thu, 11 Jun 2026 17:55:12 +0200 Subject: [PATCH] Add data directory locking to NodeFS to prevent multi-process corruption Postgres assumes exclusive control of its data directory; two PGlite instances writing the same dataDir silently corrupt it. NodeFS now takes a sibling lock file (dataDir.lock) holding the owner PID: - Acquisition is atomic: exclusive create ('wx'), so two racing processes can never both succeed. A stale lock (holder PID is dead) is claimed by atomic rename before retrying, so a losing racer can never remove a winner's freshly created lock. - Another live process holds the lock: throw with the PID and guidance. Reclaiming only happens when the holder is dead, so PID reuse can at worst cause a conservative refusal, never steal a live lock. - Release verifies ownership: the lock file is only unlinked if it still holds this instance's token, and only after FS teardown. - Another instance in this same process holds the dataDir: throw at the creation site. Deleting the lock file is the explicit override. - Opt-in takeover for HMR-style dev servers, where module reloads create a fresh instance and the abandoned one can never be closed: new PGlite({ fs: new NodeFS(dataDir, { takeover: true }) }) closes the previous instance cleanly before claiming the directory. Tests live in a single file (tests/nodefs-lock.test.js). Existing tests that hold a dataDir now close their instances; the unclean-shutdown test deletes the stale lock before reopening. test:clean also removes the sibling .lock file. --- .changeset/add-nodefs-lock-file.md | 5 + packages/pglite/package.json | 2 +- packages/pglite/src/fs/nodefs.ts | 243 +++++++++++++++++++- packages/pglite/tests/drop-database.test.ts | 5 + packages/pglite/tests/instantiation.test.ts | 3 + packages/pglite/tests/nodefs-lock.test.js | 133 +++++++++++ packages/pglite/tests/user.test.ts | 6 + 7 files changed, 391 insertions(+), 6 deletions(-) create mode 100644 .changeset/add-nodefs-lock-file.md create mode 100644 packages/pglite/tests/nodefs-lock.test.js diff --git a/.changeset/add-nodefs-lock-file.md b/.changeset/add-nodefs-lock-file.md new file mode 100644 index 000000000..3f7d1a882 --- /dev/null +++ b/.changeset/add-nodefs-lock-file.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/pglite': patch +--- + +Add data directory locking to NodeFS to prevent multi-process corruption diff --git a/packages/pglite/package.json b/packages/pglite/package.json index edc454a31..0063c728d 100644 --- a/packages/pglite/package.json +++ b/packages/pglite/package.json @@ -130,7 +130,7 @@ "test:node": "pnpm test:clean && pnpm vitest tests/targets/runtimes/node-*.test.js", "test:runtimes": "pnpm test:bun && pnpm test:node", "test:integration": "pnpm test:runtimes && pnpm test:web", - "test:clean": "rm -rf ./pgdata-test", + "test:clean": "rm -rf ./pgdata-test ./pgdata-test.lock", "build:js": "tsup && tsx scripts/bundle-wasm.ts", "build": "pnpm build:js", "dev": "concurrently \"tsup --watch\" \"sleep 1 && tsx scripts/bundle-wasm.ts\" \"pnpm dev-server\"", diff --git a/packages/pglite/src/fs/nodefs.ts b/packages/pglite/src/fs/nodefs.ts index 2dbc94d66..6c0998b3d 100644 --- a/packages/pglite/src/fs/nodefs.ts +++ b/packages/pglite/src/fs/nodefs.ts @@ -5,19 +5,41 @@ import type { PostgresMod } from '../postgresMod.js' import { PGlite } from '../pglite.js' import { PGDATA } from '../initdb.js' +const activeInstances = new Map() + +// Sequence number to make lock tokens unique within this process. +let lockSeq = 0 + +export interface NodeFSOptions { + /** + * When another PGlite instance in this same process already holds the data + * directory, close it cleanly and take over instead of throwing. Useful for + * HMR-style dev servers where a module reload creates a fresh instance and + * the abandoned old one can no longer be closed. Defaults to false. + */ + takeover?: boolean +} + export class NodeFS extends EmscriptenBuiltinFilesystem { protected rootDir: string + #lockFd: number | null = null + #lockToken: string | null = null + #takeover: boolean - constructor(dataDir: string) { + constructor(dataDir: string, options?: NodeFSOptions) { super(dataDir) this.rootDir = path.resolve(dataDir) - if (!fs.existsSync(path.join(this.rootDir))) { - fs.mkdirSync(this.rootDir) - } + this.#takeover = options?.takeover ?? false + // recursive also makes this atomic: no EEXIST when several processes + // race to create the same data directory. + fs.mkdirSync(this.rootDir, { recursive: true }) } async init(pg: PGlite, opts: Partial) { this.pg = pg + + await this.#acquireLock() + const options: Partial = { ...opts, preRun: [ @@ -32,7 +54,218 @@ export class NodeFS extends EmscriptenBuiltinFilesystem { return { emscriptenOpts: options } } + async #acquireLock() { + const lockPath = this.rootDir + '.lock' + + // Another instance in this same process holds the directory. + // Default: throw, so a double-open fails loudly at the creation site. + // A manually deleted lock file is treated as an explicit user override. + // With the takeover option, close the previous instance cleanly (flushes + // WAL, releases its lock) and claim the directory instead - Node is + // single threaded, so the close cannot interleave with a write. + const existing = activeInstances.get(this.rootDir) + if (existing && !existing.closed && fs.existsSync(lockPath)) { + if (this.#takeover) { + console.warn( + `PGlite: taking over data directory "${this.rootDir}"; the previous instance in this process has been closed.`, + ) + try { + await existing.close() + } catch (e) { + throw new Error( + `PGlite data directory "${this.rootDir}" is already in use by another instance in this process ` + + `and it could not be closed automatically. Close the other instance or use a different data directory. ` + + `(close failed with: ${e instanceof Error ? e.message : e})`, + ) + } + } else { + throw new Error( + `PGlite data directory "${this.rootDir}" is already in use by another instance in this process. ` + + `Close the other instance or use a different data directory.`, + ) + } + } + + // Cross-process: acquire by exclusive create ('wx'), which is atomic, so + // two racing processes can never both succeed. A stale lock (holder PID + // is dead) is removed under a claim mutex before retrying; see + // #inspectLock for how live, stale and mid-acquisition locks are told + // apart. + const token = `${process.pid}\n${Date.now()}\n${lockSeq++}\n` + for (let attempt = 0; attempt < 20; attempt++) { + if (attempt > 0) { + // Backoff so contenders do not spin through their attempts while + // another process is still mid-claim or mid-acquisition. + await new Promise((r) => + setTimeout(r, 5 + attempt * 10 * Math.random()), + ) + } + + try { + this.#lockFd = fs.openSync(lockPath, 'wx') + fs.writeSync(this.#lockFd, token) + this.#lockToken = token + activeInstances.set(this.rootDir, this.pg!) + return + } catch (e) { + if ((e as NodeJS.ErrnoException)?.code !== 'EEXIST') throw e + } + + // The lock exists - classify it. + const seen = this.#inspectLock(lockPath) + if (seen.state === 'live') { + throw new Error( + `PGlite data directory "${this.rootDir}" may be in use by another process (PID ${seen.pid}). ` + + `Close the other instance or use a different data directory. ` + + `If PID ${seen.pid} is no longer running or no longer needs pglite, remove or move the stale lock: mv ${lockPath} ${lockPath}.stale.${Date.now()}`, + ) + } + if (seen.state !== 'stale') { + continue // vanished or mid-acquisition; retry after backoff + } + + // Stale lock (dead holder, our own leftover, or old corrupt file). + // Removing it must not race a fresh lock written by a new owner, so + // the removal is guarded by a claim mutex: mkdir is atomic-exclusive, + // and while the lock file exists no 'wx' writer can replace it - + // therefore a re-validation under the mutex cannot be invalidated + // before the unlink. + const claimPath = lockPath + '.claim' + try { + fs.mkdirSync(claimPath) + } catch { + // Another process is reclaiming right now. If its mutex is a + // leftover from a crash mid-claim (a few syscalls wide), clear it + // once it is clearly old, then retry. + try { + if (Date.now() - fs.statSync(claimPath).mtimeMs > 5000) { + fs.rmdirSync(claimPath) + } + } catch { + // Already cleared by someone else. + } + continue + } + + let busyPid = 0 + try { + const current = this.#inspectLock(lockPath) + if (current.state === 'live') { + busyPid = current.pid // a live owner took it; do not touch + } else if (current.state === 'stale') { + try { + fs.unlinkSync(lockPath) + } catch { + // Already removed. + } + } + // 'gone' or 'pending': nothing to remove / not ours to remove. + } finally { + try { + fs.rmdirSync(claimPath) + } catch { + // Best effort. + } + } + if (busyPid) { + throw new Error( + `PGlite data directory "${this.rootDir}" may be in use by another process (PID ${busyPid}). ` + + `Close the other instance or use a different data directory.`, + ) + } + } + + throw new Error( + `PGlite could not acquire the lock for data directory "${this.rootDir}" after repeated attempts. ` + + `Another process may be rapidly creating and releasing locks on it.`, + ) + } + + /** + * Classify the current lock file. + * - 'live': held by a running process (never touch) + * - 'stale': holder is dead, it is our own leftover, or the file is + * unparseable AND old (safe to reclaim) + * - 'pending': unparseable but fresh - a writer is between its exclusive + * create and its token write; treat as in use and retry + * - 'gone': no lock file + */ + #inspectLock(lockPath: string): { + state: 'live' | 'stale' | 'pending' | 'gone' + pid: number + } { + let content: string + let mtimeMs: number + try { + content = fs.readFileSync(lockPath, 'utf-8').trim() + mtimeMs = fs.statSync(lockPath).mtimeMs + } catch { + return { state: 'gone', pid: 0 } + } + const pid = parseInt(content.split('\n')[0], 10) + if (!pid || isNaN(pid)) { + // No parseable owner. A freshly created lock may simply not have its + // token written yet ('wx' open and the write are two steps). + return Date.now() - mtimeMs < 10000 + ? { state: 'pending', pid: 0 } + : { state: 'stale', pid: 0 } + } + if (pid !== process.pid && this.#isProcessAlive(pid)) { + return { state: 'live', pid } + } + return { state: 'stale', pid } + } + + // A lock left behind by a dead process is stale and safe to reclaim. + // Reclaiming only ever happens when the holder PID is gone, so PID reuse + // can at worst make us conservatively refuse a still-live PID - it can + // never cause us to steal a lock from a running PGlite instance. + #isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0) // signal 0 only checks for existence + return true + } catch (e) { + // EPERM means the process exists but is owned by another user. + return (e as NodeJS.ErrnoException)?.code === 'EPERM' + } + } + + #releaseLock() { + // Only deregister if we are still the registered owner; another instance + // may have legitimately taken over (manually deleted lock or takeover). + if (activeInstances.get(this.rootDir) === this.pg) { + activeInstances.delete(this.rootDir) + } + + if (this.#lockFd !== null) { + try { + fs.closeSync(this.#lockFd) + } catch { + // The fd may already be invalid; nothing to release. + } + this.#lockFd = null + + // Only remove the lock file if it still holds our token, so we never + // delete a lock that has since been claimed by another owner. + const lockPath = this.rootDir + '.lock' + try { + if (fs.readFileSync(lockPath, 'utf-8') === this.#lockToken) { + fs.unlinkSync(lockPath) + } + } catch { + // The lock file is already gone or unreadable; nothing to remove. + } + this.#lockToken = null + } + } + async closeFs(): Promise { - this.pg!.Module.FS.quit() + // Release the lock only after the filesystem has fully shut down, so + // another process cannot acquire the directory mid-teardown. + try { + this.pg!.Module.FS.quit() + } finally { + this.#releaseLock() + } } } diff --git a/packages/pglite/tests/drop-database.test.ts b/packages/pglite/tests/drop-database.test.ts index 9f8ff3b4d..6b9e18130 100644 --- a/packages/pglite/tests/drop-database.test.ts +++ b/packages/pglite/tests/drop-database.test.ts @@ -74,6 +74,11 @@ describe('drop database', () => { // pause for a bit for GC... await new Promise((resolve) => setTimeout(resolve, 10)) + // The abandoned instance still holds the data dir lock; deleting the + // lock file is the explicit override that allows reuse after an + // unclean shutdown. + await fs.rm('./pgdata-test-drop-db2.lock', { force: true }) + const pg2 = await PGlite.create('./pgdata-test-drop-db2', { database: 'postgres', }) diff --git a/packages/pglite/tests/instantiation.test.ts b/packages/pglite/tests/instantiation.test.ts index 075076d29..f2f92e5c5 100644 --- a/packages/pglite/tests/instantiation.test.ts +++ b/packages/pglite/tests/instantiation.test.ts @@ -25,12 +25,14 @@ function testInstatiationMethod( const pg = await instantiateDb() const res = await pg.query(`SELECT 1 as one;`) expect(res.rows[0]?.['one']).toBe(1) + await pg.close() }) it('should instantiate with data dir argument', async () => { const pg = await instantiateDb('./pgdata-test') const res = await pg.query(`SELECT 1 as one;`) expect(res.rows[0]?.['one']).toBe(1) + await pg.close() }) it('should instantiate with options', async () => { @@ -39,6 +41,7 @@ function testInstatiationMethod( }) const res = await pg.query(`SELECT 1 as one;`) expect(res.rows[0]?.['one']).toBe(1) + await pg.close() }) }) } diff --git a/packages/pglite/tests/nodefs-lock.test.js b/packages/pglite/tests/nodefs-lock.test.js new file mode 100644 index 000000000..bbf41fa2b --- /dev/null +++ b/packages/pglite/tests/nodefs-lock.test.js @@ -0,0 +1,133 @@ +import { describe, it, expect, afterAll } from 'vitest' +import { existsSync, writeFileSync, rmSync } from 'node:fs' +import { spawn } from 'node:child_process' + +const dataDir = `/tmp/pglite-lock-test-${Date.now()}` + +afterAll(async () => { + if (!process.env.RETAIN_DATA) { + for (const p of [dataDir, dataDir + '.lock']) { + if (existsSync(p)) rmSync(p, { recursive: true, force: true }) + } + } +}) + +describe('NodeFS data directory locking', () => { + it('should block a second instance from opening the same data directory', async () => { + const { PGlite } = await import('../dist/index.js') + + const db1 = new PGlite(dataDir) + await db1.waitReady + + // Lock file should exist while db1 is open + expect(existsSync(dataDir + '.lock')).toBe(true) + + // Second instance on same dir must throw + let lockError = null + try { + const db2 = new PGlite(dataDir) + await db2.waitReady + await db2.close() + } catch (err) { + lockError = err + } + + expect(lockError).not.toBeNull() + expect(lockError.message).toContain('is already in use') + + // First instance should still work fine + const result = await db1.query('SELECT 1 as ok') + expect(result.rows[0].ok).toBe(1) + + await db1.close() + }, 30000) + + it('should let a second instance take over with the takeover option', async () => { + const { PGlite } = await import('../dist/index.js') + const { NodeFS } = await import('../dist/fs/nodefs.js') + + const db1 = new PGlite(dataDir) + await db1.waitReady + + // With takeover, the second instance cleanly closes the first and + // becomes the owner (Node is single threaded, so the close cannot + // interleave with a write). + const db2 = new PGlite({ fs: new NodeFS(dataDir, { takeover: true }) }) + await db2.waitReady + + expect(db1.closed).toBe(true) + + // The first instance now fails cleanly instead of corrupting files + let closedError = null + try { + await db1.query('SELECT 1') + } catch (err) { + closedError = err + } + expect(closedError).not.toBeNull() + + // The new owner works + const result = await db2.query('SELECT 1 as ok') + expect(result.rows[0].ok).toBe(1) + + await db2.close() + }, 30000) + + it('should allow reopening after the first instance is closed', async () => { + const { PGlite } = await import('../dist/index.js') + + // Lock file should be cleaned up after close + expect(existsSync(dataDir + '.lock')).toBe(false) + + const db = new PGlite(dataDir) + await db.waitReady + const result = await db.query('SELECT 1 as ok') + expect(result.rows[0].ok).toBe(1) + await db.close() + }, 30000) + + it('should block while another live process holds the lock', async () => { + const { PGlite } = await import('../dist/index.js') + + // Spawn a real child process that stays alive, and claim the lock in its name. + const child = spawn(process.execPath, ['-e', 'setTimeout(() => {}, 60000)']) + try { + await new Promise((resolve) => setTimeout(resolve, 100)) + writeFileSync(dataDir + '.lock', `${child.pid}\n0\n`) + + // The holder process is alive, so opening must be refused. + let lockError = null + try { + const db = new PGlite(dataDir) + await db.waitReady + await db.close() + } catch (err) { + lockError = err + } + + expect(lockError).not.toBeNull() + expect(lockError.message).toContain('may be in use') + expect(lockError.message).toContain(String(child.pid)) + } finally { + // Kill the holder and wait for it to actually exit, even if an + // assertion above failed. + child.kill('SIGKILL') + await new Promise((resolve) => child.on('exit', resolve)) + rmSync(dataDir + '.lock', { force: true }) + } + }, 30000) + + it('should reclaim a stale lock left by a dead process', async () => { + const { PGlite } = await import('../dist/index.js') + + // A PID that is not running: its lock is stale and must be reclaimed + // automatically, with no manual lock removal required. + writeFileSync(dataDir + '.lock', '999999\n0\n') + + const db = new PGlite(dataDir) + await db.waitReady + const result = await db.query('SELECT 1 as ok') + expect(result.rows[0].ok).toBe(1) + await db.close() + }, 30000) +}) diff --git a/packages/pglite/tests/user.test.ts b/packages/pglite/tests/user.test.ts index 11bbd5de2..83879339a 100644 --- a/packages/pglite/tests/user.test.ts +++ b/packages/pglite/tests/user.test.ts @@ -61,6 +61,8 @@ describe('user', () => { await expectToThrowAsync(async () => { await db2.query('SET ROLE no_such_user;') }) + + await db2.close() }) it('switch to user created after initial run', async () => { @@ -118,6 +120,8 @@ describe('user', () => { await expectToThrowAsync(async () => { await db2.query('SET ROLE no_such_user;') }) + + await db2.close() }) it('create database and switch to it', async () => { @@ -141,5 +145,7 @@ describe('user', () => { expect(currentUsername.rows).toEqual([{ current_user: 'test_user' }]) const currentDatabase = await db2.query('SELECT current_database();') expect(currentDatabase.rows).toEqual([{ current_database: 'test_db' }]) + + await db2.close() }) })