Skip to content

Commit 29c5567

Browse files
committed
refactor(node-sdk): throttle flags cache refreshes
1 parent db2325e commit 29c5567

5 files changed

Lines changed: 294 additions & 67 deletions

File tree

packages/node-sdk/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,8 @@ Reflag maintains a cached set of flag definitions in the memory of your worker w
461461

462462
The SDK caches flag definitions in memory for fast performance. The first request to a new worker instance fetches definitions from Reflag's servers, while subsequent requests use the cache. When the cache expires, it's updated in the background. `ctx.waitUntil(reflag.flush())` ensures completion of the background work, so response times are not affected. This background work may increase wall-clock time for your worker, but it will not measurably increase billable CPU time on platforms like Cloudflare.
463463

464+
`EdgeClient` uses `flagsSyncMode: "in-request"`. Refresh fetch starts are throttled to at most once per second, and Cloudflare Workers cannot rely on delayed timer callbacks to run follow-up refreshes later. That means `refreshFlags()` calls made during the throttle window only mark a refresh as pending, so the call itself may resolve before the fetch runs. The queued refresh runs on the next request/access or `refreshFlags()` call after the throttle window expires.
465+
464466
## Error Handling
465467

466468
The SDK is designed to fail gracefully and never throw exceptions to the caller. Instead, it logs errors and provides

packages/node-sdk/src/client.ts

Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -503,35 +503,54 @@ export class ReflagClient {
503503
this._config.apiBaseUrl += "/";
504504
}
505505

506-
this.flagsCache = new FlagsCache(async (waitForVersion?: number) => {
507-
const path =
508-
waitForVersion === undefined
509-
? "features"
510-
: `features?waitForVersion=${encodeURIComponent(String(waitForVersion))}`;
511-
512-
const res = await this.get<FlagsAPIResponse>(
513-
path,
514-
this._config.flagsFetchRetries,
515-
);
516-
if (
517-
!isObject(res) ||
518-
!Array.isArray(res?.features) ||
519-
!Number.isInteger(res?.flagStateVersion) ||
520-
res.flagStateVersion < 0
521-
) {
522-
const fallbackDefinitions = await this.loadFlagsFallbackDefinitions();
523-
return fallbackDefinitions
524-
? { definitions: fallbackDefinitions }
525-
: undefined;
526-
}
506+
this.flagsCache = new FlagsCache(
507+
async (waitForVersion?: number) => {
508+
const path =
509+
waitForVersion === undefined
510+
? "features"
511+
: `features?waitForVersion=${encodeURIComponent(String(waitForVersion))}`;
512+
513+
const res = await this.get<FlagsAPIResponse>(
514+
path,
515+
this._config.flagsFetchRetries,
516+
);
517+
if (
518+
!isObject(res) ||
519+
!Array.isArray(res?.features) ||
520+
!Number.isInteger(res?.flagStateVersion) ||
521+
res.flagStateVersion < 0
522+
) {
523+
const fallbackDefinitions = await this.loadFlagsFallbackDefinitions();
524+
return fallbackDefinitions
525+
? { definitions: fallbackDefinitions }
526+
: undefined;
527+
}
527528

528-
void this.saveFlagsFallbackDefinitions(res.features);
529-
this.canLoadFlagsFallbackProvider = false;
530-
return {
531-
definitions: compileFlagDefinitions(res.features),
532-
flagStateVersion: res.flagStateVersion,
533-
};
534-
}, this.logger);
529+
void this.saveFlagsFallbackDefinitions(res.features);
530+
this.canLoadFlagsFallbackProvider = false;
531+
return {
532+
definitions: compileFlagDefinitions(res.features),
533+
flagStateVersion: res.flagStateVersion,
534+
};
535+
},
536+
{
537+
logger: this.logger,
538+
// Edge runtimes use `in-request` mode and cannot rely on delayed timer
539+
// callbacks to perform trailing refresh work after the current request.
540+
scheduleTrailingRefresh:
541+
flagsSyncMode === "in-request"
542+
? undefined
543+
: (delayMs, callback) => {
544+
const timer = setTimeout(callback, delayMs);
545+
timer.unref?.();
546+
return {
547+
cancel: () => {
548+
clearTimeout(timer);
549+
},
550+
};
551+
},
552+
},
553+
);
535554

