diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 73d84b787..257018889 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -105,6 +105,11 @@ function serverCallTrace(text: string) { type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer; +/* incrementWindowSize exists at runtime but is absent from the http2 typings. */ +interface IncrementWindowSize { + incrementWindowSize(delta: number): void; +} + interface BindResult { port: number; count: number; @@ -365,9 +370,16 @@ export class Server { } if ('grpc.max_concurrent_streams' in this.options) { this.commonServerOptions.settings = { + ...this.commonServerOptions.settings, maxConcurrentStreams: this.options['grpc.max_concurrent_streams'], }; } + if ('grpc-node.flow_control_window' in this.options) { + this.commonServerOptions.settings = { + ...this.commonServerOptions.settings, + initialWindowSize: this.options['grpc-node.flow_control_window'], + }; + } this.interceptors = this.options.interceptors ?? []; this.trace('Server constructed'); } @@ -1517,6 +1529,31 @@ export class Server { return (session: http2.ServerHttp2Session) => { this.http2Servers.get(http2Server)?.sessions.add(session); + const flowControlWindow = this.options['grpc-node.flow_control_window']; + if (flowControlWindow !== undefined) { + const defaultWindow = + http2.getDefaultSettings?.()?.initialWindowSize ?? 65535; + if (flowControlWindow > defaultWindow) { + /* The settings.initialWindowSize above sets the per-stream window. + * Also raise the connection-level window so a large multi-stream + * transfer is not throttled by the default 64 KB connection window. */ + try { + session.setLocalWindowSize(flowControlWindow); + } catch { + /* Fallback for older Node, where setLocalWindowSize is unavailable: + * bump the window by the delta. incrementWindowSize is not in the + * http2 typings, so it is referenced through a narrow interface. */ + const delta = + flowControlWindow - (session.state.localWindowSize ?? defaultWindow); + if (delta > 0) { + (session as unknown as IncrementWindowSize).incrementWindowSize( + delta + ); + } + } + } + } + let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let keepaliveTimer: NodeJS.Timeout | null = null; diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index f259b27be..3feb7a78a 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -331,6 +331,58 @@ describe('Server', () => { }); }); + describe('flow control window', () => { + let server: Server; + let client: ServiceClient; + const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); + const echoService = loadProtoFile(protoFile) + .EchoService as ServiceClientConstructor; + + const serviceImplementation = { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + }, + }; + + afterEach(done => { + client.close(); + server.tryShutdown(done); + }); + + it('Should accept grpc-node.flow_control_window and round-trip a large message', done => { + const flowControlWindow = 4 * 1024 * 1024; + server = new Server({ + 'grpc-node.flow_control_window': flowControlWindow, + 'grpc.max_receive_message_length': -1, + 'grpc.max_send_message_length': -1, + }); + server.addService(echoService.service, serviceImplementation); + server.bindAsync( + 'localhost:0', + ServerCredentials.createInsecure(), + (err, port) => { + assert.ifError(err); + client = new echoService( + `localhost:${port}`, + grpc.credentials.createInsecure(), + { + 'grpc.max_receive_message_length': -1, + 'grpc.max_send_message_length': -1, + } + ); + // Larger than the default 64 KB window, so the transfer exercises + // the raised connection window rather than stalling on it. + const value = 'a'.repeat(2 * 1024 * 1024); + client.echo({ value }, (error: ServiceError, response: any) => { + assert.ifError(error); + assert.strictEqual(response.value, value); + done(); + }); + } + ); + }); + }); + describe('start', () => { let server: Server;