Skip to content

Commit 63b6379

Browse files
committed
refactor(node-sdk): split flags cache from refreshers
1 parent 45a293d commit 63b6379

File tree

11 files changed

+405
-598
lines changed

11 files changed

+405
-598
lines changed

packages/node-sdk/examples/express/bucket.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const flagOverrides = (_: Context): FlagOverrides => {
3838
// like REFLAG_SECRET_KEY, REFLAG_FLAGS_ENABLED, REFLAG_FLAGS_DISABLED, etc.
3939
export default new ReflagClient({
4040
// Optional: Set a logger to log debug information, errors, etc.
41-
logger: console,
41+
// logger: console,
4242
flagOverrides, // Optional: Set flag overrides
43+
flagsSyncMode: "push",
4344
});

packages/node-sdk/src/client.ts

Lines changed: 46 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,10 @@ import {
1919
SDK_VERSION_HEADER_NAME,
2020
} from "./config";
2121
import fetchClient, { withRetry } from "./fetch-http-client";
22-
import {
23-
FlagUpdatesSSESubscription,
24-
openFlagUpdatesSSE,
25-
} from "./flag-updates-sse";
22+
import { FlagsCache } from "./flags-cache";
23+
import { createFlagsRefresher, FlagsRefresher } from "./flags-refresher";
2624
import { isFlagsFallbackSnapshot } from "./flagsFallbackProvider";
2725
import { subscribe as triggerOnExit } from "./flusher";
28-
import inRequestCache from "./inRequestCache";
29-
import periodicallyUpdatingCache from "./periodicallyUpdatingCache";
3026
import { newRateLimiter } from "./rate-limiter";
3127
import type {
3228
BootstrappedFlags,
@@ -47,7 +43,6 @@ import type {
4743
} from "./types";
4844
import {
4945
Attributes,
50-
Cache,
5146
ClientOptions,
5247
Context,
5348
ContextWithTracking,
@@ -180,8 +175,8 @@ function createFlagsFallbackSnapshot(
180175
};
181176
}
182177

183-
function createEnvFlagsUpdatedChannelName(envIdHash: string): string {
184-
return `flags_updated:${envIdHash}`;
178+
function createEnvFlagsStateChannelName(envIdHash: string): string {
179+
return `flags-state:${envIdHash}`;
185180
}
186181

187182
function formatFlagsFallbackAge(savedAt: string): string | undefined {
@@ -250,11 +245,10 @@ export class ReflagClient {
250245
fetchTimeoutMs: number;
251246
flagsSyncMode: FlagsSyncMode;
252247
flagsPushUrl: string;
253-
flagsPushChannel: string;
254248
};
255249
httpClient: HttpClient;
256250

257-
private flagsCache: Cache<CachedFlagDefinition[]>;
251+
private flagsCache: FlagsCache;
258252
private batchBuffer: BatchBuffer<BulkEvent>;
259253
private rateLimiter: ReturnType<typeof newRateLimiter>;
260254
private baseFlagOverrides: FlagOverridesFn = () => ({});
@@ -266,16 +260,14 @@ export class ReflagClient {
266260
*/
267261
public readonly logger: Logger;
268262

263+
private flagsRefresher: FlagsRefresher;
269264
private initializationFinished = false;
270265
private canLoadFlagsFallbackProvider = true;
271-
private pendingWaitForVersion: number | undefined;
272-
private pushRefreshPromise: Promise<void> | undefined;
273-
private flagsUpdatesSSESubscription: FlagUpdatesSSESubscription | undefined;
274266
private _initialize = once(async () => {
275267
const start = Date.now();
276268
if (!this._config.offline) {
269+
await this.flagsRefresher.start();
277270
await this.flagsCache.refresh();
278-
this.startFlagsUpdatesSSE();
279271
}
280272
this.logger.info(
281273
"Reflag initialized in " +
@@ -461,10 +453,22 @@ export class ReflagClient {
461453
const secretKeyHash = config.secretKey ? hashString(config.secretKey) : "";
462454

463455
ok(
464-
flagsSyncMode !== "push" || secretKeyHash.length > 0,
465-
"flagsSyncMode=\"push\" requires a valid secretKey",
456+
offline || flagsSyncMode !== "push" || secretKeyHash.length > 0,
457+
'flagsSyncMode="push" requires a valid secretKey',
466458
);
467459

460+
const pushUrl = new URL(options.flagsPushUrl ?? PUBSUB_SSE_URL);
461+
if (
462+
flagsSyncMode === "push" &&
463+
secretKeyHash.length > 0 &&
464+
!pushUrl.searchParams.has("channels")
465+
) {
466+
pushUrl.searchParams.set(
467+
"channels",
468+
createEnvFlagsStateChannelName(secretKeyHash.slice(0, 16)),
469+
);
470+
}
471+
468472
this._config = {
469473
offline,
470474
apiBaseUrl: (config.apiBaseUrl ?? config.host) || API_BASE_URL,
@@ -483,10 +487,7 @@ export class ReflagClient {
483487
flagsFetchRetries: options.flagsFetchRetries ?? 3,
484488
fetchTimeoutMs: options.fetchTimeoutMs ?? API_TIMEOUT_MS,
485489
flagsSyncMode,
486-
flagsPushUrl: options.flagsPushUrl ?? PUBSUB_SSE_URL,
487-
flagsPushChannel: createEnvFlagsUpdatedChannelName(
488-
secretKeyHash.slice(0, 16),
489-
),
490+
flagsPushUrl: pushUrl.toString(),
490491
};
491492
this.baseFlagOverrides = baseFlagOverrides;
492493

@@ -498,10 +499,7 @@ export class ReflagClient {
498499
this._config.apiBaseUrl += "/";
499500
}
500501

501-
const fetchFlags = async () => {
502-
const waitForVersion = this.pendingWaitForVersion;
503-
this.pendingWaitForVersion = undefined;
504-
502+
this.flagsCache = new FlagsCache(async (waitForVersion?: number) => {
505503
const path =
506504
waitForVersion === undefined
507505
? "features"
@@ -518,88 +516,18 @@ export class ReflagClient {
518516
void this.saveFlagsFallbackDefinitions(res.features);
519517
this.canLoadFlagsFallbackProvider = false;
520518
return compileFlagDefinitions(res.features);
521-
};
522-
523-
if (this._config.flagsSyncMode === "push") {
524-
this.flagsCache = inRequestCache<CachedFlagDefinition[]>(
525-
Number.MAX_SAFE_INTEGER,
526-
this.logger,
527-
fetchFlags,
528-
);
529-
} else if (this._config.flagsSyncMode === "in-request") {
530-
this.flagsCache = inRequestCache<CachedFlagDefinition[]>(
531-
this._config.refetchInterval,
532-
this.logger,
533-
fetchFlags,
534-
);
535-
} else {
536-
this.flagsCache = periodicallyUpdatingCache<CachedFlagDefinition[]>(
537-
this._config.refetchInterval,
538-
this.logger,
539-
fetchFlags,
540-
);
541-
}
542-
}
543-
544-
private startFlagsUpdatesSSE() {
545-
if (
546-
this._config.offline ||
547-
this._config.flagsSyncMode !== "push" ||
548-
this.flagsUpdatesSSESubscription
549-
) {
550-
return;
551-
}
552-
553-
const pushUrl = new URL(this._config.flagsPushUrl);
554-
if (!pushUrl.searchParams.has("channels")) {
555-
pushUrl.searchParams.set("channels", this._config.flagsPushChannel);
556-
}
519+
}, this.logger);
557520

558-
this.flagsUpdatesSSESubscription = openFlagUpdatesSSE({
559-
url: pushUrl.toString(),
521+
this.flagsRefresher = createFlagsRefresher({
522+
mode: this._config.flagsSyncMode,
523+
cache: this.flagsCache,
524+
intervalMs: this._config.refetchInterval,
525+
pushUrl: this._config.flagsPushUrl,
560526
headers: this._config.headers,
561527
logger: this.logger,
562-
onFlagStateVersion: (version) => {
563-
this.pendingWaitForVersion =
564-
this.pendingWaitForVersion === undefined
565-
? version
566-
: Math.max(this.pendingWaitForVersion, version);
567-
this.refreshFlagsToPendingVersion();
568-
},
569528
});
570529
}
571530

572-
private refreshFlagsToPendingVersion() {
573-
if (this.pushRefreshPromise) {
574-
return;
575-
}
576-
577-
this.pushRefreshPromise = (async () => {
578-
while (true) {
579-
const pendingAtStart = this.pendingWaitForVersion;
580-
if (pendingAtStart === undefined) {
581-
break;
582-
}
583-
584-
await this.flagsCache.refresh();
585-
586-
const pendingNow = this.pendingWaitForVersion;
587-
if (pendingNow === undefined || pendingNow <= pendingAtStart) {
588-
break;
589-
}
590-
}
591-
})()
592-
.catch((error) => {
593-
this.logger.warn("failed to refresh flags from push update", error);
594-
})
595-
.finally(() => {
596-
this.pushRefreshPromise = undefined;
597-
if (this.pendingWaitForVersion !== undefined) {
598-
this.refreshFlagsToPendingVersion();
599-
}
600-
});
601-
}
602-
603531
private async loadFlagsFallbackDefinitions() {
604532
if (!this.canLoadFlagsFallbackProvider) {
605533
return undefined;
@@ -949,7 +877,6 @@ export class ReflagClient {
949877

950878
await this.batchBuffer.flush();
951879
await this.flagsCache.waitRefresh();
952-
await this.pushRefreshPromise;
953880
}
954881

955882
/**
@@ -962,9 +889,15 @@ export class ReflagClient {
962889
* Note: updated flag rules take a few seconds to propagate to all servers.
963890
*
964891
* Concurrent calls are deduplicated — multiple calls share the same in-flight request.
892+
*
893+
* @param waitForVersion - Optional flag state version to wait for before returning updated definitions.
965894
*/
966-
public async refreshFlags() {
967-
await this.flagsCache.refresh();
895+
public async refreshFlags(waitForVersion?: number) {
896+
if (this._config.offline) {
897+
return;
898+
}
899+
900+
await this.flagsCache.refresh(waitForVersion);
968901
}
969902

970903
/**
@@ -976,9 +909,7 @@ export class ReflagClient {
976909
* multiple background processes from running simultaneously.
977910
*/
978911
public destroy() {
979-
this.flagsUpdatesSSESubscription?.close();
980-
this.flagsUpdatesSSESubscription = undefined;
981-
912+
this.flagsRefresher.destroy();
982913
this.flagsCache.destroy();
983914
this.batchBuffer.destroy();
984915
}
@@ -990,6 +921,9 @@ export class ReflagClient {
990921
* @returns The flags definitions.
991922
*/
992923
public getFlagDefinitions(): FlagDefinition[] {
924+
if (!this._config.offline) {
925+
this.flagsRefresher.onAccess?.();
926+
}
993927
const flags = this.flagsCache.get() || [];
994928
return flags.map((f) => ({
995929
key: f.key,
@@ -1446,6 +1380,7 @@ export class ReflagClient {
14461380
let flagDefinitions: CachedFlagDefinition[] = [];
14471381

14481382
if (!this._config.offline) {
1383+
this.flagsRefresher.onAccess?.();
14491384
const flagDefs = this.flagsCache.get();
14501385
if (!flagDefs) {
14511386
this.logger.warn(
@@ -1866,9 +1801,11 @@ export class BoundReflagClient {
18661801

18671802
/**
18681803
* Refreshes the flag definitions from the server.
1804+
*
1805+
* @param waitForVersion - Optional flag state version to wait for before returning updated definitions.
18691806
*/
1870-
public async refreshFlags() {
1871-
await this._client.refreshFlags();
1807+
public async refreshFlags(waitForVersion?: number) {
1808+
await this._client.refreshFlags(waitForVersion);
18721809
}
18731810
}
18741811

0 commit comments

Comments
 (0)