536555
this.flagsSyncController = createFlagsSyncController({
537556
mode: this._config.flagsSyncMode,
@@ -903,7 +922,18 @@ export class ReflagClient {
903922
*
904923
* Note: updated flag rules take a few seconds to propagate to all servers.
905924
*
906-
* Concurrent calls are deduplicated — multiple calls share the same in-flight request.
925+
* Fetch starts are throttled to at most once per second.
926+
* Outside `in-request` mode, throttling only delays when the next fetch
927+
* begins: `refreshFlags(99)` still waits until the cache has applied flag
928+
* definitions from version `99` or newer before the promise resolves.
929+
* Concurrent callers are deduplicated and may share the same in-flight or
930+
* scheduled follow-up refresh.
931+
*
932+
* In `in-request` mode, delayed follow-up refreshes are not scheduled. On edge
933+
* runtimes like Cloudflare Workers, a call during the throttle window only
934+
* records pending refresh work, and the promise may resolve before that fetch
935+
* runs. The pending refresh is executed on the next request/access or
936+
* `refreshFlags()` call after the throttle window expires.
907937
*
908938
* @param waitForVersion - Optional flag state version to wait for before returning updated definitions.
909939
*/
@@ -1817,6 +1847,14 @@ export class BoundReflagClient {
18171847
/**
18181848
* Refreshes the flag definitions from the server.
18191849
*
1850+
* Fetch starts are throttled to at most once per second. Outside
1851+
* `in-request` mode, a call to `refreshFlags(waitForVersion)` still waits
1852+
* until the cache has applied that version or newer.
1853+
*
1854+
* In `in-request` mode, a throttled call only records pending refresh work.
1855+
* The fetch then runs on the next request/access or `refreshFlags()` call
1856+
* after the throttle window expires.
1857+
*
18201858
* @param waitForVersion - Optional flag state version to wait for before returning updated definitions.
18211859
*/
18221860
public async refreshFlags(waitForVersion?: number) {

packages/node-sdk/src/edgeClient.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ export type EdgeClientOptions = Omit<
1010
* The EdgeClient is ReflagClient pre-configured to be used in edge runtimes, like
1111
* Cloudflare Workers.
1212
*
13+
* It always uses `flagsSyncMode: "in-request"`. Refresh fetch starts are
14+
* throttled to at most once per second. A `refreshFlags()` call made during
15+
* that throttle window only records pending refresh work, so the call may
16+
* resolve before the fetch runs. That pending refresh is executed on the next
17+
* request/access or `refreshFlags()` call after the window expires.
18+
*
1319
* @example
1420
* ```ts
1521
* // set the REFLAG_SECRET_KEY environment variable or pass the secret key in the constructor

packages/node-sdk/src/flags-cache.ts

Lines changed: 137 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,65 @@
11
import type { CachedFlagDefinition, Logger } from "./types";
22

3+
const DEFAULT_MIN_REFRESH_INTERVAL_MS = 1000;
4+
35
type FlagsCacheRefreshResult = {
46
definitions: CachedFlagDefinition[];
57
flagStateVersion?: number;
68
};
79

10+
type FlagsCacheScheduledRefresh = {
11+
cancel: () => void;
12+
};
13+
14+
type FlagsCacheOptions = {
15+
logger?: Logger;
16+
minRefreshIntervalMs?: number;
17+
scheduleTrailingRefresh?: (
18+
delayMs: number,
19+
callback: () => void,
20+
) => FlagsCacheScheduledRefresh;
21+
};
22+
823
/**
924
* Stores the latest compiled flag definitions and coordinates refresh work.
1025
*
11-
* A single instance is shared across all sync modes. We allow at most one
12-
* in-flight fetch plus one pending follow-up refresh. Response
26+
* A single instance is shared across all sync modes. Response
1327
* `flagStateVersion`s decide whether fetched definitions replace the current
1428
* cache, so correctness does not depend on request ordering.
29+
*
30+
* Refreshes are throttled to at most one fetch start per interval. When the
31+
* runtime supports delayed work we schedule one trailing refresh; otherwise we
32+
* keep the pending refresh queued until the next caller touches the cache.
1533
*/
1634
export class FlagsCache {
1735
private value: CachedFlagDefinition[] | undefined;
1836
private flagStateVersion: number | undefined;
1937
private refreshPromise: Promise<void> | undefined;
38+
private scheduledRefresh: FlagsCacheScheduledRefresh | undefined;
39+
private scheduledRefreshPromise: Promise<void> | undefined;
40+
private resolveScheduledRefreshPromise: (() => void) | undefined;
2041
private lastRefreshAt: number | undefined;
42+
private lastRefreshStartedAt: number | undefined;
2143
private destroyed = false;
2244

2345
private pendingFullRefresh = false;
2446
private pendingWaitForVersion: number | undefined;
2547

48+
private readonly logger?: Logger;
49+
private readonly minRefreshIntervalMs: number;
50+
private readonly scheduleTrailingRefresh?: FlagsCacheOptions["scheduleTrailingRefresh"];
51+
2652
constructor(
2753
private readonly fetchFlags: (
2854
waitForVersion?: number,
2955
) => Promise<FlagsCacheRefreshResult | undefined>,
30-
private readonly logger?: Logger,
31-
) {}
56+
options: FlagsCacheOptions = {},
57+
) {
58+
this.logger = options.logger;
59+
this.minRefreshIntervalMs =
60+
options.minRefreshIntervalMs ?? DEFAULT_MIN_REFRESH_INTERVAL_MS;
61+
this.scheduleTrailingRefresh = options.scheduleTrailingRefresh;
62+
}
3263

3364
public get() {
3465
return this.value;
@@ -40,30 +71,34 @@ export class FlagsCache {
4071
}
4172

4273
this.queueRefresh(waitForVersion);
43-
return await this.ensureRefreshRunning();
74+
this.ensureRefreshStartedOrScheduled();
75+
await this.waitForQueuedWork();
76+
return this.value;
4477
}
4578

4679
public async waitRefresh() {
47-
await this.refreshPromise;
80+
await this.waitForQueuedWork();
4881
}
4982

5083
public destroy() {
5184
this.destroyed = true;
5285
this.value = undefined;
5386
this.flagStateVersion = undefined;
5487
this.refreshPromise = undefined;
88+
this.cancelScheduledRefresh();
5589
this.pendingFullRefresh = false;
5690
this.pendingWaitForVersion = undefined;
5791
this.lastRefreshAt = undefined;
92+
this.lastRefreshStartedAt = undefined;
5893
}
5994

6095
public getLastRefreshAt() {
6196
return this.lastRefreshAt;
6297
}
6398

64-
// Remember the newest refresh request we still need to run after the current
65-
// fetch finishes. Versioned refreshes win over plain refreshes because they
66-
// carry a concrete target we can wait for.
99+
// Remember the newest refresh request we still need to run. Versioned
100+
// refreshes win over plain refreshes because they carry a concrete target we
101+
// can wait for.
67102
private queueRefresh(waitForVersion?: number) {
68103
if (waitForVersion !== undefined) {
69104
if (
@@ -88,6 +123,10 @@ export class FlagsCache {
88123
this.pendingFullRefresh = true;
89124
}
90125

126+
private hasPendingRefresh() {
127+
return this.pendingWaitForVersion !== undefined || this.pendingFullRefresh;
128+
}
129+
91130
private takeNextRefreshRequest() {
92131
if (this.pendingWaitForVersion !== undefined) {
93132
const waitForVersion = this.pendingWaitForVersion;
@@ -129,39 +168,103 @@ export class FlagsCache {
129168
}
130169
}
131170

132-
private async runQueuedRefreshes() {
133-
while (!this.destroyed) {
134-
const request = this.takeNextRefreshRequest();
135-
if (!request) {
136-
return;
137-
}
171+
private getNextRefreshDelayMs() {
172+
if (
173+
this.lastRefreshStartedAt === undefined ||
174+
this.minRefreshIntervalMs <= 0
175+
) {
176+
return 0;
177+
}
138178

139-
const result = await this.fetchFlags(request.waitForVersion);
140-
if (this.destroyed || !result) {
141-
continue;
142-
}
179+
return Math.max(
180+
0,
181+
this.lastRefreshStartedAt + this.minRefreshIntervalMs - Date.now(),
182+
);
183+
}
184+
185+
private settleScheduledRefresh() {
186+
this.scheduledRefresh = undefined;
187+
this.scheduledRefreshPromise = undefined;
188+
this.resolveScheduledRefreshPromise?.();
189+
this.resolveScheduledRefreshPromise = undefined;
190+
}
143191

144-
this.clearSatisfiedPendingVersion(result.flagStateVersion);
192+
private cancelScheduledRefresh() {
193+
this.scheduledRefresh?.cancel();
194+
this.settleScheduledRefresh();
195+
}
145196

146-
if (!this.shouldApplyRefreshResult(result.flagStateVersion)) {
147-
continue;
148-
}
197+
private ensureScheduledRefresh(delayMs: number) {
198+
if (!this.scheduleTrailingRefresh || this.scheduledRefresh) {
199+
return;
200+
}
201+
202+
this.scheduledRefreshPromise = new Promise<void>((resolve) => {
203+
this.resolveScheduledRefreshPromise = resolve;
204+
});
149205

150-
this.value = result.definitions;
151-
this.flagStateVersion = result.flagStateVersion;
152-
this.lastRefreshAt = Date.now();
153-
this.logger?.info("refreshed flag definitions");
206+
this.scheduledRefresh = this.scheduleTrailingRefresh(delayMs, () => {
207+
this.settleScheduledRefresh();
208+
this.ensureRefreshStartedOrScheduled();
209+
});
210+
}
211+
212+
private ensureRefreshStartedOrScheduled() {
213+
if (this.destroyed || this.refreshPromise || !this.hasPendingRefresh()) {
214+
return;
154215
}
216+
217+
const delayMs = this.getNextRefreshDelayMs();
218+
if (delayMs > 0) {
219+
this.ensureScheduledRefresh(delayMs);
220+
return;
221+
}
222+
223+
this.cancelScheduledRefresh();
224+
this.startNextRefresh();
155225
}
156226

157-
private async ensureRefreshRunning() {
158-
if (!this.refreshPromise) {
159-
this.refreshPromise = this.runQueuedRefreshes().finally(() => {
160-
this.refreshPromise = undefined;
161-
});
227+
private startNextRefresh() {
228+
const request = this.takeNextRefreshRequest();
229+
if (!request) {
230+
return;
162231
}
163232

164-
await this.refreshPromise;
165-
return this.value;
233+
this.lastRefreshStartedAt = Date.now();
234+
this.refreshPromise = this.fetchAndApplyRefresh(
235+
request.waitForVersion,
236+
).finally(() => {
237+
this.refreshPromise = undefined;
238+
this.ensureRefreshStartedOrScheduled();
239+
});
240+
}
241+
242+
private async fetchAndApplyRefresh(waitForVersion?: number) {
243+
const result = await this.fetchFlags(waitForVersion);
244+
if (this.destroyed || !result) {
245+
return;
246+
}
247+
248+
this.clearSatisfiedPendingVersion(result.flagStateVersion);
249+
250+
if (!this.shouldApplyRefreshResult(result.flagStateVersion)) {
251+
return;
252+
}
253+
254+
this.value = result.definitions;
255+
this.flagStateVersion = result.flagStateVersion;
256+
this.lastRefreshAt = Date.now();
257+
this.logger?.info("refreshed flag definitions");
258+
}
259+
260+
private async waitForQueuedWork() {
261+
while (!this.destroyed) {
262+
const workPromise = this.refreshPromise ?? this.scheduledRefreshPromise;
263+
if (!workPromise) {
264+
return;
265+
}
266+
267+
await workPromise;
268+
}
166269
}
167270
}

0 commit comments

Comments
 (0)