Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/benchmark/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
},
"dependencies": {
"@eventstore/db-client": "^6.2.1",
"@kurrent/bridge": "^0.1.5",
"@kurrent/bridge": "^0.2.0",
"tinybench": "^3.1.1"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/db-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
},
"dependencies": {
"@grpc/grpc-js": "^1.14.3",
"@kurrent/bridge": "^0.1.5",
"@kurrent/bridge": "^0.2.0",
"@types/debug": "^4.1.12",
"@types/google-protobuf": "^3.15.12",
"@types/node": "^22.10.2",
Expand Down
33 changes: 22 additions & 11 deletions packages/db-client/src/Client/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,29 @@ type HTTPMethod =
| "TRACE"
| "PATCH";

/**
* Resolve the verbatim `Authorization` header for an HTTP fallback request,
* given an optional per-call credential. Returning `undefined` skips the
* header entirely (e.g. insecure mode).
*/
export type ResolveAuthorizationHeader = (
perCallCredentials?: Credentials
) => Promise<string | undefined>;

export class HTTP {
#client!: Client;
#channelCredentials!: ChannelCredentialOptions;
#defaultUserCredentials?: Credentials;
#resolveAuthorizationHeader: ResolveAuthorizationHeader;
#insecure: boolean;

constructor(
client: Client,
channelCredentials: ChannelCredentialOptions,
defaultUserCredentials?: Credentials
resolveAuthorizationHeader: ResolveAuthorizationHeader
) {
this.#client = client;
this.#channelCredentials = channelCredentials;
this.#defaultUserCredentials = defaultUserCredentials;
this.#resolveAuthorizationHeader = resolveAuthorizationHeader;
this.#insecure = !!channelCredentials.insecure;
}

Expand All @@ -64,18 +73,19 @@ export class HTTP {
url: URL,
options: HTTPRequestOptions,
body?: string
) =>
new Promise<T>((resolve, reject) => {
): Promise<T> => {
const authorization = await this.#resolveAuthorizationHeader(
options.credentials
);

return new Promise<T>((resolve, reject) => {
const headers: Record<string, string> = {
"content-type": "application/json",
...(options.headers ?? {}),
};
const credentials = options.credentials ?? this.#defaultUserCredentials;

if (!this.#insecure && credentials) {
headers["Authorization"] = `Basic ${Buffer.from(
`${credentials.username}:${credentials.password}`
).toString("base64")}`;
if (authorization) {
headers["Authorization"] = authorization;
}
Comment thread
w1am marked this conversation as resolved.

const ca = this.#channelCredentials.rootCertificate
Expand Down Expand Up @@ -120,7 +130,7 @@ export class HTTP {
`Making %s call to %s with headers %h`,
method,
url.toString(),
headers
authorization ? { ...headers, Authorization: "[REDACTED]" } : headers
);

const req = this.#insecure
Expand Down Expand Up @@ -152,6 +162,7 @@ export class HTTP {

req.end();
});
};

private createURL = async (
pathname: string,
Expand Down
125 changes: 100 additions & 25 deletions packages/db-client/src/Client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import type {
NodePreference,
GRPCClientConstructor,
EndPoint,
BasicCredentials,
Credentials,
BaseOptions,
CredentialsProvider,
} from "../types";
import { toAuthorizationHeader } from "../utils/credentials";
import {
CancelledError,
convertToCommandError,
Expand Down Expand Up @@ -147,7 +150,8 @@ export class Client {
#keepAliveTimeout: number;
#defaultDeadline: number;

#defaultCredentials?: Credentials;
#defaultCredentials?: BasicCredentials;
#credentialsProvider?: CredentialsProvider;

#nextChannelSettings?: NextChannelSettings;
#channel?: Promise<Channel>;
Expand Down Expand Up @@ -299,19 +303,19 @@ export class Client {
rustClient: bridge.RustClient,
connectionSettings: DNSClusterOptions,
channelCredentials?: ChannelCredentialOptions,
defaultUserCredentials?: Credentials
defaultUserCredentials?: BasicCredentials
);
protected constructor(
rustClient: bridge.RustClient,
connectionSettings: GossipClusterOptions,
channelCredentials?: ChannelCredentialOptions,
defaultUserCredentials?: Credentials
defaultUserCredentials?: BasicCredentials
);
protected constructor(
rustClient: bridge.RustClient,
connectionSettings: SingleNodeOptions,
channelCredentials?: ChannelCredentialOptions,
defaultUserCredentials?: Credentials
defaultUserCredentials?: BasicCredentials
);
protected constructor(
rustClient: bridge.RustClient,
Expand All @@ -324,7 +328,7 @@ export class Client {
...connectionSettings
}: ConnectionSettings,
channelCredentials: ChannelCredentialOptions = { insecure: false },
defaultUserCredentials?: Credentials
defaultUserCredentials?: BasicCredentials
) {
if (keepAliveInterval < -1) {
throw new Error(
Expand Down Expand Up @@ -359,7 +363,9 @@ export class Client {
this.#insecure = !!channelCredentials.insecure;
this.#defaultCredentials = defaultUserCredentials;
this.#connectionName = connectionName;
this.#http = new HTTP(this, channelCredentials, defaultUserCredentials);
this.#http = new HTTP(this, channelCredentials, (perCall) =>
this.resolveAuthorizationHeader(perCall)
);

if (this.#insecure) {
debug.connection("Using insecure channel");
Expand Down Expand Up @@ -387,6 +393,25 @@ export class Client {
return this.#connectionName;
}

/**
* The {@link CredentialsProvider} currently in effect, if any. Read-only.
* Use {@link setCredentialsProvider} to change it.
*/
public get credentialsProvider(): CredentialsProvider | undefined {
return this.#credentialsProvider;
}

/**
* Set or clear the {@link CredentialsProvider} invoked before every request.
* Per-call `credentials` override the provider. Otherwise the static
* `defaultCredentials` are used.
*/
public setCredentialsProvider(
provider: CredentialsProvider | undefined
): void {
this.#credentialsProvider = provider;
}

// Internal access to grpc client.
private getGRPCClient = async <T extends GRPCClient>(
Client: GRPCClientConstructor<T>,
Expand Down Expand Up @@ -615,34 +640,70 @@ export class Client {
}
};

private createCredentialsMetadataGenerator =
({
username,
password,
}: Credentials): Parameters<
typeof grpcCredentials.createFromMetadataGenerator
>[0] =>
private createMetadataGenerator =
(
resolveCredentials: () => Credentials | Promise<Credentials>
): Parameters<typeof grpcCredentials.createFromMetadataGenerator>[0] =>
(_, cb) => {
const metadata = new Metadata();

if (this.#insecure) {
debug.connection(
"Credentials are unsupported in insecure mode, and will be ignored."
);
} else {
const auth = Buffer.from(`${username}:${password}`).toString("base64");
metadata.add("authorization", `Basic ${auth}`);
return cb(null, new Metadata());
}

return cb(null, metadata);
Promise.resolve()
.then(resolveCredentials)
.then((credentials) => {
const metadata = new Metadata();
metadata.add("authorization", toAuthorizationHeader(credentials));
cb(null, metadata);
})
.catch((err) => cb(err as Error));
};

/**
* Resolve the `Authorization` header value for a given request. Captures
* the provider/default state at call time so in-flight requests are
* unaffected by concurrent {@link setCredentialsProvider} swaps.
*/
private resolveAuthorizationHeader = async (
perCallCredentials?: Credentials
): Promise<string | undefined> => {
if (this.#insecure) return undefined;

if (perCallCredentials) {
return toAuthorizationHeader(perCallCredentials);
}

if (this.#credentialsProvider) {
const credentials = await this.#credentialsProvider();
return toAuthorizationHeader(credentials);
}

if (this.#defaultCredentials) {
return toAuthorizationHeader(this.#defaultCredentials);
}

return undefined;
};

/**
* Resolve the {@link Credentials} value to forward to the Rust bridge for
* a given request. Per-call credentials win. Otherwise we invoke the
* configured {@link CredentialsProvider}. Falls back to `undefined` so the
* bridge uses the credentials baked into its connection string.
*/
protected resolveBridgeCredentials = async (
perCallCredentials?: Credentials
): Promise<Credentials | undefined> => {
if (perCallCredentials) return perCallCredentials;
if (this.#credentialsProvider) return await this.#credentialsProvider();
return undefined;
};

protected callArguments = (
{
credentials = this.#defaultCredentials,
requiresLeader,
deadline,
}: BaseOptions,
{ credentials, requiresLeader, deadline }: BaseOptions,
callOptions?: CallOptions
): [Metadata, CallOptions] => {
const metadata = new Metadata();
Expand All @@ -654,9 +715,23 @@ export class Client {
metadata.add("requires-leader", "true");
}

// Per-call credentials take precedence. Otherwise prefer a refresh-aware
// provider over the static default so token refresh actually runs per RPC.
let resolveCredentials:
| (() => Credentials | Promise<Credentials>)
| undefined;
if (credentials) {
resolveCredentials = () => credentials;
} else if (this.#credentialsProvider) {
resolveCredentials = this.#credentialsProvider;
} else if (this.#defaultCredentials) {
const defaults = this.#defaultCredentials;
resolveCredentials = () => defaults;
}

if (resolveCredentials) {
options.credentials = grpcCallCredentials.createFromMetadataGenerator(
this.createCredentialsMetadataGenerator(credentials)
this.createMetadataGenerator(resolveCredentials)
);
}
Comment thread
w1am marked this conversation as resolved.

Expand Down
4 changes: 2 additions & 2 deletions packages/db-client/src/Client/parseConnectionString.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RANDOM, FOLLOWER, LEADER, READ_ONLY_REPLICA } from "../constants";
import type { Credentials, EndPoint, NodePreference } from "../types";
import type { BasicCredentials, EndPoint, NodePreference } from "../types";
import { debug } from "../utils";

export interface QueryOptions {
Expand All @@ -21,7 +21,7 @@ export interface QueryOptions {

export interface ConnectionOptions extends QueryOptions {
dnsDiscover: boolean;
defaultCredentials?: Credentials;
defaultCredentials?: BasicCredentials;
hosts: EndPoint[];
}

Expand Down
1 change: 1 addition & 0 deletions packages/db-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export {
SingleNodeOptions,
ChannelCredentialOptions,
} from "./Client";
export { isBasicCredentials, isBearerCredentials } from "./utils/credentials";
export * from "./events";
export * from "./constants";
export * from "./types";
Expand Down
1 change: 1 addition & 0 deletions packages/db-client/src/streams/appendToStream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Client.prototype.appendToStream = async function <

if (
!baseOptions.credentials &&
!this.credentialsProvider &&
(await this.supports(StreamsService.batchAppend))
) {
return batchAppend.call(this, streamName, events, {
Expand Down
41 changes: 21 additions & 20 deletions packages/db-client/src/streams/readAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import type {
import { FORWARDS, START } from "../constants";
import { Client } from "../Client";

import * as bridge from "@kurrent/bridge";
import { convertRustEvent } from "../utils/convertRustEvent";
import { convertBridgeError } from "../utils/convertBridgeError";

Expand Down Expand Up @@ -64,26 +63,28 @@ Client.prototype.readAll = function (
...baseOptions
}: ReadAllOptions = {}
): AsyncIterableIterator<AllStreamResolvedEvent> {
const options: bridge.RustReadAllOptions = {
maxCount: BigInt(maxCount),
fromPosition,
resolvesLink: resolveLinkTos,
direction,
requiresLeader: baseOptions.requiresLeader ?? false,
credentials: baseOptions.credentials,
filter: baseOptions.filter,
};
const convert = async function* (
this: Client
): AsyncIterableIterator<AllStreamResolvedEvent> {
const credentials = await this.resolveBridgeCredentials(
baseOptions.credentials
);

let stream;
try {
stream = this.rustClient.readAll(options);
} catch (error) {
throw convertBridgeError(error);
}
let stream;
try {
stream = this.rustClient.readAll({
credentials,
direction,
fromPosition,
filter: baseOptions.filter,
maxCount: BigInt(maxCount),
requiresLeader: baseOptions.requiresLeader ?? false,
resolvesLink: resolveLinkTos,
});
} catch (error) {
throw convertBridgeError(error);
}

const convert = async function* (
stream: AsyncIterable<bridge.ResolvedEvent[]>
) {
try {
for await (const events of stream) {
for (const event of events) {
Expand All @@ -95,5 +96,5 @@ Client.prototype.readAll = function (
}
};

return convert(stream);
return convert.call(this);
};
Loading
Loading