diff --git a/packages/destination-google-sheets/__tests__/memory-sheets.ts b/packages/destination-google-sheets/__tests__/memory-sheets.ts index 0c5724c02..509791450 100644 --- a/packages/destination-google-sheets/__tests__/memory-sheets.ts +++ b/packages/destination-google-sheets/__tests__/memory-sheets.ts @@ -61,6 +61,28 @@ export function createMemorySheets() { return label || 'A' } + // Slice `values` to an A1 range. `'Name'` → whole tab; `'Name'!A2:C[100]` → bounded. + function sliceByRange(values: unknown[][], range: string): unknown[][] { + const bang = range.indexOf('!') + if (bang < 0) return values + const m = range.slice(bang + 1).match(/^([A-Z]+)(\d+)?(?::([A-Z]+)(\d+)?)?$/) + if (!m) return values + const colIdx = (s: string) => + [...s].reduce((v, ch) => v * 26 + (ch.charCodeAt(0) - 64), 0) - 1 + const startCol = colIdx(m[1]) + const startRow = m[2] ? Number(m[2]) - 1 : 0 + const endCol = m[3] !== undefined ? colIdx(m[3]) : Infinity + const endRow = m[4] !== undefined ? Number(m[4]) - 1 : values.length - 1 + const out: unknown[][] = [] + for (let r = startRow; r <= Math.min(endRow, values.length - 1); r++) { + const src = values[r] ?? [] + const slice: unknown[] = [] + for (let c = startCol; c <= Math.min(endCol, src.length - 1); c++) slice.push(src[c]) + out.push(slice) + } + return out + } + function getTab(spreadsheetId: string, range: string): SheetTab { const ss = getSpreadsheet(spreadsheetId) const name = parseSheetName(range) @@ -69,6 +91,22 @@ export function createMemorySheets() { return tab } + function getTabBySheetId(spreadsheetId: string, sheetId: number): SheetTab { + const ss = getSpreadsheet(spreadsheetId) + for (const tab of ss.sheets.values()) { + if (tab.sheetId === sheetId) return tab + } + throw Object.assign(new Error(`Sheet not found: ${sheetId}`), { code: 400 }) + } + + function rowDataToValues(rowData: unknown): string[] { + const values = (rowData as { values?: unknown[] })?.values ?? [] + return values.map((cell) => { + const uev = (cell as { userEnteredValue?: { stringValue?: string } })?.userEnteredValue + return uev?.stringValue ?? '' + }) + } + const sheets = { spreadsheets: { async create(params: { requestBody?: { properties?: { title?: string } }; fields?: string }) { @@ -83,7 +121,11 @@ export function createMemorySheets() { async get(params: { spreadsheetId: string; fields?: string }) { const ss = getSpreadsheet(params.spreadsheetId) const sheetsMeta = Array.from(ss.sheets.entries()).map(([name, tab]) => ({ - properties: { sheetId: tab.sheetId, title: name }, + properties: { + sheetId: tab.sheetId, + title: name, + gridProperties: { rowCount: 1000, columnCount: 26 }, + }, })) return { data: { sheets: sheetsMeta } } }, @@ -118,6 +160,59 @@ export function createMemorySheets() { } } replies.push({}) + } else if (req.appendCells) { + const ac = req.appendCells as { sheetId: number; rows?: unknown[] } + const tab = getTabBySheetId(params.spreadsheetId, ac.sheetId) + for (const row of ac.rows ?? []) tab.values.push(rowDataToValues(row)) + replies.push({}) + } else if (req.updateCells) { + const uc = req.updateCells as { + start?: { sheetId?: number; rowIndex?: number; columnIndex?: number } + rows?: unknown[] + } + const sheetId = uc.start?.sheetId + if (sheetId != null) { + const tab = getTabBySheetId(params.spreadsheetId, sheetId) + const rowIndex = uc.start?.rowIndex ?? 0 + const rows = (uc.rows ?? []).map(rowDataToValues) + for (let i = 0; i < rows.length; i++) { + tab.values[rowIndex + i] = rows[i] + } + } + replies.push({}) + } else if (req.appendDimension) { + // No-op in the fake: the backing arrays grow dynamically, so the + // grid never actually constrains writes. Accept and reply empty + // so production code paths that call appendDimension succeed. + replies.push({}) + } else if (req.pasteData) { + // Parse a pasteData request and write its cells into the tab. The + // production code uses `\x1f` as column delimiter and `\n` as row + // delimiter (fixed by the API), with `PASTE_VALUES` semantics. + const pd = req.pasteData as { + coordinate?: { sheetId?: number; rowIndex?: number; columnIndex?: number } + data?: string + delimiter?: string + type?: string + } + const sheetId = pd.coordinate?.sheetId + if (sheetId != null) { + const tab = getTabBySheetId(params.spreadsheetId, sheetId) + const rowIndex = pd.coordinate?.rowIndex ?? 0 + const columnIndex = pd.coordinate?.columnIndex ?? 0 + const delimiter = pd.delimiter ?? '\t' + const raw = pd.data ?? '' + const rowLines = raw.length === 0 ? [] : raw.split('\n') + for (let i = 0; i < rowLines.length; i++) { + const cells = rowLines[i].split(delimiter) + const target: unknown[] = (tab.values[rowIndex + i] ?? []).slice() + for (let j = 0; j < cells.length; j++) { + target[columnIndex + j] = cells[j] + } + tab.values[rowIndex + i] = target + } + } + replies.push({}) } else { replies.push({}) } @@ -185,6 +280,19 @@ export function createMemorySheets() { const tab = getTab(params.spreadsheetId, params.range) return { data: { values: tab.values } } }, + + async batchGet(params: { spreadsheetId: string; ranges?: string[] }) { + const ranges = params.ranges ?? [] + const valueRanges = ranges.map((range) => { + try { + const tab = getTab(params.spreadsheetId, range) + return { range, values: sliceByRange(tab.values, range) } + } catch { + return { range, values: [] } + } + }) + return { data: { valueRanges } } + }, }, }, } as unknown as sheets_v4.Sheets diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index b27a3520b..89e594f9f 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -18,15 +18,19 @@ import { log } from './logger.js' import defaultSpec, { configSchema } from './spec.js' import type { Config } from './spec.js' import { - appendRows, - buildRowMap, + applyBatch, + batchReadSheets, + buildRowMapFromPkColumns, + buildRowMapFromRows, createIntroSheet, deleteSpreadsheet, ensureSheet, ensureSpreadsheet, + findSheetId, protectSheets, readHeaderRow, - updateRows, + type BatchReadRequest, + type StreamBatchOps, } from './writer.js' export { @@ -128,11 +132,10 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< const sheets = sheetsClient ?? makeSheetsClient(config) const spreadsheetId = await ensureSpreadsheet(sheets, config.spreadsheet_title) - // Create the Overview intro tab first (handles "Sheet1" rename if needed) - const streamNames = catalog.streams.map((s) => s.stream.name) - await createIntroSheet(sheets, spreadsheetId, streamNames) - - // Create a data tab for each stream with headers derived from its JSON schema + // Data tabs must exist before the Overview is written: its rows contain + // `=COUNTUNIQUE(''!A2:A)` formulas that Sheets parses with + // USER_ENTERED. If the referenced sheet doesn't exist yet the API + // rejects the update with `Unable to parse range: !A2:A`. const sheetIds: number[] = [] for (const { stream } of catalog.streams) { const properties = stream.json_schema?.['properties'] as Record | undefined @@ -141,10 +144,15 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< sheetIds.push(sheetId) } - // Protect all data tabs with a warning so users know edits may be overwritten + const streamNames = catalog.streams.map((s) => s.stream.name) + await createIntroSheet(sheets, spreadsheetId, streamNames) + await protectSheets(sheets, spreadsheetId, sheetIds) - yield msg.control({ control_type: 'destination_config', destination_config: { ...config, spreadsheet_id: spreadsheetId } }) + yield msg.control({ + control_type: 'destination_config', + destination_config: { ...config, spreadsheet_id: spreadsheetId }, + }) }, async *teardown({ config }) { @@ -176,7 +184,6 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< async *write({ config, catalog }, $stdin) { const sheets = sheetsClient ?? makeSheetsClient(config) - const batchSize = config.batch_size ?? 50 const primaryKeys = new Map( catalog.streams.map((configuredStream) => [ configuredStream.stream.name, @@ -190,14 +197,10 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< // Per-stream state: column headers plus buffered appends/updates. const streamHeaders = new Map() + const sheetIds = new Map() const appendBuffers = new Map>() const updateBuffers = new Map>() const rowAssignments: Record> = {} - // Row maps for native upsert: rowKey → 1-based row number per stream - const rowMaps = new Map>() - // Tracks whether we've refreshed the row map from the sheet for each stream - // (once per write() call, on first flush) - const rowMapRefreshed = new Set() // Pending append index: rowKey → index in appendBuffers for O(1) in-batch dedup const appendKeyIndex = new Map>() @@ -225,7 +228,11 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< const pkFields = pk?.map((path) => path[0]) ?? [] const rest = Object.keys(cleanData).filter((k) => !pkFields.includes(k)) headers = [...pkFields.filter((k) => k in cleanData), ...rest] - await ensureSheet(sheets, spreadsheetId, streamName, headers) + const sheetId = await ensureSheet(sheets, spreadsheetId, streamName, headers) + sheetIds.set(streamName, sheetId) + } else { + const sheetId = await findSheetId(sheets, spreadsheetId, streamName) + if (sheetId !== undefined) sheetIds.set(streamName, sheetId) } streamHeaders.set(streamName, headers) @@ -236,7 +243,8 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< const next = extendHeaders(headers, cleanData) if (next.changed) { - await ensureSheet(sheets, spreadsheetId, streamName, next.headers) + const sheetId = await ensureSheet(sheets, spreadsheetId, streamName, next.headers) + sheetIds.set(streamName, sheetId) streamHeaders.set(streamName, next.headers) headers = next.headers } @@ -244,115 +252,208 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< return headers } - const ensureRowMapForStream = async (streamName: string): Promise> => { - let map = rowMaps.get(streamName) - if (!map) { + const flushAll = async () => { + const flushStart = Date.now() + let totalBufferedAppends = 0 + let totalBufferedUpdates = 0 + for (const [, arr] of appendBuffers) totalBufferedAppends += arr.length + for (const [, arr] of updateBuffers) totalBufferedUpdates += arr.length + log.debug( + { + appends: totalBufferedAppends, + updates: totalBufferedUpdates, + streams: appendBuffers.size, + }, + 'flushAll start' + ) + + const opsByStream = new Map() + const streamNames = [...new Set([...appendBuffers.keys(), ...updateBuffers.keys()])] + + // Only streams with keyed appends need a read-before-flush pass for dedup. + type StreamPrep = { + streamName: string + sheetId: number + headers: string[] + primaryKey: string[][] | undefined + appends: Array<{ row: string[]; rowKey?: string }> + bufferedUpdates: Array<{ rowNumber: number; values: string[] }> + needsRead: boolean + } + const prepInputs: StreamPrep[] = [] + for (const streamName of streamNames) { + const bufferedAppends = appendBuffers.get(streamName) ?? [] + const bufferedUpdates = (updateBuffers.get(streamName) ?? []).slice() + if (bufferedAppends.length === 0 && bufferedUpdates.length === 0) continue + + const sheetId = sheetIds.get(streamName) + if (sheetId === undefined) continue + + const headers = streamHeaders.get(streamName) ?? [] const primaryKey = primaryKeys.get(streamName) - const headers = streamHeaders.get(streamName) - if (primaryKey && primaryKey.length > 0 && headers) { - try { - map = await buildRowMap(sheets, spreadsheetId, streamName, headers, primaryKey) - rowMapRefreshed.add(streamName) - } catch { - map = new Map() // sheet doesn't exist yet or is empty - } - } else { - map = new Map() // no primary key or no headers = append-only - } - rowMaps.set(streamName, map) + const needsRead = + !!primaryKey && + primaryKey.length > 0 && + headers.length > 0 && + bufferedAppends.some((e) => e.rowKey) + + prepInputs.push({ + streamName, + sheetId, + headers, + primaryKey, + appends: bufferedAppends.slice(), + bufferedUpdates, + needsRead, + }) } - return map - } - const flushStream = async (streamName: string) => { - const updates = updateBuffers.get(streamName) - if (updates && updates.length > 0) { - await updateRows(sheets, spreadsheetId, streamName, updates) - updateBuffers.set(streamName, []) + // One batchGet fetches all streams' existing rows at the cost of one + // read-quota unit, avoiding the 300 reads/min limit on wide catalogs. + // Narrow per-stream range when PK columns are the first N headers + // (guaranteed by setup); otherwise read the whole tab to locate PK. + const streamsToRead: BatchReadRequest[] = [] + const narrowByStream = new Map() + for (const prep of prepInputs) { + if (!prep.needsRead || !prep.primaryKey) continue + const pkFields = prep.primaryKey.map((p) => p[0]) + const pkIsFirstN = pkFields.every((field, i) => prep.headers[i] === field) + narrowByStream.set(prep.streamName, pkIsFirstN) + streamsToRead.push({ + name: prep.streamName, + ...(pkIsFirstN ? { columnCount: pkFields.length } : {}), + }) } - let appends = appendBuffers.get(streamName) - if (!appends || appends.length === 0) return - - // On the first flush per stream, refresh the row map from the sheet - // to catch rows written by previous write() calls or Temporal activity - // retries. Only done once per write() to avoid excessive API calls. - const primaryKey = primaryKeys.get(streamName) - const headers = streamHeaders.get(streamName) - if ( - !rowMapRefreshed.has(streamName) && - primaryKey && - primaryKey.length > 0 && - headers && - appends.some((e) => e.rowKey) - ) { - rowMapRefreshed.add(streamName) + let sheetRows = new Map() + if (streamsToRead.length > 0) { + const readStart = Date.now() try { - const freshMap = await buildRowMap( - sheets, - spreadsheetId, - streamName, - headers, - primaryKey + sheetRows = await batchReadSheets(sheets, spreadsheetId, streamsToRead) + let totalRows = 0 + for (const rows of sheetRows.values()) totalRows += rows.length + log.debug( + { + streams: streamsToRead.length, + narrow: streamsToRead.filter((r) => r.columnCount).length, + totalRows, + durationMs: Date.now() - readStart, + }, + 'batchReadSheets' ) - rowMaps.set(streamName, freshMap) - - const lateUpdates: Array<{ rowNumber: number; values: string[] }> = [] - const remaining: typeof appends = [] - for (const entry of appends) { - const existing = entry.rowKey ? freshMap.get(entry.rowKey) : undefined - if (existing !== undefined) { - lateUpdates.push({ rowNumber: existing, values: entry.row }) - } else { - remaining.push(entry) - } - } + } catch (err) { + log.warn( + { err, streams: streamsToRead.length, durationMs: Date.now() - readStart }, + 'batchReadSheets failed; proceeding without dedup' + ) + } + } - if (lateUpdates.length > 0) { - await updateRows(sheets, spreadsheetId, streamName, lateUpdates) + // Per-stream prep from pre-fetched rows. Stream order is preserved + // so row_assignments tracking matches the previous sequential impl. + for (const prep of prepInputs) { + const { streamName, sheetId, headers, primaryKey, bufferedUpdates, needsRead } = prep + let appends = prep.appends + let existingRowCount = 0 + + if (needsRead && primaryKey) { + const allRows = sheetRows.get(streamName) + if (allRows) { + const isNarrow = narrowByStream.get(streamName) === true + // Narrow reads skip the header row; add 1 so append startRow is correct. + existingRowCount = isNarrow ? allRows.length + 1 : allRows.length + const freshMap = isNarrow + ? buildRowMapFromPkColumns(allRows, primaryKey) + : buildRowMapFromRows(allRows, headers, primaryKey) + const remaining: typeof appends = [] + let converted = 0 + for (const entry of appends) { + const existing = entry.rowKey ? freshMap.get(entry.rowKey) : undefined + if (existing !== undefined) { + bufferedUpdates.push({ rowNumber: existing, values: entry.row }) + converted++ + } else { + remaining.push(entry) + } + } + appends = remaining + if (converted > 0) { + log.debug( + { + streamName, + existingRows: existingRowCount, + keys: freshMap.size, + converted, + }, + 'dedup: converted appends to updates' + ) + } } - appends = remaining - } catch { - // Sheet read failed — proceed with append (best effort) } + + opsByStream.set(streamName, { + sheetId, + updates: bufferedUpdates, + appends: appends.map((entry) => entry.row), + existingRowCount, + }) + // Stash deduped entries so row_assignments can be emitted after + // applyBatch returns per-stream start rows. + appendBuffers.set(streamName, appends) } - if (appends.length === 0) { - appendBuffers.set(streamName, []) - appendKeyIndex.get(streamName)?.clear() + if (opsByStream.size === 0) { + log.debug({ durationMs: Date.now() - flushStart }, 'flushAll: nothing to flush') return } - const range = await appendRows( - sheets, - spreadsheetId, - streamName, - appends.map((entry) => entry.row) + let totalAppends = 0 + let totalUpdates = 0 + for (const ops of opsByStream.values()) { + totalAppends += ops.appends.length + totalUpdates += ops.updates.length + } + log.debug( + { appends: totalAppends, updates: totalUpdates, streams: opsByStream.size }, + 'applyBatch start' ) - if (range) { - const map = rowMaps.get(streamName) + const applyStart = Date.now() + const results = await applyBatch(sheets, spreadsheetId, opsByStream) + log.debug({ durationMs: Date.now() - applyStart }, 'applyBatch done') + + for (const [streamName, { appendStartRow }] of results) { + const appends = appendBuffers.get(streamName) ?? [] for (let index = 0; index < appends.length; index++) { const rowKey = appends[index]?.rowKey if (!rowKey) continue - const rowNumber = range.startRow + index + const rowNumber = appendStartRow + index rowAssignments[streamName] ??= {} rowAssignments[streamName][rowKey] = rowNumber - map?.set(rowKey, rowNumber) } } - appendBuffers.set(streamName, []) - appendKeyIndex.get(streamName)?.clear() - } - const flushAll = async () => { - for (const streamName of new Set([...appendBuffers.keys(), ...updateBuffers.keys()])) { - await flushStream(streamName) + for (const streamName of opsByStream.keys()) { + appendBuffers.set(streamName, []) + appendKeyIndex.get(streamName)?.clear() + updateBuffers.set(streamName, []) } + + log.debug({ durationMs: Date.now() - flushStart }, 'flushAll done') } + const writeStart = Date.now() + let recordCount = 0 + let stateCount = 0 + let writeError: unknown = undefined + let cancelled = true + + // try/finally ensures flushAll runs even when the consumer closes us + // early via iterator.return() (e.g. takeLimits eof). Otherwise the + // buffered batch would be silently dropped. try { for await (const msg of $stdin) { if (msg.type === 'record') { + recordCount++ const { stream, data } = msg.record const cleanData = stripSystemFields(data) const headers = await ensureHeadersForRecord(stream, cleanData) @@ -371,58 +472,58 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< // 1. Explicit _row_number (backwards compat with service layer) updateBuffers.get(stream)!.push({ rowNumber, values: row }) } else if (rowKey) { - // 2. Native upsert: look up row key in the map - const map = await ensureRowMapForStream(stream) - const existingRow = map.get(rowKey) - if (existingRow !== undefined) { - updateBuffers.get(stream)!.push({ rowNumber: existingRow, values: row }) + // Upsert: buffer as append + in-batch dedup. flushAll splits + // into final appends vs updates after reading the sheet. + const buffer = appendBuffers.get(stream)! + const keyIdx = appendKeyIndex.get(stream)! + const pendingIdx = keyIdx.get(rowKey) + if (pendingIdx !== undefined) { + buffer[pendingIdx] = { row, rowKey } } else { - const buffer = appendBuffers.get(stream)! - const keyIdx = appendKeyIndex.get(stream)! - const pendingIdx = keyIdx.get(rowKey) - if (pendingIdx !== undefined) { - buffer[pendingIdx] = { row, rowKey } - } else { - keyIdx.set(rowKey, buffer.length) - buffer.push({ row, rowKey }) - } + keyIdx.set(rowKey, buffer.length) + buffer.push({ row, rowKey }) } } else { // 3. No key at all — pure append appendBuffers.get(stream)!.push({ row }) } - - const appendCount = appendBuffers.get(stream)?.length ?? 0 - const updateCount = updateBuffers.get(stream)?.length ?? 0 - if (appendCount + updateCount >= batchSize) { - await flushStream(stream) - } - yield msg - } else if (msg.type === 'source_state') { - // Flush the stream's pending rows, then re-emit the state checkpoint - if (msg.source_state.state_type === 'global') { - await flushAll() - } else { - await flushStream(msg.source_state.stream) - } yield msg } else { - // Pass through messages the destination doesn't handle + if (msg.type === 'source_state') stateCount++ + // Pass through non-record messages immediately; data is flushed at end. yield msg } } - // Flush any remaining rows - await flushAll() + cancelled = false + log.debug( + { durationMs: Date.now() - writeStart, recordCount, stateCount }, + '$stdin drained' + ) } catch (err: unknown) { - // Attempt to flush what we have before yielding the error + cancelled = false + writeError = err + log.error( + { err, durationMs: Date.now() - writeStart, recordCount, stateCount }, + 'write() error' + ) + } finally { + if (cancelled) { + log.warn( + { durationMs: Date.now() - writeStart, recordCount, stateCount }, + 'write() cancelled by consumer; flushing buffered data anyway' + ) + } try { await flushAll() - } catch { - // ignore flush errors during error handling + } catch (flushErr) { + log.error({ err: flushErr }, 'flushAll failed during teardown') + if (!writeError) writeError = flushErr } + } - const errMsg = err instanceof Error ? err.message : String(err) + if (writeError) { + const errMsg = writeError instanceof Error ? writeError.message : String(writeError) log.error(errMsg) yield { type: 'connection_status' as const, diff --git a/packages/destination-google-sheets/src/writer.ts b/packages/destination-google-sheets/src/writer.ts index bc199e0c3..4296a59b2 100644 --- a/packages/destination-google-sheets/src/writer.ts +++ b/packages/destination-google-sheets/src/writer.ts @@ -1,4 +1,5 @@ import type { drive_v3, sheets_v4 } from 'googleapis' +import { log } from './logger.js' import { serializeRowKey } from './metadata.js' /** @@ -12,22 +13,68 @@ const BACKOFF_BASE_MS = 1000 const BACKOFF_MAX_MS = 32000 const MAX_RETRIES = 5 -async function withRetry(fn: () => Promise): Promise { +async function withRetry(fn: () => Promise, label?: string): Promise { let delay = BACKOFF_BASE_MS + const overallStart = Date.now() + if (label) { + log.debug({ label }, 'withRetry start') + } for (let attempt = 0; ; attempt++) { + const attemptStart = Date.now() try { - return await fn() + const result = await fn() + if (label) { + const attemptMs = Date.now() - attemptStart + const totalMs = Date.now() - overallStart + if (attempt === 0) { + log.debug({ label, attemptMs }, 'withRetry OK first-try') + } else { + log.debug( + { label, attempts: attempt + 1, attemptMs, totalMs }, + 'withRetry OK after retries' + ) + } + } + return result } catch (err: unknown) { - const status = - err instanceof Error && 'code' in err ? (err as { code: number }).code : undefined + const attemptMs = Date.now() - attemptStart + const rawCode = + err instanceof Error && 'code' in err ? (err as { code?: number | string }).code : undefined + const status = typeof rawCode === 'number' ? rawCode : undefined const isRateLimit = status === 429 const isServerError = status !== undefined && status >= 500 + const retriable = isRateLimit || isServerError - if ((isRateLimit || isServerError) && attempt < MAX_RETRIES) { + if (retriable && attempt < MAX_RETRIES) { + if (label) { + log.warn( + { + err, + label, + attempt: attempt + 1, + maxRetries: MAX_RETRIES, + status, + attemptMs, + backingOffMs: delay, + }, + 'withRetry retry' + ) + } await new Promise((r) => setTimeout(r, delay)) delay = Math.min(delay * 2, BACKOFF_MAX_MS) continue } + + if (label) { + const totalMs = Date.now() - overallStart + const reason = retriable + ? `exhausted ${MAX_RETRIES} retries` + : `non-retriable (status=${rawCode ?? 'none'})` + log.error( + { err, label, reason, attempts: attempt + 1, attemptMs, totalMs }, + 'withRetry FAIL' + ) + } throw err } } @@ -149,6 +196,19 @@ export async function readHeaderRow( return Array.isArray(headerRow) ? headerRow.map((value) => String(value)) : [] } +/** Look up the numeric sheetId for a tab by name. Returns undefined if not found. */ +export async function findSheetId( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetName: string +): Promise { + const meta = await withRetry(() => + sheets.spreadsheets.get({ spreadsheetId, fields: 'sheets.properties' }) + ) + const tab = meta.data.sheets?.find((s) => s.properties?.title === sheetName) + return tab?.properties?.sheetId ?? undefined +} + function parseUpdatedRows(updatedRange: string): { startRow: number; endRow: number } { const match = updatedRange.match(/![A-Z]+(\d+)(?::[A-Z]+(\d+))?$/i) if (!match) throw new Error(`Unable to parse updated range: ${updatedRange}`) @@ -337,23 +397,19 @@ export async function deleteSpreadsheet( } /** - * Build a map from serialized primary key → 1-based row number by reading - * existing sheet data and extracting only the primary key columns. - * - * `headers` must already be known (from `readHeaderRow` or first-record discovery). + * Pure: serialized primary key → 1-based sheet row number, from rows you've + * already fetched. `headers` must be known. Prefer this over `buildRowMap` + * when you also need the row data; avoids a second read. */ -export async function buildRowMap( - sheets: sheets_v4.Sheets, - spreadsheetId: string, - sheetName: string, +export function buildRowMapFromRows( + allRows: unknown[][], headers: string[], primaryKey: string[][] -): Promise> { +): Map { const pkFields = primaryKey.map((path) => path[0]) const pkIndices = pkFields.map((field) => headers.indexOf(field)) if (pkIndices.some((i) => i === -1)) return new Map() - const allRows = await readSheet(sheets, spreadsheetId, sheetName) // Skip header row (index 0), data starts at index 1 const map = new Map() for (let i = 1; i < allRows.length; i++) { @@ -369,6 +425,46 @@ export async function buildRowMap( return map } +/** + * Like `buildRowMapFromRows` but for a header-less PK-only slice + * (see `batchReadSheets` with `columnCount`). Row i → sheet row i + 2. + */ +export function buildRowMapFromPkColumns( + pkRows: unknown[][], + primaryKey: string[][] +): Map { + const pkFields = primaryKey.map((path) => path[0]) + const map = new Map() + for (let i = 0; i < pkRows.length; i++) { + const row = pkRows[i] as string[] + const data: Record = {} + for (let j = 0; j < pkFields.length; j++) { + data[pkFields[j]] = row[j] ?? '' + } + const rowKey = serializeRowKey(primaryKey, data) + if (rowKey === '[""]' || rowKey === '[null]') continue + map.set(rowKey, i + 2) + } + return map +} + +/** + * Build a map from serialized primary key → 1-based row number by reading + * existing sheet data and extracting only the primary key columns. + * + * `headers` must already be known (from `readHeaderRow` or first-record discovery). + */ +export async function buildRowMap( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetName: string, + headers: string[], + primaryKey: string[][] +): Promise> { + const allRows = await readSheet(sheets, spreadsheetId, sheetName) + return buildRowMapFromRows(allRows, headers, primaryKey) +} + /** Read all values from a sheet tab. Used for verification in tests. */ export async function readSheet( sheets: sheets_v4.Sheets, @@ -383,3 +479,402 @@ export async function readSheet( ) return (res.data.values ?? []) as unknown[][] } + +function columnLetter(index: number): string { + let value = index + 1 + let label = '' + while (value > 0) { + const remainder = (value - 1) % 26 + label = String.fromCharCode(65 + remainder) + label + value = Math.floor((value - 1) / 26) + } + return label +} + +export interface BatchReadRequest { + name: string + /** Read only the first N columns starting at row 2 (header skipped). */ + columnCount?: number +} + +/** + * Read multiple sheet tabs in one `values.batchGet` call. Replaces N + * parallel reads with 1 request and 1 read-quota unit — required for wide + * catalogs (otherwise blows the 300/min read limit). Missing tabs map to + * empty arrays so callers can always `.get()` safely. + * + * With `columnCount` set: response is PK-only, header-less — use with + * {@link buildRowMapFromPkColumns}. Without: whole tab — use with + * {@link buildRowMapFromRows}. + */ +export async function batchReadSheets( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + requests: Array +): Promise> { + const result = new Map() + if (requests.length === 0) return result + const normalized = requests.map((r) => (typeof r === 'string' ? { name: r } : r)) + const res = await withRetry(() => + sheets.spreadsheets.values.batchGet({ + spreadsheetId, + ranges: normalized.map((req) => + req.columnCount && req.columnCount > 0 + ? `'${req.name}'!A2:${columnLetter(req.columnCount - 1)}` + : `'${req.name}'` + ), + }) + ) + const valueRanges = res.data.valueRanges ?? [] + for (let i = 0; i < normalized.length; i++) { + const entry = valueRanges[i] + const values = (entry?.values ?? []) as unknown[][] + result.set(normalized[i].name, values) + } + return result +} + +export interface StreamBatchOps { + sheetId: number + updates: { rowNumber: number; values: string[] }[] + appends: string[][] + existingRowCount: number +} + +// `pasteData` column delimiter. Unit Separator (U+001F) — a control char +// that won't naturally appear in Stripe data. Row separator is always `\n` +// (not configurable), so any `\n`, `\r`, or U+001F inside cells must be +// sanitized or the paste parser misaligns columns. +export const PASTE_COL_DELIMITER = '\x1f' +const PASTE_SANITIZE_RE = /[\n\r\x1f]/g + +function sanitizeForPaste(value: string): string { + return value.replace(PASTE_SANITIZE_RE, ' ') +} + +export function rowsToTsv(rows: string[][]): string { + let out = '' + for (let r = 0; r < rows.length; r++) { + const row = rows[r] + for (let c = 0; c < row.length; c++) { + if (c > 0) out += PASTE_COL_DELIMITER + out += sanitizeForPaste(row[c]) + } + if (r < rows.length - 1) out += '\n' + } + return out +} + +/** + * Flush buffered updates + appends across all streams. + * + * Phase 1 — parallel reads: gridProperties + per-stream row counts. + * Phase 3a — one batchUpdate with appendDimension requests (only if grids + * need to grow). Must precede data writes. + * Phase 3b — one batchUpdate with all pasteData requests. PASTE_VALUES + + * TSV is the cheapest wire payload (no formula eval, no + * cell-level parsing server-side). + * + * Returns per-stream 1-based `appendStartRow` for row_assignments. + */ +export async function applyBatch( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + opsByStream: Map +): Promise> { + const applyStart = Date.now() + + // ── Phase 1 (parallel reads) ──────────────────────────────────── + // gridProperties for every sheet + per-stream column-A row counts when + // we don't already have them (streams that bypassed buildRowMap). + type GridInfo = { rowCount: number; columnCount: number } + const gridInfo = new Map() + const probes: Array> = [] + + probes.push( + (async () => { + const metaStart = Date.now() + try { + const res = await withRetry( + () => + sheets.spreadsheets.get({ + spreadsheetId, + fields: 'sheets(properties(sheetId,gridProperties))', + }), + 'gridMetadata' + ) + for (const s of res.data.sheets ?? []) { + const id = s.properties?.sheetId + const gp = s.properties?.gridProperties + if (id != null && gp) { + gridInfo.set(id, { rowCount: gp.rowCount ?? 1000, columnCount: gp.columnCount ?? 26 }) + } + } + log.debug({ sheets: gridInfo.size, durationMs: Date.now() - metaStart }, 'gridMetadata') + } catch (err) { + log.warn({ err, durationMs: Date.now() - metaStart }, 'gridMetadata failed') + } + })() + ) + + for (const [streamName, ops] of opsByStream) { + if (ops.appends.length > 0 && ops.existingRowCount === 0) { + probes.push( + (async () => { + const probeStart = Date.now() + try { + const res = await withRetry( + () => + sheets.spreadsheets.values.get({ + spreadsheetId, + range: `'${streamName}'!A:A`, + majorDimension: 'ROWS', + }), + `rowCountProbe(${streamName})` + ) + ops.existingRowCount = (res.data.values ?? []).length + log.debug( + { + streamName, + rows: ops.existingRowCount, + durationMs: Date.now() - probeStart, + }, + 'rowCountProbe' + ) + } catch (err) { + log.warn( + { err, streamName, durationMs: Date.now() - probeStart }, + 'rowCountProbe failed' + ) + } + })() + ) + } + } + const phase1Start = Date.now() + await Promise.all(probes) + log.warn( + { parallelCalls: probes.length, durationMs: Date.now() - phase1Start }, + 'phase1 (reads) done' + ) + + // ── Phase 2 (build payloads) ──────────────────────────────────── + // `expansionRequests` run first (Phase 3a) — the grid must fit before + // pasteData writes. `dataRequests` are all dispatched in one batchUpdate + // (Phase 3b); each pasteData targets a distinct row range on its sheet. + const appendStartRows = new Map() + const expansionRequests: sheets_v4.Schema$Request[] = [] + const dataRequests: sheets_v4.Schema$Request[] = [] + const EXPAND_ROW_BUFFER = 1000 + + // 2a) appendDimension — only for grids that don't already fit. + const phase2aStart = Date.now() + for (const [, ops] of opsByStream) { + const maxUpdateRow = ops.updates.reduce((m, u) => Math.max(m, u.rowNumber), 0) + const maxAppendRow = ops.appends.length > 0 ? ops.existingRowCount + ops.appends.length : 0 + const neededRows = Math.max(maxUpdateRow, maxAppendRow) + + const maxUpdateCol = ops.updates.reduce((m, u) => Math.max(m, u.values.length), 0) + const maxAppendCol = ops.appends.reduce((m, row) => Math.max(m, row.length), 0) + const neededCols = Math.max(maxUpdateCol, maxAppendCol) + + const current = gridInfo.get(ops.sheetId) + if (!current) continue // metadata missing — best-effort; hope the grid fits + + if (neededRows > current.rowCount) { + expansionRequests.push({ + appendDimension: { + sheetId: ops.sheetId, + dimension: 'ROWS', + length: neededRows - current.rowCount + EXPAND_ROW_BUFFER, + }, + }) + } + if (neededCols > current.columnCount) { + expansionRequests.push({ + appendDimension: { + sheetId: ops.sheetId, + dimension: 'COLUMNS', + length: neededCols - current.columnCount, + }, + }) + } + } + const expansionCount = expansionRequests.length + log.debug( + { expansions: expansionCount, durationMs: Date.now() - phase2aStart }, + 'phase2a (expansions) planned' + ) + + // 2b) pasteData for contiguous update groups (one per group). + const phase2bStart = Date.now() + let updateGroupCount = 0 + let updateRowCount = 0 + let updateCellCount = 0 + let updateBytesEstimate = 0 + for (const [, ops] of opsByStream) { + if (ops.updates.length === 0) continue + const sortedUpdates = [...ops.updates].sort((a, b) => a.rowNumber - b.rowNumber) + let groupStart = 0 + while (groupStart < sortedUpdates.length) { + let groupEnd = groupStart + while ( + groupEnd + 1 < sortedUpdates.length && + sortedUpdates[groupEnd + 1].rowNumber === sortedUpdates[groupEnd].rowNumber + 1 + ) { + groupEnd++ + } + const firstRow = sortedUpdates[groupStart].rowNumber + const groupRows = sortedUpdates.slice(groupStart, groupEnd + 1).map((u) => { + updateCellCount += u.values.length + for (const v of u.values) updateBytesEstimate += v.length + return u.values + }) + dataRequests.push({ + pasteData: { + coordinate: { sheetId: ops.sheetId, rowIndex: firstRow - 1, columnIndex: 0 }, + data: rowsToTsv(groupRows), + delimiter: PASTE_COL_DELIMITER, + type: 'PASTE_VALUES', + }, + }) + updateGroupCount++ + updateRowCount += groupEnd - groupStart + 1 + groupStart = groupEnd + 1 + } + } + log.debug( + { + groups: updateGroupCount, + rows: updateRowCount, + cells: updateCellCount, + bytes: updateBytesEstimate, + durationMs: Date.now() - phase2bStart, + }, + 'phase2b (updates) planned' + ) + + // 2c) pasteData for appends — one request per stream. + const phase2cStart = Date.now() + let appendRowCount = 0 + let appendCellCount = 0 + let appendBytesEstimate = 0 + for (const [streamName, ops] of opsByStream) { + if (ops.appends.length === 0) continue + const startRow = ops.existingRowCount + 1 + for (const row of ops.appends) { + appendCellCount += row.length + for (const v of row) appendBytesEstimate += v.length + } + dataRequests.push({ + pasteData: { + coordinate: { sheetId: ops.sheetId, rowIndex: startRow - 1, columnIndex: 0 }, + data: rowsToTsv(ops.appends), + delimiter: PASTE_COL_DELIMITER, + type: 'PASTE_VALUES', + }, + }) + appendStartRows.set(streamName, { appendStartRow: startRow }) + appendRowCount += ops.appends.length + } + log.warn( + { + streams: appendStartRows.size, + rows: appendRowCount, + cells: appendCellCount, + bytes: appendBytesEstimate, + durationMs: Date.now() - phase2cStart, + }, + 'phase2c (appends) planned' + ) + + if (expansionRequests.length === 0 && dataRequests.length === 0) return appendStartRows + + const totalCells = updateCellCount + appendCellCount + const totalBytesEstimate = updateBytesEstimate + appendBytesEstimate + + // ── Phase 3a (grid expansion — runs first, only if needed) ───── + if (expansionRequests.length > 0) { + const expandStart = Date.now() + try { + const res = await withRetry( + () => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { requests: expansionRequests }, + }), + 'gridExpansion' + ) + log.warn( + { + status: res.status, + requests: expansionRequests.length, + durationMs: Date.now() - expandStart, + }, + 'gridExpansion OK' + ) + } catch (err) { + log.error( + { err, requests: expansionRequests.length, durationMs: Date.now() - expandStart }, + 'gridExpansion FAILED' + ) + throw err + } + } + + // ── Phase 3b (single batchUpdate with all data writes) ────────── + if (dataRequests.length === 0) return appendStartRows + + log.warn( + { + streams: opsByStream.size, + totalRequests: dataRequests.length, + expansions: expansionCount, + updateRows: updateRowCount, + appendRows: appendRowCount, + cells: totalCells, + bytes: totalBytesEstimate, + }, + 'batchUpdate dispatching' + ) + + const httpStart = Date.now() + try { + const res = await withRetry( + () => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { requests: dataRequests }, + }), + 'batchUpdate' + ) + log.debug( + { + status: res.status, + requests: dataRequests.length, + cells: totalCells, + replies: res.data.replies?.length ?? 0, + wallClockMs: Date.now() - httpStart, + applyBatchTotalMs: Date.now() - applyStart, + }, + 'batchUpdate OK' + ) + } catch (err) { + log.error( + { + err, + totalRequests: dataRequests.length, + expansions: expansionCount, + updateRows: updateRowCount, + appendRows: appendRowCount, + cells: totalCells, + wallClockMs: Date.now() - httpStart, + applyBatchTotalMs: Date.now() - applyStart, + }, + 'batchUpdate FAILED' + ) + throw err + } + + return appendStartRows +}