diff --git a/scripts/reconcile-sigma-vs-postgres.ts b/scripts/reconcile-sigma-vs-postgres.ts index 12a9da69e..515c5eaf3 100755 --- a/scripts/reconcile-sigma-vs-postgres.ts +++ b/scripts/reconcile-sigma-vs-postgres.ts @@ -14,7 +14,9 @@ import { homedir } from 'node:os' const POLL_INTERVAL_MS = 3_000 const POLL_TIMEOUT_MS = 5 * 60 * 1_000 -const SIGMA_CONCURRENCY = 16 +const SIGMA_CONCURRENCY = Number(process.env.SIGMA_CONCURRENCY) || 4 +const SIGMA_POST_MAX_RETRIES = Number(process.env.SIGMA_POST_MAX_RETRIES) || 8 +const SIGMA_POST_RETRY_DELAY_MS = Number(process.env.SIGMA_POST_RETRY_DELAY_MS) || 15_000 class UsageError extends Error {} @@ -22,8 +24,19 @@ class UsageError extends Error {} // CLI // --------------------------------------------------------------------------- -function parseArgs(argv) { - const args = {} +type ReconcileArgs = { + pipelineId?: string + dataDir?: string + stripeApiKey?: string + dbUrl?: string + schema?: string + table?: string + output?: string + help?: boolean +} + +function parseArgs(argv: string[]): ReconcileArgs { + const args: ReconcileArgs = {} for (let i = 0; i < argv.length; i += 1) { const arg = argv[i] const next = argv[i + 1] @@ -42,6 +55,9 @@ function parseArgs(argv) { } else if (arg === '--schema') { args.schema = next i += 1 + } else if (arg === '--table') { + args.table = next + i += 1 } else if (arg === '--output') { args.output = next i += 1 @@ -81,6 +97,7 @@ function usage() { ' --stripe-api-key Required. Falls back to STRIPE_API_KEY env var.', ' --db-url Optional. Falls back to DATABASE_URL or POSTGRES_URL.', ' --schema Optional. Falls back to destination.postgres.schema or public.', + ' --table Optional. Reconcile only one Postgres table.', ' --output Optional. Report path (default: tmp/reconcile-.json).', ].join('\n') } @@ -210,27 +227,283 @@ async function stripeDownload(apiKey, url) { return res.text() } +// Tolerance (in seconds) on each side of the object's `created` timestamp. +// Sigma can occasionally drift by a second vs the API. +const LIST_API_TIMESTAMP_WINDOW_S = 1 +const LIST_API_PAGE_LIMIT = 100 +const LIST_API_MAX_PAGES = 50 +const LIST_API_RPS = Number(process.env.LIST_API_RPS) || 15 +const LIST_API_CONCURRENCY = Number(process.env.LIST_API_CONCURRENCY) || 15 + +// Leaky-bucket rate limiter at LIST_API_RPS requests/second across all +// concurrent workers. Each acquire reserves the next slot; waiters sleep +// until their slot opens. +const listApiIntervalMs = 1000 / LIST_API_RPS +let listApiNextAt = 0 +async function listApiAcquire(): Promise { + const now = Date.now() + const at = Math.max(now, listApiNextAt) + listApiNextAt = at + listApiIntervalMs + if (at > now) await sleep(at - now) +} + +/** + * Parse Sigma's `created` (unix seconds as number/string, or "YYYY-MM-DD HH:MM:SS" + * datetime strings in UTC) into unix seconds. Returns null when unparseable. + */ +function parseSigmaCreatedSeconds(raw: unknown): number | null { + if (raw == null || raw === '') return null + const n = Number(raw) + if (Number.isFinite(n) && n > 946_684_800 && n < 4_102_444_800) return n + const match = String(raw).match(/^(\d{4})-(\d{2})-(\d{2})[ T](\d{2}):(\d{2}):(\d{2})/) + if (match) { + const [, y, mo, d, h, mi, s] = match + return Math.floor( + Date.UTC(Number(y), Number(mo) - 1, Number(d), Number(h), Number(mi), Number(s)) / 1000 + ) + } + const fallback = new Date(String(raw)) + if (!Number.isNaN(fallback.getTime())) return Math.floor(fallback.getTime() / 1000) + return null +} + +/** + * Page through Stripe's list endpoint for `resource`, filtered to a tight + * window around `ts`, and return true if `id` is present. Returns false when + * the list endpoint doesn't surface the object (the sync engine wouldn't + * have synced it either). + */ +async function stripeListContainsAtTimestamp( + apiKey: string, + resource: string, + ts: number, + id: string +): Promise { + const basePath = STRIPE_API_OBJECT_ENDPOINTS[resource] ?? `/v1/${resource}` + const gte = Math.max(0, Math.floor(ts) - LIST_API_TIMESTAMP_WINDOW_S) + const lte = Math.ceil(ts) + LIST_API_TIMESTAMP_WINDOW_S + let startingAfter: string | undefined + for (let page = 0; page < LIST_API_MAX_PAGES; page += 1) { + const params = new URLSearchParams() + params.set('limit', String(LIST_API_PAGE_LIMIT)) + params.set('created[gte]', String(gte)) + params.set('created[lte]', String(lte)) + if (startingAfter) params.set('starting_after', startingAfter) + await listApiAcquire() + const body = await stripeGet(apiKey, `${basePath}?${params.toString()}`) + const data = Array.isArray(body?.data) ? body.data : [] + for (const obj of data) { + if (obj?.id === id) return true + } + if (!body?.has_more || data.length === 0) return false + startingAfter = data[data.length - 1]?.id + if (!startingAfter) return false + } + return false +} + +/** + * Fallback list check using `ids[]=id`. Used when no usable `created` + * timestamp is available for the missing object. Still a list call (not + * retrieve), so it respects the same scope the sync engine sees. + */ +async function stripeListContainsById( + apiKey: string, + resource: string, + id: string +): Promise { + const basePath = STRIPE_API_OBJECT_ENDPOINTS[resource] ?? `/v1/${resource}` + const params = new URLSearchParams() + params.set('limit', String(LIST_API_PAGE_LIMIT)) + params.append('ids[]', id) + await listApiAcquire() + const body = await stripeGet(apiKey, `${basePath}?${params.toString()}`) + const data = Array.isArray(body?.data) ? body.data : [] + return data.some((obj) => obj?.id === id) +} + +/** + * For each diff row, drop missing IDs that the list endpoint doesn't return + * within a tight window of the Sigma `created` timestamp. The sync engine + * only ingests list-endpoint results, so anything the list API hides is out + * of scope and shouldn't be counted as missing. + */ +type ComparisonRow = { + resource: string + status: string + missingRows: Array<{ id: string; created: unknown }> + postgresMissing: number | null +} + +async function filterMissingNotInListApi( + apiKey: string, + rows: ComparisonRow[] +): Promise> { + const removed: Array<{ resource: string; id: string; created: unknown; reason: string }> = [] + + type WorkItem = { row: ComparisonRow; item: { id: string; created: unknown } } + const queue: WorkItem[] = [] + const eligibleRows = new Set() + for (const row of rows) { + if (row.status !== 'diff') continue + if (!row.missingRows || row.missingRows.length === 0) continue + if (!(row.resource in STRIPE_API_OBJECT_ENDPOINTS)) continue + eligibleRows.add(row) + for (const item of row.missingRows) queue.push({ row, item }) + } + if (queue.length === 0) return removed + + console.error( + `Verifying ${queue.length} missing IDs via list endpoint ` + + `(rps=${LIST_API_RPS}, concurrency=${LIST_API_CONCURRENCY})...` + ) + + const keptIds = new Map>() + for (const row of eligibleRows) keptIds.set(row, new Set()) + let done = 0 + const total = queue.length + + async function processOne({ row, item }: WorkItem) { + const ts = parseSigmaCreatedSeconds(item.created) + try { + const found = + ts != null + ? await stripeListContainsAtTimestamp(apiKey, row.resource, ts, item.id) + : await stripeListContainsById(apiKey, row.resource, item.id) + if (found) { + keptIds.get(row)!.add(item.id) + } else { + removed.push({ + resource: row.resource, + id: item.id, + created: item.created, + reason: ts != null ? 'not_returned_by_list_api' : 'not_returned_by_list_api_ids', + }) + process.stderr.write( + `\n list-api filter: ${row.resource} ${item.id} ` + + `(created=${formatCreated(item.created)}) not returned by list endpoint — dropping from missing` + ) + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + process.stderr.write( + `\n list-api filter: ${row.resource} ${item.id} list check failed (${message}); keeping as missing` + ) + keptIds.get(row)!.add(item.id) + } finally { + done += 1 + process.stderr.write(`\r list-api filter: progress ${done}/${total}`) + } + } + + await Promise.all( + Array.from({ length: Math.min(LIST_API_CONCURRENCY, queue.length) }, async () => { + while (queue.length > 0) { + const work = queue.shift() + if (!work) break + await processOne(work) + } + }) + ) + process.stderr.write('\n') + + for (const row of eligibleRows) { + const kept = keptIds.get(row)! + row.missingRows = row.missingRows.filter((m) => kept.has(m.id)) + row.postgresMissing = row.missingRows.length + row.status = row.missingRows.length === 0 ? 'match' : 'diff' + } + + return removed +} + function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)) } +function inferLivemodeFromApiKey(apiKey: string) { + return !apiKey.toLowerCase().includes('test') +} + +function isSigmaConcurrencyLimitError(err) { + const msg = err?.stripeError?.message ?? err?.message ?? '' + return /too many concurrently running Sigma queries/i.test(msg) +} + /** Tables whose list endpoint filters to `active = true` by default. * Sigma retains inactive/archived objects that the list API doesn't surface, * so we filter to active-only when querying Sigma for these tables. */ const SIGMA_TABLES_ACTIVE_ONLY = new Set(['prices', 'tax_rates']) +const SIGMA_TABLES_WITH_LIVEMODE = new Set(['issuing_personalization_designs', 'terminal_readers']) + +const SIGMA_TABLES_WITHOUT_LIVEMODE = new Set([ + 'application_fees', + 'billing_credit_grants', + 'charges', + 'checkout_sessions', + 'connected_accounts', + 'connected_account_treasury_financial_accounts', + 'coupons', + 'credit_notes', + 'customers', + 'disputes', + 'early_fraud_warnings', + 'invoice_items', + 'invoice_payments', + 'invoices', + 'issuing_authorizations', + 'issuing_cardholders', + 'issuing_cards', + 'issuing_disputes', + 'issuing_transactions', + 'payment_intents', + 'payment_links', + 'plans', + 'prices', + 'products', + 'promotion_codes', + 'quotes', + 'refunds', + 'setup_intents', + 'subscription_schedules', + 'subscriptions', + 'tax_rates', + 'topups', + 'transfers', +]) + +function sigmaLivemodeForTable(table: string, livemode: boolean): boolean | null { + if (SIGMA_TABLES_WITH_LIVEMODE.has(table)) return livemode + if (SIGMA_TABLES_WITHOUT_LIVEMODE.has(table)) return null + return null +} + +type SigmaIdsQueryVariant = { + createdColumn: string | undefined + hasDeletedCol: boolean + activeOnly: boolean + livemode: boolean | null + extraConditions: string[] +} + /** * Build a Sigma query that returns (id[, created]) rows for the given table. * Tables in `tablesWithDeletedCol` get a WHERE clause that excludes deleted * rows so results match what Stripe's `list` endpoints return. * Tables in SIGMA_TABLES_ACTIVE_ONLY get an additional `active = true` filter. */ -function buildSigmaIdsSql(table, { withCreated, hasDeletedCol, activeOnly }) { +function buildSigmaIdsSql( + table: string, + { createdColumn, hasDeletedCol, activeOnly, livemode, extraConditions }: SigmaIdsQueryVariant +) { const conditions = [] if (hasDeletedCol) conditions.push('NOT COALESCE(deleted, false)') if (activeOnly) conditions.push('active = true') + if (livemode != null) conditions.push(`livemode = ${livemode ? 'true' : 'false'}`) + conditions.push(...extraConditions) const where = conditions.length > 0 ? ' WHERE ' + conditions.join(' AND ') : '' - const cols = withCreated ? 'id, created' : 'id' + const cols = createdColumn ? `id, ${createdColumn} AS created` : 'id' return `SELECT ${cols} FROM "${table}"${where}` } @@ -280,7 +553,21 @@ async function downloadSigmaResult(apiKey, completed) { } async function runIdsQuery(apiKey, sql, table) { - const queryRun = await stripePost(apiKey, '/v1/sigma/query_runs', { sql }) + let queryRun + for (let attempt = 0; ; attempt += 1) { + try { + queryRun = await stripePost(apiKey, '/v1/sigma/query_runs', { sql }) + break + } catch (err) { + if (!isSigmaConcurrencyLimitError(err) || attempt >= SIGMA_POST_MAX_RETRIES) throw err + const waitMs = SIGMA_POST_RETRY_DELAY_MS * (attempt + 1) + console.error( + ` Sigma concurrency limit for ${table}; retrying POST in ${Math.round(waitMs / 1000)}s ` + + `(attempt ${attempt + 1}/${SIGMA_POST_MAX_RETRIES})` + ) + await sleep(waitMs) + } + } const completed = await pollSigmaRun(apiKey, queryRun.id) const csv = await downloadSigmaResult(apiKey, completed) const rows = parseCsv(csv) @@ -303,37 +590,119 @@ async function runIdsQuery(apiKey, sql, table) { return { ids, createdById } } +async function runSigmaRowsQuery(apiKey, sql, table) { + let queryRun + for (let attempt = 0; ; attempt += 1) { + try { + queryRun = await stripePost(apiKey, '/v1/sigma/query_runs', { sql }) + break + } catch (err) { + if (!isSigmaConcurrencyLimitError(err) || attempt >= SIGMA_POST_MAX_RETRIES) throw err + const waitMs = SIGMA_POST_RETRY_DELAY_MS * (attempt + 1) + console.error( + ` Sigma concurrency limit for ${table}; retrying detail POST in ${Math.round(waitMs / 1000)}s ` + + `(attempt ${attempt + 1}/${SIGMA_POST_MAX_RETRIES})` + ) + await sleep(waitMs) + } + } + + const completed = await pollSigmaRun(apiKey, queryRun.id) + const csv = await downloadSigmaResult(apiKey, completed) + const rows = parseCsv(csv) + if (rows.length < 2) return [] + + const header = rows[0] + return rows + .slice(1) + .map((row) => Object.fromEntries(header.map((name, idx) => [name, row[idx] ?? null]))) +} + function isMissingColumnError(err) { const msg = err.errorMessage ?? err.message ?? '' return /column|invalid identifier/i.test(msg) } +function isRetryableSigmaQueryError(err) { + return isMissingColumnError(err) || err instanceof SigmaQueryFailedError +} + /** * Fetch IDs (with `created` where available) for a Sigma table. Retries * progressively stripping columns/filters when Sigma reports they don't * exist on that particular table. */ -async function fetchSigmaIds(apiKey, table, hasDeletedCol, activeOnly = false) { - const variants = [ - { withCreated: true, hasDeletedCol, activeOnly }, - { withCreated: false, hasDeletedCol, activeOnly }, - ] - if (hasDeletedCol) { - variants.push({ withCreated: true, hasDeletedCol: false, activeOnly }) - variants.push({ withCreated: false, hasDeletedCol: false, activeOnly }) - } - if (activeOnly) { - // Also try without the active filter in case the column doesn't exist - variants.push({ withCreated: true, hasDeletedCol: false, activeOnly: false }) - variants.push({ withCreated: false, hasDeletedCol: false, activeOnly: false }) +async function fetchSigmaIds( + apiKey: string, + table: string, + hasDeletedCol: boolean, + activeOnly = false, + livemode: boolean | null = null +) { + const createdColumns = SIGMA_CREATED_COLUMNS[table] ?? ['created', undefined] + const extraConditions = SIGMA_TABLE_CONDITIONS[table] ?? [] + + function buildVariants(mode: boolean | null): SigmaIdsQueryVariant[] { + const next = [ + ...createdColumns.map((createdColumn) => ({ + createdColumn, + hasDeletedCol, + activeOnly, + livemode: mode, + extraConditions, + })), + ] + if (hasDeletedCol) { + next.push( + ...createdColumns.map((createdColumn) => ({ + createdColumn, + hasDeletedCol: false, + activeOnly, + livemode: mode, + extraConditions, + })) + ) + } + if (activeOnly) { + // Try without active in case older Sigma schemas omit the column. + next.push( + ...createdColumns.map((createdColumn) => ({ + createdColumn, + hasDeletedCol: false, + activeOnly: false, + livemode: mode, + extraConditions, + })) + ) + } + if (extraConditions.length > 0) { + next.push( + ...createdColumns.map((createdColumn) => ({ + createdColumn, + hasDeletedCol: false, + activeOnly: false, + livemode: mode, + extraConditions: [], + })) + ) + } + return next } + + const variants = buildVariants(livemode) + let lastErr - for (const variant of variants) { + for (const [index, variant] of variants.entries()) { + const sql = buildSigmaIdsSql(table, variant) try { - return await runIdsQuery(apiKey, buildSigmaIdsSql(table, variant), table) + return await runIdsQuery(apiKey, sql, table) } catch (err) { - if (!isMissingColumnError(err)) throw err lastErr = err + if (!isRetryableSigmaQueryError(err)) throw err + if (index < variants.length - 1) { + const message = err instanceof Error ? err.message : String(err) + console.error(` retrying Sigma query for ${table} after error: ${message}; sql=${sql}`) + } } } throw lastErr @@ -349,53 +718,111 @@ function detectMissingTables(err) { * let us query `information_schema`, so we maintain this list by hand based on * Stripe's public data dictionary. Add entries as needed. */ const SIGMA_TABLES_WITH_DELETED = new Set([ - 'accounts', 'bank_accounts', 'cards', - 'coupons', 'customers', 'discounts', - 'invoice_line_items', - 'issuing_personalization_designs', - 'plans', - 'products', 'skus', 'subscription_items', - 'subscriptions', 'tax_ids', - 'terminal_readers', ]) +const SIGMA_CREATED_COLUMNS: Record> = { + invoice_items: ['date', undefined], +} + +const SIGMA_TABLE_CONDITIONS: Record = { + subscriptions: ["status != 'canceled'"], +} + +const STRIPE_API_OBJECT_ENDPOINTS = { + application_fees: '/v1/application_fees', + billing_credit_grants: '/v1/billing/credit_grants', + billing_meters: '/v1/billing/meters', + charges: '/v1/charges', + checkout_sessions: '/v1/checkout/sessions', + coupons: '/v1/coupons', + credit_notes: '/v1/credit_notes', + customers: '/v1/customers', + disputes: '/v1/disputes', + early_fraud_warnings: '/v1/radar/early_fraud_warnings', + invoice_payments: '/v1/invoice_payments', + invoiceitems: '/v1/invoiceitems', + invoices: '/v1/invoices', + issuing_authorizations: '/v1/issuing/authorizations', + issuing_cardholders: '/v1/issuing/cardholders', + issuing_cards: '/v1/issuing/cards', + issuing_disputes: '/v1/issuing/disputes', + issuing_personalization_designs: '/v1/issuing/personalization_designs', + issuing_transactions: '/v1/issuing/transactions', + payment_intents: '/v1/payment_intents', + payment_links: '/v1/payment_links', + plans: '/v1/plans', + prices: '/v1/prices', + products: '/v1/products', + promotion_codes: '/v1/promotion_codes', + quotes: '/v1/quotes', + refunds: '/v1/refunds', + setup_intents: '/v1/setup_intents', + subscription_schedules: '/v1/subscription_schedules', + subscriptions: '/v1/subscriptions', + tax_rates: '/v1/tax_rates', + terminal_readers: '/v1/terminal/readers', + topups: '/v1/topups', + transfers: '/v1/transfers', + treasury_financial_accounts: '/v1/treasury/financial_accounts', +} + /** Known Postgres → Sigma name aliases. Add entries as you discover more. */ -const SIGMA_ALIAS = { - invoiceitems: 'invoice_line_items', +const SIGMA_ALIAS: Record = { + accounts: 'connected_accounts', + invoiceitems: 'invoice_items', // NOTE: do NOT alias tax_ids → customer_tax_ids. The sync engine uses // /v1/tax_ids which returns account-level tax IDs, while Sigma's // customer_tax_ids table contains customer-scoped tax IDs (different dataset). billing_alerts: 'billing_meter_alerts', + treasury_financial_accounts: 'connected_account_treasury_financial_accounts', } /** Tables to skip from reconciliation entirely. These cannot be meaningfully * compared because the sync engine either excludes them or the top-level API * endpoint doesn't return the same scope of data as Sigma. */ const RECONCILE_SKIP = new Set([ + 'billing_alerts', + 'billing_portal_configurations', // Requires `customer` query param; explicitly excluded from sync engine. 'billing_credit_balance_transactions', + 'climate_orders', + 'climate_products', + 'files', + 'financial_connections_accounts', + 'identity_verification_sessions', + // Sigma includes historical/internal designs that List/Retrieve do not expose. + 'issuing_personalization_designs', // Top-level /v1/payment_methods only returns unattached/Treasury payment methods. // Sigma includes customer-attached pm_, src_, and card_ objects. 'payment_methods', + 'payouts', + 'reporting_report_runs', + 'reporting_report_types', + 'reviews', + 'scheduled_query_runs', + 'tax_ids', + 'test_helpers_test_clocks', + 'v2_core_accounts', + 'v2_core_event_destinations', ]) /** Per-table ID filters applied to Sigma results before comparison. * Sigma tables sometimes include object types that the sync engine fetches * via a different endpoint or that aren't available with the current API key mode. */ -const SIGMA_ID_FILTERS: Record boolean> = { +const SIGMA_ID_FILTERS: Record boolean> = { // Sigma's "transfers" table includes payouts (po_ prefix). The sync engine // fetches payouts via /v1/payouts, not /v1/transfers. transfers: (id) => !id.startsWith('po_'), // Sigma includes test-mode billing meters (mtr_test_ prefix) which a // live-mode API key does not return from /v1/billing/meters. - billing_meters: (id) => !id.startsWith('mtr_test_'), + billing_meters: (id, livemode) => !livemode || !id.startsWith('mtr_test_'), } /** @@ -403,8 +830,8 @@ const SIGMA_ID_FILTERS: Record boolean> = { * (missing table, opaque query error) to the offending table only so one * bad table doesn't tank the whole reconcile. */ -async function runSigmaForResources(apiKey, resources) { - const skipped = [] +async function runSigmaForResources(apiKey: string, resources: string[], livemode: boolean) { + const skipped: string[] = [] const dataByTable = new Map() // pgTable → { ids: Set, createdById: Map } let done = 0 @@ -422,21 +849,24 @@ async function runSigmaForResources(apiKey, resources) { } const queryable = work - const unexpectedErrors = [] + type SigmaResourceWork = { pgTable: string; sigmaTable: string } + const unexpectedErrors: string[] = [] - async function runOne({ pgTable, sigmaTable }) { + async function runOne({ pgTable, sigmaTable }: SigmaResourceWork) { try { const data = await fetchSigmaIds( apiKey, sigmaTable, SIGMA_TABLES_WITH_DELETED.has(sigmaTable), - SIGMA_TABLES_ACTIVE_ONLY.has(pgTable) + SIGMA_TABLES_ACTIVE_ONLY.has(pgTable), + sigmaLivemodeForTable(sigmaTable, livemode) ) dataByTable.set(pgTable, data) } catch (err) { const missing = detectMissingTables(err) if (!missing || !missing.includes(sigmaTable)) { - unexpectedErrors.push(`${pgTable}: ${err.message}`) + const message = err instanceof Error ? err.message : String(err) + unexpectedErrors.push(`${pgTable}: ${message}`) } skipped.push(pgTable) } finally { @@ -621,6 +1051,88 @@ function formatTable(rows) { return [fmt(headers), separator, ...stringRows.map(fmt)].join('\n') } +function safeJson(value) { + try { + return JSON.stringify(value, null, 2) + } catch (err) { + return JSON.stringify({ error: `Could not serialize value: ${errorMessage(err)}` }, null, 2) + } +} + +function errorMessage(err) { + return err instanceof Error ? err.message : String(err) +} + +function indentBlock(text, prefix) { + return text + .split('\n') + .map((line) => `${prefix}${line}`) + .join('\n') +} + +function formatSkippedTables(rows) { + if (rows.length === 0) return '' + + const lines = ['=== Skipped tables (no Sigma/source data) ==='] + for (const row of rows) { + lines.push(` - ${row.resource} (postgres=${row.postgresCount})`) + } + return lines.join('\n') +} + +async function fetchMissingSampleDetails(apiKey, rows, limit = 3) { + const details = [] + const diffRows = rows.filter((r) => r.status === 'diff' && r.missingRows.length > 0) + + for (const row of diffRows) { + const sigmaTable = SIGMA_ALIAS[row.resource] ?? row.resource + for (const item of row.missingRows.slice(0, limit)) { + const detail: { + resource: string + sigma_table: string + id: string + created: unknown + sigma: unknown + } = { + resource: row.resource, + sigma_table: sigmaTable, + id: item.id, + created: item.created, + sigma: null, + } + + try { + const sql = `SELECT * FROM "${sigmaTable}" WHERE id = '${escapeSqlLiteral(item.id)}' LIMIT 1` + const rows = await runSigmaRowsQuery(apiKey, sql, sigmaTable) + detail.sigma = { ok: true, row: rows[0] ?? null } + } catch (err) { + detail.sigma = { ok: false, error: errorMessage(err) } + } + + details.push(detail) + } + } + + return details +} + +function formatMissingSamples(details) { + if (details.length === 0) return '' + + const lines = ['=== Missing samples (source only, first 3 per resource) ==='] + let currentResource = null + for (const item of details) { + if (item.resource !== currentResource) { + currentResource = item.resource + lines.push(`${item.resource}:`) + } + lines.push(` - id=${item.id} created=${formatCreated(item.created)}`) + lines.push(' sigma:') + lines.push(indentBlock(safeJson(item.sigma), ' ')) + } + return lines.join('\n') +} + // --------------------------------------------------------------------------- // Main // --------------------------------------------------------------------------- @@ -683,9 +1195,18 @@ async function main() { if (!dbUrl) throw new UsageError('Provide --db-url or set DATABASE_URL / POSTGRES_URL') + const livemode = inferLivemodeFromApiKey(apiKey) + console.error(`Sigma livemode filter: livemode = ${livemode ? 'true' : 'false'}`) + // Step 1: discover tables from Postgres console.error(`Discovering tables from Postgres schema ${schema}...`) - const pgTables = discoverPostgresTables(dbUrl, schema) + let pgTables = discoverPostgresTables(dbUrl, schema) + if (args.table) { + if (!pgTables.includes(args.table)) { + throw new UsageError(`Table ${args.table} not found in Postgres schema ${schema}`) + } + pgTables = [args.table] + } console.error(` found ${pgTables.length} tables in ${schema}`) // Step 2: fetch IDs for every PG table (serial to avoid overloading psql) @@ -716,7 +1237,8 @@ async function main() { // Step 3: fetch IDs from Sigma for comparable tables const { dataByTable: sigmaDataByTable, skipped } = await runSigmaForResources( apiKey, - pgTablesToCompare + pgTablesToCompare, + livemode ) // Apply per-table ID filters to remove object types that the sync engine @@ -727,7 +1249,7 @@ async function main() { const filteredIds = new Set() const filteredCreatedById = new Map() for (const id of data.ids) { - if (filterFn(id)) { + if (filterFn(id, livemode)) { filteredIds.add(id) const created = data.createdById.get(id) if (created) filteredCreatedById.set(id, created) @@ -745,11 +1267,23 @@ async function main() { ...skipped, ...excludedTables, ]) + + // Drop missing IDs that the Stripe list endpoint doesn't return at their + // creation timestamp — those objects are out of scope for the sync engine. + const droppedByListApi = await filterMissingNotInListApi(apiKey, rows) + if (droppedByListApi.length > 0) { + console.error( + ` list-api filter: dropped ${droppedByListApi.length} missing ID(s) not returned by list endpoint` + ) + } + const matchCount = rows.filter((r) => r.status === 'match').length const diffCount = rows.filter((r) => r.status === 'diff').length const skippedCount = rows.filter((r) => r.status === 'skipped_in_sigma').length const skippedRows = rows.filter((r) => r.status === 'skipped_in_sigma') const diffRows = rows.filter((r) => r.status === 'diff') + const missingSampleDetails = + diffRows.length > 0 ? await fetchMissingSampleDetails(apiKey, diffRows) : [] // Write detailed report to file (defaults to tmp/reconcile-.json) const outputPath = @@ -761,6 +1295,7 @@ async function main() { timestamp: new Date().toISOString(), pipeline_id: pipelineId ?? null, schema, + livemode, summary: { tables: pgTables.length, compared: matchCount + diffCount, @@ -769,6 +1304,9 @@ async function main() { skipped: skippedCount, }, formatted: formatTable(rows.filter((r) => r.status !== 'skipped_in_sigma')), + skipped_tables: skippedRows, + missing_samples: missingSampleDetails, + dropped_by_list_api: droppedByListApi, tables: rows, } writeFileSync(outputPath, JSON.stringify(report, null, 2) + '\n') @@ -789,6 +1327,16 @@ async function main() { console.log('') console.log(formatTable(rows.filter((r) => r.status !== 'skipped_in_sigma'))) + const skippedTableOutput = formatSkippedTables(skippedRows) + if (skippedTableOutput) { + console.log('') + console.log(skippedTableOutput) + } + const missingSamples = formatMissingSamples(missingSampleDetails) + if (missingSamples) { + console.log('') + console.log(missingSamples) + } if (diffCount > 0) process.exit(1) }