Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .claude/rules/push-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ The push pipeline lives in `packages/shared/src/push/`. It is invoked by `apps/w

Push pipeline (`packages/shared/src/push/`):
- `push.logic.ts` — `buildPushJobs()` (pure, paginated), `dispatchJob()` (circuit-breaker)
- `push.repository.ts` — `getAllCandidates`, `getVerifiedChannelsBulk`, `upsertHistoryBatch`, `stampLastPushDate`, `incrementChannelFailures`, `resetChannelFailuresForUsers`, `recordPushRun`
- `channels/` — `interface.ts`, `telegram.ts`, `line.ts`, `email.ts`, `email-template.tsx`, `registry.ts` (`createChannelRegistry` factory)
- `push.repository.ts` — `getAllCandidates`, `getVerifiedChannelsBulk`, `upsertHistoryBatch`, `stampLastPushDate`, `incrementChannelFailures`, `resetChannelFailures`, `recordPushRun`
- `channels/` — `email-template.tsx`, `registry.ts` (defines `NotificationChannel` as a function type and `createChannelRegistry` that builds closures over `sendTelegramMessage` / `sendLineMessage` / `sendEmailMessage`)

Web cron endpoint (`apps/web/`):
- `app/api/cron/push/route.ts` — POST handler; auth; overlap guard; invokes shared pipeline
Expand Down
4 changes: 2 additions & 2 deletions .claude/rules/shared-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ paths:

## Structure

- `src/channels/` — `sendTelegramMessage`, `sendLineMessage`, `sendEmailMessage` return `SendResult` with `shouldRetry`. Push channel classes delegate here; admin `forceNotifyAll` calls directly.
- `src/push/` — Full push pipeline: `buildPushJobs()`, `dispatchJob()`, `recordPushRun()`, repository functions, channel classes (`TelegramChannel`, `LineChannel`, `EmailChannel`), `createChannelRegistry()` factory. Invoked from `apps/web/app/api/cron/push/route.ts` (the hourly cron target).
- `src/channels/` — `sendTelegramMessage`, `sendLineMessage`, `sendEmailMessage` return `SendResult` with `shouldRetry`. Shared by the push pipeline (via `createChannelRegistry`) and admin `forceNotifyAll`.
- `src/push/` — Full push pipeline: `buildPushJobs()`, `dispatchJob()`, `recordPushRun()`, repository functions. `createChannelRegistry()` returns a `Record<string, NotificationChannel>` where each `NotificationChannel` is a function `(identifier, msg) => Promise<SendResult>` that wraps one of the raw channel senders. Invoked from `apps/web/app/api/cron/push/route.ts` (the hourly cron target).
- `src/services/problem-selector.ts` — `selectProblemForUser()` single source of truth for cron push and admin force-notify.
- `src/services/badge-checker.ts` — `evaluateBadgeCondition()` evaluates badge requirement JSONB against user context.
- `src/utils/notification-formatters.ts` — `formatTelegramMessage`, `buildFlexBubble`, `formatEmailSubject`, `buildTelegramReplyMarkup`.
Expand Down
7 changes: 4 additions & 3 deletions apps/web/__tests__/api/cron/push/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ vi.mock('@caffecode/shared', async (importOriginal) => {
...actual,
buildPushJobs: (...args: unknown[]) => mockBuildPushJobs(...args),
recordPushRun: (...args: unknown[]) => mockRecordPushRun(...args),
TelegramChannel: vi.fn(),
LineChannel: vi.fn(),
EmailChannel: vi.fn(),
createChannelRegistry: vi.fn().mockReturnValue({
telegram: vi.fn(),
line: vi.fn(),
}),
}
})

Expand Down
72 changes: 0 additions & 72 deletions packages/shared/src/push/__tests__/email.test.ts

This file was deleted.

60 changes: 0 additions & 60 deletions packages/shared/src/push/__tests__/line-channel.test.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/shared/src/push/__tests__/line.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { describe, it, expect } from 'vitest'
import { buildFlexBubble } from '../../utils/notification-formatters.js'
import { LineChannel } from '../channels/line.js'
import type { PushMessage } from '../../types/push.js'

type FlexBubble = {
Expand Down
60 changes: 26 additions & 34 deletions packages/shared/src/push/__tests__/push.pipeline.paused.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import type { SupabaseClient } from '@supabase/supabase-js'
import type { LimitFunction } from 'p-limit'
import type { NotificationChannel } from '../channels/interface.js'
import type { NotificationChannel } from '../channels/registry.js'
import type { SendResult } from '../../types/push.js'

// Mock selectProblemForUser to return a problem for each candidate
Expand Down Expand Up @@ -99,16 +99,14 @@ describe('buildPushJobs — channel pausing', () => {

// First call: permanent failure. Second call: should be skipped (paused)
let callCount = 0
const channel: NotificationChannel = {
send: vi.fn().mockImplementation(async () => {
callCount++
if (callCount === 1) {
return { success: false, shouldRetry: false, error: '403 Forbidden' } as SendResult
}
// This should not be reached if pausing works
return { success: true } as SendResult
}),
}
const channel: NotificationChannel = vi.fn().mockImplementation(async () => {
callCount++
if (callCount === 1) {
return { success: false, shouldRetry: false, error: '403 Forbidden' } as SendResult
}
// This should not be reached if pausing works
return { success: true } as SendResult
})

// Both users share the same channel ID by making the mock return same channel_id
const fromMock = vi.fn().mockImplementation((table: string) => {
Expand Down Expand Up @@ -161,20 +159,18 @@ describe('buildPushJobs — channel pausing', () => {
const users = [makeCandidate('user-1')]
const db = makeSupabaseMock(users)

const channel: NotificationChannel = {
send: vi.fn().mockResolvedValue({
success: false,
shouldRetry: true,
error: '500 Server Error',
} as SendResult),
}
const channel: NotificationChannel = vi.fn().mockResolvedValue({
success: false,
shouldRetry: true,
error: '500 Server Error',
} as SendResult)

const stats = await buildPushJobs(db, { telegram: channel }, noopLimit)

// Job fails but channel should NOT be paused (shouldRetry=true)
expect(stats.failed).toBe(1)
// channel.send should have been called (not skipped)
expect(channel.send).toHaveBeenCalled()
// channel should have been called (not skipped)
expect(channel).toHaveBeenCalled()
})

it('pausing one channel does not affect other channels for the same user', async () => {
Expand Down Expand Up @@ -214,19 +210,15 @@ describe('buildPushJobs — channel pausing', () => {

const db = { from: fromMock, rpc: rpcMock } as unknown as SupabaseClient

const telegramChannel: NotificationChannel = {
send: vi.fn().mockResolvedValue({
success: false,
shouldRetry: false,
error: '403 Forbidden',
} as SendResult),
}
const lineChannel: NotificationChannel = {
send: vi.fn().mockResolvedValue({
success: true,
shouldRetry: false,
} as SendResult),
}
const telegramChannel: NotificationChannel = vi.fn().mockResolvedValue({
success: false,
shouldRetry: false,
error: '403 Forbidden',
} as SendResult)
const lineChannel: NotificationChannel = vi.fn().mockResolvedValue({
success: true,
shouldRetry: false,
} as SendResult)

const stats = await buildPushJobs(
db,
Expand All @@ -235,7 +227,7 @@ describe('buildPushJobs — channel pausing', () => {
)

// Telegram failed permanently, but LINE should still succeed
expect(lineChannel.send).toHaveBeenCalled()
expect(lineChannel).toHaveBeenCalled()
expect(stats.succeeded).toBeGreaterThanOrEqual(1)
})
})
20 changes: 7 additions & 13 deletions packages/shared/src/push/__tests__/push.pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
import { selectProblemForUser } from '../../services/problem-selector.js'
import type { SupabaseClient } from '@supabase/supabase-js'
import type { SelectedProblem } from '../../types/push.js'
import type { NotificationChannel } from '../channels/interface.js'
import type { NotificationChannel } from '../channels/registry.js'

const mockGetAllCandidates = vi.mocked(getAllCandidates)
const mockGetChannels = vi.mocked(getVerifiedChannelsBulk)
Expand Down Expand Up @@ -72,9 +72,7 @@ function makeChannel(overrides: Partial<VerifiedChannel> = {}): VerifiedChannel

// Channel registry stub: dispatches succeed without real HTTP calls
function makeChannelRegistry(channelTypes = ['telegram', 'email', 'line']): Record<string, NotificationChannel> {
const channel: NotificationChannel = {
send: vi.fn().mockResolvedValue({ success: true }),
}
const channel: NotificationChannel = vi.fn().mockResolvedValue({ success: true })
return Object.fromEntries(channelTypes.map(t => [t, channel]))
}

Expand Down Expand Up @@ -227,12 +225,10 @@ describe('buildPushJobs — pipeline orchestration', () => {
])

const registry: Record<string, NotificationChannel> = {
telegram: {
send: vi.fn().mockImplementation(async (identifier: string) => {
if (identifier === 'tg-u1') return { success: true }
return { success: false, shouldRetry: false, error: 'permanent failure' }
}),
},
telegram: vi.fn().mockImplementation(async (identifier: string) => {
if (identifier === 'tg-u1') return { success: true }
return { success: false, shouldRetry: false, error: 'permanent failure' }
}),
}

const stats = await buildPushJobs(db, registry, noopLimit)
Expand All @@ -251,9 +247,7 @@ describe('buildPushJobs — pipeline orchestration', () => {
mockGetChannels.mockResolvedValueOnce([makeChannel({ id: 'ch-1', user_id: 'u1' })])

const registry: Record<string, NotificationChannel> = {
telegram: {
send: vi.fn().mockResolvedValue({ success: false, shouldRetry: false, error: 'blocked' }),
},
telegram: vi.fn().mockResolvedValue({ success: false, shouldRetry: false, error: 'blocked' }),
}

const stats = await buildPushJobs(db, registry, noopLimit)
Expand Down
Loading
Loading