Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ npm workspaces:
| `@wxyc/flowsheet-etl` | `jobs/flowsheet-etl/` | Flowsheet ETL: sync from tubafrenzy |
| `@wxyc/rotation-etl` | `jobs/rotation-etl/` | Rotation ETL: sync from tubafrenzy |
| `@wxyc/artist-identity-etl` | `jobs/artist-identity-etl/` | Artist identity ETL: sync from LML's `entity.identity` |
| `@wxyc/flowsheet-dj-name-backfill` | `jobs/flowsheet-dj-name-backfill/` | One-shot backfill: populate `flowsheet.dj_name` on legacy track rows after migration 0053 |
| `@wxyc/flowsheet-dj-name-backfill` | `jobs/flowsheet-dj-name-backfill/` | One-shot backfill: populate `flowsheet.dj_name` on legacy track + marker rows (show_start, show_end, dj_join, dj_leave) after migration 0053 / #952 |
| `@wxyc/library-artist-name-backfill` | `jobs/library-artist-name-backfill/` | One-shot backfill: populate `library.artist_name` from the `artists` join after migration 0058 (Epic A.2) |
| `@wxyc/flowsheet-metadata-backfill` | `jobs/flowsheet-metadata-backfill/` | Recurring metadata drift-repair: enrich `flowsheet` track rows where LML metadata enrichment never ran (#631 / #638 / #641). Cron-registered via deploy-base; default schedule `0 6 * * *` UTC (02:00 ET) from `package.json` `cron-schedule`, overridable per-deploy via the `BACKFILL_CRON_SCHEDULE` GHA repository variable (BS#914). Orchestrator's cooperative pause (#735) defers when DJs are active. |
| `@wxyc/library-artwork-url-backfill` | `jobs/library-artwork-url-backfill/` | One-shot warm: populate `library.artwork_url` for Discogs-resolvable rows (joined to `artists.discogs_artist_id`) so search-time `enrichWithArtwork` short-circuits (#637). |
Expand Down
78 changes: 23 additions & 55 deletions apps/backend/services/flowsheet.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ export const startShow = async (dj_id: string, show_name?: string, specialty_id?
await db.insert(flowsheet).values({
show_id: new_show[0].id,
entry_type: 'show_start',
dj_name: dj_info.djName || dj_info.name || null,
play_order: await nextPlayOrder(new_show[0].id),
message: `Start of Show: DJ ${dj_info.djName || dj_info.name} joined the set at ${new Date().toLocaleString(
'en-US',
Expand Down Expand Up @@ -469,18 +470,19 @@ export const addDJToShow = async (dj_id: string, current_show: Show): Promise<Sh
};

const createJoinNotification = async (id: string, show_id: number): Promise<FSEntry> => {
let dj_name = 'A DJ';
const dj = (await db.select().from(user).where(eq(user.id, id)).limit(1))[0];

dj_name = dj?.djName || dj?.name || dj_name;
const persisted_dj_name = dj?.djName || dj?.name || null;
const display_dj_name = persisted_dj_name ?? 'A DJ';

const message = `${dj_name} joined the set!`;
const message = `${display_dj_name} joined the set!`;

const notification = await db
.insert(flowsheet)
.values({
show_id: show_id,
entry_type: 'dj_join',
dj_name: persisted_dj_name,
play_order: await nextPlayOrder(show_id),
message: message,
})
Expand Down Expand Up @@ -511,13 +513,15 @@ export const endShow = async (currentShow: Show): Promise<Show> => {
);

const dj_information = (await db.select().from(user).where(eq(user.id, primary_dj_id)).limit(1))[0];
const dj_name = dj_information?.djName || dj_information?.name || 'A DJ';
const persisted_dj_name = dj_information?.djName || dj_information?.name || null;
const display_dj_name = persisted_dj_name ?? 'A DJ';

await db.insert(flowsheet).values({
show_id: currentShow.id,
entry_type: 'show_end',
dj_name: persisted_dj_name,
play_order: await nextPlayOrder(currentShow.id),
message: `End of Show: ${dj_name} left the set at ${new Date().toLocaleString('en-US', {
message: `End of Show: ${display_dj_name} left the set at ${new Date().toLocaleString('en-US', {
timeZone: 'America/New_York',
})}`,
});
Expand Down Expand Up @@ -551,18 +555,19 @@ export const leaveShow = async (dj_id: string, currentShow: Show): Promise<ShowD
};

const createLeaveNotification = async (dj_id: string, show_id: number): Promise<FSEntry> => {
let dj_name = 'A DJ';
const dj = (await db.select().from(user).where(eq(user.id, dj_id)).limit(1))[0];

dj_name = dj?.djName || dj?.name || dj_name;
const persisted_dj_name = dj?.djName || dj?.name || null;
const display_dj_name = persisted_dj_name ?? 'A DJ';

const message = `${dj_name} left the set!`;
const message = `${display_dj_name} left the set!`;

const notification = await db
.insert(flowsheet)
.values({
show_id: show_id,
entry_type: 'dj_leave',
dj_name: persisted_dj_name,
play_order: await nextPlayOrder(show_id),
message: message,
})
Expand Down Expand Up @@ -754,11 +759,12 @@ export const transformToV2 = (entry: IFSEntry): Record<string, unknown> => {
entry_type: entry.entry_type,
};

// dj_name is intentionally not propagated here. It is denormalized onto the
// flowsheet row purely to let the search service skip the shows -> auth_user
// join (steps 5b.1-5b.3); V2 API consumers should keep deriving the display
// name from the show metadata so this denormalization stays an internal
// implementation detail of the search path.
// For marker entry types (show_start, show_end, dj_join, dj_leave), dj_name is
// surfaced directly from the flowsheet.dj_name column — see the v2 contract in
// apps/backend/app.yaml. Track entries do not include dj_name in the v2 payload
// (the artist_name / album_title / track_title fields carry the relevant
// attribution); flowsheet.dj_name on track rows exists solely for the search
// service's hot path (search.service.ts, originally steps 5b.1-5b.3).
switch (entry.entry_type) {
case 'track':
return {
Expand Down Expand Up @@ -789,58 +795,20 @@ export const transformToV2 = (entry: IFSEntry): Record<string, unknown> => {

case 'show_start':
case 'show_end': {
// Parse DJ name and timestamp from message
// Format: "Start of Show: DJ {name} joined the set at {timestamp}"
// Format: "End of Show: {name} left the set at {timestamp}"
const message = entry.message || '';
let dj_name = '';
let timestamp = '';

if (entry.entry_type === 'show_start') {
const match = message.match(/^Start of Show: DJ (.+) joined the set at (.+)$/);
if (match) {
dj_name = match[1];
timestamp = match[2];
}
} else {
const match = message.match(/^End of Show: (.+) left the set at (.+)$/);
if (match) {
dj_name = match[1];
timestamp = match[2];
}
}

const timestamp = entry.add_time ? entry.add_time.toLocaleString('en-US', { timeZone: 'America/New_York' }) : '';
return {
...baseFields,
dj_name,
dj_name: entry.dj_name ?? '',
timestamp,
};
}

case 'dj_join':
case 'dj_leave': {
// Parse DJ name from message
// Format: "{name} joined the set!" or "{name} left the set!"
const message = entry.message || '';
let dj_name = '';

if (entry.entry_type === 'dj_join') {
const match = message.match(/^(.+) joined the set!$/);
if (match) {
dj_name = match[1];
}
} else {
const match = message.match(/^(.+) left the set!$/);
if (match) {
dj_name = match[1];
}
}

case 'dj_leave':
return {
...baseFields,
dj_name,
dj_name: entry.dj_name ?? '',
};
}

case 'talkset':
case 'message':
Expand Down
26 changes: 16 additions & 10 deletions jobs/flowsheet-dj-name-backfill/job.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* One-shot backfill: populate flowsheet.dj_name on legacy track rows.
* One-shot backfill: populate flowsheet.dj_name on legacy track + marker rows.
*
* Splits the large UPDATE that was originally embedded in migration 0053 into
* batched, individually-committed UPDATEs so a long run cannot hold an
Expand All @@ -12,9 +12,15 @@
* insert path, so every row carries the same resolved name regardless of
* which subsystem populated it.
*
* Track-only filter: matches the precondition guard in migration 0054, which
* only requires dj_name on track rows. Non-track entries (talkset, breakpoint,
* show_start, etc.) keep dj_name NULL — search never reads them.
* Covered entry types (#952): track (search hot path) plus the four marker
* types — show_start, show_end, dj_join, dj_leave — which surface dj_name in
* the v2 /flowsheet API contract.
*
* Known limitation: dj_join / dj_leave rows for guest DJs join through
* shows.primary_dj_id, so the backfill writes the primary DJ's name, not the
* joining guest's. New rows from the live insert path are correct (they look
* up the joining DJ directly). Talkset / breakpoint / message rows still keep
* dj_name NULL — those aren't attributed to a specific DJ.
*
* Run procedure: see Backend-Service/CLAUDE.md and issue #511. Build via
* `Manual Build & Deploy` with `target=flowsheet-dj-name-backfill`, then SSH
Expand Down Expand Up @@ -61,11 +67,11 @@ const applyBatch = async (batchSize: number): Promise<number> => {
FROM "wxyc_schema"."shows" AS s
LEFT JOIN "auth_user" AS u ON u."id" = s."primary_dj_id"
WHERE f."show_id" = s."id"
AND f."entry_type" = 'track'
AND f."entry_type" IN ('track', 'show_start', 'show_end', 'dj_join', 'dj_leave')
AND f."dj_name" IS NULL
AND f."id" IN (
SELECT "id" FROM "wxyc_schema"."flowsheet"
WHERE "entry_type" = 'track'
WHERE "entry_type" IN ('track', 'show_start', 'show_end', 'dj_join', 'dj_leave')
AND "dj_name" IS NULL
ORDER BY "id"
LIMIT ${batchSize}
Expand All @@ -85,7 +91,7 @@ const formatDuration = (ms: number): string => {
};

const runBackfill = async () => {
console.log(`[${JOB_NAME}] Starting backfill of flowsheet.dj_name (track rows).`);
console.log(`[${JOB_NAME}] Starting backfill of flowsheet.dj_name (track + marker rows).`);
console.log(`[${JOB_NAME}] Batch size: ${BATCH_SIZE}.`);

const startedAt = Date.now();
Expand Down Expand Up @@ -134,16 +140,16 @@ const runBackfill = async () => {
*/
const verifyComplete = async () => {
const result = await db.execute(
sql`SELECT count(*)::int AS missing FROM "wxyc_schema"."flowsheet" WHERE entry_type = 'track' AND dj_name IS NULL`
sql`SELECT count(*)::int AS missing FROM "wxyc_schema"."flowsheet" WHERE entry_type IN ('track', 'show_start', 'show_end', 'dj_join', 'dj_leave') AND dj_name IS NULL`
);
const missing = Number((result as unknown as Array<{ missing: number }>)[0]?.missing ?? 0);
if (missing > 0) {
throw new Error(
`[${JOB_NAME}] Verification failed: ${missing} track row(s) still have dj_name IS NULL. ` +
`[${JOB_NAME}] Verification failed: ${missing} row(s) still have dj_name IS NULL. ` +
`Re-run the backfill — it is idempotent and will pick up the remaining rows.`
);
}
console.log(`[${JOB_NAME}] Verification passed: 0 track rows with dj_name IS NULL.`);
console.log(`[${JOB_NAME}] Verification passed: 0 track + marker rows with dj_name IS NULL.`);
};

const main = async () => {
Expand Down
31 changes: 25 additions & 6 deletions tests/unit/jobs/flowsheet-dj-name-backfill/job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ describe('flowsheet-dj-name-backfill: applyBatch', () => {
expect(sqlText).toMatch(/COALESCE\(\s*u\."dj_name",\s*s\."legacy_dj_name",\s*u\."name"\s*\)/);
});

it('restricts the UPDATE to track rows with NULL dj_name within a bounded batch', async () => {
it('restricts the UPDATE to track + marker rows with NULL dj_name within a bounded batch', async () => {
// Three regression guards in one test:
// - entry_type = 'track' filter (matches 0054's precondition guard)
// - entry_type IN (...) covers track + four marker types (#952)
// - dj_name IS NULL (idempotency: re-run skips already-backfilled rows)
// - LIMIT in an inner SELECT (bounded batch — never an unbounded UPDATE)
(db.execute as jest.Mock).mockResolvedValueOnce({ count: 5000 });
Expand All @@ -70,7 +70,12 @@ describe('flowsheet-dj-name-backfill: applyBatch', () => {

const call = findExecuteCallMatching(/UPDATE[\s\S]*flowsheet[\s\S]*dj_name/i);
const sqlText = renderSql(call?.[0]);
expect(sqlText).toMatch(/entry_type"?\s*=\s*'track'/i);
expect(sqlText).toMatch(/entry_type"?\s+IN\s*\(/i);
expect(sqlText).toMatch(/'track'/);
expect(sqlText).toMatch(/'show_start'/);
expect(sqlText).toMatch(/'show_end'/);
expect(sqlText).toMatch(/'dj_join'/);
expect(sqlText).toMatch(/'dj_leave'/);
expect(sqlText).toMatch(/dj_name"?\s+IS\s+NULL/i);
expect(sqlText).toMatch(/LIMIT/i);
});
Expand Down Expand Up @@ -189,18 +194,32 @@ describe('flowsheet-dj-name-backfill: verifyComplete', () => {
jest.restoreAllMocks();
});

it('passes when zero track rows have NULL dj_name', async () => {
it('passes when zero track + marker rows have NULL dj_name', async () => {
(db.execute as jest.Mock).mockResolvedValueOnce([{ missing: 0 }]);

await expect(verifyComplete()).resolves.toBeUndefined();
});

it('throws with a remediation hint when track rows still have NULL dj_name', async () => {
it('checks all four marker types plus track in the verification query', async () => {
(db.execute as jest.Mock).mockResolvedValueOnce([{ missing: 0 }]);

await verifyComplete();

const sqlText = renderSql((db.execute as jest.Mock).mock.calls[0]?.[0]);
expect(sqlText).toMatch(/entry_type"?\s+IN\s*\(/i);
expect(sqlText).toMatch(/'track'/);
expect(sqlText).toMatch(/'show_start'/);
expect(sqlText).toMatch(/'show_end'/);
expect(sqlText).toMatch(/'dj_join'/);
expect(sqlText).toMatch(/'dj_leave'/);
});

it('throws with a remediation hint when rows still have NULL dj_name', async () => {
// mockResolvedValue (not Once) so both rejection assertions get the
// same response — the second await re-invokes verifyComplete.
(db.execute as jest.Mock).mockResolvedValue([{ missing: 137 }]);

await expect(verifyComplete()).rejects.toThrow(/137 track row\(s\)/);
await expect(verifyComplete()).rejects.toThrow(/137 row\(s\)/);
await expect(verifyComplete()).rejects.toThrow(/idempotent/i);
});
});
Expand Down
76 changes: 76 additions & 0 deletions tests/unit/services/flowsheet.endShow.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { db, createMockQueryChain } from '../../mocks/database.mock';
import { endShow } from '../../../apps/backend/services/flowsheet.service';

const makeAwaitablePlayOrderChain = (max: number) => {
const chain = createMockQueryChain();
(chain as unknown as { then: (resolve: (v: unknown) => void) => void }).then = (resolve) => resolve([{ max }]);
return chain;
};

describe('endShow', () => {
beforeEach(() => {
jest.clearAllMocks();
});

it('persists dj_name (from user.djName) on the show_end flowsheet row', async () => {
// remaining_djs select — no guests so the loop is a no-op
const remainingDjsSelect = createMockQueryChain();
remainingDjsSelect.where.mockResolvedValue([]);
db.select.mockReturnValueOnce(remainingDjsSelect);

// primary DJ user lookup
const userSelect = createMockQueryChain();
userSelect.limit.mockResolvedValue([{ djName: 'DJ Night Owl', name: 'Riley Owens' }]);
db.select.mockReturnValueOnce(userSelect);

// nextPlayOrder select for the flowsheet insert
db.select.mockReturnValueOnce(makeAwaitablePlayOrderChain(7));

// flowsheet insert — inspection target
const flowsheetInsert = createMockQueryChain([{ id: 999 }]);
db.insert.mockReturnValueOnce(flowsheetInsert);

// shows update (end_time)
db.update.mockReturnValueOnce(createMockQueryChain([{}]));

// getLatestShow() select chain — returns the show we just ended
const latestShowSelect = createMockQueryChain();
latestShowSelect.limit.mockResolvedValue([{ id: 42, end_time: new Date() }]);
db.select.mockReturnValueOnce(latestShowSelect);

await endShow({ id: 42, primary_dj_id: 'user-1' } as unknown as Parameters<typeof endShow>[0]);

const flowsheetValues = flowsheetInsert.values.mock.calls[0]?.[0] as Record<string, unknown>;
expect(flowsheetValues).toEqual(
expect.objectContaining({
entry_type: 'show_end',
dj_name: 'DJ Night Owl',
})
);
});

it('falls back to user.name when djName is null', async () => {
const remainingDjsSelect = createMockQueryChain();
remainingDjsSelect.where.mockResolvedValue([]);
db.select.mockReturnValueOnce(remainingDjsSelect);

const userSelect = createMockQueryChain();
userSelect.limit.mockResolvedValue([{ djName: null, name: 'Riley Owens' }]);
db.select.mockReturnValueOnce(userSelect);

db.select.mockReturnValueOnce(makeAwaitablePlayOrderChain(0));

const flowsheetInsert = createMockQueryChain([{ id: 999 }]);
db.insert.mockReturnValueOnce(flowsheetInsert);
db.update.mockReturnValueOnce(createMockQueryChain([{}]));

const latestShowSelect = createMockQueryChain();
latestShowSelect.limit.mockResolvedValue([{ id: 42, end_time: new Date() }]);
db.select.mockReturnValueOnce(latestShowSelect);

await endShow({ id: 42, primary_dj_id: 'user-1' } as unknown as Parameters<typeof endShow>[0]);

const flowsheetValues = flowsheetInsert.values.mock.calls[0]?.[0] as Record<string, unknown>;
expect(flowsheetValues.dj_name).toBe('Riley Owens');
});
});
Loading
Loading