From a6daf493f142ade4f8696b23c2bfe77185499fc2 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 6 May 2026 11:17:15 +0400 Subject: [PATCH 1/4] feat: add first-class bearer token credential support --- packages/benchmark/package.json | 2 +- packages/db-client/package.json | 2 +- packages/db-client/src/Client/http.ts | 31 +++-- packages/db-client/src/Client/index.ts | 125 ++++++++++++++---- .../src/Client/parseConnectionString.ts | 4 +- packages/db-client/src/index.ts | 1 + packages/db-client/src/streams/readAll.ts | 41 +++--- packages/db-client/src/streams/readStream.ts | 81 +++++------- packages/db-client/src/types/index.ts | 32 ++++- packages/db-client/src/utils/credentials.ts | 41 ++++++ packages/opentelemetry/src/attributes.ts | 1 + packages/opentelemetry/src/instrumentation.ts | 43 ++++-- packages/opentelemetry/src/utils.ts | 31 ++++- .../src/connection/defaultCredentials.test.ts | 92 ++++++++++++- .../reconnect/no-reconnection.test.ts | 5 +- .../src/opentelemetry/instrumentation.test.ts | 11 +- packages/test/src/utils/index.ts | 4 +- yarn.lock | 70 +++++----- 18 files changed, 457 insertions(+), 160 deletions(-) create mode 100644 packages/db-client/src/utils/credentials.ts diff --git a/packages/benchmark/package.json b/packages/benchmark/package.json index eaafb869..bd304fd3 100644 --- a/packages/benchmark/package.json +++ b/packages/benchmark/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@eventstore/db-client": "^6.2.1", - "@kurrent/bridge": "^0.1.5", + "@kurrent/bridge": "^0.2.0", "tinybench": "^3.1.1" }, "devDependencies": { diff --git a/packages/db-client/package.json b/packages/db-client/package.json index 0a4c9f17..2b793869 100644 --- a/packages/db-client/package.json +++ b/packages/db-client/package.json @@ -46,7 +46,7 @@ }, "dependencies": { "@grpc/grpc-js": "^1.14.3", - "@kurrent/bridge": "^0.1.5", + "@kurrent/bridge": "^0.2.0", "@types/debug": "^4.1.12", "@types/google-protobuf": "^3.15.12", "@types/node": "^22.10.2", diff --git a/packages/db-client/src/Client/http.ts b/packages/db-client/src/Client/http.ts index 78c13a77..059a1b84 100644 --- a/packages/db-client/src/Client/http.ts +++ b/packages/db-client/src/Client/http.ts @@ -32,20 +32,29 @@ type HTTPMethod = | "TRACE" | "PATCH"; +/** + * Resolve the verbatim `Authorization` header for an HTTP fallback request, + * given an optional per-call credential. Returning `undefined` skips the + * header entirely (e.g. insecure mode). + */ +export type ResolveAuthorizationHeader = ( + perCallCredentials?: Credentials +) => Promise; + export class HTTP { #client!: Client; #channelCredentials!: ChannelCredentialOptions; - #defaultUserCredentials?: Credentials; + #resolveAuthorizationHeader: ResolveAuthorizationHeader; #insecure: boolean; constructor( client: Client, channelCredentials: ChannelCredentialOptions, - defaultUserCredentials?: Credentials + resolveAuthorizationHeader: ResolveAuthorizationHeader ) { this.#client = client; this.#channelCredentials = channelCredentials; - this.#defaultUserCredentials = defaultUserCredentials; + this.#resolveAuthorizationHeader = resolveAuthorizationHeader; this.#insecure = !!channelCredentials.insecure; } @@ -64,18 +73,19 @@ export class HTTP { url: URL, options: HTTPRequestOptions, body?: string - ) => - new Promise((resolve, reject) => { + ): Promise => { + const authorization = await this.#resolveAuthorizationHeader( + options.credentials + ); + + return new Promise((resolve, reject) => { const headers: Record = { "content-type": "application/json", ...(options.headers ?? {}), }; - const credentials = options.credentials ?? this.#defaultUserCredentials; - if (!this.#insecure && credentials) { - headers["Authorization"] = `Basic ${Buffer.from( - `${credentials.username}:${credentials.password}` - ).toString("base64")}`; + if (authorization) { + headers["Authorization"] = authorization; } const ca = this.#channelCredentials.rootCertificate @@ -152,6 +162,7 @@ export class HTTP { req.end(); }); + }; private createURL = async ( pathname: string, diff --git a/packages/db-client/src/Client/index.ts b/packages/db-client/src/Client/index.ts index 6170e36a..aaa6bc9c 100644 --- a/packages/db-client/src/Client/index.ts +++ b/packages/db-client/src/Client/index.ts @@ -27,9 +27,12 @@ import type { NodePreference, GRPCClientConstructor, EndPoint, + BasicCredentials, Credentials, BaseOptions, + CredentialsProvider, } from "../types"; +import { toAuthorizationHeader } from "../utils/credentials"; import { CancelledError, convertToCommandError, @@ -147,7 +150,8 @@ export class Client { #keepAliveTimeout: number; #defaultDeadline: number; - #defaultCredentials?: Credentials; + #defaultCredentials?: BasicCredentials; + #credentialsProvider?: CredentialsProvider; #nextChannelSettings?: NextChannelSettings; #channel?: Promise; @@ -299,19 +303,19 @@ export class Client { rustClient: bridge.RustClient, connectionSettings: DNSClusterOptions, channelCredentials?: ChannelCredentialOptions, - defaultUserCredentials?: Credentials + defaultUserCredentials?: BasicCredentials ); protected constructor( rustClient: bridge.RustClient, connectionSettings: GossipClusterOptions, channelCredentials?: ChannelCredentialOptions, - defaultUserCredentials?: Credentials + defaultUserCredentials?: BasicCredentials ); protected constructor( rustClient: bridge.RustClient, connectionSettings: SingleNodeOptions, channelCredentials?: ChannelCredentialOptions, - defaultUserCredentials?: Credentials + defaultUserCredentials?: BasicCredentials ); protected constructor( rustClient: bridge.RustClient, @@ -324,7 +328,7 @@ export class Client { ...connectionSettings }: ConnectionSettings, channelCredentials: ChannelCredentialOptions = { insecure: false }, - defaultUserCredentials?: Credentials + defaultUserCredentials?: BasicCredentials ) { if (keepAliveInterval < -1) { throw new Error( @@ -359,7 +363,9 @@ export class Client { this.#insecure = !!channelCredentials.insecure; this.#defaultCredentials = defaultUserCredentials; this.#connectionName = connectionName; - this.#http = new HTTP(this, channelCredentials, defaultUserCredentials); + this.#http = new HTTP(this, channelCredentials, (perCall) => + this.resolveAuthorizationHeader(perCall) + ); if (this.#insecure) { debug.connection("Using insecure channel"); @@ -387,6 +393,25 @@ export class Client { return this.#connectionName; } + /** + * The {@link CredentialsProvider} currently in effect, if any. Read-only. + * Use {@link setCredentialsProvider} to change it. + */ + public get credentialsProvider(): CredentialsProvider | undefined { + return this.#credentialsProvider; + } + + /** + * Set or clear the {@link CredentialsProvider} invoked before every request. + * Per-call `credentials` override the provider. Otherwise the static + * `defaultCredentials` are used. + */ + public setCredentialsProvider( + provider: CredentialsProvider | undefined + ): void { + this.#credentialsProvider = provider; + } + // Internal access to grpc client. private getGRPCClient = async ( Client: GRPCClientConstructor, @@ -615,34 +640,70 @@ export class Client { } }; - private createCredentialsMetadataGenerator = - ({ - username, - password, - }: Credentials): Parameters< - typeof grpcCredentials.createFromMetadataGenerator - >[0] => + private createMetadataGenerator = + ( + resolveCredentials: () => Credentials | Promise + ): Parameters[0] => (_, cb) => { - const metadata = new Metadata(); - if (this.#insecure) { debug.connection( "Credentials are unsupported in insecure mode, and will be ignored." ); - } else { - const auth = Buffer.from(`${username}:${password}`).toString("base64"); - metadata.add("authorization", `Basic ${auth}`); + return cb(null, new Metadata()); } - return cb(null, metadata); + Promise.resolve() + .then(resolveCredentials) + .then((credentials) => { + const metadata = new Metadata(); + metadata.add("authorization", toAuthorizationHeader(credentials)); + cb(null, metadata); + }) + .catch((err) => cb(err as Error)); }; + /** + * Resolve the `Authorization` header value for a given request. Captures + * the provider/default state at call time so in-flight requests are + * unaffected by concurrent {@link setCredentialsProvider} swaps. + */ + private resolveAuthorizationHeader = async ( + perCallCredentials?: Credentials + ): Promise => { + if (this.#insecure) return undefined; + + if (perCallCredentials) { + return toAuthorizationHeader(perCallCredentials); + } + + if (this.#credentialsProvider) { + const credentials = await this.#credentialsProvider(); + return toAuthorizationHeader(credentials); + } + + if (this.#defaultCredentials) { + return toAuthorizationHeader(this.#defaultCredentials); + } + + return undefined; + }; + + /** + * Resolve the {@link Credentials} value to forward to the Rust bridge for + * a given request. Per-call credentials win. Otherwise we invoke the + * configured {@link CredentialsProvider}. Falls back to `undefined` so the + * bridge uses the credentials baked into its connection string. + */ + protected resolveBridgeCredentials = async ( + perCallCredentials?: Credentials + ): Promise => { + if (perCallCredentials) return perCallCredentials; + if (this.#credentialsProvider) return await this.#credentialsProvider(); + return undefined; + }; + protected callArguments = ( - { - credentials = this.#defaultCredentials, - requiresLeader, - deadline, - }: BaseOptions, + { credentials, requiresLeader, deadline }: BaseOptions, callOptions?: CallOptions ): [Metadata, CallOptions] => { const metadata = new Metadata(); @@ -654,9 +715,23 @@ export class Client { metadata.add("requires-leader", "true"); } + // Per-call credentials take precedence. Otherwise prefer a refresh-aware + // provider over the static default so token refresh actually runs per RPC. + let resolveCredentials: + | (() => Credentials | Promise) + | undefined; if (credentials) { + resolveCredentials = () => credentials; + } else if (this.#credentialsProvider) { + resolveCredentials = this.#credentialsProvider; + } else if (this.#defaultCredentials) { + const defaults = this.#defaultCredentials; + resolveCredentials = () => defaults; + } + + if (resolveCredentials) { options.credentials = grpcCallCredentials.createFromMetadataGenerator( - this.createCredentialsMetadataGenerator(credentials) + this.createMetadataGenerator(resolveCredentials) ); } diff --git a/packages/db-client/src/Client/parseConnectionString.ts b/packages/db-client/src/Client/parseConnectionString.ts index cacacaf9..c0f53ed5 100644 --- a/packages/db-client/src/Client/parseConnectionString.ts +++ b/packages/db-client/src/Client/parseConnectionString.ts @@ -1,5 +1,5 @@ import { RANDOM, FOLLOWER, LEADER, READ_ONLY_REPLICA } from "../constants"; -import type { Credentials, EndPoint, NodePreference } from "../types"; +import type { BasicCredentials, EndPoint, NodePreference } from "../types"; import { debug } from "../utils"; export interface QueryOptions { @@ -21,7 +21,7 @@ export interface QueryOptions { export interface ConnectionOptions extends QueryOptions { dnsDiscover: boolean; - defaultCredentials?: Credentials; + defaultCredentials?: BasicCredentials; hosts: EndPoint[]; } diff --git a/packages/db-client/src/index.ts b/packages/db-client/src/index.ts index 6821dcd6..b95704fb 100644 --- a/packages/db-client/src/index.ts +++ b/packages/db-client/src/index.ts @@ -9,6 +9,7 @@ export { SingleNodeOptions, ChannelCredentialOptions, } from "./Client"; +export { isBasicCredentials, isBearerCredentials } from "./utils/credentials"; export * from "./events"; export * from "./constants"; export * from "./types"; diff --git a/packages/db-client/src/streams/readAll.ts b/packages/db-client/src/streams/readAll.ts index 62468673..91dc7506 100644 --- a/packages/db-client/src/streams/readAll.ts +++ b/packages/db-client/src/streams/readAll.ts @@ -8,7 +8,6 @@ import type { import { FORWARDS, START } from "../constants"; import { Client } from "../Client"; -import * as bridge from "@kurrent/bridge"; import { convertRustEvent } from "../utils/convertRustEvent"; import { convertBridgeError } from "../utils/convertBridgeError"; @@ -64,26 +63,28 @@ Client.prototype.readAll = function ( ...baseOptions }: ReadAllOptions = {} ): AsyncIterableIterator { - const options: bridge.RustReadAllOptions = { - maxCount: BigInt(maxCount), - fromPosition, - resolvesLink: resolveLinkTos, - direction, - requiresLeader: baseOptions.requiresLeader ?? false, - credentials: baseOptions.credentials, - filter: baseOptions.filter, - }; + const convert = async function* ( + this: Client + ): AsyncIterableIterator { + const credentials = await this.resolveBridgeCredentials( + baseOptions.credentials + ); - let stream; - try { - stream = this.rustClient.readAll(options); - } catch (error) { - throw convertBridgeError(error); - } + let stream; + try { + stream = this.rustClient.readAll({ + credentials, + direction, + fromPosition, + filter: baseOptions.filter, + maxCount: BigInt(maxCount), + requiresLeader: baseOptions.requiresLeader ?? false, + resolvesLink: resolveLinkTos, + }); + } catch (error) { + throw convertBridgeError(error); + } - const convert = async function* ( - stream: AsyncIterable - ) { try { for await (const events of stream) { for (const event of events) { @@ -95,5 +96,5 @@ Client.prototype.readAll = function ( } }; - return convert(stream); + return convert.call(this); }; diff --git a/packages/db-client/src/streams/readStream.ts b/packages/db-client/src/streams/readStream.ts index eb5cb854..beb1b674 100644 --- a/packages/db-client/src/streams/readStream.ts +++ b/packages/db-client/src/streams/readStream.ts @@ -1,5 +1,3 @@ -import * as bridge from "@kurrent/bridge"; - import { Client } from "../Client"; import { FORWARDS, START, END } from "../constants"; import type { @@ -65,55 +63,48 @@ Client.prototype.readStream = function < ...baseOptions }: ReadStreamOptions = {} ): AsyncIterableIterator> { - const options: bridge.RustReadStreamOptions = { - maxCount: BigInt(maxCount), - fromRevision, - resolvesLink: resolveLinkTos, - direction, - requiresLeader: baseOptions.requiresLeader ?? false, - credentials: baseOptions.credentials, - }; - switch (fromRevision) { - case START: { - break; - } + if ( + fromRevision !== START && + fromRevision !== END && + typeof fromRevision === "bigint" + ) { + const lowerBound = BigInt("0"); + const upperBound = BigInt("0xffffffffffffffff"); - case END: { - break; + if (fromRevision < lowerBound) { + throw new InvalidArgumentError( + `fromRevision value must be a non-negative integer. Value Received: ${fromRevision}` + ); } - default: { - const lowerBound = BigInt("0"); - const upperBound = BigInt("0xffffffffffffffff"); - - if (fromRevision < lowerBound) { - throw new InvalidArgumentError( - `fromRevision value must be a non-negative integer. Value Received: ${fromRevision}` - ); - } - - if (fromRevision > upperBound) { - throw new InvalidArgumentError( - `fromRevision value must be a non-negative integer, range from 0 to 18446744073709551615. Value Received: ${fromRevision}` - ); - } - - options.fromRevision = fromRevision; - - break; + if (fromRevision > upperBound) { + throw new InvalidArgumentError( + `fromRevision value must be a non-negative integer, range from 0 to 18446744073709551615. Value Received: ${fromRevision}` + ); } } - let stream; - try { - stream = this.rustClient.readStream(streamName, options); - } catch (error) { - throw convertBridgeError(error, streamName); - } - const convert = async function* ( - stream: AsyncIterable - ) { + this: Client + ): AsyncIterableIterator> { + const credentials = await this.resolveBridgeCredentials( + baseOptions.credentials + ); + + let stream; + try { + stream = this.rustClient.readStream(streamName, { + credentials, + direction, + fromRevision, + maxCount: BigInt(maxCount), + requiresLeader: baseOptions.requiresLeader ?? false, + resolvesLink: resolveLinkTos, + }); + } catch (error) { + throw convertBridgeError(error, streamName); + } + try { for await (const events of stream) { for (const event of events) { @@ -125,5 +116,5 @@ Client.prototype.readStream = function < } }; - return convert(stream); + return convert.call(this); }; diff --git a/packages/db-client/src/types/index.ts b/packages/db-client/src/types/index.ts index a9696d6c..35d43a34 100644 --- a/packages/db-client/src/types/index.ts +++ b/packages/db-client/src/types/index.ts @@ -21,7 +21,8 @@ import type * as constants from "../constants"; export interface BaseOptions { /** - * Overwrite the default credentials. + * Overwrite the default credentials. Accepts either basic + * username/password credentials or a bearer token. */ credentials?: Credentials; /** @@ -328,11 +329,38 @@ export interface PrefixesFilter extends FilterBase { export type Filter = RegexFilter | PrefixesFilter; -export interface Credentials { +/** + * Username/password credentials used to authenticate against KurrentDB, + * rendered as an HTTP Basic `Authorization` header. + */ +export interface BasicCredentials { username: string; password: string; } +/** + * Bearer-token credentials used to authenticate against KurrentDB, rendered + * as an HTTP Bearer `Authorization` header. Bearer tokens are + * programmatic-only and cannot be supplied via a connection string. + */ +export interface BearerCredentials { + bearerToken: string; +} + +/** + * Credential shape accepted by the client. Either basic username/password + * or a bearer token. + */ +export type Credentials = BasicCredentials | BearerCredentials; + +/** + * Callback invoked before every RPC to obtain fresh credentials. Use this for + * refresh-aware token sources such as Azure Entra or OIDC. The provider runs + * once per outbound request, so consumers can return a cached token until + * expiry and rotate transparently. + */ +export type CredentialsProvider = () => Credentials | Promise; + export interface Certificate { userCertFile: Buffer; userKeyFile: Buffer; diff --git a/packages/db-client/src/utils/credentials.ts b/packages/db-client/src/utils/credentials.ts new file mode 100644 index 00000000..550a58c1 --- /dev/null +++ b/packages/db-client/src/utils/credentials.ts @@ -0,0 +1,41 @@ +import type { + BasicCredentials, + BearerCredentials, + Credentials, +} from "../types"; + +/** + * Type predicate narrowing {@link Credentials} to {@link BasicCredentials} + * (username/password). Prefer this over inline `"username" in credentials` + * checks so the discrimination lives in one place. + */ +export const isBasicCredentials = ( + credentials?: Credentials +): credentials is BasicCredentials => + credentials !== undefined && "username" in credentials; + +/** + * Type predicate narrowing {@link Credentials} to {@link BearerCredentials}. + */ +export const isBearerCredentials = ( + credentials?: Credentials +): credentials is BearerCredentials => + credentials !== undefined && "bearerToken" in credentials; + +/** + * Render a {@link Credentials} value as the verbatim value of an + * `Authorization` HTTP header (`"Basic ..."` or `"Bearer ..."`). + * + * Shared between the gRPC metadata generator and the HTTP fallback so the two + * paths stay in lockstep. + */ +export const toAuthorizationHeader = (credentials: Credentials): string => { + if (isBearerCredentials(credentials)) { + return `Bearer ${credentials.bearerToken}`; + } + + const auth = Buffer.from( + `${credentials.username}:${credentials.password}` + ).toString("base64"); + return `Basic ${auth}`; +}; diff --git a/packages/opentelemetry/src/attributes.ts b/packages/opentelemetry/src/attributes.ts index a335a80a..4c5a2e59 100644 --- a/packages/opentelemetry/src/attributes.ts +++ b/packages/opentelemetry/src/attributes.ts @@ -19,4 +19,5 @@ export const KurrentAttributes = { KURRENT_DB_SUBSCRIPTION_ID: `${kurrentdb}.subscription.id`, KURRENT_DB_EVENT_ID: `${kurrentdb}.event.id`, KURRENT_DB_EVENT_TYPE: `${kurrentdb}.event.type`, + KURRENT_DB_AUTH_KIND: `${kurrentdb}.auth.kind`, }; diff --git a/packages/opentelemetry/src/instrumentation.ts b/packages/opentelemetry/src/instrumentation.ts index 12b79531..55f7a0d6 100644 --- a/packages/opentelemetry/src/instrumentation.ts +++ b/packages/opentelemetry/src/instrumentation.ts @@ -46,7 +46,12 @@ import type { PersistentSubscribeParameters, SubscribeParameters, } from "./types"; -import { hasConvertGrpcEventMethod, isJSONEventData } from "./utils"; +import { + describeAuth, + hasConvertGrpcEventMethod, + isJSONEventData, + type AuthContext, +} from "./utils"; const TRACE_ID = "$traceId" as any; const SPAN_ID = "$spanId" as any; @@ -170,9 +175,15 @@ export class Instrumentation extends InstrumentationBase { [KurrentAttributes.DATABASE_OPERATION]: operation, }; - if (options?.credentials) { - attributes[KurrentAttributes.DATABASE_USER] = - options.credentials.username; + const auth = describeAuth( + options?.credentials, + Boolean(this.credentialsProvider) + ); + if (auth.username !== undefined) { + attributes[KurrentAttributes.DATABASE_USER] = auth.username; + } + if (auth.kind !== undefined) { + attributes[KurrentAttributes.KURRENT_DB_AUTH_KIND] = auth.kind; } const span = tracer.startSpan(KurrentAttributes.STREAM_APPEND, { @@ -344,7 +355,7 @@ export class Instrumentation extends InstrumentationBase { | PersistentSubscriptionImpl, uri: string, operation: string, - options: SubscribeToStreamOptions | SubscribeToAllOptions | undefined, + authContext: AuthContext, tracer: Tracer ) { if (!hasConvertGrpcEventMethod(subscription)) return; @@ -378,9 +389,15 @@ export class Instrumentation extends InstrumentationBase { [KurrentAttributes.SERVER_PORT]: port, [KurrentAttributes.DATABASE_SYSTEM]: INSTRUMENTATION_NAME, [KurrentAttributes.DATABASE_OPERATION]: operation, - [KurrentAttributes.DATABASE_USER]: options?.credentials?.username, }; + if (authContext.username !== undefined) { + attributes[KurrentAttributes.DATABASE_USER] = authContext.username; + } + if (authContext.kind !== undefined) { + attributes[KurrentAttributes.KURRENT_DB_AUTH_KIND] = authContext.kind; + } + const span = tracer.startSpan( spanName, { @@ -431,13 +448,18 @@ export class Instrumentation extends InstrumentationBase { args ); + const authContext = describeAuth( + options?.credentials, + Boolean(this.credentialsProvider) + ); + this.resolveUri().then((uri) => Instrumentation.applySubscriptionInstrumentation( KurrentAttributes.STREAM_SUBSCRIBE, subscription, uri, operation, - options, + authContext, tracer ) ); @@ -478,13 +500,18 @@ export class Instrumentation extends InstrumentationBase { args ); + const authContext = describeAuth( + options?.credentials, + Boolean(this.credentialsProvider) + ); + this.resolveUri().then((uri) => Instrumentation.applySubscriptionInstrumentation( KurrentAttributes.STREAM_SUBSCRIBE, subscription, uri, operation, - options, + authContext, tracer ) ); diff --git a/packages/opentelemetry/src/utils.ts b/packages/opentelemetry/src/utils.ts index 11013428..db24a6cb 100644 --- a/packages/opentelemetry/src/utils.ts +++ b/packages/opentelemetry/src/utils.ts @@ -1,6 +1,11 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import type { EventData, JSONEventData } from "@kurrent/kurrentdb-client"; +import { + isBasicCredentials, + type Credentials, + type EventData, + type JSONEventData, +} from "@kurrent/kurrentdb-client"; export function hasConvertGrpcEventMethod( obj: any @@ -12,3 +17,27 @@ export function hasConvertGrpcEventMethod( export function isJSONEventData(event: EventData): event is JSONEventData { return event.contentType === "application/json"; } + +export type AuthKind = "basic" | "bearer" | "provider"; + +export type AuthContext = { + username?: string; + kind?: AuthKind; +}; + +/** + * Derive the auth-related span attributes from the per-call credentials and + * whether a {@link CredentialsProvider} is configured. Narrows the credential + * shape once so `username` is only ever set alongside `kind: "basic"`. + */ +export const describeAuth = ( + credentials: Credentials | undefined, + hasProvider: boolean +): AuthContext => { + if (isBasicCredentials(credentials)) { + return { kind: "basic", username: credentials.username }; + } + if (credentials) return { kind: "bearer" }; + if (hasProvider) return { kind: "provider" }; + return {}; +}; diff --git a/packages/test/src/connection/defaultCredentials.test.ts b/packages/test/src/connection/defaultCredentials.test.ts index 4c9940a2..566ca948 100644 --- a/packages/test/src/connection/defaultCredentials.test.ts +++ b/packages/test/src/connection/defaultCredentials.test.ts @@ -1,5 +1,19 @@ -import { collect, createTestNode } from "@test-utils"; -import { KurrentDBClient, AccessDeniedError } from "@kurrent/kurrentdb-client"; +import { collect, createTestNode, jsonTestEvents } from "@test-utils"; +import { + KurrentDBClient, + AccessDeniedError, + type BasicCredentials, +} from "@kurrent/kurrentdb-client"; + +const adminBasic: BasicCredentials = { + username: "admin", + password: "changeit", +}; + +const wrongBasic: BasicCredentials = { + username: "AzureDiamond", + password: "hunter2", +}; describe("defaultCredentials", () => { const node = createTestNode(); @@ -22,7 +36,7 @@ describe("defaultCredentials", () => { await collect( client.readAll({ maxCount: 10, - credentials: { username: "AzureDiamond", password: "hunter2" }, + credentials: wrongBasic, }) ); } catch (e) { @@ -49,10 +63,80 @@ describe("defaultCredentials", () => { collect( client.readAll({ maxCount: 10, - credentials: { username: "admin", password: "changeit" }, + credentials: adminBasic, }) ) ).resolves.toBeDefined(); }); }); + + describe("bearer-token credentials", () => { + test("unknown token rejected with AccessDenied", async () => { + const client = KurrentDBClient.connectionString(node.connectionString()); + + await expect( + client.appendToStream("bearer-rejected-stream", jsonTestEvents(1), { + credentials: { bearerToken: "not-a-real-token" }, + }) + ).rejects.toBeInstanceOf(AccessDeniedError); + }); + }); + + describe("credentialsProvider", () => { + test("returns fresh credentials per RPC", async () => { + const client = KurrentDBClient.connectionString(node.connectionString()); + + const provider = jest + .fn() + .mockReturnValueOnce(adminBasic) + .mockReturnValueOnce(wrongBasic) + .mockReturnValueOnce(adminBasic); + client.setCredentialsProvider(provider); + + const stream = "auth-provider-stream"; + await expect( + client.appendToStream(stream, jsonTestEvents(1)) + ).resolves.toBeDefined(); + await expect( + client.appendToStream(stream, jsonTestEvents(1)) + ).rejects.toBeInstanceOf(AccessDeniedError); + await expect( + client.appendToStream(stream, jsonTestEvents(1)) + ).resolves.toBeDefined(); + + expect(provider).toHaveBeenCalledTimes(3); + }); + + test("per-call credentials override the provider", async () => { + const client = KurrentDBClient.connectionString(node.connectionString()); + + let calls = 0; + client.setCredentialsProvider(() => { + calls += 1; + return { bearerToken: "should-never-be-used" }; + }); + + await expect( + client.appendToStream("auth-override-stream", jsonTestEvents(1), { + credentials: adminBasic, + }) + ).resolves.toBeDefined(); + expect(calls).toBe(0); + }); + + test("bearer token reaches bridge-backed readAll", async () => { + const client = KurrentDBClient.connectionString(node.connectionString()); + + let calls = 0; + client.setCredentialsProvider(() => { + calls += 1; + return { bearerToken: "not-a-real-token" }; + }); + + await expect( + collect(client.readAll({ maxCount: 1 })) + ).rejects.toBeInstanceOf(AccessDeniedError); + expect(calls).toBe(1); + }); + }); }); diff --git a/packages/test/src/connection/reconnect/no-reconnection.test.ts b/packages/test/src/connection/reconnect/no-reconnection.test.ts index 04bb9c25..8e994553 100644 --- a/packages/test/src/connection/reconnect/no-reconnection.test.ts +++ b/packages/test/src/connection/reconnect/no-reconnection.test.ts @@ -61,7 +61,10 @@ describe("reconnect", () => { await timeoutNode.up(); - const credentials = { username: "admin", password: "changeit" }; + const credentials = { + username: "admin", + password: "changeit", + }; const STREAM_NAME = "try_get_timeout"; const client = KurrentDBClient.connectionString( diff --git a/packages/test/src/opentelemetry/instrumentation.test.ts b/packages/test/src/opentelemetry/instrumentation.test.ts index 5e4fe3cc..1a87d541 100644 --- a/packages/test/src/opentelemetry/instrumentation.test.ts +++ b/packages/test/src/opentelemetry/instrumentation.test.ts @@ -24,6 +24,7 @@ instrumentation.disable(); import * as kdb from "@kurrent/kurrentdb-client"; import { AppendToStreamOptions, + isBasicCredentials, ResolvedEvent, streamNameFilter, WrongExpectedVersionError, @@ -59,7 +60,10 @@ describe("instrumentation", () => { { withCredentials: false, credentials: undefined }, { withCredentials: true, - credentials: { username: "admin", password: "changeit" }, + credentials: { + username: "admin", + password: "changeit", + }, }, ])( "should create a span for append operation, withCredentials: $withCredentials", @@ -111,9 +115,10 @@ describe("instrumentation", () => { [KurrentAttributes.DATABASE_OPERATION]: "appendToStream", }; - if (withCredentials) { + if (withCredentials && isBasicCredentials(credentials)) { expectedAttributes[KurrentAttributes.DATABASE_USER] = - credentials!.username; + credentials.username; + expectedAttributes[KurrentAttributes.KURRENT_DB_AUTH_KIND] = "basic"; } expect(spans.length).toBe(1); diff --git a/packages/test/src/utils/index.ts b/packages/test/src/utils/index.ts index 68af8911..aba7ab3e 100644 --- a/packages/test/src/utils/index.ts +++ b/packages/test/src/utils/index.ts @@ -1,12 +1,12 @@ import { Cluster } from "./Cluster"; import { - Credentials, + BasicCredentials, EndPoint, NodePreference, } from "@kurrent/kurrentdb-client"; export type ConnectionFeatures = { - defaultUserCredentials?: Credentials; + defaultUserCredentials?: BasicCredentials; nodePreference?: NodePreference; userCertificates?: "valid" | "invalid"; maxDiscoverAttempts?: number; diff --git a/yarn.lock b/yarn.lock index 862973f8..26a6ec50 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1065,66 +1065,66 @@ __metadata: languageName: node linkType: hard -"@kurrent/bridge-darwin-arm64@npm:0.1.5": - version: 0.1.5 - resolution: "@kurrent/bridge-darwin-arm64@npm:0.1.5" +"@kurrent/bridge-darwin-arm64@npm:0.2.0": + version: 0.2.0 + resolution: "@kurrent/bridge-darwin-arm64@npm:0.2.0" conditions: os=darwin & cpu=arm64 languageName: node linkType: hard -"@kurrent/bridge-darwin-x64@npm:0.1.5": - version: 0.1.5 - resolution: "@kurrent/bridge-darwin-x64@npm:0.1.5" +"@kurrent/bridge-darwin-x64@npm:0.2.0": + version: 0.2.0 + resolution: "@kurrent/bridge-darwin-x64@npm:0.2.0" conditions: os=darwin & cpu=x64 languageName: node linkType: hard -"@kurrent/bridge-linux-arm64-gnu@npm:0.1.5": - version: 0.1.5 - resolution: "@kurrent/bridge-linux-arm64-gnu@npm:0.1.5" +"@kurrent/bridge-linux-arm64-gnu@npm:0.2.0": + version: 0.2.0 + resolution: "@kurrent/bridge-linux-arm64-gnu@npm:0.2.0" conditions: os=linux & cpu=arm64 languageName: node linkType: hard -"@kurrent/bridge-linux-arm64-musl@npm:0.1.5": - version: 0.1.5 - resolution: "@kurrent/bridge-linux-arm64-musl@npm:0.1.5" +"@kurrent/bridge-linux-arm64-musl@npm:0.2.0": + version: 0.2.0 + resolution: "@kurrent/bridge-linux-arm64-musl@npm:0.2.0" conditions: os=linux & cpu=arm64 languageName: node linkType: hard -"@kurrent/bridge-linux-x64-gnu@npm:0.1.5": - version: 0.1.5 - resolution: "@kurrent/bridge-linux-x64-gnu@npm:0.1.5" +"@kurrent/bridge-linux-x64-gnu@npm:0.2.0": + version: 0.2.0 + resolution: "@kurrent/bridge-linux-x64-gnu@npm:0.2.0" conditions: os=linux & cpu=x64 languageName: node linkType: hard -"@kurrent/bridge-linux-x64-musl@npm:0.1.5": - version: 0.1.5 - resolution: "@kurrent/bridge-linux-x64-musl@npm:0.1.5" +"@kurrent/bridge-linux-x64-musl@npm:0.2.0": + version: 0.2.0 + resolution: "@kurrent/bridge-linux-x64-musl@npm:0.2.0" conditions: os=linux & cpu=x64 languageName: node linkType: hard -"@kurrent/bridge-win32-x64-msvc@npm:0.1.5": - version: 0.1.5 - resolution: "@kurrent/bridge-win32-x64-msvc@npm:0.1.5" +"@kurrent/bridge-win32-x64-msvc@npm:0.2.0": + version: 0.2.0 + resolution: "@kurrent/bridge-win32-x64-msvc@npm:0.2.0" conditions: os=win32 & cpu=x64 languageName: node linkType: hard -"@kurrent/bridge@npm:^0.1.5": - version: 0.1.5 - resolution: "@kurrent/bridge@npm:0.1.5" - dependencies: - "@kurrent/bridge-darwin-arm64": "npm:0.1.5" - "@kurrent/bridge-darwin-x64": "npm:0.1.5" - "@kurrent/bridge-linux-arm64-gnu": "npm:0.1.5" - "@kurrent/bridge-linux-arm64-musl": "npm:0.1.5" - "@kurrent/bridge-linux-x64-gnu": "npm:0.1.5" - "@kurrent/bridge-linux-x64-musl": "npm:0.1.5" - "@kurrent/bridge-win32-x64-msvc": "npm:0.1.5" +"@kurrent/bridge@npm:^0.2.0": + version: 0.2.0 + resolution: "@kurrent/bridge@npm:0.2.0" + dependencies: + "@kurrent/bridge-darwin-arm64": "npm:0.2.0" + "@kurrent/bridge-darwin-x64": "npm:0.2.0" + "@kurrent/bridge-linux-arm64-gnu": "npm:0.2.0" + "@kurrent/bridge-linux-arm64-musl": "npm:0.2.0" + "@kurrent/bridge-linux-x64-gnu": "npm:0.2.0" + "@kurrent/bridge-linux-x64-musl": "npm:0.2.0" + "@kurrent/bridge-win32-x64-msvc": "npm:0.2.0" "@neon-rs/load": "npm:^0.1.82" dependenciesMeta: "@kurrent/bridge-darwin-arm64": @@ -1141,7 +1141,7 @@ __metadata: optional: true "@kurrent/bridge-win32-x64-msvc": optional: true - checksum: 10c0/8f1941011c17cd81b29e78b4743097fa8c6452abe2c80acee23c24d4b665ae027172bbb31d805d21a15ab42caa5e8ac83699a040434a53a0df14bcf7cf4a69b9 + checksum: 10c0/fca00c46c8d9cc85006c5f34f167f4545aa0a5b3ec671f2a4907cded39a4175f8b0b9ec0e723078a285fd4cf0a1b1369d9e4c72febc36c64f54272256c58b7a2 languageName: node linkType: hard @@ -1150,7 +1150,7 @@ __metadata: resolution: "@kurrent/kurrentdb-client@workspace:packages/db-client" dependencies: "@grpc/grpc-js": "npm:^1.14.3" - "@kurrent/bridge": "npm:^0.1.5" + "@kurrent/bridge": "npm:^0.2.0" "@types/debug": "npm:^4.1.12" "@types/google-protobuf": "npm:^3.15.12" "@types/node": "npm:^22.10.2" @@ -2740,7 +2740,7 @@ __metadata: resolution: "benchmark@workspace:packages/benchmark" dependencies: "@eventstore/db-client": "npm:^6.2.1" - "@kurrent/bridge": "npm:^0.1.5" + "@kurrent/bridge": "npm:^0.2.0" "@kurrent/kurrentdb-client": "workspace:^" clinic: "npm:^13.0.0" tinybench: "npm:^3.1.1" From 4c9a9f8e48e781f5023e95ce0fc09deab24efe4e Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 6 May 2026 11:36:03 +0400 Subject: [PATCH 2/4] fix: bypass cached batchAppend when credentialsProvider is set --- packages/db-client/src/streams/appendToStream/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/db-client/src/streams/appendToStream/index.ts b/packages/db-client/src/streams/appendToStream/index.ts index 32235cf2..17f57276 100644 --- a/packages/db-client/src/streams/appendToStream/index.ts +++ b/packages/db-client/src/streams/appendToStream/index.ts @@ -82,6 +82,7 @@ Client.prototype.appendToStream = async function < if ( !baseOptions.credentials && + !this.credentialsProvider && (await this.supports(StreamsService.batchAppend)) ) { return batchAppend.call(this, streamName, events, { From 3be1552d7fdfe1f0635a8f7f7a20836c4009fe16 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 6 May 2026 11:36:47 +0400 Subject: [PATCH 3/4] fix: redact Authorization header in HTTP debug log --- packages/db-client/src/Client/http.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/db-client/src/Client/http.ts b/packages/db-client/src/Client/http.ts index 059a1b84..01a32522 100644 --- a/packages/db-client/src/Client/http.ts +++ b/packages/db-client/src/Client/http.ts @@ -130,7 +130,9 @@ export class HTTP { `Making %s call to %s with headers %h`, method, url.toString(), - headers + authorization + ? { ...headers, Authorization: "[REDACTED]" } + : headers ); const req = this.#insecure From 2cca3d9c0ba385783685df1d6c22ed0fa21b56a7 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 6 May 2026 11:42:26 +0400 Subject: [PATCH 4/4] style: prettier --- packages/db-client/src/Client/http.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/db-client/src/Client/http.ts b/packages/db-client/src/Client/http.ts index 01a32522..ae87774e 100644 --- a/packages/db-client/src/Client/http.ts +++ b/packages/db-client/src/Client/http.ts @@ -130,9 +130,7 @@ export class HTTP { `Making %s call to %s with headers %h`, method, url.toString(), - authorization - ? { ...headers, Authorization: "[REDACTED]" } - : headers + authorization ? { ...headers, Authorization: "[REDACTED]" } : headers ); const req = this.#insecure