diff --git a/src/requester/requester.ts b/src/requester/requester.ts index 8474f31..df954b1 100644 --- a/src/requester/requester.ts +++ b/src/requester/requester.ts @@ -80,33 +80,44 @@ export class Requester { return this.makeRequestInternal(request); }; + public makeStreamingRequest = async ( + method: RestMethod, + url: string, + data: Record | undefined, + parseItem: (obj: Record) => StreamItemType, + onItem: (item: StreamItemType) => void, + headers?: Record, + timeout?: number, + ): Promise => { + const request = new KibaRequest(method, url, headers, data, undefined, new Date(), timeout); + await this.makeStreamingRequestInternal(request, parseItem, onItem); + }; + private makeRequestInternal = async (request: KibaRequest): Promise => { const modifiedRequest = this.modifyRequest(request); let response = await this.makeFetchRequest(modifiedRequest); response = this.modifyResponse(response); if (response.status >= 400 && response.status < 600) { - let errorContent = null; - try { - errorContent = JSON.parse(response.content); - } catch { - // no-op - } - if (errorContent && 'message' in errorContent) { - const fields = errorContent.fields || {}; - const exceptionType = errorContent.exceptionType || undefined; - throw new KibaException(errorContent.message, response.status, exceptionType, fields); - } - throw new KibaException(response.content, response.status, undefined, {}); + Requester.throwKibaExceptionFromErrorContent(response.content, response.status); } return response; }; - private makeFetchRequest = async (request: KibaRequest): Promise => { + private makeStreamingRequestInternal = async ( + request: KibaRequest, + parseItem: (obj: Record) => StreamItemType, + onItem: (item: StreamItemType) => void, + ): Promise => { + const modifiedRequest = this.modifyRequest(request); + const response = await this.makeStreamingFetchRequest(modifiedRequest, parseItem, onItem); + this.modifyResponse(response); + }; + + private createFetchRequestConfig = (request: KibaRequest): { url: URL; fetchConfig: { method: string; headers: Headers; credentials: 'include' | 'same-origin'; body?: string | FormData } } => { const url = new URL(request.url); const headers = new Headers({ ...this.headers, ...(request.headers || {}) }); // NOTE(krishan711): RequestInit comes from the DOM which isn't used by default in typescript typings - // eslint-disable-next-line no-undef - const fetchConfig: RequestInit = { + const fetchConfig: { method: string; headers: Headers; credentials: 'include' | 'same-origin'; body?: string | FormData } = { method: request.method.toUpperCase(), headers, credentials: this.shouldIncludeCrossSiteCredentials ? 'include' : 'same-origin', @@ -138,6 +149,37 @@ export class Requester { // headers.set('content-type', 'multipart/form-data'); } } + return { url, fetchConfig }; + }; + + private static getResponseHeaders = (response: Response): Record => { + const responseHeaders: Record = {}; + response.headers.forEach((value: string, key: string): void => { + if (responseHeaders[key]) { + console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`); + } + responseHeaders[key] = value; + }); + return responseHeaders; + }; + + private static throwKibaExceptionFromErrorContent = (errorContent: string, statusCode: number): never => { + let parsedErrorContent: Record | null = null; + try { + parsedErrorContent = JSON.parse(errorContent) as Record; + } catch { + // no-op + } + if (parsedErrorContent && 'message' in parsedErrorContent) { + const fields = parsedErrorContent.fields as Record || {}; + const exceptionType = typeof parsedErrorContent.exceptionType === 'string' ? parsedErrorContent.exceptionType : undefined; + throw new KibaException(String(parsedErrorContent.message), statusCode, exceptionType, fields); + } + throw new KibaException(errorContent, statusCode, undefined, {}); + }; + + private makeFetchRequest = async (request: KibaRequest): Promise => { + const { url, fetchConfig } = this.createFetchRequestConfig(request); const fetchOperation = fetch(url.toString(), fetchConfig) .catch((error): void => { throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`); @@ -147,14 +189,67 @@ export class Requester { throw new KibaException('The request was made but no response was received.'); } const content = await response.text(); - const responseHeaders: Record = {}; - response.headers.forEach((value: string, key: string): void => { - if (responseHeaders[key]) { - console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`); + return new KibaResponse(response.status, Requester.getResponseHeaders(response), new Date(), content); + }); + const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation); + return response; + }; + + private makeStreamingFetchRequest = async ( + request: KibaRequest, + parseItem: (obj: Record) => StreamItemType, + onItem: (item: StreamItemType) => void, + ): Promise => { + const { url, fetchConfig } = this.createFetchRequestConfig(request); + const fetchOperation = fetch(url.toString(), fetchConfig) + .catch((error): void => { + throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`); + }) + .then(async (response: Response | void): Promise => { + if (!response) { + throw new KibaException('The request was made but no response was received.'); + } + const responseHeaders = Requester.getResponseHeaders(response); + if (response.status >= 400 && response.status < 600) { + const errorContent = await response.text(); + Requester.throwKibaExceptionFromErrorContent(errorContent, response.status); + } + const reader = response.body?.getReader(); + if (!reader) { + throw new KibaException('The request was made but no response body was received.'); + } + const processLine = (line: string): void => { + const trimmedLine = line.trim(); + if (!trimmedLine) { + return; } - responseHeaders[key] = value; - }); - return new KibaResponse(response.status, responseHeaders, new Date(), content); + let itemObject: Record; + try { + itemObject = JSON.parse(trimmedLine) as Record; + } catch (error) { + throw new KibaException(`Failed to parse streaming response line: ${trimmedLine}. Error: ${String(error)}`); + } + onItem(parseItem(itemObject)); + }; + + const decoder = new TextDecoder(); + let buffer = ''; + const readAllChunks = async (): Promise => { + const { done, value } = await reader.read(); + if (done) { + return; + } + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + lines.forEach(processLine); + await readAllChunks(); + }; + await readAllChunks(); + buffer += decoder.decode(); + processLine(buffer); + + return new KibaResponse(response.status, responseHeaders, new Date(), ''); }); const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation); return response; diff --git a/src/services/serviceClient.ts b/src/services/serviceClient.ts index f0e7bfd..c1cfaea 100644 --- a/src/services/serviceClient.ts +++ b/src/services/serviceClient.ts @@ -17,8 +17,9 @@ export class ResponseData { // } } -// eslint-disable-next-line @typescript-eslint/no-explicit-any -type Constructor = new (...args: any[]) => T; +export type ResponseDataClass = { + fromObject: (obj: RawObject) => T; +}; export class ServiceClient { protected requester: Requester; @@ -29,10 +30,27 @@ export class ServiceClient { this.baseUrl = baseUrl; } - protected makeRequest = async (method: RestMethod, path: string, request?: RequestData | undefined, responseClass?: Constructor | undefined, additionalHeaders?: Record): Promise => { + protected makeRequest = async ( + method: RestMethod, + path: string, + request?: RequestData | undefined, + responseClass?: ResponseDataClass | undefined, + additionalHeaders?: Record, + ): Promise => { const url = `${this.baseUrl}/${path}`; const response = await this.requester.makeRequest(method, url, request?.toObject(), additionalHeaders); - // @ts-ignore - return responseClass ? responseClass.fromObject(JSON.parse(response.content)) : null; + return responseClass ? responseClass.fromObject(JSON.parse(response.content) as RawObject) : null as unknown as ResponseType; + }; + + protected makeStreamingRequest = async ( + method: RestMethod, + path: string, + request: RequestData | undefined, + streamItemClass: ResponseDataClass, + onStreamItem: (streamItem: StreamItemType) => void, + additionalHeaders?: Record, + ): Promise => { + const url = `${this.baseUrl}/${path}`; + await this.requester.makeStreamingRequest(method, url, request?.toObject(), streamItemClass.fromObject, onStreamItem, additionalHeaders); }; }