Skip to content

Commit ffecd3d

Browse files
committed
feat(node-sdk): add push sync mode for flag updates
1 parent fdce8b9 commit ffecd3d

File tree

8 files changed

+403
-12
lines changed

8 files changed

+403
-12
lines changed

packages/node-sdk/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,8 @@ current working directory.
565565
| `apiBaseUrl` | string | The base API URL for the Reflag servers. | REFLAG_API_BASE_URL |
566566
| `flagOverrides` | Record<string, boolean> | An object specifying flag overrides for testing or local development. See [examples/express/app.test.ts](https://github.com/reflagcom/javascript/tree/main/packages/node-sdk/examples/express/app.test.ts) for how to use `flagOverrides` in tests. | REFLAG_FLAGS_ENABLED, REFLAG_FLAGS_DISABLED |
567567
| `flagsFallbackProvider` | `FlagsFallbackProvider` | Optional provider used to load and save raw flag definitions for fallback startup when the initial live fetch fails. Available only through the constructor. Ignored in offline mode. | - |
568+
| `flagsSyncMode` | `"polling" \| "in-request" \| "push"` | Flag-definition sync mode. `polling` uses periodic background refresh, `in-request` refreshes stale flags during request handling, and `push` subscribes to live updates and refreshes with `waitForVersion`. Default: `"polling"`. | - |
569+
| `flagsPushUrl` | string | Push endpoint used when `flagsSyncMode: "push"`. Default: `https://pubsub.reflag.com/sse`. | - |
568570
| `configFile` | string | Load this config file from disk. Default: `reflag.config.json` | REFLAG_CONFIG_FILE |
569571

570572
> [!NOTE]

packages/node-sdk/src/client.ts

Lines changed: 110 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,17 @@ import {
1212
API_TIMEOUT_MS,
1313
FLAG_EVENT_RATE_LIMITER_WINDOW_SIZE_MS,
1414
FLAGS_REFETCH_MS,
15+
PUBSUB_SSE_URL,
1516
loadConfig,
1617
REFLAG_LOG_PREFIX,
1718
SDK_VERSION,
1819
SDK_VERSION_HEADER_NAME,
1920
} from "./config";
2021
import fetchClient, { withRetry } from "./fetch-http-client";
22+
import {
23+
FlagUpdatesSSESubscription,
24+
openFlagUpdatesSSE,
25+
} from "./flag-updates-sse";
2126
import { isFlagsFallbackSnapshot } from "./flagsFallbackProvider";
2227
import { subscribe as triggerOnExit } from "./flusher";
2328
import inRequestCache from "./inRequestCache";
@@ -26,7 +31,7 @@ import { newRateLimiter } from "./rate-limiter";
2631
import type {
2732
BootstrappedFlags,
2833
CachedFlagDefinition,
29-
CacheStrategy,
34+
FlagsSyncMode,
3035
EvaluatedFlagsAPIResponse,
3136
FlagAPIResponse,
3237
FlagDefinition,
@@ -239,7 +244,8 @@ export class ReflagClient {
239244
configFile?: string;
240245
flagsFetchRetries: number;
241246
fetchTimeoutMs: number;
242-
cacheStrategy: CacheStrategy;
247+
flagsSyncMode: FlagsSyncMode;
248+
flagsPushUrl: string;
243249
};
244250
httpClient: HttpClient;
245251

@@ -257,10 +263,14 @@ export class ReflagClient {
257263

258264
private initializationFinished = false;
259265
private canLoadFlagsFallbackProvider = true;
266+
private pendingWaitForVersion: number | undefined;
267+
private pushRefreshPromise: Promise<void> | undefined;
268+
private flagsUpdatesSSESubscription: FlagUpdatesSSESubscription | undefined;
260269
private _initialize = once(async () => {
261270
const start = Date.now();
262271
if (!this._config.offline) {
263272
await this.flagsCache.refresh();
273+
this.startFlagsUpdatesSSE();
264274
}
265275
this.logger.info(
266276
"Reflag initialized in " +
@@ -288,7 +298,7 @@ export class ReflagClient {
288298
* @param options.configFile - The path to the config file (optional).
289299
* @param options.flagsFetchRetries - Number of retries for fetching flags (optional, defaults to 3).
290300
* @param options.fetchTimeoutMs - Timeout for fetching flags (optional, defaults to 10000ms).
291-
* @param options.cacheStrategy - The cache strategy to use for the client (optional, defaults to "periodically-update").
301+
* @param options.flagsSyncMode - How flag definitions are synchronized (optional, defaults to "polling").
292302
*
293303
* @throws An error if the options are invalid.
294304
**/
@@ -351,6 +361,21 @@ export class ReflagClient {
351361
"fetchTimeoutMs must be a non-negative integer",
352362
);
353363

364+
ok(
365+
options.flagsSyncMode === undefined ||
366+
options.flagsSyncMode === "polling" ||
367+
options.flagsSyncMode === "in-request" ||
368+
options.flagsSyncMode === "push",
369+
'flagsSyncMode must be one of "polling", "in-request", or "push"',
370+
);
371+
372+
ok(
373+
options.flagsPushUrl === undefined ||
374+
(typeof options.flagsPushUrl === "string" &&
375+
options.flagsPushUrl.length > 0),
376+
"flagsPushUrl must be a non-empty string",
377+
);
378+
354379
if (!options.configFile) {
355380
options.configFile =
356381
(process.env.REFLAG_CONFIG_FILE ??
@@ -424,6 +449,10 @@ export class ReflagClient {
424449
logger: this.logger,
425450
});
426451

452+
const flagsSyncMode: FlagsSyncMode =
453+
options.flagsSyncMode ??
454+
(options.cacheStrategy === "in-request" ? "in-request" : "polling");
455+
427456
this._config = {
428457
offline,
429458
apiBaseUrl: (config.apiBaseUrl ?? config.host) || API_BASE_URL,
@@ -441,7 +470,8 @@ export class ReflagClient {
441470
flagOverrides: baseFlagOverrides,
442471
flagsFetchRetries: options.flagsFetchRetries ?? 3,
443472
fetchTimeoutMs: options.fetchTimeoutMs ?? API_TIMEOUT_MS,
444-
cacheStrategy: options.cacheStrategy ?? "periodically-update",
473+
flagsSyncMode,
474+
flagsPushUrl: options.flagsPushUrl ?? PUBSUB_SSE_URL,
445475
};
446476
this.baseFlagOverrides = baseFlagOverrides;
447477

@@ -454,8 +484,16 @@ export class ReflagClient {
454484
}
455485

456486
const fetchFlags = async () => {
487+
const waitForVersion = this.pendingWaitForVersion;
488+
this.pendingWaitForVersion = undefined;
489+
490+
const path =
491+
waitForVersion === undefined
492+
? "features"
493+
: `features?waitForVersion=${encodeURIComponent(String(waitForVersion))}`;
494+
457495
const res = await this.get<FlagsAPIResponse>(
458-
"features",
496+
path,
459497
this._config.flagsFetchRetries,
460498
);
461499
if (!isObject(res) || !Array.isArray(res?.features)) {
@@ -467,21 +505,81 @@ export class ReflagClient {
467505
return compileFlagDefinitions(res.features);
468506
};
469507

470-
if (this._config.cacheStrategy === "periodically-update") {
471-
this.flagsCache = periodicallyUpdatingCache<CachedFlagDefinition[]>(
508+
if (this._config.flagsSyncMode === "push") {
509+
this.flagsCache = inRequestCache<CachedFlagDefinition[]>(
510+
Number.MAX_SAFE_INTEGER,
511+
this.logger,
512+
fetchFlags,
513+
);
514+
} else if (this._config.flagsSyncMode === "in-request") {
515+
this.flagsCache = inRequestCache<CachedFlagDefinition[]>(
472516
this._config.refetchInterval,
473517
this.logger,
474518
fetchFlags,
475519
);
476520
} else {
477-
this.flagsCache = inRequestCache<CachedFlagDefinition[]>(
521+
this.flagsCache = periodicallyUpdatingCache<CachedFlagDefinition[]>(
478522
this._config.refetchInterval,
479523
this.logger,
480524
fetchFlags,
481525
);
482526
}
483527
}
484528

529+
private startFlagsUpdatesSSE() {
530+
if (
531+
this._config.offline ||
532+
this._config.flagsSyncMode !== "push" ||
533+
this.flagsUpdatesSSESubscription
534+
) {
535+
return;
536+
}
537+
538+
this.flagsUpdatesSSESubscription = openFlagUpdatesSSE({
539+
url: this._config.flagsPushUrl,
540+
headers: this._config.headers,
541+
logger: this.logger,
542+
onFlagStateVersion: (version) => {
543+
this.pendingWaitForVersion =
544+
this.pendingWaitForVersion === undefined
545+
? version
546+
: Math.max(this.pendingWaitForVersion, version);
547+
this.refreshFlagsToPendingVersion();
548+
},
549+
});
550+
}
551+
552+
private refreshFlagsToPendingVersion() {
553+
if (this.pushRefreshPromise) {
554+
return;
555+
}
556+
557+
this.pushRefreshPromise = (async () => {
558+
while (true) {
559+
const pendingAtStart = this.pendingWaitForVersion;
560+
if (pendingAtStart === undefined) {
561+
break;
562+
}
563+
564+
await this.flagsCache.refresh();
565+
566+
const pendingNow = this.pendingWaitForVersion;
567+
if (pendingNow === undefined || pendingNow <= pendingAtStart) {
568+
break;
569+
}
570+
}
571+
})()
572+
.catch((error) => {
573+
this.logger.warn("failed to refresh flags from push update", error);
574+
})
575+
.finally(() => {
576+
this.pushRefreshPromise = undefined;
577+
if (this.pendingWaitForVersion !== undefined) {
578+
this.refreshFlagsToPendingVersion();
579+
}
580+
});
581+
}
582+
485583
private async loadFlagsFallbackDefinitions() {
486584
if (!this.canLoadFlagsFallbackProvider) {
487585
return undefined;
@@ -831,6 +929,7 @@ export class ReflagClient {
831929

832930
await this.batchBuffer.flush();
833931
await this.flagsCache.waitRefresh();
932+
await this.pushRefreshPromise;
834933
}
835934

836935
/**
@@ -857,6 +956,9 @@ export class ReflagClient {
857956
* multiple background processes from running simultaneously.
858957
*/
859958
public destroy() {
959+
this.flagsUpdatesSSESubscription?.close();
960+
this.flagsUpdatesSSESubscription = undefined;
961+
860962
this.flagsCache.destroy();
861963
this.batchBuffer.destroy();
862964
}

packages/node-sdk/src/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { LOG_LEVELS } from "./types";
66
import { isObject, ok } from "./utils";
77

88
export const API_BASE_URL = "https://front.reflag.com";
9+
export const PUBSUB_SSE_URL = "https://pubsub.reflag.com/sse";
910
export const SDK_VERSION_HEADER_NAME = "reflag-sdk-version";
1011
export const SDK_VERSION = `node-sdk/${version}`;
1112
export const API_TIMEOUT_MS = 10000;

packages/node-sdk/src/edgeClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { ClientOptions } from "./types";
33

44
export type EdgeClientOptions = Omit<
55
ClientOptions,
6-
"cacheStrategy" | "flushIntervalMs" | "batchOptions"
6+
"flagsSyncMode" | "cacheStrategy" | "flushIntervalMs" | "batchOptions"
77
>;
88

99
/**
@@ -28,7 +28,7 @@ export class EdgeClient extends ReflagClient {
2828
constructor(options: EdgeClientOptions = {}) {
2929
const opts = {
3030
...options,
31-
cacheStrategy: "in-request" as const,
31+
flagsSyncMode: "in-request" as const,
3232
batchOptions: {
3333
intervalMs: 0,
3434
},

0 commit comments

Comments
 (0)