From 4fcf446792fae955afec24ebbfe7635838b3793b Mon Sep 17 00:00:00 2001 From: Jared Zwick <52264361+jaredzwick@users.noreply.github.com> Date: Sun, 3 May 2026 10:08:49 -0400 Subject: [PATCH] hir-94: actually reschedule queue entries when an account is over quota MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The quota-exceeded branch in `processEmailQueue` claimed to "reschedule for quota reset time" but the underlying `updateQueueStatus` call only touched `status` and `errorMessage`. `scheduledFor` stayed at its original value. The next batch picked the same entry again — because `getNextPendingEmails` filters by `scheduledFor <= now` — wasted a slot re-running the unsubscribe + quota checks, and skipped it again. Under sustained load with one over-quota account, the processor would fill its entire batch every iteration with the same N entries and never make forward progress on other accounts' work. This change: - libs/db/src/queries/emailQueue.ts: new `rescheduleQueueEntry(id, newScheduledFor, errorMessage?)` helper. Keeps status pending, bumps `scheduledFor`, sets `updatedAt`. Does NOT increment `attemptCount` because no send was attempted. - src/lib/queueScheduling.ts (new): pure `computeQuotaRescheduleAt( quotaResetAt, now?, minDelayMs?)` — picks the future moment to retry. Honors the account's stored quota reset when it's far enough out; floors at `now + minDelay` (default 1 minute) when the reset is null, in the past, equal to now, or sooner than the floor. Always returns a Date >= now so we never schedule into the past. - src/lib/emailQueueProcessor.ts: quota-exceeded branch now calls `rescheduleQueueEntry` with the computed Date instead of `updateQueueStatus`. Also drops the previous "reset is in the past -> fall through and try to send" path: a fresh quota window resetting is the email-account row's responsibility, not the processor's, and the previous fall-through would attempt a send against an account whose `quotaUsedToday` was still at the limit. Safer to wait for the reset to be applied. Tests: 12 new vitest specs covering future-reset honored, null/undefined fallback, past-reset floor, equal-to-now floor, sub-floor reset clamped, just-past-floor reset honored, custom minDelay, negative minDelay clamped to zero, fresh-Date guarantee, default-now omission, "never earlier than now" property across a parametric set of inputs. 107/108 tests pass in the full int suite (the 1 failure is the pre-existing Payload-secret config issue on `main`, unrelated). Lint clean for changed files. Co-Authored-By: Paperclip --- libs/db/src/queries/emailQueue.ts | 34 ++++++++++ src/lib/emailQueueProcessor.ts | 34 +++++----- src/lib/queueScheduling.ts | 45 +++++++++++++ tests/int/queueScheduling.int.spec.ts | 91 +++++++++++++++++++++++++++ 4 files changed, 190 insertions(+), 14 deletions(-) create mode 100644 src/lib/queueScheduling.ts create mode 100644 tests/int/queueScheduling.int.spec.ts diff --git a/libs/db/src/queries/emailQueue.ts b/libs/db/src/queries/emailQueue.ts index b7386bb..1b96a28 100644 --- a/libs/db/src/queries/emailQueue.ts +++ b/libs/db/src/queries/emailQueue.ts @@ -83,6 +83,40 @@ export const updateQueueStatus = async ( return results[0] || null; }; +/** + * Reschedule a pending queue entry for a future send time. Used by the queue + * processor when an account is over its daily quota — without bumping + * `scheduledFor`, the entry would be picked again on the next batch + * iteration and immediately re-skipped. + * + * Always keeps the entry in `pending` status (only the time and optional + * error message change). `attemptCount` is intentionally NOT incremented + * because no send was actually attempted. + */ +export const rescheduleQueueEntry = async ( + id: string, + newScheduledFor: Date, + errorMessage?: string | null +): Promise => { + const updateData: Record = { + status: 'pending' as const, + scheduledFor: newScheduledFor, + updatedAt: new Date(), + }; + + if (errorMessage !== undefined) { + updateData.errorMessage = errorMessage; + } + + const results = await db + .update(emailQueue) + .set(updateData) + .where(eq(emailQueue.id, id)) + .returning(); + + return results[0] || null; +}; + /** * Increment attempt count for a queue entry */ diff --git a/src/lib/emailQueueProcessor.ts b/src/lib/emailQueueProcessor.ts index 7a8bcbe..87e9285 100644 --- a/src/lib/emailQueueProcessor.ts +++ b/src/lib/emailQueueProcessor.ts @@ -1,6 +1,7 @@ import { getNextPendingEmails, updateQueueStatus, + rescheduleQueueEntry, incrementAttemptCount, createEmailEvent, incrementCampaignStat, @@ -8,6 +9,7 @@ import { isEmailUnsubscribed, } from '@coldflow/db'; import { sendEmail, hasAvailableQuota } from './gmailService'; +import { computeQuotaRescheduleAt } from './queueScheduling'; import { nanoid } from 'nanoid'; /** @@ -84,20 +86,24 @@ export async function processEmailQueue(batchSize: number = 10): Promise now) { - // Reschedule for quota reset time - await updateQueueStatus( - queueEntry.id, - 'pending', // Keep as pending - null, - 'Quota exceeded - rescheduled' - ); - result.skipped++; - console.log(`Email ${queueEntry.id} skipped - quota exceeded, rescheduled for ${quotaResetAt}`); - continue; - } + const quotaResetAt = account?.quotaResetAt + ? new Date(account.quotaResetAt) + : null; + + // Always push the entry's `scheduledFor` forward — without that, + // `getNextPendingEmails` (which filters `scheduledFor <= now`) + // would re-pick this entry on every batch and we'd spin. + const nextAttempt = computeQuotaRescheduleAt(quotaResetAt, now); + await rescheduleQueueEntry( + queueEntry.id, + nextAttempt, + 'Quota exceeded - rescheduled' + ); + result.skipped++; + console.log( + `Email ${queueEntry.id} skipped - quota exceeded, rescheduled for ${nextAttempt.toISOString()}` + ); + continue; } // Update status to processing diff --git a/src/lib/queueScheduling.ts b/src/lib/queueScheduling.ts new file mode 100644 index 0000000..ee9dde6 --- /dev/null +++ b/src/lib/queueScheduling.ts @@ -0,0 +1,45 @@ +/** + * Pure scheduling helpers for the email queue processor. + * + * Kept here (and not in `emailQueueProcessor.ts`) so the decision logic can + * be unit-tested without the DB / Gmail dependency graph. The processor + * imports from this module instead of inlining the math. + */ + +const ONE_MINUTE_MS = 60 * 1000; + +/** + * Decide when to next try sending a queue entry whose account is over its + * daily quota. + * + * Inputs: + * - `quotaResetAt` — the account's stored quota reset timestamp (may be + * null if the account record has never been initialized, may be in + * the past if a reset hasn't been written back yet). + * - `now` — wall-clock time of the current processing pass. + * - `minDelayMs` (optional, defaults to 1 minute) — floor on how soon we + * will retry. Prevents a tight loop when `quotaResetAt` is the same + * tick as `now` or already in the past. + * + * Returns a Date strictly in the future relative to `now`. Callers should + * write that Date back to the queue entry's `scheduledFor` so + * `getNextPendingEmails` (which filters by `scheduledFor <= now`) skips it + * until the reset time arrives. + */ +export function computeQuotaRescheduleAt( + quotaResetAt: Date | null | undefined, + now: Date = new Date(), + minDelayMs: number = ONE_MINUTE_MS +): Date { + const floor = new Date(now.getTime() + Math.max(minDelayMs, 0)); + + if (!quotaResetAt) { + return floor; + } + + if (quotaResetAt.getTime() > floor.getTime()) { + return new Date(quotaResetAt.getTime()); + } + + return floor; +} diff --git a/tests/int/queueScheduling.int.spec.ts b/tests/int/queueScheduling.int.spec.ts new file mode 100644 index 0000000..6838751 --- /dev/null +++ b/tests/int/queueScheduling.int.spec.ts @@ -0,0 +1,91 @@ +import { describe, expect, it } from 'vitest'; +import { computeQuotaRescheduleAt } from '@/lib/queueScheduling'; + +describe('computeQuotaRescheduleAt', () => { + const now = new Date('2026-05-03T12:00:00Z'); + + it('uses quotaResetAt when it is far in the future', () => { + const reset = new Date('2026-05-04T00:00:00Z'); + expect(computeQuotaRescheduleAt(reset, now).toISOString()).toBe( + reset.toISOString() + ); + }); + + it('falls back to now+minDelay when quotaResetAt is null', () => { + const out = computeQuotaRescheduleAt(null, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('falls back to now+minDelay when quotaResetAt is undefined', () => { + const out = computeQuotaRescheduleAt(undefined, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('floors at now+minDelay when quotaResetAt is already in the past', () => { + const past = new Date('2026-05-03T11:00:00Z'); + const out = computeQuotaRescheduleAt(past, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('floors at now+minDelay when quotaResetAt equals now', () => { + const out = computeQuotaRescheduleAt(now, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('floors at now+minDelay when quotaResetAt is sooner than the floor', () => { + const reset = new Date(now.getTime() + 30_000); // 30s ahead of now + const out = computeQuotaRescheduleAt(reset, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('uses quotaResetAt when it is just past the floor', () => { + const reset = new Date(now.getTime() + 90_000); // 90s ahead + const out = computeQuotaRescheduleAt(reset, now); + expect(out.toISOString()).toBe(reset.toISOString()); + }); + + it('honors a custom minDelayMs', () => { + const out = computeQuotaRescheduleAt(null, now, 5 * 60_000); + expect(out.getTime()).toBe(now.getTime() + 5 * 60_000); + }); + + it('treats negative minDelayMs as zero (never schedules earlier than now)', () => { + const out = computeQuotaRescheduleAt(null, now, -1000); + expect(out.getTime()).toBe(now.getTime()); + // Strictly: >= now. The processor still won't pick it again until the + // next pass, so this is intentional rather than buggy. + expect(out.getTime()).toBeGreaterThanOrEqual(now.getTime()); + }); + + it('returns a fresh Date instance (not the input)', () => { + const reset = new Date('2026-05-04T00:00:00Z'); + const out = computeQuotaRescheduleAt(reset, now); + expect(out).not.toBe(reset); + expect(out.getTime()).toBe(reset.getTime()); + }); + + it('uses the system clock when `now` is omitted', () => { + const before = Date.now(); + const out = computeQuotaRescheduleAt(null); + const after = Date.now(); + // Output should be in [before+60_000, after+60_000]. + expect(out.getTime()).toBeGreaterThanOrEqual(before + 60_000); + expect(out.getTime()).toBeLessThanOrEqual(after + 60_000); + }); + + it('never returns a Date earlier than `now` for any reset value', () => { + const cases: Array = [ + null, + undefined, + new Date('1970-01-01T00:00:00Z'), + new Date('2026-05-03T11:59:59Z'), + new Date('2026-05-03T12:00:00Z'), + new Date('2026-05-03T12:00:01Z'), + new Date('2099-01-01T00:00:00Z'), + ]; + for (const reset of cases) { + const out = computeQuotaRescheduleAt(reset, now); + expect(out.getTime()).toBeGreaterThanOrEqual(now.getTime()); + } + }); +});