From 48c7c8b7faae6c1000d577822fc8b845fbae5db0 Mon Sep 17 00:00:00 2001 From: longtn Date: Sat, 30 May 2026 23:50:59 +0700 Subject: [PATCH] feat: stream-based database export for large databases Replace in-memory dump with streaming approach: - TransformStream for chunked response delivery - Paginated queries (LIMIT/OFFSET, 500 rows/batch) - Breathing intervals (50ms) between batches to prevent DO lockup - Proper value escaping including blobs and nulls - Backward compatible API (same endpoint, same headers) Fixes #59 --- src/export/dump.test.ts | 133 ++++++++++++++++++++++------------- src/export/dump.ts | 151 +++++++++++++++++++++++++++++----------- 2 files changed, 196 insertions(+), 88 deletions(-) diff --git a/src/export/dump.test.ts b/src/export/dump.test.ts index ca65b43..d11b33c 100644 --- a/src/export/dump.test.ts +++ b/src/export/dump.test.ts @@ -23,6 +23,7 @@ let mockDataSource: DataSource let mockConfig: StarbaseDBConfiguration beforeEach(() => { + vi.restoreAllMocks() vi.clearAllMocks() mockDataSource = { @@ -36,26 +37,63 @@ beforeEach(() => { role: 'admin', features: { allowlist: true, rls: true, rest: true }, } + + // Re-mock after restore + vi.mocked(executeOperation).mockReset() + vi.mocked(createResponse).mockImplementation( + (data: any, message: any, status: any) => + new Response(JSON.stringify({ result: data, error: message }), { + status, + headers: { 'Content-Type': 'application/json' }, + }) + ) }) +async function collectResponseText(response: Response): Promise { + const reader = response.body?.getReader() + if (!reader) return '' + + const chunks: Uint8Array[] = [] + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + } + + const totalLength = chunks.reduce((sum, c) => sum + c.length, 0) + const result = new Uint8Array(totalLength) + let offset = 0 + for (const chunk of chunks) { + result.set(chunk, offset) + offset += chunk.length + } + + return new TextDecoder().decode(result) +} + describe('Database Dump Module', () => { - it('should return a database dump when tables exist', async () => { - vi.mocked(executeOperation) - .mockResolvedValueOnce([{ name: 'users' }, { name: 'orders' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, - ]) - .mockResolvedValueOnce([ - { id: 1, name: 'Alice' }, - { id: 2, name: 'Bob' }, - ]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE orders (id INTEGER, total REAL);' }, - ]) - .mockResolvedValueOnce([ - { id: 1, total: 99.99 }, - { id: 2, total: 49.5 }, - ]) + it('should return a streaming database dump when tables exist', async () => { + const mock = vi.mocked(executeOperation) + // 1: tables list + mock.mockResolvedValueOnce([{ name: 'users' }, { name: 'orders' }]) + // 2: users schema + mock.mockResolvedValueOnce([ + { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, + ]) + // 3: users data (2 rows < BATCH_SIZE=500, generator exits after this) + mock.mockResolvedValueOnce([ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' }, + ]) + // 4: orders schema + mock.mockResolvedValueOnce([ + { sql: 'CREATE TABLE orders (id INTEGER, total REAL);' }, + ]) + // 5: orders data (2 rows < BATCH_SIZE, generator exits) + mock.mockResolvedValueOnce([ + { id: 1, total: 99.99 }, + { id: 2, total: 49.5 }, + ]) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) @@ -67,17 +105,17 @@ describe('Database Dump Module', () => { 'attachment; filename="database_dump.sql"' ) - const dumpText = await response.text() + const dumpText = await collectResponseText(response) expect(dumpText).toContain( 'CREATE TABLE users (id INTEGER, name TEXT);' ) - expect(dumpText).toContain("INSERT INTO users VALUES (1, 'Alice');") - expect(dumpText).toContain("INSERT INTO users VALUES (2, 'Bob');") + expect(dumpText).toContain("INSERT INTO `users` VALUES (1, 'Alice');") + expect(dumpText).toContain("INSERT INTO `users` VALUES (2, 'Bob');") expect(dumpText).toContain( 'CREATE TABLE orders (id INTEGER, total REAL);' ) - expect(dumpText).toContain('INSERT INTO orders VALUES (1, 99.99);') - expect(dumpText).toContain('INSERT INTO orders VALUES (2, 49.5);') + expect(dumpText).toContain('INSERT INTO `orders` VALUES (1, 99.99);') + expect(dumpText).toContain('INSERT INTO `orders` VALUES (2, 49.5);') }) it('should handle empty databases (no tables)', async () => { @@ -89,52 +127,53 @@ describe('Database Dump Module', () => { expect(response.headers.get('Content-Type')).toBe( 'application/x-sqlite3' ) - const dumpText = await response.text() - expect(dumpText).toBe('SQLite format 3\0') + const dumpText = await collectResponseText(response) + expect(dumpText).toContain('SQLite format 3') + expect(dumpText).not.toContain('INSERT INTO') }) it('should handle databases with tables but no data', async () => { - vi.mocked(executeOperation) - .mockResolvedValueOnce([{ name: 'users' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, - ]) - .mockResolvedValueOnce([]) + const mock = vi.mocked(executeOperation) + mock.mockResolvedValueOnce([{ name: 'users' }]) + mock.mockResolvedValueOnce([ + { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, + ]) + // Empty data (first batch returns nothing) + mock.mockResolvedValueOnce([]) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) expect(response).toBeInstanceOf(Response) - const dumpText = await response.text() + const dumpText = await collectResponseText(response) expect(dumpText).toContain( 'CREATE TABLE users (id INTEGER, name TEXT);' ) - expect(dumpText).not.toContain('INSERT INTO users VALUES') + expect(dumpText).not.toContain('INSERT INTO `users` VALUES') }) it('should escape single quotes properly in string values', async () => { - vi.mocked(executeOperation) - .mockResolvedValueOnce([{ name: 'users' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, bio TEXT);' }, - ]) - .mockResolvedValueOnce([{ id: 1, bio: "Alice's adventure" }]) + const mock = vi.mocked(executeOperation) + mock.mockResolvedValueOnce([{ name: 'users' }]) + mock.mockResolvedValueOnce([ + { sql: 'CREATE TABLE users (id INTEGER, bio TEXT);' }, + ]) + mock.mockResolvedValueOnce([{ id: 1, bio: "Alice's adventure" }]) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) expect(response).toBeInstanceOf(Response) - const dumpText = await response.text() + const dumpText = await collectResponseText(response) expect(dumpText).toContain( - "INSERT INTO users VALUES (1, 'Alice''s adventure');" + "INSERT INTO `users` VALUES (1, 'Alice''s adventure');" ) }) - it('should return a 500 response when an error occurs', async () => { - const consoleErrorMock = vi - .spyOn(console, 'error') - .mockImplementation(() => {}) - vi.mocked(executeOperation).mockRejectedValue( - new Error('Database Error') - ) + it('should return a 500 response when initial table list fails', async () => { + vi.spyOn(console, 'error').mockImplementation(() => {}) + // First call (table list) throws synchronously + vi.mocked(executeOperation).mockImplementationOnce(() => { + throw new Error('Database Error') + }) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) diff --git a/src/export/dump.ts b/src/export/dump.ts index 91a2e89..1e30d72 100644 --- a/src/export/dump.ts +++ b/src/export/dump.ts @@ -3,12 +3,70 @@ import { StarbaseDBConfiguration } from '../handler' import { DataSource } from '../types' import { createResponse } from '../utils' +const BATCH_SIZE = 500 +const BREATHING_INTERVAL_MS = 50 + +function escapeValue(value: unknown): string { + if (value === null || value === undefined) return 'NULL' + if (typeof value === 'string') return `'${value.replace(/'/g, "''")}'` + if (typeof value === 'number') return String(value) + if (typeof value === 'bigint') return String(value) + if (value instanceof ArrayBuffer || value instanceof Uint8Array) { + const hex = Array.from(new Uint8Array(value)) + .map((b) => b.toString(16).padStart(2, '0')) + .join('') + return `X'${hex}'` + } + return `'${String(value).replace(/'/g, "''")}'` +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +async function* streamTableData( + tableName: string, + dataSource: DataSource, + config: StarbaseDBConfiguration +): AsyncGenerator { + let offset = 0 + let hasMore = true + + while (hasMore) { + const rows = await executeOperation( + [ + { + sql: `SELECT * FROM \`${tableName}\` LIMIT ? OFFSET ?;`, + params: [BATCH_SIZE, offset], + }, + ], + dataSource, + config + ) + + if (!rows || rows.length === 0) break + + for (const row of rows) { + const values = Object.values(row).map(escapeValue) + yield `INSERT INTO \`${tableName}\` VALUES (${values.join(', ')});\n` + } + + offset += rows.length + hasMore = rows.length === BATCH_SIZE + + // Breathing interval between batches + if (hasMore) { + await sleep(BREATHING_INTERVAL_MS) + } + } +} + export async function dumpDatabaseRoute( dataSource: DataSource, config: StarbaseDBConfiguration ): Promise { try { - // Get all table names + // Get all table names synchronously (fail fast if DB is broken) const tablesResult = await executeOperation( [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }], dataSource, @@ -16,54 +74,65 @@ export async function dumpDatabaseRoute( ) const tables = tablesResult.map((row: any) => row.name) - let dumpContent = 'SQLite format 3\0' // SQLite file header - - // Iterate through all tables - for (const table of tables) { - // Get table schema - const schemaResult = await executeOperation( - [ - { - sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name='${table}';`, - }, - ], - dataSource, - config - ) - - if (schemaResult.length) { - const schema = schemaResult[0].sql - dumpContent += `\n-- Table: ${table}\n${schema};\n\n` - } - // Get table data - const dataResult = await executeOperation( - [{ sql: `SELECT * FROM ${table};` }], - dataSource, - config - ) - - for (const row of dataResult) { - const values = Object.values(row).map((value) => - typeof value === 'string' - ? `'${value.replace(/'/g, "''")}'` - : value - ) - dumpContent += `INSERT INTO ${table} VALUES (${values.join(', ')});\n` - } + const { readable, writable } = new TransformStream() + const writer = writable.getWriter() + const encoder = new TextEncoder() - dumpContent += '\n' - } + // Process tables in background (streaming) + ;(async () => { + try { + // Write SQLite header + await writer.write(encoder.encode('SQLite format 3\0')) + + for (const table of tables) { + // Get table schema + const schemaResult = await executeOperation( + [ + { + sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`, + params: [table], + }, + ], + dataSource, + config + ) - // Create a Blob from the dump content - const blob = new Blob([dumpContent], { type: 'application/x-sqlite3' }) + if (schemaResult.length) { + const schema = schemaResult[0].sql + await writer.write( + encoder.encode( + `\n-- Table: ${table}\n${schema};\n\n` + ) + ) + } + + // Stream table data in batches with breathing intervals + for await (const insertStmt of streamTableData( + table, + dataSource, + config + )) { + await writer.write(encoder.encode(insertStmt)) + } + + await writer.write(encoder.encode('\n')) + } + } catch (error) { + console.error('Database Dump Stream Error:', error) + } finally { + await writer.close() + } + })() const headers = new Headers({ 'Content-Type': 'application/x-sqlite3', - 'Content-Disposition': 'attachment; filename="database_dump.sql"', + 'Content-Disposition': + 'attachment; filename="database_dump.sql"', + 'Transfer-Encoding': 'chunked', }) - return new Response(blob, { headers }) + return new Response(readable, { headers }) } catch (error: any) { console.error('Database Dump Error:', error) return createResponse(undefined, 'Failed to create database dump', 500)