Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 117 additions & 22 deletions src/requester/requester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,44 @@ export class Requester {
return this.makeRequestInternal(request);
};

public makeStreamingRequest = async <StreamItemType>(
method: RestMethod,
url: string,
data: Record<string, unknown> | undefined,
parseItem: (obj: Record<string, unknown>) => StreamItemType,
onItem: (item: StreamItemType) => void,
headers?: Record<string, string>,
timeout?: number,
): Promise<void> => {
const request = new KibaRequest(method, url, headers, data, undefined, new Date(), timeout);
await this.makeStreamingRequestInternal(request, parseItem, onItem);
};

private makeRequestInternal = async (request: KibaRequest): Promise<KibaResponse> => {
const modifiedRequest = this.modifyRequest(request);
let response = await this.makeFetchRequest(modifiedRequest);
response = this.modifyResponse(response);
if (response.status >= 400 && response.status < 600) {
let errorContent = null;
try {
errorContent = JSON.parse(response.content);
} catch {
// no-op
}
if (errorContent && 'message' in errorContent) {
const fields = errorContent.fields || {};
const exceptionType = errorContent.exceptionType || undefined;
throw new KibaException(errorContent.message, response.status, exceptionType, fields);
}
throw new KibaException(response.content, response.status, undefined, {});
Requester.throwKibaExceptionFromErrorContent(response.content, response.status);
}
return response;
};

