-
Notifications
You must be signed in to change notification settings - Fork 1
Feature: implement streaming api reading in client #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -80,33 +80,44 @@ export class Requester { | |
| return this.makeRequestInternal(request); | ||
| }; | ||
|
|
||
| public makeStreamingRequest = async <StreamItemType>( | ||
| method: RestMethod, | ||
| url: string, | ||
| data: Record<string, unknown> | undefined, | ||
| parseItem: (obj: Record<string, unknown>) => StreamItemType, | ||
| onItem: (item: StreamItemType) => void, | ||
| headers?: Record<string, string>, | ||
| timeout?: number, | ||
| ): Promise<void> => { | ||
| const request = new KibaRequest(method, url, headers, data, undefined, new Date(), timeout); | ||
| await this.makeStreamingRequestInternal(request, parseItem, onItem); | ||
| }; | ||
|
|
||
| private makeRequestInternal = async (request: KibaRequest): Promise<KibaResponse> => { | ||
| const modifiedRequest = this.modifyRequest(request); | ||
| let response = await this.makeFetchRequest(modifiedRequest); | ||
| response = this.modifyResponse(response); | ||
| if (response.status >= 400 && response.status < 600) { | ||
| let errorContent = null; | ||
| try { | ||
| errorContent = JSON.parse(response.content); | ||
| } catch { | ||
| // no-op | ||
| } | ||
| if (errorContent && 'message' in errorContent) { | ||
| const fields = errorContent.fields || {}; | ||
| const exceptionType = errorContent.exceptionType || undefined; | ||
| throw new KibaException(errorContent.message, response.status, exceptionType, fields); | ||
| } | ||
| throw new KibaException(response.content, response.status, undefined, {}); | ||
| Requester.throwKibaExceptionFromErrorContent(response.content, response.status); | ||
| } | ||
| return response; | ||
| }; | ||
|
|
||
| private makeFetchRequest = async (request: KibaRequest): Promise<KibaResponse> => { | ||
| private makeStreamingRequestInternal = async <StreamItemType>( | ||
| request: KibaRequest, | ||
| parseItem: (obj: Record<string, unknown>) => StreamItemType, | ||
| onItem: (item: StreamItemType) => void, | ||
| ): Promise<void> => { | ||
| const modifiedRequest = this.modifyRequest(request); | ||
| const response = await this.makeStreamingFetchRequest(modifiedRequest, parseItem, onItem); | ||
| this.modifyResponse(response); | ||
| }; | ||
|
|
||
| private createFetchRequestConfig = (request: KibaRequest): { url: URL; fetchConfig: { method: string; headers: Headers; credentials: 'include' | 'same-origin'; body?: string | FormData } } => { | ||
| const url = new URL(request.url); | ||
| const headers = new Headers({ ...this.headers, ...(request.headers || {}) }); | ||
| // NOTE(krishan711): RequestInit comes from the DOM which isn't used by default in typescript typings | ||
| // eslint-disable-next-line no-undef | ||
| const fetchConfig: RequestInit = { | ||
| const fetchConfig: { method: string; headers: Headers; credentials: 'include' | 'same-origin'; body?: string | FormData } = { | ||
| method: request.method.toUpperCase(), | ||
| headers, | ||
| credentials: this.shouldIncludeCrossSiteCredentials ? 'include' : 'same-origin', | ||
|
|
@@ -138,6 +149,37 @@ export class Requester { | |
| // headers.set('content-type', 'multipart/form-data'); | ||
| } | ||
| } | ||
| return { url, fetchConfig }; | ||
| }; | ||
|
|
||
| private static getResponseHeaders = (response: Response): Record<string, string> => { | ||
| const responseHeaders: Record<string, string> = {}; | ||
| response.headers.forEach((value: string, key: string): void => { | ||
| if (responseHeaders[key]) { | ||
| console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`); | ||
| } | ||
| responseHeaders[key] = value; | ||
| }); | ||
| return responseHeaders; | ||
| }; | ||
|
|
||
| private static throwKibaExceptionFromErrorContent = (errorContent: string, statusCode: number): never => { | ||
| let parsedErrorContent: Record<string, unknown> | null = null; | ||
| try { | ||
| parsedErrorContent = JSON.parse(errorContent) as Record<string, unknown>; | ||
| } catch { | ||
| // no-op | ||
| } | ||
| if (parsedErrorContent && 'message' in parsedErrorContent) { | ||
| const fields = parsedErrorContent.fields as Record<string, string> || {}; | ||
| const exceptionType = typeof parsedErrorContent.exceptionType === 'string' ? parsedErrorContent.exceptionType : undefined; | ||
| throw new KibaException(String(parsedErrorContent.message), statusCode, exceptionType, fields); | ||
| } | ||
| throw new KibaException(errorContent, statusCode, undefined, {}); | ||
| }; | ||
|
|
||
| private makeFetchRequest = async (request: KibaRequest): Promise<KibaResponse> => { | ||
| const { url, fetchConfig } = this.createFetchRequestConfig(request); | ||
| const fetchOperation = fetch(url.toString(), fetchConfig) | ||
| .catch((error): void => { | ||
| throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`); | ||
|
|
@@ -147,14 +189,67 @@ export class Requester { | |
| throw new KibaException('The request was made but no response was received.'); | ||
| } | ||
| const content = await response.text(); | ||
| const responseHeaders: Record<string, string> = {}; | ||
| response.headers.forEach((value: string, key: string): void => { | ||
| if (responseHeaders[key]) { | ||
| console.warn(`key ${key} will be overwritten. TODO(krish): Implement joining keys!`); | ||
| return new KibaResponse(response.status, Requester.getResponseHeaders(response), new Date(), content); | ||
| }); | ||
| const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation); | ||
| return response; | ||
| }; | ||
|
|
||
| private makeStreamingFetchRequest = async <StreamItemType>( | ||
| request: KibaRequest, | ||
| parseItem: (obj: Record<string, unknown>) => StreamItemType, | ||
| onItem: (item: StreamItemType) => void, | ||
| ): Promise<KibaResponse> => { | ||
| const { url, fetchConfig } = this.createFetchRequestConfig(request); | ||
| const fetchOperation = fetch(url.toString(), fetchConfig) | ||
| .catch((error): void => { | ||
| throw new KibaException(`The request was made but no response was received: [${error.code}] "${error.message}"`); | ||
| }) | ||
| .then(async (response: Response | void): Promise<KibaResponse> => { | ||
| if (!response) { | ||
| throw new KibaException('The request was made but no response was received.'); | ||
| } | ||
| const responseHeaders = Requester.getResponseHeaders(response); | ||
| if (response.status >= 400 && response.status < 600) { | ||
| const errorContent = await response.text(); | ||
| Requester.throwKibaExceptionFromErrorContent(errorContent, response.status); | ||
| } | ||
| const reader = response.body?.getReader(); | ||
| if (!reader) { | ||
| throw new KibaException('The request was made but no response body was received.'); | ||
| } | ||
| const processLine = (line: string): void => { | ||
| const trimmedLine = line.trim(); | ||
| if (!trimmedLine) { | ||
| return; | ||
| } | ||
| responseHeaders[key] = value; | ||
| }); | ||
| return new KibaResponse(response.status, responseHeaders, new Date(), content); | ||
| let itemObject: Record<string, unknown>; | ||
| try { | ||
| itemObject = JSON.parse(trimmedLine) as Record<string, unknown>; | ||
| } catch (error) { | ||
| throw new KibaException(`Failed to parse streaming response line: ${trimmedLine}. Error: ${String(error)}`); | ||
| } | ||
| onItem(parseItem(itemObject)); | ||
| }; | ||
|
|
||
| const decoder = new TextDecoder(); | ||
| let buffer = ''; | ||
| const readAllChunks = async (): Promise<void> => { | ||
| const { done, value } = await reader.read(); | ||
| if (done) { | ||
| return; | ||
| } | ||
| buffer += decoder.decode(value, { stream: true }); | ||
| const lines = buffer.split('\n'); | ||
| buffer = lines.pop() || ''; | ||
| lines.forEach(processLine); | ||
| await readAllChunks(); | ||
| }; | ||
| await readAllChunks(); | ||
| buffer += decoder.decode(); | ||
| processLine(buffer); | ||
|
|
||
| return new KibaResponse(response.status, responseHeaders, new Date(), ''); | ||
| }); | ||
| const response = await (request.timeoutSeconds ? timeoutPromise(request.timeoutSeconds, fetchOperation) : fetchOperation); | ||
| return response; | ||
|
Comment on lines
+254
to
+255
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chaining
.catch(...).then(...)forces thethenhandler to acceptResponse | void, which complicates typing and requires an additional!responseguard. Consider rewriting this section withtry/catcharound an awaitedfetch(...)soresponsestays aResponse, simplifying control flow and types.