Skip to content

Commit 5610099

Browse files
committed
feat(mollifier): track org→envs in the buffer for clean org-level fairness
Previously the drainer cached envId→orgId from popped entries and used a sentinel pseudo-org for envs it hadn't seen yet. The sentinel polluted the bucket map with fake org IDs and was a foreseeable source of bugs. This commit moves org membership into the buffer's atomic Lua scripts. New Redis keys, both maintained transactionally alongside per-env queues: - mollifier:orgs — orgs with at least one queued env - mollifier:org-envs:${orgId} — envs of that org with queued entries acceptMollifierEntry SADDs into all three sets (envs + orgs + org-envs). popAndMarkDraining cleans up envs+orgs+org-envs together when the queue empties in the success branch (we know orgId from the popped entry). The no-runId branch can't read orgId so it only cleans envs — stale org-envs entries are bounded by env count and recovered on the next accept. requeueMollifierEntry re-SADDs all three since the env may have just been pruned. The drainer now walks listOrgs() → listEnvsForOrg(org) → pop(env) with two cursors: orgCursor across all active orgs and a per-org envCursor for round-robin within each org. No client-side cache, no sentinel, deterministic from the first tick. Tests updated: - multi-org-round-robin (was multi-env-round-robin): two orgs with one and two envs respectively, asserts org_B drains its only env each tick while org_A rotates through its two. - concurrency-cap test spreads 12 envs across 12 orgs (otherwise one org → one pop per tick). - "heavy org doesn't dominate vs light org" gets explicit listOrgs / listEnvsForOrg from the test's env→org map; assertion tightened to 0.7–1.5 ratio over 20 ticks. - "within an org envs rotated round-robin" gets explicit listEnvsForOrg. - "envCursor resets" → "rotation cursors reset"; cache is gone, only orgCursor and perOrgEnvCursors reset on start(). - makeStubBuffer auto-derives listOrgs/listEnvsForOrg from listEnvs (each env as its own org) so tests that don't care about org grouping don't need to provide them explicitly. 24/24 drainer tests pass, 35/35 buffer tests pass (some redis-container flakes under full-suite load; all green in isolation). Webapp typecheck clean.
1 parent 2348bf2 commit 5610099

3 files changed

Lines changed: 272 additions & 160 deletions

File tree

packages/redis-worker/src/mollifier/buffer.ts

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,30 +53,36 @@ export class MollifierBuffer {
5353
const entryKey = `mollifier:entries:${input.runId}`;
5454
const queueKey = `mollifier:queue:${input.envId}`;
5555
const envsKey = "mollifier:envs";
56+
const orgsKey = "mollifier:orgs";
5657
const createdAt = new Date().toISOString();
5758
const result = await this.redis.acceptMollifierEntry(
5859
entryKey,
5960
queueKey,
6061
envsKey,
62+
orgsKey,
6163
input.runId,
6264
input.envId,
6365
input.orgId,
6466
input.payload,
6567
createdAt,
6668
String(this.entryTtlSeconds),
69+
"mollifier:org-envs:",
6770
);
6871
return result === 1;
6972
}
7073

7174
async pop(envId: string): Promise<BufferEntry | null> {
7275
const queueKey = `mollifier:queue:${envId}`;
7376
const envsKey = "mollifier:envs";
77+
const orgsKey = "mollifier:orgs";
7478
const entryPrefix = "mollifier:entries:";
7579
const encoded = (await this.redis.popAndMarkDraining(
7680
queueKey,
7781
envsKey,
82+
orgsKey,
7883
entryPrefix,
7984
envId,
85+
"mollifier:org-envs:",
8086
)) as string | null;
8187
if (!encoded) return null;
8288

@@ -114,10 +120,24 @@ export class MollifierBuffer {
114120
return parsed.data;
115121
}
116122

123+
// Flat list of envs with active entries. Kept for inspection and the
124+
// org-walk fallback; the drainer walks orgs → envs-for-org instead.
117125
async listEnvs(): Promise<string[]> {
118126
return this.redis.smembers("mollifier:envs");
119127
}
120128

129+
// Drainer walks these two methods to schedule pops with org-level
130+
// fairness: one env per org per tick. The Lua scripts maintain both
131+
// sets atomically with the per-env queues, so an env appears here
132+
// exactly when its queue has at least one entry.
133+
async listOrgs(): Promise<string[]> {
134+
return this.redis.smembers("mollifier:orgs");
135+
}
136+
137+
async listEnvsForOrg(orgId: string): Promise<string[]> {
138+
return this.redis.smembers(`mollifier:org-envs:${orgId}`);
139+
}
140+
121141
async ack(runId: string): Promise<void> {
122142
await this.redis.del(`mollifier:entries:${runId}`);
123143
}
@@ -126,8 +146,10 @@ export class MollifierBuffer {
126146
await this.redis.requeueMollifierEntry(
127147
`mollifier:entries:${runId}`,
128148
"mollifier:envs",
149+
"mollifier:orgs",
129150
"mollifier:queue:",
130151
runId,
152+
"mollifier:org-envs:",
131153
);
132154
}
133155

@@ -169,17 +191,19 @@ export class MollifierBuffer {
169191

170192
#registerCommands() {
171193
this.redis.defineCommand("acceptMollifierEntry", {
172-
numberOfKeys: 3,
194+
numberOfKeys: 4,
173195
lua: `
174196
local entryKey = KEYS[1]
175197
local queueKey = KEYS[2]
176198
local envsKey = KEYS[3]
199+
local orgsKey = KEYS[4]
177200
local runId = ARGV[1]
178201
local envId = ARGV[2]
179202
local orgId = ARGV[3]
180203
local payload = ARGV[4]
181204
local createdAt = ARGV[5]
182205
local ttlSeconds = tonumber(ARGV[6])
206+
local orgEnvsPrefix = ARGV[7]
183207
184208
-- Idempotent: refuse if an entry for this runId already exists in any
185209
-- state. Caller-side dedup is also enforced via API idempotency keys,
@@ -199,19 +223,28 @@ export class MollifierBuffer {
199223
redis.call('EXPIRE', entryKey, ttlSeconds)
200224
redis.call('LPUSH', queueKey, runId)
201225
redis.call('SADD', envsKey, envId)
226+
-- Org-level membership: maintained atomically with the per-env
227+
-- queue/SET so the drainer can walk orgs → envs-for-org and
228+
-- schedule one env per org per tick. SADDs are idempotent if the
229+
-- org/env are already tracked.
230+
redis.call('SADD', orgsKey, orgId)
231+
redis.call('SADD', orgEnvsPrefix .. orgId, envId)
202232
return 1
203233
`,
204234
});
205235

206236
this.redis.defineCommand("requeueMollifierEntry", {
207-
numberOfKeys: 2,
237+
numberOfKeys: 3,
208238
lua: `
209239
local entryKey = KEYS[1]
210240
local envsKey = KEYS[2]
241+
local orgsKey = KEYS[3]
211242
local queuePrefix = ARGV[1]
212243
local runId = ARGV[2]
244+
local orgEnvsPrefix = ARGV[3]
213245
214246
local envId = redis.call('HGET', entryKey, 'envId')
247+
local orgId = redis.call('HGET', entryKey, 'orgId')
215248
if not envId then
216249
return 0
217250
end
@@ -221,20 +254,41 @@ export class MollifierBuffer {
221254
222255
redis.call('HSET', entryKey, 'status', 'QUEUED', 'attempts', tostring(nextAttempts))
223256
redis.call('LPUSH', queuePrefix .. envId, runId)
224-
-- Re-track the env: pop may have SREM'd it when the queue last
225-
-- emptied. SADD is idempotent if the env is still present.
257+
-- Re-track the env/org: pop may have SREM'd them when the queue
258+
-- last emptied. SADDs are idempotent if the values are still
259+
-- present.
226260
redis.call('SADD', envsKey, envId)
261+
if orgId then
262+
redis.call('SADD', orgsKey, orgId)
263+
redis.call('SADD', orgEnvsPrefix .. orgId, envId)
264+
end
227265
return 1
228266
`,
229267
});
230268

231269
this.redis.defineCommand("popAndMarkDraining", {
232-
numberOfKeys: 2,
270+
numberOfKeys: 3,
233271
lua: `
234272
local queueKey = KEYS[1]
235273
local envsKey = KEYS[2]
274+
local orgsKey = KEYS[3]
236275
local entryPrefix = ARGV[1]
237276
local envId = ARGV[2]
277+
local orgEnvsPrefix = ARGV[3]
278+
279+
-- Helper: prune org-level membership when an env's queue empties.
280+
-- Called only from the success branch where we know orgId from the
281+
-- popped entry. The no-runId branch below can't reach this because
282+
-- it has no entry to read orgId from — accept any stale org-envs
283+
-- entries that result (bounded by env count, recovered next accept).
284+
local function pruneOrgMembership(orgId)
285+
if not orgId then return end
286+
local orgEnvsKey = orgEnvsPrefix .. orgId
287+
redis.call('SREM', orgEnvsKey, envId)
288+
if redis.call('SCARD', orgEnvsKey) == 0 then
289+
redis.call('SREM', orgsKey, orgId)
290+
end
291+
end
238292
239293
-- Loop to skip orphan queue references — runIds whose entry hash has
240294
-- expired (TTL hit). HSET on a missing key would CREATE a partial
@@ -245,6 +299,9 @@ export class MollifierBuffer {
245299
if not runId then
246300
-- Queue is empty; opportunistically prune envs set. SREM is safe
247301
-- under concurrent LPUSH: accept SADDs the env back atomically.
302+
-- Org-level cleanup is skipped here because we don't know orgId
303+
-- without an entry to read from. Stale org-envs entries are
304+
-- bounded by env count and recovered on the next accept.
248305
if redis.call('LLEN', queueKey) == 0 then
249306
redis.call('SREM', envsKey, envId)
250307
end
@@ -254,17 +311,18 @@ export class MollifierBuffer {
254311
local entryKey = entryPrefix .. runId
255312
if redis.call('EXISTS', entryKey) == 1 then
256313
redis.call('HSET', entryKey, 'status', 'DRAINING')
257-
-- Prune envs set if this pop drained the queue. Atomic with the
258-
-- RPOP above — a concurrent accept AFTER this script will SADD
259-
-- the env back along with its LPUSH.
260-
if redis.call('LLEN', queueKey) == 0 then
261-
redis.call('SREM', envsKey, envId)
262-
end
263314
local raw = redis.call('HGETALL', entryKey)
264315
local result = {}
265316
for i = 1, #raw, 2 do
266317
result[raw[i]] = raw[i + 1]
267318
end
319+
-- Prune envs/orgs/org-envs sets if this pop drained the queue.
320+
-- Atomic with the RPOP above — a concurrent accept AFTER this
321+
-- script will SADD all three back along with its LPUSH.
322+
if redis.call('LLEN', queueKey) == 0 then
323+
redis.call('SREM', envsKey, envId)
324+
pruneOrgMembership(result['orgId'])
325+
end
268326
return cjson.encode(result)
269327
end
270328
-- Orphan queue reference: entry TTL expired while runId was queued.
@@ -321,26 +379,32 @@ declare module "@internal/redis" {
321379
entryKey: string,
322380
queueKey: string,
323381
envsKey: string,
382+
orgsKey: string,
324383
runId: string,
325384
envId: string,
326385
orgId: string,
327386
payload: string,
328387
createdAt: string,
329388
ttlSeconds: string,
389+
orgEnvsPrefix: string,
330390
callback?: Callback<number>,
331391
): Result<number, Context>;
332392
popAndMarkDraining(
333393
queueKey: string,
334394
envsKey: string,
395+
orgsKey: string,
335396
entryPrefix: string,
336397
envId: string,
398+
orgEnvsPrefix: string,
337399
callback?: Callback<string | null>,
338400
): Result<string | null, Context>;
339401
requeueMollifierEntry(
340402
entryKey: string,
341403
envsKey: string,
404+
orgsKey: string,
342405
queuePrefix: string,
343406
runId: string,
407+
orgEnvsPrefix: string,
344408
callback?: Callback<number>,
345409
): Result<number, Context>;
346410
failMollifierEntry(

0 commit comments

Comments
 (0)