Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/engine/src/__generated__/openapi.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion apps/engine/src/__generated__/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion apps/engine/src/lib/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,22 @@ async function* withLoggedStream<T>(
): AsyncIterable<T> {
const startedAt = Date.now()
let itemCount = 0
let errorCount = 0
logger.info(context, `${label} started`)
try {
for await (const item of iter) {
itemCount++
const msg = item as { type?: string; trace?: { trace_type?: string; error?: unknown } }
if (msg.type === 'trace' && msg.trace?.trace_type === 'error') {
errorCount++
logger.error({ ...context, traceError: msg.trace.error }, `${label} stream error`)
}
yield item
}
logger.info({ ...context, itemCount, durationMs: Date.now() - startedAt }, `${label} completed`)
logger.info(
{ ...context, itemCount, errorCount, durationMs: Date.now() - startedAt },
`${label} completed`
)
} catch (error) {
logger.error(
{ ...context, itemCount, durationMs: Date.now() - startedAt, err: error },
Expand Down
2 changes: 1 addition & 1 deletion apps/service/src/__generated__/openapi.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion apps/service/src/__generated__/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions packages/source-stripe/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
} from '@stripe/sync-openapi'
import { stripeEventSchema, type StripeEvent } from './spec.js'
import { fetchWithProxy, parsePositiveInteger, type TransportEnv } from './transport.js'
import { hashApiKey } from './utils/hashApiKey.js'

export type StripeClientConfig = {
api_key: string
Expand All @@ -33,6 +34,13 @@ export class StripeRequestError extends Error {

export type StripeClient = ReturnType<typeof makeClient>

export function describeApiKey(apiKey: string) {
return {
apiKeyPrefix: apiKey.slice(0, 7),
apiKeyFingerprint: hashApiKey(apiKey).slice(0, 12),
}
}

export function makeClient(config: StripeClientConfig, env: TransportEnv = process.env) {
const baseUrl = (config.base_url ?? DEFAULT_STRIPE_API_BASE).replace(/\/$/, '')
const timeoutMs = parsePositiveInteger(
Expand All @@ -42,6 +50,13 @@ export function makeClient(config: StripeClientConfig, env: TransportEnv = proce
)
const logRequests = env.STRIPE_LOG_REQUESTS === '1'

console.error({
msg: 'Stripe client initialized',
baseUrl,
apiVersion: config.api_version,
...describeApiKey(config.api_key),
})

const headers: Record<string, string> = {
Authorization: `Bearer ${config.api_key}`,
'Content-Type': 'application/x-www-form-urlencoded',
Expand Down
36 changes: 18 additions & 18 deletions packages/source-stripe/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import type { StripeEvent } from './spec.js'
import { buildResourceRegistry } from './resourceRegistry.js'
import { catalogFromRegistry, catalogFromOpenApi } from './catalog.js'
import {
BUNDLED_API_VERSION,
resolveOpenApiSpec,
SpecParser,
OPENAPI_RESOURCE_TABLE_ALIASES,
Expand All @@ -29,7 +28,7 @@ import { pollEvents } from './src-events-api.js'
import type { StripeWebSocketClient, StripeWebhookEvent } from './src-websocket.js'
import { createStripeWebSocketClient } from './src-websocket.js'
import type { ResourceConfig } from './types.js'
import { makeClient } from './client.js'
import { describeApiKey, makeClient } from './client.js'
import type { RateLimiter } from './rate-limiter.js'
import { createInMemoryRateLimiter, DEFAULT_MAX_RPS } from './rate-limiter.js'
import { fetchWithProxy } from './transport.js'
Expand Down Expand Up @@ -96,10 +95,7 @@ export function createStripeSource(

async *check({ config }): AsyncGenerator<CheckOutput> {
try {
const client = makeClient({
...config,
api_version: config.api_version ?? BUNDLED_API_VERSION,
})
const client = makeClient(config)
await client.getAccount()
yield {
type: 'connection_status' as const,
Expand All @@ -122,7 +118,7 @@ export function createStripeSource(
// TODO: Custom objects (not yet supported) would require a more specific cache
// since they aren't discoverable from the OpenAPI spec alone.
async *discover({ config }): AsyncGenerator<DiscoverOutput> {
const apiVersion = config.api_version ?? BUNDLED_API_VERSION
const apiVersion = config.api_version
const cached = discoverCache.get(apiVersion)
if (cached) {
yield { type: 'catalog' as const, catalog: cached }
Expand Down Expand Up @@ -151,11 +147,13 @@ export function createStripeSource(
},

async *setup({ config, catalog }): AsyncGenerator<SetupOutput> {
const updates: Partial<Config> = {}
const client = makeClient({
...config,
api_version: config.api_version ?? BUNDLED_API_VERSION,
console.error({
msg: 'source-stripe setup() called',
baseUrl: config.base_url,
...describeApiKey(config.api_key),
})
const updates: Partial<Config> = {}
const client = makeClient(config)

// Resolve account_id if not already set
if (!config.account_id) {
Expand Down Expand Up @@ -208,10 +206,7 @@ export function createStripeSource(

async *teardown({ config }): AsyncGenerator<TeardownOutput> {
if (config.webhook_url) {
const client = makeClient({
...config,
api_version: config.api_version ?? BUNDLED_API_VERSION,
})
const client = makeClient(config)
const existing = await client.listWebhookEndpoints({ limit: 100 })
// Only delete the endpoint matching THIS pipeline's URL — not all managed endpoints.
// Other pipelines on the same account may share the managed_by tag with different URLs.
Expand All @@ -225,11 +220,16 @@ export function createStripeSource(
},

async *read({ config, catalog, state }, $stdin?) {
const apiVersion = config.api_version ?? BUNDLED_API_VERSION
console.error({
msg: 'source-stripe read() called',
baseUrl: config.base_url,
streams: catalog.streams.length,
...describeApiKey(config.api_key),
})
const rateLimiter =
externalRateLimiter ?? createInMemoryRateLimiter(config.rate_limit ?? DEFAULT_MAX_RPS)
const client = makeClient({ ...config, api_version: apiVersion })
const resolved = await resolveOpenApiSpec({ apiVersion }, apiFetch)
const client = makeClient(config)
const resolved = await resolveOpenApiSpec({ apiVersion: config.api_version }, apiFetch)
const registry = buildResourceRegistry(
resolved.spec,
config.api_key,
Expand Down
4 changes: 2 additions & 2 deletions packages/source-stripe/src/spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ export const configSchema = z.object({
livemode: z.boolean().optional().describe('Whether this is a live mode sync'),
api_version: z
.enum(SUPPORTED_API_VERSIONS)
.optional()
.describe(`Stripe API version (default: ${BUNDLED_API_VERSION})`),
.default(BUNDLED_API_VERSION)
.describe('Stripe API version'),
base_url: z
.string()
.url()
Expand Down
12 changes: 7 additions & 5 deletions packages/source-stripe/src/src-list-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ export async function* listApiBackfill(opts: {
} = opts

let accountCreated: number | null = null
const failedStreams: string[] = []

for (const configuredStream of catalog.streams) {
const stream = configuredStream.stream
Expand Down Expand Up @@ -494,11 +495,6 @@ export async function* listApiBackfill(opts: {
} satisfies TraceMessage
continue
}
console.error({
msg: 'Stripe list page failed',
stream: stream.name,
error: err instanceof Error ? err.message : String(err),
})
const isRateLimit = err instanceof Error && err.message.includes('Rate limit')
yield {
type: 'trace',
Expand All @@ -512,6 +508,12 @@ export async function* listApiBackfill(opts: {
},
},
} satisfies TraceMessage
failedStreams.push(stream.name)
}
}

if (failedStreams.length > 0) {
const msg = `${failedStreams.length} stream(s) failed: ${failedStreams.join(', ')}`
console.error({ msg, failedStreams })
}
}
Loading