From 18daf6d307dc59ead910e2c8226eb772795f1c91 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Sat, 7 Mar 2026 23:38:57 +0530 Subject: [PATCH 1/2] Stream CSV exports to storage with multipart uploads --- .../(ee)/api/cron/export/commissions/route.ts | 28 ++-- .../(ee)/api/cron/export/customers/route.ts | 36 ++--- .../api/cron/export/events/partner/route.ts | 6 +- .../api/cron/export/events/workspace/route.ts | 6 +- .../app/(ee)/api/cron/export/links/route.ts | 25 ++- .../(ee)/api/cron/export/partners/route.ts | 24 ++- .../web/app/(ee)/api/partners/export/route.ts | 2 +- .../[programId]/applications/export/route.ts | 2 +- apps/web/lib/analytics/utils/index.ts | 2 +- .../utils => exports}/convert-to-csv.ts | 6 +- .../create-downloadable-export.ts | 8 +- apps/web/lib/exports/export-csv-to-storage.ts | 116 +++++++++++++ .../generate-export-filename.ts | 3 - apps/web/lib/exports/multipart-storage.ts | 152 ++++++++++++++++++ apps/web/lib/storage.ts | 8 +- .../ui/modals/export-commissions-modal.tsx | 2 +- apps/web/ui/modals/export-customers-modal.tsx | 2 +- apps/web/ui/modals/export-partners-modal.tsx | 2 +- 18 files changed, 332 insertions(+), 98 deletions(-) rename apps/web/lib/{analytics/utils => exports}/convert-to-csv.ts (64%) rename apps/web/lib/{api => exports}/create-downloadable-export.ts (83%) create mode 100644 apps/web/lib/exports/export-csv-to-storage.ts rename apps/web/lib/{api/utils => exports}/generate-export-filename.ts (60%) create mode 100644 apps/web/lib/exports/multipart-storage.ts diff --git a/apps/web/app/(ee)/api/cron/export/commissions/route.ts b/apps/web/app/(ee)/api/cron/export/commissions/route.ts index a2a30da1715..455fd858422 100644 --- a/apps/web/app/(ee)/api/cron/export/commissions/route.ts +++ b/apps/web/app/(ee)/api/cron/export/commissions/route.ts @@ -1,9 +1,8 @@ -import { convertToCSV } from "@/lib/analytics/utils/convert-to-csv"; import { formatCommissionsForExport } from "@/lib/api/commissions/format-commissions-for-export"; -import { createDownloadableExport } from "@/lib/api/create-downloadable-export"; import { handleAndReturnErrorResponse } from "@/lib/api/errors"; -import { generateExportFilename } from "@/lib/api/utils/generate-export-filename"; import { generateRandomString } from "@/lib/api/utils/generate-random-string"; +import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; import { commissionsExportQuerySchema } from "@/lib/zod/schemas/commissions"; import { sendEmail } from "@dub/email"; @@ -65,26 +64,23 @@ export async function POST(req: Request) { ); } - // Fetch commissions in batches and build CSV - const allCommissions: any[] = []; const commissionsFilters = { ...filters, programId, }; - for await (const { commissions } of fetchCommissionsBatch( - commissionsFilters, - )) { - allCommissions.push(...formatCommissionsForExport(commissions, columns)); - } - - const csvData = convertToCSV(allCommissions); + const formattedBatches = async function* () { + for await (const { commissions } of fetchCommissionsBatch( + commissionsFilters, + )) { + yield formatCommissionsForExport(commissions, columns); + } + }; - const { downloadUrl } = await createDownloadableExport({ + const { downloadUrl, rowCount } = await exportCsvToStorage({ fileKey: `exports/commissions/${generateRandomString(16)}.csv`, fileName: generateExportFilename("commissions"), - body: csvData, - contentType: "text/csv", + batches: formattedBatches(), }); await sendEmail({ @@ -101,7 +97,7 @@ export async function POST(req: Request) { }); return logAndRespond( - `Export (${allCommissions.length} commissions) generated and email sent to user.`, + `Export (${rowCount} commissions) generated and email sent to user.`, ); } catch (error) { await log({ diff --git a/apps/web/app/(ee)/api/cron/export/customers/route.ts b/apps/web/app/(ee)/api/cron/export/customers/route.ts index f4004134cff..a957aed1c2a 100644 --- a/apps/web/app/(ee)/api/cron/export/customers/route.ts +++ b/apps/web/app/(ee)/api/cron/export/customers/route.ts @@ -1,18 +1,15 @@ -import { convertToCSV } from "@/lib/analytics/utils/convert-to-csv"; -import { createDownloadableExport } from "@/lib/api/create-downloadable-export"; -import { generateExportFilename } from "@/lib/api/utils/generate-export-filename"; import { generateRandomString } from "@/lib/api/utils/generate-random-string"; import { withCron } from "@/lib/cron/with-cron"; import { fetchCustomersBatch } from "@/lib/customers/api/fetch-customers-batch"; import { formatCustomersForExport } from "@/lib/customers/api/format-customers-export"; +import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import { customersExportCronInputSchema } from "@/lib/zod/schemas/customers"; import { sendEmail } from "@dub/email"; import ExportReady from "@dub/email/templates/export-ready"; import { prisma } from "@dub/prisma"; import { logAndRespond } from "../../utils"; -const MAX_CUSTOMERS_EXPORT_LIMIT = 100_000; - export const dynamic = "force-dynamic"; // POST /api/cron/export/customers - QStash worker for processing large customer exports @@ -53,26 +50,16 @@ export const POST = withCron(async ({ rawBody }) => { return logAndRespond(`Workspace ${workspaceId} not found.`); } - const allRows: Record[] = []; - - for await (const { customers } of fetchCustomersBatch(parsedFilters)) { - const formatted = formatCustomersForExport(customers, columns); - const remaining = MAX_CUSTOMERS_EXPORT_LIMIT - allRows.length; - - if (remaining <= 0) { - break; + const formattedBatches = async function* () { + for await (const { customers } of fetchCustomersBatch(parsedFilters)) { + yield formatCustomersForExport(customers, columns); } + }; - allRows.push(...formatted.slice(0, remaining)); - } - - const csvData = convertToCSV(allRows); - - const { downloadUrl } = await createDownloadableExport({ + const { downloadUrl, rowCount } = await exportCsvToStorage({ fileKey: `exports/customers/${generateRandomString(16)}.csv`, fileName: generateExportFilename("customers"), - body: csvData, - contentType: "text/csv", + batches: formattedBatches(), }); await sendEmail({ @@ -88,12 +75,7 @@ export const POST = withCron(async ({ rawBody }) => { }), }); - const capped = - allRows.length >= MAX_CUSTOMERS_EXPORT_LIMIT - ? ` (capped at ${MAX_CUSTOMERS_EXPORT_LIMIT})` - : ""; - return logAndRespond( - `Export (${allRows.length} customers${capped}) generated and email sent to user.`, + `Export (${rowCount} customers) generated and email sent to user.`, ); }); diff --git a/apps/web/app/(ee)/api/cron/export/events/partner/route.ts b/apps/web/app/(ee)/api/cron/export/events/partner/route.ts index 6f38dcdbb41..c9a8beb3472 100644 --- a/apps/web/app/(ee)/api/cron/export/events/partner/route.ts +++ b/apps/web/app/(ee)/api/cron/export/events/partner/route.ts @@ -3,12 +3,12 @@ import { eventsExportColumnNames, } from "@/lib/analytics/events-export-helpers"; import { getFirstFilterValue } from "@/lib/analytics/filter-helpers"; -import { convertToCSV } from "@/lib/analytics/utils/convert-to-csv"; -import { createDownloadableExport } from "@/lib/api/create-downloadable-export"; +import { convertToCSV } from "@/lib/exports/convert-to-csv"; +import { createDownloadableExport } from "@/lib/exports/create-downloadable-export"; import { handleAndReturnErrorResponse } from "@/lib/api/errors"; import { obfuscateCustomerEmail } from "@/lib/api/partner-profile/obfuscate-customer-email"; import { getProgramEnrollmentOrThrow } from "@/lib/api/programs/get-program-enrollment-or-throw"; -import { generateExportFilename } from "@/lib/api/utils/generate-export-filename"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import { generateRandomString } from "@/lib/api/utils/generate-random-string"; import { MAX_PARTNER_LINKS_FOR_LOCAL_FILTERING } from "@/lib/constants/partner-profile"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; diff --git a/apps/web/app/(ee)/api/cron/export/events/workspace/route.ts b/apps/web/app/(ee)/api/cron/export/events/workspace/route.ts index d497fe824b8..15c546072f5 100644 --- a/apps/web/app/(ee)/api/cron/export/events/workspace/route.ts +++ b/apps/web/app/(ee)/api/cron/export/events/workspace/route.ts @@ -2,10 +2,10 @@ import { eventsExportColumnAccessors, eventsExportColumnNames, } from "@/lib/analytics/events-export-helpers"; -import { convertToCSV } from "@/lib/analytics/utils/convert-to-csv"; -import { createDownloadableExport } from "@/lib/api/create-downloadable-export"; +import { convertToCSV } from "@/lib/exports/convert-to-csv"; +import { createDownloadableExport } from "@/lib/exports/create-downloadable-export"; import { handleAndReturnErrorResponse } from "@/lib/api/errors"; -import { generateExportFilename } from "@/lib/api/utils/generate-export-filename"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import { generateRandomString } from "@/lib/api/utils/generate-random-string"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; import { eventsQuerySchema } from "@/lib/zod/schemas/analytics"; diff --git a/apps/web/app/(ee)/api/cron/export/links/route.ts b/apps/web/app/(ee)/api/cron/export/links/route.ts index bd4d1850668..5511279af9d 100644 --- a/apps/web/app/(ee)/api/cron/export/links/route.ts +++ b/apps/web/app/(ee)/api/cron/export/links/route.ts @@ -1,11 +1,10 @@ -import { convertToCSV } from "@/lib/analytics/utils/convert-to-csv"; import { getStartEndDates } from "@/lib/analytics/utils/get-start-end-dates"; -import { createDownloadableExport } from "@/lib/api/create-downloadable-export"; import { handleAndReturnErrorResponse } from "@/lib/api/errors"; import { formatLinksForExport } from "@/lib/api/links/format-links-for-export"; import { validateLinksQueryFilters } from "@/lib/api/links/validate-links-query-filters"; -import { generateExportFilename } from "@/lib/api/utils/generate-export-filename"; import { generateRandomString } from "@/lib/api/utils/generate-random-string"; +import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import { MEGA_WORKSPACE_LINKS_LIMIT } from "@/lib/constants/misc"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; import { PlanProps } from "@/lib/types"; @@ -97,9 +96,6 @@ export async function POST(req: Request) { end: end ? endOfDay(new Date(end)) : undefined, }); - // Fetch links in batches and build CSV - const allLinks: Record[] = []; - const linksFilters = { ...filters, ...(interval !== "all" && { @@ -116,17 +112,16 @@ export async function POST(req: Request) { folderIds, }; - for await (const { links } of fetchLinksBatch(linksFilters)) { - allLinks.push(...formatLinksForExport(links, columns)); - } - - const csvData = convertToCSV(allLinks); + const formattedBatches = async function* () { + for await (const { links } of fetchLinksBatch(linksFilters)) { + yield formatLinksForExport(links, columns); + } + }; - const { downloadUrl } = await createDownloadableExport({ + const { downloadUrl, rowCount } = await exportCsvToStorage({ fileKey: `exports/links/${generateRandomString(16)}.csv`, fileName: generateExportFilename("links"), - body: csvData, - contentType: "text/csv", + batches: formattedBatches(), }); await sendEmail({ @@ -143,7 +138,7 @@ export async function POST(req: Request) { }); return logAndRespond( - `Export (${allLinks.length} links) generated and email sent to user.`, + `Export (${rowCount} links) generated and email sent to user.`, ); } catch (error) { await log({ diff --git a/apps/web/app/(ee)/api/cron/export/partners/route.ts b/apps/web/app/(ee)/api/cron/export/partners/route.ts index 7774dba716b..c1c54e4b24a 100644 --- a/apps/web/app/(ee)/api/cron/export/partners/route.ts +++ b/apps/web/app/(ee)/api/cron/export/partners/route.ts @@ -1,9 +1,8 @@ -import { convertToCSV } from "@/lib/analytics/utils/convert-to-csv"; -import { createDownloadableExport } from "@/lib/api/create-downloadable-export"; import { handleAndReturnErrorResponse } from "@/lib/api/errors"; import { formatPartnersForExport } from "@/lib/api/partners/format-partners-for-export"; -import { generateExportFilename } from "@/lib/api/utils/generate-export-filename"; import { generateRandomString } from "@/lib/api/utils/generate-random-string"; +import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; import { partnersExportQuerySchema } from "@/lib/zod/schemas/partners"; import { sendEmail } from "@dub/email"; @@ -65,24 +64,21 @@ export async function POST(req: Request) { ); } - // Fetch partners in batches and build CSV - const allPartners: any[] = []; const partnersFilters = { ...filters, programId, }; - for await (const { partners } of fetchPartnersBatch(partnersFilters)) { - allPartners.push(...formatPartnersForExport(partners, columns)); - } - - const csvData = convertToCSV(allPartners); + const formattedBatches = async function* () { + for await (const { partners } of fetchPartnersBatch(partnersFilters)) { + yield formatPartnersForExport(partners, columns); + } + }; - const { downloadUrl } = await createDownloadableExport({ + const { downloadUrl, rowCount } = await exportCsvToStorage({ fileKey: `exports/partners/${generateRandomString(16)}.csv`, fileName: generateExportFilename("partners"), - body: csvData, - contentType: "text/csv", + batches: formattedBatches(), }); await sendEmail({ @@ -99,7 +95,7 @@ export async function POST(req: Request) { }); return logAndRespond( - `Export (${allPartners.length} partners) generated and email sent to user.`, + `Export (${rowCount} partners) generated and email sent to user.`, ); } catch (error) { await log({ diff --git a/apps/web/app/(ee)/api/partners/export/route.ts b/apps/web/app/(ee)/api/partners/export/route.ts index f96f8b9e55b..e1c8be186a9 100644 --- a/apps/web/app/(ee)/api/partners/export/route.ts +++ b/apps/web/app/(ee)/api/partners/export/route.ts @@ -1,4 +1,4 @@ -import { convertToCSV } from "@/lib/analytics/utils/convert-to-csv"; +import { convertToCSV } from "@/lib/exports/convert-to-csv"; import { formatPartnersForExport } from "@/lib/api/partners/format-partners-for-export"; import { getPartners } from "@/lib/api/partners/get-partners"; import { getPartnersCount } from "@/lib/api/partners/get-partners-count"; diff --git a/apps/web/app/(ee)/api/programs/[programId]/applications/export/route.ts b/apps/web/app/(ee)/api/programs/[programId]/applications/export/route.ts index b47a10c231a..5284d805568 100644 --- a/apps/web/app/(ee)/api/programs/[programId]/applications/export/route.ts +++ b/apps/web/app/(ee)/api/programs/[programId]/applications/export/route.ts @@ -1,4 +1,4 @@ -import { convertToCSV } from "@/lib/analytics/utils/convert-to-csv"; +import { convertToCSV } from "@/lib/exports/convert-to-csv"; import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw"; import { withWorkspace } from "@/lib/auth"; import { diff --git a/apps/web/lib/analytics/utils/index.ts b/apps/web/lib/analytics/utils/index.ts index dde64e9f882..3e26f57f8d4 100644 --- a/apps/web/lib/analytics/utils/index.ts +++ b/apps/web/lib/analytics/utils/index.ts @@ -1,4 +1,4 @@ -export * from "./convert-to-csv"; +export * from "@/lib/exports/convert-to-csv"; export * from "./edit-query-string"; export * from "./get-interval-data"; export * from "./valid-date-range-for-plan"; diff --git a/apps/web/lib/analytics/utils/convert-to-csv.ts b/apps/web/lib/exports/convert-to-csv.ts similarity index 64% rename from apps/web/lib/analytics/utils/convert-to-csv.ts rename to apps/web/lib/exports/convert-to-csv.ts index 20ba9314e31..58e2ba2e118 100644 --- a/apps/web/lib/analytics/utils/convert-to-csv.ts +++ b/apps/web/lib/exports/convert-to-csv.ts @@ -1,7 +1,11 @@ import { json2csv } from "json-2-csv"; -export const convertToCSV = (data: object[]) => { +export const convertToCSV = ( + data: object[], + opts?: { prependHeader?: boolean }, +) => { return json2csv(data, { + prependHeader: opts?.prependHeader ?? true, parseValue(fieldValue, defaultParser) { if (fieldValue instanceof Date) { return fieldValue.toISOString(); diff --git a/apps/web/lib/api/create-downloadable-export.ts b/apps/web/lib/exports/create-downloadable-export.ts similarity index 83% rename from apps/web/lib/api/create-downloadable-export.ts rename to apps/web/lib/exports/create-downloadable-export.ts index 84507eb53fb..5bdbfbb2744 100644 --- a/apps/web/lib/api/create-downloadable-export.ts +++ b/apps/web/lib/exports/create-downloadable-export.ts @@ -1,4 +1,5 @@ import { storage } from "../storage"; +import { SIGNED_URL_EXPIRY } from "./export-csv-to-storage"; interface CreateDownloadableExportOptions { fileKey: string; @@ -7,9 +8,6 @@ interface CreateDownloadableExportOptions { contentType: string; } -const expiresIn = 7 * 24 * 3600; // 7 days - -// Upload the .csv file to R2 and return a signed link to download it export async function createDownloadableExport({ fileKey, fileName, @@ -18,7 +16,6 @@ export async function createDownloadableExport({ }: CreateDownloadableExportOptions) { const blob = new Blob([body], { type: contentType }); - // Upload const uploadResult = await storage.upload({ key: fileKey, body: blob, @@ -35,10 +32,9 @@ export async function createDownloadableExport({ throw new Error(`Failed to upload ${contentType} file.`); } - // Generate a signed download URL const downloadUrl = await storage.getSignedDownloadUrl({ key: fileKey, - expiresIn, + expiresIn: SIGNED_URL_EXPIRY, }); if (!downloadUrl) { diff --git a/apps/web/lib/exports/export-csv-to-storage.ts b/apps/web/lib/exports/export-csv-to-storage.ts new file mode 100644 index 00000000000..0d951af2f12 --- /dev/null +++ b/apps/web/lib/exports/export-csv-to-storage.ts @@ -0,0 +1,116 @@ +import { storage } from "../storage"; +import { convertToCSV } from "./convert-to-csv"; +import { multipartStorage } from "./multipart-storage"; + +const MIN_PART_SIZE = 5 * 1024 * 1024; // 5 MB — S3/R2 minimum for non-final parts +export const SIGNED_URL_EXPIRY = 7 * 24 * 3600; // 7 days + +interface ExportCsvToStorageOptions { + fileKey: string; + fileName: string; + batches: AsyncIterable; +} + +interface ExportCsvToStorageResult { + downloadUrl: string; + rowCount: number; +} + +export async function exportCsvToStorage({ + fileKey, + fileName, + batches, +}: ExportCsvToStorageOptions): Promise { + console.log(`[csv-export] Starting export to ${fileKey}`); + + const uploadId = await multipartStorage.initiateMultipartUpload({ + key: fileKey, + bucket: "private", + contentType: "text/csv", + contentDisposition: `attachment; filename="${fileName}"`, + }); + + const parts: { etag: string; partNumber: number }[] = []; + let buffer = ""; + let partNumber = 1; + let rowCount = 0; + let isFirstBatch = true; + + try { + for await (const rows of batches) { + if (rows.length === 0) continue; + + const csvChunk = convertToCSV(rows, { + prependHeader: isFirstBatch, + }); + + if (isFirstBatch) { + buffer = csvChunk; + isFirstBatch = false; + } else { + buffer += "\n" + csvChunk; + } + + rowCount += rows.length; + + while (buffer.length >= MIN_PART_SIZE) { + const chunk = buffer.slice(0, MIN_PART_SIZE); + buffer = buffer.slice(MIN_PART_SIZE); + + const result = await multipartStorage.uploadPart({ + key: fileKey, + uploadId, + partNumber, + body: chunk, + bucket: "private", + }); + + parts.push(result); + partNumber++; + } + } + + if (buffer.length > 0 || parts.length === 0) { + const result = await multipartStorage.uploadPart({ + key: fileKey, + uploadId, + partNumber, + body: buffer, + bucket: "private", + }); + parts.push(result); + } + + await multipartStorage.completeMultipartUpload({ + key: fileKey, + uploadId, + parts, + bucket: "private", + }); + + const downloadUrl = await storage.getSignedDownloadUrl({ + key: fileKey, + expiresIn: SIGNED_URL_EXPIRY, + }); + + if (!downloadUrl) { + throw new Error("Failed to generate signed download URL."); + } + + console.log( + `[csv-export] Export complete: ${rowCount} rows, ${parts.length} parts → ${fileKey}`, + ); + + return { downloadUrl, rowCount }; + } catch (error) { + console.error(`[csv-export] Export failed for ${fileKey}, aborting upload`); + + await multipartStorage.abortMultipartUpload({ + key: fileKey, + uploadId, + bucket: "private", + }); + + throw error; + } +} diff --git a/apps/web/lib/api/utils/generate-export-filename.ts b/apps/web/lib/exports/generate-export-filename.ts similarity index 60% rename from apps/web/lib/api/utils/generate-export-filename.ts rename to apps/web/lib/exports/generate-export-filename.ts index ad40b8d4bd6..14fc1c76c62 100644 --- a/apps/web/lib/api/utils/generate-export-filename.ts +++ b/apps/web/lib/exports/generate-export-filename.ts @@ -1,9 +1,6 @@ import { capitalize } from "@dub/utils"; -// Generates a sanitized filename for exports with a timestamp -// Example: "Dub Partners Export - 2025-10-27-15-49-12.csv" export function generateExportFilename(exportType: string): string { - // Sanitize timestamp: remove colons, replace T with hyphen, remove milliseconds and Z const sanitizedTimestamp = new Date() .toISOString() .replace(/:/g, "-") diff --git a/apps/web/lib/exports/multipart-storage.ts b/apps/web/lib/exports/multipart-storage.ts new file mode 100644 index 00000000000..686ac82ce8a --- /dev/null +++ b/apps/web/lib/exports/multipart-storage.ts @@ -0,0 +1,152 @@ +import { type BucketType, StorageClient } from "../storage"; + +class MultipartStorageClient extends StorageClient { + async initiateMultipartUpload({ + key, + bucket = "private", + contentType, + contentDisposition, + }: { + key: string; + bucket?: BucketType; + contentType?: string; + contentDisposition?: string; + }): Promise { + const url = `${process.env.STORAGE_ENDPOINT}/${this._getBucketName(bucket)}/${key}?uploads`; + + const headers: Record = {}; + if (contentType) headers["Content-Type"] = contentType; + if (contentDisposition) headers["Content-Disposition"] = contentDisposition; + + console.log(`[multipart-upload] Initiating upload for ${key}`); + + const response = await this.client.fetch(url, { + method: "POST", + headers, + }); + + if (!response.ok) { + throw new Error( + `Failed to initiate multipart upload: ${response.statusText}`, + ); + } + + const xml = await response.text(); + const uploadId = xml.match(/(.+?)<\/UploadId>/)?.[1]; + + if (!uploadId) { + throw new Error( + "Failed to parse UploadId from multipart upload response", + ); + } + + console.log( + `[multipart-upload] Upload initiated for ${key} (uploadId: ${uploadId})`, + ); + return uploadId; + } + + async uploadPart({ + key, + uploadId, + partNumber, + body, + bucket = "private", + }: { + key: string; + uploadId: string; + partNumber: number; + body: string | Blob; + bucket?: BucketType; + }): Promise<{ etag: string; partNumber: number }> { + const url = `${process.env.STORAGE_ENDPOINT}/${this._getBucketName(bucket)}/${key}?partNumber=${partNumber}&uploadId=${uploadId}`; + + const blob = + body instanceof Blob ? body : new Blob([body], { type: "text/csv" }); + + console.log( + `[multipart-upload] Uploading part ${partNumber} (${blob.size} bytes) for ${key}`, + ); + + const response = await this.client.fetch(url, { + method: "PUT", + headers: { + "Content-Length": blob.size.toString(), + }, + body: blob, + }); + + if (!response.ok) { + throw new Error( + `Failed to upload part ${partNumber}: ${response.statusText}`, + ); + } + + const etag = response.headers.get("etag"); + + if (!etag) { + throw new Error(`Missing ETag in response for part ${partNumber}`); + } + + return { etag, partNumber }; + } + + async completeMultipartUpload({ + key, + uploadId, + parts, + bucket = "private", + }: { + key: string; + uploadId: string; + parts: { etag: string; partNumber: number }[]; + bucket?: BucketType; + }): Promise { + const url = `${process.env.STORAGE_ENDPOINT}/${this._getBucketName(bucket)}/${key}?uploadId=${uploadId}`; + + const xml = `${parts.map((p) => `${p.partNumber}${p.etag}`).join("")}`; + + const response = await this.client.fetch(url, { + method: "POST", + headers: { "Content-Type": "application/xml" }, + body: xml, + }); + + if (!response.ok) { + throw new Error( + `Failed to complete multipart upload: ${response.statusText}`, + ); + } + + console.log( + `[multipart-upload] Completed upload for ${key} (${parts.length} parts)`, + ); + } + + async abortMultipartUpload({ + key, + uploadId, + bucket = "private", + }: { + key: string; + uploadId: string; + bucket?: BucketType; + }): Promise { + try { + const url = `${process.env.STORAGE_ENDPOINT}/${this._getBucketName(bucket)}/${key}?uploadId=${uploadId}`; + + await this.client.fetch(url, { + method: "DELETE", + }); + + console.log(`[multipart-upload] Aborted upload for ${key}`); + } catch (error) { + console.error( + `[multipart-upload] Failed to abort upload for ${key}:`, + error, + ); + } + } +} + +export const multipartStorage = new MultipartStorageClient(); diff --git a/apps/web/lib/storage.ts b/apps/web/lib/storage.ts index 21a111b2c07..ee2f606a9bc 100644 --- a/apps/web/lib/storage.ts +++ b/apps/web/lib/storage.ts @@ -8,10 +8,10 @@ interface imageOptions { headers?: Record; } -type BucketType = "public" | "private"; +export type BucketType = "public" | "private"; -class StorageClient { - private client: AwsClient; +export class StorageClient { + protected client: AwsClient; constructor() { this.client = new AwsClient({ @@ -223,7 +223,7 @@ class StorageClient { return blob; } - private _getBucketName(bucket: BucketType) { + protected _getBucketName(bucket: BucketType) { if (bucket === "public") { const bucketName = process.env.STORAGE_PUBLIC_BUCKET; diff --git a/apps/web/ui/modals/export-commissions-modal.tsx b/apps/web/ui/modals/export-commissions-modal.tsx index 0af58b466ee..1c45c7de513 100644 --- a/apps/web/ui/modals/export-commissions-modal.tsx +++ b/apps/web/ui/modals/export-commissions-modal.tsx @@ -1,6 +1,6 @@ "use client"; -import { generateExportFilename } from "@/lib/api/utils/generate-export-filename"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import useProgram from "@/lib/swr/use-program"; import useWorkspace from "@/lib/swr/use-workspace"; import { diff --git a/apps/web/ui/modals/export-customers-modal.tsx b/apps/web/ui/modals/export-customers-modal.tsx index 92531b43059..1890368a3aa 100644 --- a/apps/web/ui/modals/export-customers-modal.tsx +++ b/apps/web/ui/modals/export-customers-modal.tsx @@ -1,6 +1,6 @@ "use client"; -import { generateExportFilename } from "@/lib/api/utils/generate-export-filename"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import useProgram from "@/lib/swr/use-program"; import useWorkspace from "@/lib/swr/use-workspace"; import { diff --git a/apps/web/ui/modals/export-partners-modal.tsx b/apps/web/ui/modals/export-partners-modal.tsx index 1e4cd2271cd..b0085e0610a 100644 --- a/apps/web/ui/modals/export-partners-modal.tsx +++ b/apps/web/ui/modals/export-partners-modal.tsx @@ -1,4 +1,4 @@ -import { generateExportFilename } from "@/lib/api/utils/generate-export-filename"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import useProgram from "@/lib/swr/use-program"; import useWorkspace from "@/lib/swr/use-workspace"; import { From 90eadcc96d933ecdacc3496b960752d5a2517561 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Tue, 24 Mar 2026 19:50:58 +0530 Subject: [PATCH 2/2] Refactor CSV exports: cursor pagination, remove limits, reorganize into lib/exports/ - Switch links, commissions, and customers batch fetchers to cursor-based pagination - Remove MAX_CUSTOMERS_TO_EXPORT limit, always use async streaming export - Move multipart-storage.ts to lib/ alongside storage.ts - Consolidate export code into lib/exports/{entity}/ with fetch.ts, format.ts, export.ts - Cron route handlers now delegate to export.ts orchestrators --- .../app/(ee)/api/commissions/export/route.ts | 2 +- .../(ee)/api/cron/export/commissions/route.ts | 29 +++------- .../(ee)/api/cron/export/customers/route.ts | 19 ++----- .../app/(ee)/api/cron/export/links/route.ts | 49 ++++++---------- .../(ee)/api/cron/export/partners/route.ts | 27 +++------ .../app/(ee)/api/customers/export/route.ts | 57 ++++--------------- .../web/app/(ee)/api/partners/export/route.ts | 2 +- apps/web/app/api/links/export/route.ts | 2 +- apps/web/lib/exports/commissions/export.ts | 32 +++++++++++ .../exports/commissions/fetch.ts} | 10 ++-- .../commissions/format.ts} | 2 +- apps/web/lib/exports/customers/export.ts | 27 +++++++++ .../customers/fetch.ts} | 8 ++- .../customers/format.ts} | 0 apps/web/lib/exports/export-csv-to-storage.ts | 2 +- apps/web/lib/exports/links/export.ts | 29 ++++++++++ .../exports/links/fetch.ts} | 13 +++-- .../links/format.ts} | 2 +- apps/web/lib/exports/partners/export.ts | 29 ++++++++++ .../exports/partners/fetch.ts} | 0 .../partners/format.ts} | 0 .../lib/{exports => }/multipart-storage.ts | 2 +- 22 files changed, 190 insertions(+), 153 deletions(-) create mode 100644 apps/web/lib/exports/commissions/export.ts rename apps/web/{app/(ee)/api/cron/export/commissions/fetch-commissions-batch.ts => lib/exports/commissions/fetch.ts} (75%) rename apps/web/lib/{api/commissions/format-commissions-for-export.ts => exports/commissions/format.ts} (96%) create mode 100644 apps/web/lib/exports/customers/export.ts rename apps/web/lib/{customers/api/fetch-customers-batch.ts => exports/customers/fetch.ts} (82%) rename apps/web/lib/{customers/api/format-customers-export.ts => exports/customers/format.ts} (100%) create mode 100644 apps/web/lib/exports/links/export.ts rename apps/web/{app/(ee)/api/cron/export/links/fetch-links-batch.ts => lib/exports/links/fetch.ts} (63%) rename apps/web/lib/{api/links/format-links-for-export.ts => exports/links/format.ts} (96%) create mode 100644 apps/web/lib/exports/partners/export.ts rename apps/web/{app/(ee)/api/cron/export/partners/fetch-partners-batch.ts => lib/exports/partners/fetch.ts} (100%) rename apps/web/lib/{api/partners/format-partners-for-export.ts => exports/partners/format.ts} (100%) rename apps/web/lib/{exports => }/multipart-storage.ts (98%) diff --git a/apps/web/app/(ee)/api/commissions/export/route.ts b/apps/web/app/(ee)/api/commissions/export/route.ts index f899530520e..4c9ae240a0d 100644 --- a/apps/web/app/(ee)/api/commissions/export/route.ts +++ b/apps/web/app/(ee)/api/commissions/export/route.ts @@ -1,5 +1,5 @@ import { convertToCSV } from "@/lib/analytics/utils"; -import { formatCommissionsForExport } from "@/lib/api/commissions/format-commissions-for-export"; +import { formatCommissionsForExport } from "@/lib/exports/commissions/format"; import { getCommissions } from "@/lib/api/commissions/get-commissions"; import { getCommissionsCount } from "@/lib/api/commissions/get-commissions-count"; import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw"; diff --git a/apps/web/app/(ee)/api/cron/export/commissions/route.ts b/apps/web/app/(ee)/api/cron/export/commissions/route.ts index 455fd858422..2c3e747bf86 100644 --- a/apps/web/app/(ee)/api/cron/export/commissions/route.ts +++ b/apps/web/app/(ee)/api/cron/export/commissions/route.ts @@ -1,9 +1,6 @@ -import { formatCommissionsForExport } from "@/lib/api/commissions/format-commissions-for-export"; import { handleAndReturnErrorResponse } from "@/lib/api/errors"; -import { generateRandomString } from "@/lib/api/utils/generate-random-string"; -import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; -import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; +import { exportCommissions } from "@/lib/exports/commissions/export"; import { commissionsExportQuerySchema } from "@/lib/zod/schemas/commissions"; import { sendEmail } from "@dub/email"; import ExportReady from "@dub/email/templates/export-ready"; @@ -11,7 +8,6 @@ import { prisma } from "@dub/prisma"; import { log } from "@dub/utils"; import * as z from "zod/v4"; import { logAndRespond } from "../../utils"; -import { fetchCommissionsBatch } from "./fetch-commissions-batch"; const payloadSchema = commissionsExportQuerySchema.extend({ programId: z.string(), @@ -64,23 +60,12 @@ export async function POST(req: Request) { ); } - const commissionsFilters = { - ...filters, - programId, - }; - - const formattedBatches = async function* () { - for await (const { commissions } of fetchCommissionsBatch( - commissionsFilters, - )) { - yield formatCommissionsForExport(commissions, columns); - } - }; - - const { downloadUrl, rowCount } = await exportCsvToStorage({ - fileKey: `exports/commissions/${generateRandomString(16)}.csv`, - fileName: generateExportFilename("commissions"), - batches: formattedBatches(), + const { downloadUrl, rowCount } = await exportCommissions({ + filters: { + ...filters, + programId, + }, + columns, }); await sendEmail({ diff --git a/apps/web/app/(ee)/api/cron/export/customers/route.ts b/apps/web/app/(ee)/api/cron/export/customers/route.ts index a957aed1c2a..e69b22c286d 100644 --- a/apps/web/app/(ee)/api/cron/export/customers/route.ts +++ b/apps/web/app/(ee)/api/cron/export/customers/route.ts @@ -1,9 +1,5 @@ -import { generateRandomString } from "@/lib/api/utils/generate-random-string"; import { withCron } from "@/lib/cron/with-cron"; -import { fetchCustomersBatch } from "@/lib/customers/api/fetch-customers-batch"; -import { formatCustomersForExport } from "@/lib/customers/api/format-customers-export"; -import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; -import { generateExportFilename } from "@/lib/exports/generate-export-filename"; +import { exportCustomers } from "@/lib/exports/customers/export"; import { customersExportCronInputSchema } from "@/lib/zod/schemas/customers"; import { sendEmail } from "@dub/email"; import ExportReady from "@dub/email/templates/export-ready"; @@ -50,16 +46,9 @@ export const POST = withCron(async ({ rawBody }) => { return logAndRespond(`Workspace ${workspaceId} not found.`); } - const formattedBatches = async function* () { - for await (const { customers } of fetchCustomersBatch(parsedFilters)) { - yield formatCustomersForExport(customers, columns); - } - }; - - const { downloadUrl, rowCount } = await exportCsvToStorage({ - fileKey: `exports/customers/${generateRandomString(16)}.csv`, - fileName: generateExportFilename("customers"), - batches: formattedBatches(), + const { downloadUrl, rowCount } = await exportCustomers({ + filters: parsedFilters, + columns, }); await sendEmail({ diff --git a/apps/web/app/(ee)/api/cron/export/links/route.ts b/apps/web/app/(ee)/api/cron/export/links/route.ts index 5511279af9d..df5bccb9fb4 100644 --- a/apps/web/app/(ee)/api/cron/export/links/route.ts +++ b/apps/web/app/(ee)/api/cron/export/links/route.ts @@ -1,12 +1,9 @@ import { getStartEndDates } from "@/lib/analytics/utils/get-start-end-dates"; import { handleAndReturnErrorResponse } from "@/lib/api/errors"; -import { formatLinksForExport } from "@/lib/api/links/format-links-for-export"; import { validateLinksQueryFilters } from "@/lib/api/links/validate-links-query-filters"; -import { generateRandomString } from "@/lib/api/utils/generate-random-string"; -import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; -import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import { MEGA_WORKSPACE_LINKS_LIMIT } from "@/lib/constants/misc"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; +import { exportLinks } from "@/lib/exports/links/export"; import { PlanProps } from "@/lib/types"; import { linksExportQuerySchema } from "@/lib/zod/schemas/links"; import { sendEmail } from "@dub/email"; @@ -16,7 +13,6 @@ import { log } from "@dub/utils"; import { endOfDay, startOfDay } from "date-fns"; import * as z from "zod/v4"; import { logAndRespond } from "../../utils"; -import { fetchLinksBatch } from "./fetch-links-batch"; const payloadSchema = linksExportQuerySchema.extend({ workspaceId: z.string(), @@ -96,32 +92,23 @@ export async function POST(req: Request) { end: end ? endOfDay(new Date(end)) : undefined, }); - const linksFilters = { - ...filters, - ...(interval !== "all" && { - startDate, - endDate, - }), - searchMode: (workspace.totalLinks > MEGA_WORKSPACE_LINKS_LIMIT - ? "exact" - : "fuzzy") as "exact" | "fuzzy", - includeDashboard: false, - includeUser: false, - includeWebhooks: false, - workspaceId, - folderIds, - }; - - const formattedBatches = async function* () { - for await (const { links } of fetchLinksBatch(linksFilters)) { - yield formatLinksForExport(links, columns); - } - }; - - const { downloadUrl, rowCount } = await exportCsvToStorage({ - fileKey: `exports/links/${generateRandomString(16)}.csv`, - fileName: generateExportFilename("links"), - batches: formattedBatches(), + const { downloadUrl, rowCount } = await exportLinks({ + filters: { + ...filters, + ...(interval !== "all" && { + startDate, + endDate, + }), + searchMode: (workspace.totalLinks > MEGA_WORKSPACE_LINKS_LIMIT + ? "exact" + : "fuzzy") as "exact" | "fuzzy", + includeDashboard: false, + includeUser: false, + includeWebhooks: false, + workspaceId, + folderIds, + }, + columns, }); await sendEmail({ diff --git a/apps/web/app/(ee)/api/cron/export/partners/route.ts b/apps/web/app/(ee)/api/cron/export/partners/route.ts index c1c54e4b24a..40a08dfa864 100644 --- a/apps/web/app/(ee)/api/cron/export/partners/route.ts +++ b/apps/web/app/(ee)/api/cron/export/partners/route.ts @@ -1,9 +1,6 @@ import { handleAndReturnErrorResponse } from "@/lib/api/errors"; -import { formatPartnersForExport } from "@/lib/api/partners/format-partners-for-export"; -import { generateRandomString } from "@/lib/api/utils/generate-random-string"; -import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; -import { generateExportFilename } from "@/lib/exports/generate-export-filename"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; +import { exportPartners } from "@/lib/exports/partners/export"; import { partnersExportQuerySchema } from "@/lib/zod/schemas/partners"; import { sendEmail } from "@dub/email"; import ExportReady from "@dub/email/templates/export-ready"; @@ -11,7 +8,6 @@ import { prisma } from "@dub/prisma"; import { log } from "@dub/utils"; import * as z from "zod/v4"; import { logAndRespond } from "../../utils"; -import { fetchPartnersBatch } from "./fetch-partners-batch"; const payloadSchema = partnersExportQuerySchema.extend({ programId: z.string(), @@ -64,21 +60,12 @@ export async function POST(req: Request) { ); } - const partnersFilters = { - ...filters, - programId, - }; - - const formattedBatches = async function* () { - for await (const { partners } of fetchPartnersBatch(partnersFilters)) { - yield formatPartnersForExport(partners, columns); - } - }; - - const { downloadUrl, rowCount } = await exportCsvToStorage({ - fileKey: `exports/partners/${generateRandomString(16)}.csv`, - fileName: generateExportFilename("partners"), - batches: formattedBatches(), + const { downloadUrl, rowCount } = await exportPartners({ + filters: { + ...filters, + programId, + }, + columns, }); await sendEmail({ diff --git a/apps/web/app/(ee)/api/customers/export/route.ts b/apps/web/app/(ee)/api/customers/export/route.ts index 73123bb784c..ffb17a013eb 100644 --- a/apps/web/app/(ee)/api/customers/export/route.ts +++ b/apps/web/app/(ee)/api/customers/export/route.ts @@ -1,17 +1,10 @@ -import { convertToCSV } from "@/lib/analytics/utils"; import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw"; import { withWorkspace } from "@/lib/auth"; import { qstash } from "@/lib/cron"; -import { buildCustomerCountWhere } from "@/lib/customers/api/customer-count-where"; -import { formatCustomersForExport } from "@/lib/customers/api/format-customers-export"; -import { getCustomers } from "@/lib/customers/api/get-customers"; import { customersExportQuerySchema } from "@/lib/zod/schemas/customers"; -import { prisma } from "@dub/prisma"; import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; import { NextResponse } from "next/server"; -const MAX_CUSTOMERS_TO_EXPORT = 1000; - // GET /api/customers/export – export customers to CSV export const GET = withWorkspace( async ({ searchParams, workspace, session }) => { @@ -23,48 +16,18 @@ export const GET = withWorkspace( programId = getDefaultProgramIdOrThrow(workspace); } - const where = buildCustomerCountWhere({ - ...filters, - workspaceId: workspace.id, - programId, - }); - - const count = await prisma.customer.count({ - where, - }); - - if (count > MAX_CUSTOMERS_TO_EXPORT) { - await qstash.publishJSON({ - url: `${APP_DOMAIN_WITH_NGROK}/api/cron/export/customers`, - body: { - ...filters, - workspaceId: workspace.id, - programId, - userId: session.user.id, - columns: columns.join(","), - }, - }); - - return NextResponse.json({}, { status: 202 }); - } - - const customers = await getCustomers({ - ...filters, - workspaceId: workspace.id, - programId, - page: 1, - pageSize: MAX_CUSTOMERS_TO_EXPORT, - includeExpandedFields: true, - }); - - const rows = formatCustomersForExport(customers, columns); - - return new Response(convertToCSV(rows), { - headers: { - "Content-Type": "text/csv", - "Content-Disposition": "attachment", + await qstash.publishJSON({ + url: `${APP_DOMAIN_WITH_NGROK}/api/cron/export/customers`, + body: { + ...filters, + workspaceId: workspace.id, + programId, + userId: session.user.id, + columns: columns.join(","), }, }); + + return NextResponse.json({}, { status: 202 }); }, { requiredPlan: [ diff --git a/apps/web/app/(ee)/api/partners/export/route.ts b/apps/web/app/(ee)/api/partners/export/route.ts index e1c8be186a9..e7a302652fe 100644 --- a/apps/web/app/(ee)/api/partners/export/route.ts +++ b/apps/web/app/(ee)/api/partners/export/route.ts @@ -1,5 +1,5 @@ import { convertToCSV } from "@/lib/exports/convert-to-csv"; -import { formatPartnersForExport } from "@/lib/api/partners/format-partners-for-export"; +import { formatPartnersForExport } from "@/lib/exports/partners/format"; import { getPartners } from "@/lib/api/partners/get-partners"; import { getPartnersCount } from "@/lib/api/partners/get-partners-count"; import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw"; diff --git a/apps/web/app/api/links/export/route.ts b/apps/web/app/api/links/export/route.ts index 0c774f68934..253a20d7a4d 100644 --- a/apps/web/app/api/links/export/route.ts +++ b/apps/web/app/api/links/export/route.ts @@ -1,7 +1,7 @@ import { convertToCSV } from "@/lib/analytics/utils"; import { getStartEndDates } from "@/lib/analytics/utils/get-start-end-dates"; import { getLinksCount } from "@/lib/api/links"; -import { formatLinksForExport } from "@/lib/api/links/format-links-for-export"; +import { formatLinksForExport } from "@/lib/exports/links/format"; import { getLinksForWorkspace } from "@/lib/api/links/get-links-for-workspace"; import { throwIfClicksUsageExceeded } from "@/lib/api/links/usage-checks"; import { validateLinksQueryFilters } from "@/lib/api/links/validate-links-query-filters"; diff --git a/apps/web/lib/exports/commissions/export.ts b/apps/web/lib/exports/commissions/export.ts new file mode 100644 index 00000000000..b11163aacfc --- /dev/null +++ b/apps/web/lib/exports/commissions/export.ts @@ -0,0 +1,32 @@ +import { generateRandomString } from "@/lib/api/utils/generate-random-string"; +import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; +import { getCommissionsQuerySchema } from "@/lib/zod/schemas/commissions"; +import * as z from "zod/v4"; +import { fetchCommissionsBatch } from "./fetch"; +import { formatCommissionsForExport } from "./format"; + +export async function exportCommissions({ + filters, + columns, +}: { + filters: Omit< + z.infer, + "page" | "pageSize" | "startingAfter" | "endingBefore" + > & { + programId: string; + }; + columns: string[]; +}) { + const formattedBatches = async function* () { + for await (const { commissions } of fetchCommissionsBatch(filters)) { + yield formatCommissionsForExport(commissions, columns); + } + }; + + return exportCsvToStorage({ + fileKey: `exports/commissions/${generateRandomString(16)}.csv`, + fileName: generateExportFilename("commissions"), + batches: formattedBatches(), + }); +} diff --git a/apps/web/app/(ee)/api/cron/export/commissions/fetch-commissions-batch.ts b/apps/web/lib/exports/commissions/fetch.ts similarity index 75% rename from apps/web/app/(ee)/api/cron/export/commissions/fetch-commissions-batch.ts rename to apps/web/lib/exports/commissions/fetch.ts index 5b95327b472..29a249d4773 100644 --- a/apps/web/app/(ee)/api/cron/export/commissions/fetch-commissions-batch.ts +++ b/apps/web/lib/exports/commissions/fetch.ts @@ -4,7 +4,7 @@ import * as z from "zod/v4"; type CommissionFilters = Omit< z.infer, - "page" | "pageSize" + "page" | "pageSize" | "startingAfter" | "endingBefore" > & { programId: string; }; @@ -13,19 +13,21 @@ export async function* fetchCommissionsBatch( filters: CommissionFilters, pageSize: number = 1000, ) { - let page = 1; + let cursor: string | undefined; let hasMore = true; while (hasMore) { const commissions = await getCommissions({ ...filters, - page, pageSize, + ...(cursor + ? { startingAfter: cursor } + : { page: 1 }), }); if (commissions.length > 0) { yield { commissions }; - page++; + cursor = commissions[commissions.length - 1].id; hasMore = commissions.length === pageSize; } else { hasMore = false; diff --git a/apps/web/lib/api/commissions/format-commissions-for-export.ts b/apps/web/lib/exports/commissions/format.ts similarity index 96% rename from apps/web/lib/api/commissions/format-commissions-for-export.ts rename to apps/web/lib/exports/commissions/format.ts index a7cf11d7b4b..dc9ea0518af 100644 --- a/apps/web/lib/api/commissions/format-commissions-for-export.ts +++ b/apps/web/lib/exports/commissions/format.ts @@ -1,6 +1,6 @@ +import { getCommissions } from "@/lib/api/commissions/get-commissions"; import { COMMISSION_EXPORT_COLUMNS } from "@/lib/zod/schemas/commissions"; import * as z from "zod/v4"; -import { getCommissions } from "./get-commissions"; const COLUMN_LOOKUP: Map< string, diff --git a/apps/web/lib/exports/customers/export.ts b/apps/web/lib/exports/customers/export.ts new file mode 100644 index 00000000000..5b950ccb069 --- /dev/null +++ b/apps/web/lib/exports/customers/export.ts @@ -0,0 +1,27 @@ +import { generateRandomString } from "@/lib/api/utils/generate-random-string"; +import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; +import { customersExportCronInputSchema } from "@/lib/zod/schemas/customers"; +import * as z from "zod/v4"; +import { fetchCustomersBatch } from "./fetch"; +import { formatCustomersForExport } from "./format"; + +export async function exportCustomers({ + filters, + columns, +}: { + filters: z.infer; + columns: string[]; +}) { + const formattedBatches = async function* () { + for await (const { customers } of fetchCustomersBatch(filters)) { + yield formatCustomersForExport(customers, columns); + } + }; + + return exportCsvToStorage({ + fileKey: `exports/customers/${generateRandomString(16)}.csv`, + fileName: generateExportFilename("customers"), + batches: formattedBatches(), + }); +} diff --git a/apps/web/lib/customers/api/fetch-customers-batch.ts b/apps/web/lib/exports/customers/fetch.ts similarity index 82% rename from apps/web/lib/customers/api/fetch-customers-batch.ts rename to apps/web/lib/exports/customers/fetch.ts index 9b48384435a..62e47b1f466 100644 --- a/apps/web/lib/customers/api/fetch-customers-batch.ts +++ b/apps/web/lib/exports/customers/fetch.ts @@ -10,20 +10,22 @@ export async function* fetchCustomersBatch( ) { const { columns: _columns, ...filtersRest } = filters; - let page = 1; + let cursor: string | undefined; let hasMore = true; while (hasMore) { const customers = await getCustomers({ ...filtersRest, - page, pageSize, includeExpandedFields: true, + ...(cursor + ? { startingAfter: cursor } + : { page: 1 }), }); if (customers.length > 0) { yield { customers }; - page++; + cursor = customers[customers.length - 1].id; hasMore = customers.length === pageSize; } else { hasMore = false; diff --git a/apps/web/lib/customers/api/format-customers-export.ts b/apps/web/lib/exports/customers/format.ts similarity index 100% rename from apps/web/lib/customers/api/format-customers-export.ts rename to apps/web/lib/exports/customers/format.ts diff --git a/apps/web/lib/exports/export-csv-to-storage.ts b/apps/web/lib/exports/export-csv-to-storage.ts index 0d951af2f12..dd6ad4548c1 100644 --- a/apps/web/lib/exports/export-csv-to-storage.ts +++ b/apps/web/lib/exports/export-csv-to-storage.ts @@ -1,6 +1,6 @@ import { storage } from "../storage"; import { convertToCSV } from "./convert-to-csv"; -import { multipartStorage } from "./multipart-storage"; +import { multipartStorage } from "../multipart-storage"; const MIN_PART_SIZE = 5 * 1024 * 1024; // 5 MB — S3/R2 minimum for non-final parts export const SIGNED_URL_EXPIRY = 7 * 24 * 3600; // 7 days diff --git a/apps/web/lib/exports/links/export.ts b/apps/web/lib/exports/links/export.ts new file mode 100644 index 00000000000..a56c3382c49 --- /dev/null +++ b/apps/web/lib/exports/links/export.ts @@ -0,0 +1,29 @@ +import { GetLinksForWorkspaceProps } from "@/lib/api/links/get-links-for-workspace"; +import { generateRandomString } from "@/lib/api/utils/generate-random-string"; +import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; +import { fetchLinksBatch } from "./fetch"; +import { formatLinksForExport } from "./format"; + +export async function exportLinks({ + filters, + columns, +}: { + filters: Omit< + GetLinksForWorkspaceProps, + "page" | "pageSize" | "startingAfter" | "endingBefore" + >; + columns: string[]; +}) { + const formattedBatches = async function* () { + for await (const { links } of fetchLinksBatch(filters)) { + yield formatLinksForExport(links, columns); + } + }; + + return exportCsvToStorage({ + fileKey: `exports/links/${generateRandomString(16)}.csv`, + fileName: generateExportFilename("links"), + batches: formattedBatches(), + }); +} diff --git a/apps/web/app/(ee)/api/cron/export/links/fetch-links-batch.ts b/apps/web/lib/exports/links/fetch.ts similarity index 63% rename from apps/web/app/(ee)/api/cron/export/links/fetch-links-batch.ts rename to apps/web/lib/exports/links/fetch.ts index 65d1ae1ae5d..c233af73b04 100644 --- a/apps/web/app/(ee)/api/cron/export/links/fetch-links-batch.ts +++ b/apps/web/lib/exports/links/fetch.ts @@ -4,22 +4,27 @@ import { } from "@/lib/api/links/get-links-for-workspace"; export async function* fetchLinksBatch( - filters: Omit, + filters: Omit< + GetLinksForWorkspaceProps, + "page" | "pageSize" | "startingAfter" | "endingBefore" + >, pageSize: number = 1000, ) { - let page = 1; + let cursor: string | undefined; let hasMore = true; while (hasMore) { const links = await getLinksForWorkspace({ ...filters, - page, pageSize, + ...(cursor + ? { startingAfter: cursor } + : { page: 1 }), }); if (links.length > 0) { yield { links }; - page++; + cursor = links[links.length - 1].id; hasMore = links.length === pageSize; } else { hasMore = false; diff --git a/apps/web/lib/api/links/format-links-for-export.ts b/apps/web/lib/exports/links/format.ts similarity index 96% rename from apps/web/lib/api/links/format-links-for-export.ts rename to apps/web/lib/exports/links/format.ts index 986b8a3c69e..82de6ab4444 100644 --- a/apps/web/lib/api/links/format-links-for-export.ts +++ b/apps/web/lib/exports/links/format.ts @@ -1,6 +1,6 @@ +import { transformLink } from "@/lib/api/links/utils"; import { exportLinksColumns } from "@/lib/zod/schemas/links"; import { linkConstructor } from "@dub/utils"; -import { transformLink } from "./utils"; const columnMap = exportLinksColumns.reduce( (acc, column, index) => { diff --git a/apps/web/lib/exports/partners/export.ts b/apps/web/lib/exports/partners/export.ts new file mode 100644 index 00000000000..e3ee9aca8fd --- /dev/null +++ b/apps/web/lib/exports/partners/export.ts @@ -0,0 +1,29 @@ +import { generateRandomString } from "@/lib/api/utils/generate-random-string"; +import { exportCsvToStorage } from "@/lib/exports/export-csv-to-storage"; +import { generateExportFilename } from "@/lib/exports/generate-export-filename"; +import { partnersExportQuerySchema } from "@/lib/zod/schemas/partners"; +import * as z from "zod/v4"; +import { fetchPartnersBatch } from "./fetch"; +import { formatPartnersForExport } from "./format"; + +export async function exportPartners({ + filters, + columns, +}: { + filters: Omit, "columns"> & { + programId: string; + }; + columns: string[]; +}) { + const formattedBatches = async function* () { + for await (const { partners } of fetchPartnersBatch(filters)) { + yield formatPartnersForExport(partners, columns); + } + }; + + return exportCsvToStorage({ + fileKey: `exports/partners/${generateRandomString(16)}.csv`, + fileName: generateExportFilename("partners"), + batches: formattedBatches(), + }); +} diff --git a/apps/web/app/(ee)/api/cron/export/partners/fetch-partners-batch.ts b/apps/web/lib/exports/partners/fetch.ts similarity index 100% rename from apps/web/app/(ee)/api/cron/export/partners/fetch-partners-batch.ts rename to apps/web/lib/exports/partners/fetch.ts diff --git a/apps/web/lib/api/partners/format-partners-for-export.ts b/apps/web/lib/exports/partners/format.ts similarity index 100% rename from apps/web/lib/api/partners/format-partners-for-export.ts rename to apps/web/lib/exports/partners/format.ts diff --git a/apps/web/lib/exports/multipart-storage.ts b/apps/web/lib/multipart-storage.ts similarity index 98% rename from apps/web/lib/exports/multipart-storage.ts rename to apps/web/lib/multipart-storage.ts index 686ac82ce8a..47f1b3dbb6f 100644 --- a/apps/web/lib/exports/multipart-storage.ts +++ b/apps/web/lib/multipart-storage.ts @@ -1,4 +1,4 @@ -import { type BucketType, StorageClient } from "../storage"; +import { type BucketType, StorageClient } from "./storage"; class MultipartStorageClient extends StorageClient { async initiateMultipartUpload({