Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions packages/grpc-js/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ function serverCallTrace(text: string) {

type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer;

/* incrementWindowSize exists at runtime but is absent from the http2 typings. */
interface IncrementWindowSize {
incrementWindowSize(delta: number): void;
}

interface BindResult {
port: number;
count: number;
Expand Down Expand Up @@ -365,9 +370,16 @@ export class Server {
}
if ('grpc.max_concurrent_streams' in this.options) {
this.commonServerOptions.settings = {
...this.commonServerOptions.settings,
maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
};
}
if ('grpc-node.flow_control_window' in this.options) {
this.commonServerOptions.settings = {
...this.commonServerOptions.settings,
initialWindowSize: this.options['grpc-node.flow_control_window'],
};
}
this.interceptors = this.options.interceptors ?? [];
this.trace('Server constructed');
}
Expand Down Expand Up @@ -1517,6 +1529,31 @@ export class Server {
return (session: http2.ServerHttp2Session) => {
this.http2Servers.get(http2Server)?.sessions.add(session);

const flowControlWindow = this.options['grpc-node.flow_control_window'];
if (flowControlWindow !== undefined) {
const defaultWindow =
http2.getDefaultSettings?.()?.initialWindowSize ?? 65535;
if (flowControlWindow > defaultWindow) {
/* The settings.initialWindowSize above sets the per-stream window.
* Also raise the connection-level window so a large multi-stream
* transfer is not throttled by the default 64 KB connection window. */
try {
session.setLocalWindowSize(flowControlWindow);
} catch {
/* Fallback for older Node, where setLocalWindowSize is unavailable:
* bump the window by the delta. incrementWindowSize is not in the
* http2 typings, so it is referenced through a narrow interface. */
const delta =
flowControlWindow - (session.state.localWindowSize ?? defaultWindow);
if (delta > 0) {
(session as unknown as IncrementWindowSize).incrementWindowSize(
delta
);
}
}
}
}

let connectionAgeTimer: NodeJS.Timeout | null = null;
let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
let keepaliveTimer: NodeJS.Timeout | null = null;
Expand Down
52 changes: 52 additions & 0 deletions packages/grpc-js/test/test-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,58 @@ describe('Server', () => {
});
});

describe('flow control window', () => {
let server: Server;
let client: ServiceClient;
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;

const serviceImplementation = {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
};

afterEach(done => {
client.close();
server.tryShutdown(done);
});

it('Should accept grpc-node.flow_control_window and round-trip a large message', done => {
const flowControlWindow = 4 * 1024 * 1024;
server = new Server({
'grpc-node.flow_control_window': flowControlWindow,
'grpc.max_receive_message_length': -1,
'grpc.max_send_message_length': -1,
});
server.addService(echoService.service, serviceImplementation);
server.bindAsync(
'localhost:0',
ServerCredentials.createInsecure(),
(err, port) => {
assert.ifError(err);
client = new echoService(
`localhost:${port}`,
grpc.credentials.createInsecure(),
{
'grpc.max_receive_message_length': -1,
'grpc.max_send_message_length': -1,
}
);
// Larger than the default 64 KB window, so the transfer exercises
// the raised connection window rather than stalling on it.
const value = 'a'.repeat(2 * 1024 * 1024);
client.echo({ value }, (error: ServiceError, response: any) => {
assert.ifError(error);
assert.strictEqual(response.value, value);
done();
});
}
);
});
});

describe('start', () => {
let server: Server;

Expand Down