From 41c821515034691e73cb12b6efe4cb7bb7e11b50 Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Thu, 5 Mar 2026 16:29:17 +0000 Subject: [PATCH 1/5] . --- src/requester/requester.ts | 135 ++++++++++++++++++++++++++++++++++ src/services/serviceClient.ts | 29 ++++++-- 2 files changed, 159 insertions(+), 5 deletions(-) diff --git a/src/requester/requester.ts b/src/requester/requester.ts index 8474f31..c337dfd 100644 --- a/src/requester/requester.ts +++ b/src/requester/requester.ts @@ -80,6 +80,19 @@ export class Requester { return this.makeRequestInternal(request); }; + public makeStreamingRequest = async ( + method: RestMethod, + url: string, + data: Record | undefined, + parseItem: (obj: Record) => StreamItemType, + onItem: (item: StreamItemType) => void, + headers?: Record, + timeout?: number, + ): Promise => { + const request = new KibaRequest(method, url, headers, data, undefined, new Date(), timeout); + await this.makeStreamingRequestInternal(request, parseItem, onItem); + }; + private makeRequestInternal = async (request: KibaRequest): Promise => { const modifiedRequest = this.modifyRequest(request); let response = await this.makeFetchRequest(modifiedRequest); @@ -101,6 +114,16 @@ export class Requester { return response; }; + private makeStreamingRequestInternal = async ( + request: KibaRequest, + parseItem: (obj: Record) => StreamItemType, + onItem: (item: StreamItemType) => void, + ): Promise => { + const modifiedRequest = this.modifyRequest(request); + const response = await this.makeStreamingFetchRequest(modifiedRequest, parseItem, onItem); + this.modifyResponse(response); + }; + private makeFetchRequest = async (request: KibaRequest): Promise => { const url = new URL(request.url); const headers = new Headers({ ...this.headers, ...(request.headers || {}) }); @@ -159,4 +182,116 @@ export class Requester { const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation); return response; }; + + private makeStreamingFetchRequest = async ( + request: KibaRequest, + parseItem: (obj: Record) => StreamItemType, + onItem: (item: StreamItemType) => void, + ): Promise => { + const url = new URL(request.url); + const headers = new Headers({ ...this.headers, ...(request.headers || {}) }); + // NOTE(krishan711): RequestInit comes from the DOM which isn't used by default in typescript typings + // eslint-disable-next-line no-undef + const fetchConfig: RequestInit = { + method: request.method.toUpperCase(), + headers, + credentials: this.shouldIncludeCrossSiteCredentials ? 'include' : 'same-origin', + }; + if (request.method === RestMethod.GET || request.method === RestMethod.DELETE) { + if (request.data) { + const requestData = ({ ...request.data }) as Record; + Object.keys(requestData).forEach((key: string): void => { + if (requestData[key] === undefined) { + delete requestData[key]; + } + }); + url.search = createSearchParams(requestData).toString(); + } + } else { + const currentContentHeader = headers.get('Content-Type'); + if (request.data) { + fetchConfig.body = JSON.stringify(request.data); + if (currentContentHeader && currentContentHeader !== 'application/json') { + console.warn(`Overwriting content-type header for request from ${currentContentHeader} to application/json`); + } + headers.set('content-type', 'application/json'); + } else if (request.formData) { + fetchConfig.body = request.formData; + } + } + const fetchOperation = fetch(url.toString(), fetchConfig) + .catch((error): void => { + throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`); + }) + .then(async (response: Response | void): Promise => { + if (!response) { + throw new KibaException('The request was made but no response was received.'); + } + const responseHeaders: Record = {}; + response.headers.forEach((value: string, key: string): void => { + if (responseHeaders[key]) { + console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`); + } + responseHeaders[key] = value; + }); + if (response.status >= 400 && response.status < 600) { + const errorContent = await response.text(); + let parsedErrorContent: Record | null = null; + try { + parsedErrorContent = JSON.parse(errorContent) as Record; + } catch { + // no-op + } + if (parsedErrorContent && 'message' in parsedErrorContent) { + const fields = parsedErrorContent.fields as Record || {}; + const exceptionType = typeof parsedErrorContent.exceptionType === 'string' ? parsedErrorContent.exceptionType : undefined; + throw new KibaException(String(parsedErrorContent.message), response.status, exceptionType, fields); + } + throw new KibaException(errorContent, response.status, undefined, {}); + } + const reader = response.body?.getReader(); + if (!reader) { + throw new KibaException('The request was made but no response body was received.'); + } + const decoder = new TextDecoder(); + let buffer = ''; + // eslint-disable-next-line no-constant-condition + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + for (const line of lines) { + const trimmedLine = line.trim(); + if (!trimmedLine) { + continue; + } + let itemObject: Record; + try { + itemObject = JSON.parse(trimmedLine) as Record; + } catch (error) { + throw new KibaException(`Failed to parse streaming response line: ${trimmedLine}. Error: ${String(error)}`); + } + onItem(parseItem(itemObject)); + } + } + buffer += decoder.decode(); + const finalLine = buffer.trim(); + if (finalLine) { + let itemObject: Record; + try { + itemObject = JSON.parse(finalLine) as Record; + } catch (error) { + throw new KibaException(`Failed to parse streaming response line: ${finalLine}. Error: ${String(error)}`); + } + onItem(parseItem(itemObject)); + } + return new KibaResponse(response.status, responseHeaders, new Date(), ''); + }); + const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation); + return response; + }; } diff --git a/src/services/serviceClient.ts b/src/services/serviceClient.ts index f0e7bfd..dc0b03c 100644 --- a/src/services/serviceClient.ts +++ b/src/services/serviceClient.ts @@ -17,8 +17,9 @@ export class ResponseData { // } } -// eslint-disable-next-line @typescript-eslint/no-explicit-any -type Constructor = new (...args: any[]) => T; +export type ResponseDataClass = { + fromObject: (obj: RawObject) => T; +}; export class ServiceClient { protected requester: Requester; @@ -29,10 +30,28 @@ export class ServiceClient { this.baseUrl = baseUrl; } - protected makeRequest = async (method: RestMethod, path: string, request?: RequestData | undefined, responseClass?: Constructor | undefined, additionalHeaders?: Record): Promise => { + protected makeRequest = async (method: RestMethod, path: string, request?: RequestData | undefined, responseClass?: ResponseDataClass | undefined, additionalHeaders?: Record): Promise => { const url = `${this.baseUrl}/${path}`; const response = await this.requester.makeRequest(method, url, request?.toObject(), additionalHeaders); - // @ts-ignore - return responseClass ? responseClass.fromObject(JSON.parse(response.content)) : null; + return responseClass ? responseClass.fromObject(JSON.parse(response.content) as RawObject) : null as ResponseType; + }; + + protected makeStreamingRequest = async ( + method: RestMethod, + path: string, + request: RequestData | undefined, + streamItemClass: ResponseDataClass, + onStreamItem: (streamItem: StreamItemType) => void, + additionalHeaders?: Record, + ): Promise => { + const url = `${this.baseUrl}/${path}`; + await this.requester.makeStreamingRequest( + method, + url, + request?.toObject(), + streamItemClass.fromObject, + onStreamItem, + additionalHeaders, + ); }; } From 65f90ffffe290646d98b4381cf24911022a91560 Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Thu, 5 Mar 2026 17:00:39 +0000 Subject: [PATCH 2/5] . --- src/requester/requester.ts | 191 +++++++++++++++------------------- src/services/serviceClient.ts | 10 +- 2 files changed, 93 insertions(+), 108 deletions(-) diff --git a/src/requester/requester.ts b/src/requester/requester.ts index c337dfd..d21f47b 100644 --- a/src/requester/requester.ts +++ b/src/requester/requester.ts @@ -124,12 +124,11 @@ export class Requester { this.modifyResponse(response); }; - private makeFetchRequest = async (request: KibaRequest): Promise => { + private createFetchRequestConfig = (request: KibaRequest): { url: URL; fetchConfig: { method: string; headers: Headers; credentials: 'include' | 'same-origin'; body?: string | FormData } } => { const url = new URL(request.url); const headers = new Headers({ ...this.headers, ...(request.headers || {}) }); // NOTE(krishan711): RequestInit comes from the DOM which isn't used by default in typescript typings - // eslint-disable-next-line no-undef - const fetchConfig: RequestInit = { + const fetchConfig: { method: string; headers: Headers; credentials: 'include' | 'same-origin'; body?: string | FormData } = { method: request.method.toUpperCase(), headers, credentials: this.shouldIncludeCrossSiteCredentials ? 'include' : 'same-origin', @@ -145,7 +144,6 @@ export class Requester { url.search = createSearchParams(requestData).toString(); } } else { - // TODO(krishan711): find a better place for this const currentContentHeader = headers.get('Content-Type'); if (request.data) { fetchConfig.body = JSON.stringify(request.data); @@ -155,12 +153,86 @@ export class Requester { headers.set('content-type', 'application/json'); } else if (request.formData) { fetchConfig.body = request.formData; - // NOTE(krishan711): I don't know whether this should be set or not. - // For S3 uploads it cannot be set as the file type is set already - // For azure uploads the content-type should be set by the caller to the file type - // headers.set('content-type', 'multipart/form-data'); } } + return { url, fetchConfig }; + }; + + private static getResponseHeaders = (response: Response): Record => { + const responseHeaders: Record = {}; + response.headers.forEach((value: string, key: string): void => { + if (responseHeaders[key]) { + console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`); + } + responseHeaders[key] = value; + }); + return responseHeaders; + }; + + private static throwKibaExceptionFromErrorContent = (errorContent: string, statusCode: number): never => { + let parsedErrorContent: Record | null = null; + try { + parsedErrorContent = JSON.parse(errorContent) as Record; + } catch { + // no-op + } + if (parsedErrorContent && 'message' in parsedErrorContent) { + const fields = parsedErrorContent.fields as Record || {}; + const exceptionType = typeof parsedErrorContent.exceptionType === 'string' ? parsedErrorContent.exceptionType : undefined; + throw new KibaException(String(parsedErrorContent.message), statusCode, exceptionType, fields); + } + throw new KibaException(errorContent, statusCode, undefined, {}); + }; + + private static processStreamLine = ( + line: string, + parseItem: (obj: Record) => StreamItemType, + onItem: (item: StreamItemType) => void, + ): void => { + const trimmedLine = line.trim(); + if (!trimmedLine) { + return; + } + let itemObject: Record; + try { + itemObject = JSON.parse(trimmedLine) as Record; + } catch (error) { + throw new KibaException(`Failed to parse streaming response line: ${trimmedLine}. Error: ${String(error)}`); + } + onItem(parseItem(itemObject)); + }; + + private static readNdjsonStream = async ( + response: Response, + parseItem: (obj: Record) => StreamItemType, + onItem: (item: StreamItemType) => void, + ): Promise => { + const reader = response.body?.getReader(); + if (!reader) { + throw new KibaException('The request was made but no response body was received.'); + } + const decoder = new TextDecoder(); + let buffer = ''; + + const readAllChunks = async (): Promise => { + const { done, value } = await reader.read(); + if (done) { + return; + } + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + lines.forEach((line: string): void => Requester.processStreamLine(line, parseItem, onItem)); + await readAllChunks(); + }; + + await readAllChunks(); + buffer += decoder.decode(); + Requester.processStreamLine(buffer, parseItem, onItem); + }; + + private makeFetchRequest = async (request: KibaRequest): Promise => { + const { url, fetchConfig } = this.createFetchRequestConfig(request); const fetchOperation = fetch(url.toString(), fetchConfig) .catch((error): void => { throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`); @@ -170,14 +242,7 @@ export class Requester { throw new KibaException('The request was made but no response was received.'); } const content = await response.text(); - const responseHeaders: Record = {}; - response.headers.forEach((value: string, key: string): void => { - if (responseHeaders[key]) { - console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`); - } - responseHeaders[key] = value; - }); - return new KibaResponse(response.status, responseHeaders, new Date(), content); + return new KibaResponse(response.status, Requester.getResponseHeaders(response), new Date(), content); }); const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation); return response; @@ -188,37 +253,7 @@ export class Requester { parseItem: (obj: Record) => StreamItemType, onItem: (item: StreamItemType) => void, ): Promise => { - const url = new URL(request.url); - const headers = new Headers({ ...this.headers, ...(request.headers || {}) }); - // NOTE(krishan711): RequestInit comes from the DOM which isn't used by default in typescript typings - // eslint-disable-next-line no-undef - const fetchConfig: RequestInit = { - method: request.method.toUpperCase(), - headers, - credentials: this.shouldIncludeCrossSiteCredentials ? 'include' : 'same-origin', - }; - if (request.method === RestMethod.GET || request.method === RestMethod.DELETE) { - if (request.data) { - const requestData = ({ ...request.data }) as Record; - Object.keys(requestData).forEach((key: string): void => { - if (requestData[key] === undefined) { - delete requestData[key]; - } - }); - url.search = createSearchParams(requestData).toString(); - } - } else { - const currentContentHeader = headers.get('Content-Type'); - if (request.data) { - fetchConfig.body = JSON.stringify(request.data); - if (currentContentHeader && currentContentHeader !== 'application/json') { - console.warn(`Overwriting content-type header for request from ${currentContentHeader} to application/json`); - } - headers.set('content-type', 'application/json'); - } else if (request.formData) { - fetchConfig.body = request.formData; - } - } + const { url, fetchConfig } = this.createFetchRequestConfig(request); const fetchOperation = fetch(url.toString(), fetchConfig) .catch((error): void => { throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`); @@ -227,68 +262,12 @@ export class Requester { if (!response) { throw new KibaException('The request was made but no response was received.'); } - const responseHeaders: Record = {}; - response.headers.forEach((value: string, key: string): void => { - if (responseHeaders[key]) { - console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`); - } - responseHeaders[key] = value; - }); + const responseHeaders = Requester.getResponseHeaders(response); if (response.status >= 400 && response.status < 600) { const errorContent = await response.text(); - let parsedErrorContent: Record | null = null; - try { - parsedErrorContent = JSON.parse(errorContent) as Record; - } catch { - // no-op - } - if (parsedErrorContent && 'message' in parsedErrorContent) { - const fields = parsedErrorContent.fields as Record || {}; - const exceptionType = typeof parsedErrorContent.exceptionType === 'string' ? parsedErrorContent.exceptionType : undefined; - throw new KibaException(String(parsedErrorContent.message), response.status, exceptionType, fields); - } - throw new KibaException(errorContent, response.status, undefined, {}); - } - const reader = response.body?.getReader(); - if (!reader) { - throw new KibaException('The request was made but no response body was received.'); - } - const decoder = new TextDecoder(); - let buffer = ''; - // eslint-disable-next-line no-constant-condition - while (true) { - const { done, value } = await reader.read(); - if (done) { - break; - } - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - for (const line of lines) { - const trimmedLine = line.trim(); - if (!trimmedLine) { - continue; - } - let itemObject: Record; - try { - itemObject = JSON.parse(trimmedLine) as Record; - } catch (error) { - throw new KibaException(`Failed to parse streaming response line: ${trimmedLine}. Error: ${String(error)}`); - } - onItem(parseItem(itemObject)); - } - } - buffer += decoder.decode(); - const finalLine = buffer.trim(); - if (finalLine) { - let itemObject: Record; - try { - itemObject = JSON.parse(finalLine) as Record; - } catch (error) { - throw new KibaException(`Failed to parse streaming response line: ${finalLine}. Error: ${String(error)}`); - } - onItem(parseItem(itemObject)); + Requester.throwKibaExceptionFromErrorContent(errorContent, response.status); } + await Requester.readNdjsonStream(response, parseItem, onItem); return new KibaResponse(response.status, responseHeaders, new Date(), ''); }); const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation); diff --git a/src/services/serviceClient.ts b/src/services/serviceClient.ts index dc0b03c..5547b40 100644 --- a/src/services/serviceClient.ts +++ b/src/services/serviceClient.ts @@ -30,10 +30,16 @@ export class ServiceClient { this.baseUrl = baseUrl; } - protected makeRequest = async (method: RestMethod, path: string, request?: RequestData | undefined, responseClass?: ResponseDataClass | undefined, additionalHeaders?: Record): Promise => { + protected makeRequest = async ( + method: RestMethod, + path: string, + request?: RequestData | undefined, + responseClass?: ResponseDataClass | undefined, + additionalHeaders?: Record, + ): Promise => { const url = `${this.baseUrl}/${path}`; const response = await this.requester.makeRequest(method, url, request?.toObject(), additionalHeaders); - return responseClass ? responseClass.fromObject(JSON.parse(response.content) as RawObject) : null as ResponseType; + return responseClass ? responseClass.fromObject(JSON.parse(response.content) as RawObject) : null as unknown as ResponseType; }; protected makeStreamingRequest = async ( From a3d97f3184fe3f17a604068b8d855bcb188cd3a4 Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Thu, 5 Mar 2026 17:11:28 +0000 Subject: [PATCH 3/5] . --- src/services/serviceClient.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/services/serviceClient.ts b/src/services/serviceClient.ts index 5547b40..c1cfaea 100644 --- a/src/services/serviceClient.ts +++ b/src/services/serviceClient.ts @@ -51,13 +51,6 @@ export class ServiceClient { additionalHeaders?: Record, ): Promise => { const url = `${this.baseUrl}/${path}`; - await this.requester.makeStreamingRequest( - method, - url, - request?.toObject(), - streamItemClass.fromObject, - onStreamItem, - additionalHeaders, - ); + await this.requester.makeStreamingRequest(method, url, request?.toObject(), streamItemClass.fromObject, onStreamItem, additionalHeaders); }; } From e6aec9b307db0c2acddbf605fa2ae3a62394b57b Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Thu, 5 Mar 2026 17:12:27 +0000 Subject: [PATCH 4/5] . --- src/requester/requester.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/requester/requester.ts b/src/requester/requester.ts index d21f47b..38ff9bc 100644 --- a/src/requester/requester.ts +++ b/src/requester/requester.ts @@ -144,6 +144,7 @@ export class Requester { url.search = createSearchParams(requestData).toString(); } } else { + // TODO(krishan711): find a better place for this const currentContentHeader = headers.get('Content-Type'); if (request.data) { fetchConfig.body = JSON.stringify(request.data); @@ -153,6 +154,10 @@ export class Requester { headers.set('content-type', 'application/json'); } else if (request.formData) { fetchConfig.body = request.formData; + // NOTE(krishan711): I don't know whether this should be set or not. + // For S3 uploads it cannot be set as the file type is set already + // For azure uploads the content-type should be set by the caller to the file type + // headers.set('content-type', 'multipart/form-data'); } } return { url, fetchConfig }; From 8b27a3d07420c208756e779e4e29fb9082f48676 Mon Sep 17 00:00:00 2001 From: Krishan Patel Date: Thu, 5 Mar 2026 17:17:24 +0000 Subject: [PATCH 5/5] . --- src/requester/requester.ts | 96 ++++++++++++++------------------------ 1 file changed, 36 insertions(+), 60 deletions(-) diff --git a/src/requester/requester.ts b/src/requester/requester.ts index 38ff9bc..df954b1 100644 --- a/src/requester/requester.ts +++ b/src/requester/requester.ts @@ -98,18 +98,7 @@ export class Requester { let response = await this.makeFetchRequest(modifiedRequest); response = this.modifyResponse(response); if (response.status >= 400 && response.status < 600) { - let errorContent = null; - try { - errorContent = JSON.parse(response.content); - } catch { - // no-op - } - if (errorContent && 'message' in errorContent) { - const fields = errorContent.fields || {}; - const exceptionType = errorContent.exceptionType || undefined; - throw new KibaException(errorContent.message, response.status, exceptionType, fields); - } - throw new KibaException(response.content, response.status, undefined, {}); + Requester.throwKibaExceptionFromErrorContent(response.content, response.status); } return response; }; @@ -189,53 +178,6 @@ export class Requester { throw new KibaException(errorContent, statusCode, undefined, {}); }; - private static processStreamLine = ( - line: string, - parseItem: (obj: Record) => StreamItemType, - onItem: (item: StreamItemType) => void, - ): void => { - const trimmedLine = line.trim(); - if (!trimmedLine) { - return; - } - let itemObject: Record; - try { - itemObject = JSON.parse(trimmedLine) as Record; - } catch (error) { - throw new KibaException(`Failed to parse streaming response line: ${trimmedLine}. Error: ${String(error)}`); - } - onItem(parseItem(itemObject)); - }; - - private static readNdjsonStream = async ( - response: Response, - parseItem: (obj: Record) => StreamItemType, - onItem: (item: StreamItemType) => void, - ): Promise => { - const reader = response.body?.getReader(); - if (!reader) { - throw new KibaException('The request was made but no response body was received.'); - } - const decoder = new TextDecoder(); - let buffer = ''; - - const readAllChunks = async (): Promise => { - const { done, value } = await reader.read(); - if (done) { - return; - } - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - lines.forEach((line: string): void => Requester.processStreamLine(line, parseItem, onItem)); - await readAllChunks(); - }; - - await readAllChunks(); - buffer += decoder.decode(); - Requester.processStreamLine(buffer, parseItem, onItem); - }; - private makeFetchRequest = async (request: KibaRequest): Promise => { const { url, fetchConfig } = this.createFetchRequestConfig(request); const fetchOperation = fetch(url.toString(), fetchConfig) @@ -272,7 +214,41 @@ export class Requester { const errorContent = await response.text(); Requester.throwKibaExceptionFromErrorContent(errorContent, response.status); } - await Requester.readNdjsonStream(response, parseItem, onItem); + const reader = response.body?.getReader(); + if (!reader) { + throw new KibaException('The request was made but no response body was received.'); + } + const processLine = (line: string): void => { + const trimmedLine = line.trim(); + if (!trimmedLine) { + return; + } + let itemObject: Record; + try { + itemObject = JSON.parse(trimmedLine) as Record; + } catch (error) { + throw new KibaException(`Failed to parse streaming response line: ${trimmedLine}. Error: ${String(error)}`); + } + onItem(parseItem(itemObject)); + }; + + const decoder = new TextDecoder(); + let buffer = ''; + const readAllChunks = async (): Promise => { + const { done, value } = await reader.read(); + if (done) { + return; + } + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + lines.forEach(processLine); + await readAllChunks(); + }; + await readAllChunks(); + buffer += decoder.decode(); + processLine(buffer); + return new KibaResponse(response.status, responseHeaders, new Date(), ''); }); const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation);