diff --git a/libs/db/src/queries/emailQueue.ts b/libs/db/src/queries/emailQueue.ts index b7386bb..1b96a28 100644 --- a/libs/db/src/queries/emailQueue.ts +++ b/libs/db/src/queries/emailQueue.ts @@ -83,6 +83,40 @@ export const updateQueueStatus = async ( return results[0] || null; }; +/** + * Reschedule a pending queue entry for a future send time. Used by the queue + * processor when an account is over its daily quota — without bumping + * `scheduledFor`, the entry would be picked again on the next batch + * iteration and immediately re-skipped. + * + * Always keeps the entry in `pending` status (only the time and optional + * error message change). `attemptCount` is intentionally NOT incremented + * because no send was actually attempted. + */ +export const rescheduleQueueEntry = async ( + id: string, + newScheduledFor: Date, + errorMessage?: string | null +): Promise => { + const updateData: Record = { + status: 'pending' as const, + scheduledFor: newScheduledFor, + updatedAt: new Date(), + }; + + if (errorMessage !== undefined) { + updateData.errorMessage = errorMessage; + } + + const results = await db + .update(emailQueue) + .set(updateData) + .where(eq(emailQueue.id, id)) + .returning(); + + return results[0] || null; +}; + /** * Increment attempt count for a queue entry */ diff --git a/src/lib/emailQueueProcessor.ts b/src/lib/emailQueueProcessor.ts index 7a8bcbe..87e9285 100644 --- a/src/lib/emailQueueProcessor.ts +++ b/src/lib/emailQueueProcessor.ts @@ -1,6 +1,7 @@ import { getNextPendingEmails, updateQueueStatus, + rescheduleQueueEntry, incrementAttemptCount, createEmailEvent, incrementCampaignStat, @@ -8,6 +9,7 @@ import { isEmailUnsubscribed, } from '@coldflow/db'; import { sendEmail, hasAvailableQuota } from './gmailService'; +import { computeQuotaRescheduleAt } from './queueScheduling'; import { nanoid } from 'nanoid'; /** @@ -84,20 +86,24 @@ export async function processEmailQueue(batchSize: number = 10): Promise now) { - // Reschedule for quota reset time - await updateQueueStatus( - queueEntry.id, - 'pending', // Keep as pending - null, - 'Quota exceeded - rescheduled' - ); - result.skipped++; - console.log(`Email ${queueEntry.id} skipped - quota exceeded, rescheduled for ${quotaResetAt}`); - continue; - } + const quotaResetAt = account?.quotaResetAt + ? new Date(account.quotaResetAt) + : null; + + // Always push the entry's `scheduledFor` forward — without that, + // `getNextPendingEmails` (which filters `scheduledFor <= now`) + // would re-pick this entry on every batch and we'd spin. + const nextAttempt = computeQuotaRescheduleAt(quotaResetAt, now); + await rescheduleQueueEntry( + queueEntry.id, + nextAttempt, + 'Quota exceeded - rescheduled' + ); + result.skipped++; + console.log( + `Email ${queueEntry.id} skipped - quota exceeded, rescheduled for ${nextAttempt.toISOString()}` + ); + continue; } // Update status to processing diff --git a/src/lib/queueScheduling.ts b/src/lib/queueScheduling.ts new file mode 100644 index 0000000..ee9dde6 --- /dev/null +++ b/src/lib/queueScheduling.ts @@ -0,0 +1,45 @@ +/** + * Pure scheduling helpers for the email queue processor. + * + * Kept here (and not in `emailQueueProcessor.ts`) so the decision logic can + * be unit-tested without the DB / Gmail dependency graph. The processor + * imports from this module instead of inlining the math. + */ + +const ONE_MINUTE_MS = 60 * 1000; + +/** + * Decide when to next try sending a queue entry whose account is over its + * daily quota. + * + * Inputs: + * - `quotaResetAt` — the account's stored quota reset timestamp (may be + * null if the account record has never been initialized, may be in + * the past if a reset hasn't been written back yet). + * - `now` — wall-clock time of the current processing pass. + * - `minDelayMs` (optional, defaults to 1 minute) — floor on how soon we + * will retry. Prevents a tight loop when `quotaResetAt` is the same + * tick as `now` or already in the past. + * + * Returns a Date strictly in the future relative to `now`. Callers should + * write that Date back to the queue entry's `scheduledFor` so + * `getNextPendingEmails` (which filters by `scheduledFor <= now`) skips it + * until the reset time arrives. + */ +export function computeQuotaRescheduleAt( + quotaResetAt: Date | null | undefined, + now: Date = new Date(), + minDelayMs: number = ONE_MINUTE_MS +): Date { + const floor = new Date(now.getTime() + Math.max(minDelayMs, 0)); + + if (!quotaResetAt) { + return floor; + } + + if (quotaResetAt.getTime() > floor.getTime()) { + return new Date(quotaResetAt.getTime()); + } + + return floor; +} diff --git a/tests/int/queueScheduling.int.spec.ts b/tests/int/queueScheduling.int.spec.ts new file mode 100644 index 0000000..6838751 --- /dev/null +++ b/tests/int/queueScheduling.int.spec.ts @@ -0,0 +1,91 @@ +import { describe, expect, it } from 'vitest'; +import { computeQuotaRescheduleAt } from '@/lib/queueScheduling'; + +describe('computeQuotaRescheduleAt', () => { + const now = new Date('2026-05-03T12:00:00Z'); + + it('uses quotaResetAt when it is far in the future', () => { + const reset = new Date('2026-05-04T00:00:00Z'); + expect(computeQuotaRescheduleAt(reset, now).toISOString()).toBe( + reset.toISOString() + ); + }); + + it('falls back to now+minDelay when quotaResetAt is null', () => { + const out = computeQuotaRescheduleAt(null, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('falls back to now+minDelay when quotaResetAt is undefined', () => { + const out = computeQuotaRescheduleAt(undefined, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('floors at now+minDelay when quotaResetAt is already in the past', () => { + const past = new Date('2026-05-03T11:00:00Z'); + const out = computeQuotaRescheduleAt(past, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('floors at now+minDelay when quotaResetAt equals now', () => { + const out = computeQuotaRescheduleAt(now, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('floors at now+minDelay when quotaResetAt is sooner than the floor', () => { + const reset = new Date(now.getTime() + 30_000); // 30s ahead of now + const out = computeQuotaRescheduleAt(reset, now); + expect(out.getTime()).toBe(now.getTime() + 60_000); + }); + + it('uses quotaResetAt when it is just past the floor', () => { + const reset = new Date(now.getTime() + 90_000); // 90s ahead + const out = computeQuotaRescheduleAt(reset, now); + expect(out.toISOString()).toBe(reset.toISOString()); + }); + + it('honors a custom minDelayMs', () => { + const out = computeQuotaRescheduleAt(null, now, 5 * 60_000); + expect(out.getTime()).toBe(now.getTime() + 5 * 60_000); + }); + + it('treats negative minDelayMs as zero (never schedules earlier than now)', () => { + const out = computeQuotaRescheduleAt(null, now, -1000); + expect(out.getTime()).toBe(now.getTime()); + // Strictly: >= now. The processor still won't pick it again until the + // next pass, so this is intentional rather than buggy. + expect(out.getTime()).toBeGreaterThanOrEqual(now.getTime()); + }); + + it('returns a fresh Date instance (not the input)', () => { + const reset = new Date('2026-05-04T00:00:00Z'); + const out = computeQuotaRescheduleAt(reset, now); + expect(out).not.toBe(reset); + expect(out.getTime()).toBe(reset.getTime()); + }); + + it('uses the system clock when `now` is omitted', () => { + const before = Date.now(); + const out = computeQuotaRescheduleAt(null); + const after = Date.now(); + // Output should be in [before+60_000, after+60_000]. + expect(out.getTime()).toBeGreaterThanOrEqual(before + 60_000); + expect(out.getTime()).toBeLessThanOrEqual(after + 60_000); + }); + + it('never returns a Date earlier than `now` for any reset value', () => { + const cases: Array = [ + null, + undefined, + new Date('1970-01-01T00:00:00Z'), + new Date('2026-05-03T11:59:59Z'), + new Date('2026-05-03T12:00:00Z'), + new Date('2026-05-03T12:00:01Z'), + new Date('2099-01-01T00:00:00Z'), + ]; + for (const reset of cases) { + const out = computeQuotaRescheduleAt(reset, now); + expect(out.getTime()).toBeGreaterThanOrEqual(now.getTime()); + } + }); +});