private makeFetchRequest = async (request: KibaRequest): Promise<KibaResponse> => {
private makeStreamingRequestInternal = async <StreamItemType>(
request: KibaRequest,
parseItem: (obj: Record<string, unknown>) => StreamItemType,
onItem: (item: StreamItemType) => void,
): Promise<void> => {
const modifiedRequest = this.modifyRequest(request);
const response = await this.makeStreamingFetchRequest(modifiedRequest, parseItem, onItem);
this.modifyResponse(response);
};

private createFetchRequestConfig = (request: KibaRequest): { url: URL; fetchConfig: { method: string; headers: Headers; credentials: 'include' | 'same-origin'; body?: string | FormData } } => {
const url = new URL(request.url);
const headers = new Headers({ ...this.headers, ...(request.headers || {}) });
// NOTE(krishan711): RequestInit comes from the DOM which isn't used by default in typescript typings
// eslint-disable-next-line no-undef
const fetchConfig: RequestInit = {
const fetchConfig: { method: string; headers: Headers; credentials: 'include' | 'same-origin'; body?: string | FormData } = {
method: request.method.toUpperCase(),
headers,
credentials: this.shouldIncludeCrossSiteCredentials ? 'include' : 'same-origin',
Expand Down Expand Up @@ -138,6 +149,37 @@ export class Requester {
// headers.set('content-type', 'multipart/form-data');
}
}
return { url, fetchConfig };
};

private static getResponseHeaders = (response: Response): Record<string, string> => {
const responseHeaders: Record<string, string> = {};
response.headers.forEach((value: string, key: string): void => {
if (responseHeaders[key]) {
console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`);
}
responseHeaders[key] = value;
});
return responseHeaders;
};

private static throwKibaExceptionFromErrorContent = (errorContent: string, statusCode: number): never => {
let parsedErrorContent: Record<string, unknown> | null = null;
try {
parsedErrorContent = JSON.parse(errorContent) as Record<string, unknown>;
} catch {
// no-op
}
if (parsedErrorContent && 'message' in parsedErrorContent) {
const fields = parsedErrorContent.fields as Record<string, string> || {};
const exceptionType = typeof parsedErrorContent.exceptionType === 'string' ? parsedErrorContent.exceptionType : undefined;
throw new KibaException(String(parsedErrorContent.message), statusCode, exceptionType, fields);
}
throw new KibaException(errorContent, statusCode, undefined, {});
};

private makeFetchRequest = async (request: KibaRequest): Promise<KibaResponse> => {
const { url, fetchConfig } = this.createFetchRequestConfig(request);
const fetchOperation = fetch(url.toString(), fetchConfig)
.catch((error): void => {
throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`);
Expand All @@ -147,14 +189,67 @@ export class Requester {
throw new KibaException('The request was made but no response was received.');
}
const content = await response.text();
const responseHeaders: Record<string, string> = {};
response.headers.forEach((value: string, key: string): void => {
if (responseHeaders[key]) {
console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`);
return new KibaResponse(response.status, Requester.getResponseHeaders(response), new Date(), content);
});
const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation);
return response;
};

private makeStreamingFetchRequest = async <StreamItemType>(
request: KibaRequest,
parseItem: (obj: Record<string, unknown>) => StreamItemType,
onItem: (item: StreamItemType) => void,
): Promise<KibaResponse> => {
const { url, fetchConfig } = this.createFetchRequestConfig(request);
const fetchOperation = fetch(url.toString(), fetchConfig)
.catch((error): void => {
throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`);
})
.then(async (response: Response | void): Promise<KibaResponse> => {
if (!response) {
throw new KibaException('The request was made but no response was received.');
}
Comment on lines +205 to +211
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chaining .catch(...).then(...) forces the then handler to accept Response | void, which complicates typing and requires an additional !response guard. Consider rewriting this section with try/catch around an awaited fetch(...) so response stays a Response, simplifying control flow and types.

Suggested change
.catch((error): void => {
throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`);
})
.then(async (response: Response | void): Promise<KibaResponse> => {
if (!response) {
throw new KibaException('The request was made but no response was received.');
}
.catch((error): never => {
throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`);
})
.then(async (response: Response): Promise<KibaResponse> => {

Copilot uses AI. Check for mistakes.
const responseHeaders = Requester.getResponseHeaders(response);
if (response.status >= 400 && response.status < 600) {
const errorContent = await response.text();
Requester.throwKibaExceptionFromErrorContent(errorContent, response.status);
}
const reader = response.body?.getReader();
if (!reader) {
throw new KibaException('The request was made but no response body was received.');
}
const processLine = (line: string): void => {
const trimmedLine = line.trim();
if (!trimmedLine) {
return;
}
responseHeaders[key] = value;
});
return new KibaResponse(response.status, responseHeaders, new Date(), content);
let itemObject: Record<string, unknown>;
try {
itemObject = JSON.parse(trimmedLine) as Record<string, unknown>;
} catch (error) {
throw new KibaException(`Failed to parse streaming response line: ${trimmedLine}. Error: ${String(error)}`);
}
onItem(parseItem(itemObject));
};

const decoder = new TextDecoder();
let buffer = '';
const readAllChunks = async (): Promise<void> => {
const { done, value } = await reader.read();
if (done) {
return;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
lines.forEach(processLine);
await readAllChunks();
};
await readAllChunks();
buffer += decoder.decode();
processLine(buffer);

return new KibaResponse(response.status, responseHeaders, new Date(), '');
});
const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation);
return response;
Comment on lines +254 to +255
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout mechanism (timeoutPromise) likely rejects without aborting the underlying fetch/stream read, meaning network and parsing work can continue in the background after a timeout. Consider integrating AbortController so timeouts actively cancel the in-flight fetch and release resources for streaming requests.

Copilot uses AI. Check for mistakes.
Expand Down
28 changes: 23 additions & 5 deletions src/services/serviceClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ export class ResponseData {
// }
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type Constructor<T = RawObject> = new (...args: any[]) => T;
export type ResponseDataClass<T extends ResponseData> = {
fromObject: (obj: RawObject) => T;
};

export class ServiceClient {
protected requester: Requester;
Expand All @@ -29,10 +30,27 @@ export class ServiceClient {
this.baseUrl = baseUrl;
}

protected makeRequest = async <ResponseType extends ResponseData>(method: RestMethod, path: string, request?: RequestData | undefined, responseClass?: Constructor<ResponseType> | undefined, additionalHeaders?: Record<string, string>): Promise<ResponseType> => {
protected makeRequest = async <ResponseType extends ResponseData>(
method: RestMethod,
path: string,
request?: RequestData | undefined,
responseClass?: ResponseDataClass<ResponseType> | undefined,
additionalHeaders?: Record<string, string>,
): Promise<ResponseType> => {
const url = `${this.baseUrl}/${path}`;
const response = await this.requester.makeRequest(method, url, request?.toObject(), additionalHeaders);
// @ts-ignore
return responseClass ? responseClass.fromObject(JSON.parse(response.content)) : null;
return responseClass ? responseClass.fromObject(JSON.parse(response.content) as RawObject) : null as unknown as ResponseType;
};

protected makeStreamingRequest = async <StreamItemType extends ResponseData>(
method: RestMethod,
path: string,
request: RequestData | undefined,
streamItemClass: ResponseDataClass<StreamItemType>,
onStreamItem: (streamItem: StreamItemType) => void,
additionalHeaders?: Record<string, string>,
): Promise<void> => {
const url = `${this.baseUrl}/${path}`;
await this.requester.makeStreamingRequest(method, url, request?.toObject(), streamItemClass.fromObject, onStreamItem, additionalHeaders);
};
}
Loading