Skip to content
2 changes: 2 additions & 0 deletions docs/docs/api/Client.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Returns: `Client`
* **keepAliveTimeoutThreshold** `number | null` (optional) - Default: `2e3` - A number of milliseconds subtracted from server *keep-alive* hints when overriding `keepAliveTimeout` to account for timing inaccuracies caused by e.g. transport latency. Defaults to 2 seconds.
* **maxHeaderSize** `number | null` (optional) - Default: `--max-http-header-size` or `16384` - The maximum length of request headers in bytes. Defaults to Node.js' --max-http-header-size or 16KiB.
* **maxResponseSize** `number | null` (optional) - Default: `-1` - The maximum length of response body in bytes. Set to `-1` to disable.
* **webSocket** `WebSocketOptions` (optional) - WebSocket-specific configuration options.
* **maxPayloadSize** `number` (optional) - Default: `134217728` (128 MB) - Maximum allowed payload size in bytes for WebSocket messages. Applied to uncompressed messages, compressed frame payloads, and decompressed (permessage-deflate) messages. Set to 0 to disable the limit.
* **pipelining** `number | null` (optional) - Default: `1` - The amount of concurrent requests to be sent over the single TCP/TLS connection according to [RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2). Carefully consider your workload and environment before enabling concurrent requests as pipelining may reduce performance if used incorrectly. Pipelining is sensitive to network stack settings as well as head of line blocking caused by e.g. long running requests. Set to `0` to disable keep-alive connections.
* **connect** `ConnectOptions | Function | null` (optional) - Default: `null`.
* **strictContentLength** `Boolean` (optional) - Default: `true` - Whether to treat request content length mismatches as errors. If true, an error is thrown when the request content-length header doesn't match the length of the request body. **Security Warning:** Disabling this option can expose your application to HTTP Request Smuggling attacks, where mismatched content-length headers cause servers and proxies to interpret request boundaries differently. This can lead to cache poisoning, credential hijacking, and bypassing security controls. Only disable this in controlled environments where you fully trust the request source.
Expand Down
2 changes: 1 addition & 1 deletion lib/dispatcher/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Agent extends DispatcherBase {
throw new InvalidArgumentError('maxOrigins must be a number greater than 0')
}

super()
super(options)

if (connect && typeof connect !== 'function') {
connect = { ...connect }
Expand Down
5 changes: 3 additions & 2 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class Client extends DispatcherBase {
useH2c,
initialWindowSize,
connectionWindowSize,
pingInterval
pingInterval,
webSocket
} = {}) {
if (keepAlive !== undefined) {
throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
Expand Down Expand Up @@ -222,7 +223,7 @@ class Client extends DispatcherBase {
throw new InvalidArgumentError('pingInterval must be a positive integer, greater or equal to 0')
}

super()
super({ webSocket })

if (typeof connect !== 'function') {
connect = buildConnector({
Expand Down
18 changes: 18 additions & 0 deletions lib/dispatcher/dispatcher-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const { kDestroy, kClose, kClosed, kDestroyed, kDispatch } = require('../core/sy

const kOnDestroyed = Symbol('onDestroyed')
const kOnClosed = Symbol('onClosed')
const kWebSocketOptions = Symbol('webSocketOptions')

class DispatcherBase extends Dispatcher {
/** @type {boolean} */
Expand All @@ -25,6 +26,23 @@ class DispatcherBase extends Dispatcher {
/** @type {Array<Function>|null} */
[kOnClosed] = null

/**
* @param {import('../../types/dispatcher').DispatcherOptions} [opts]
*/
constructor (opts) {
super()
this[kWebSocketOptions] = opts?.webSocket ?? {}
}

/**
* @returns {import('../../types/dispatcher').WebSocketOptions}
*/
get webSocketOptions () {
return {
maxPayloadSize: this[kWebSocketOptions].maxPayloadSize ?? 128 * 1024 * 1024 // 128 MB default
}
}

/** @returns {boolean} */
get destroyed () {
return this[kDestroyed]
Expand Down
2 changes: 1 addition & 1 deletion lib/dispatcher/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Pool extends PoolBase {
})
}

super()
super(options)

this[kConnections] = connections || null
this[kUrl] = util.parseOrigin(origin)
Expand Down
44 changes: 13 additions & 31 deletions lib/web/websocket/permessage-deflate.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,35 @@ const tail = Buffer.from([0x00, 0x00, 0xff, 0xff])
const kBuffer = Symbol('kBuffer')
const kLength = Symbol('kLength')

// Default maximum decompressed message size: 4 MB
const kDefaultMaxDecompressedSize = 4 * 1024 * 1024

class PerMessageDeflate {
/** @type {import('node:zlib').InflateRaw} */
#inflate

#options = {}

/** @type {boolean} */
#aborted = false

/** @type {Function|null} */
#currentCallback = null
#maxPayloadSize = 0

/**
* @param {Map<string, string>} extensions
*/
constructor (extensions) {
constructor (extensions, options) {
this.#options.serverNoContextTakeover = extensions.has('server_no_context_takeover')
this.#options.serverMaxWindowBits = extensions.get('server_max_window_bits')

this.#maxPayloadSize = options.maxPayloadSize
}

/**
* Decompress a compressed payload.
* @param {Buffer} chunk Compressed data
* @param {boolean} fin Final fragment flag
* @param {Function} callback Callback function
*/
decompress (chunk, fin, callback) {
// An endpoint uses the following algorithm to decompress a message.
// 1. Append 4 octets of 0x00 0x00 0xff 0xff to the tail end of the
// payload of the message.
// 2. Decompress the resulting data using DEFLATE.

if (this.#aborted) {
callback(new MessageSizeExceededError())
return
}

if (!this.#inflate) {
let windowBits = Z_DEFAULT_WINDOWBITS

Expand All @@ -64,23 +59,12 @@ class PerMessageDeflate {
this.#inflate[kLength] = 0

this.#inflate.on('data', (data) => {
if (this.#aborted) {
return
}

this.#inflate[kLength] += data.length

if (this.#inflate[kLength] > kDefaultMaxDecompressedSize) {
this.#aborted = true
if (this.#maxPayloadSize > 0 && this.#inflate[kLength] > this.#maxPayloadSize) {
callback(new MessageSizeExceededError())
this.#inflate.removeAllListeners()
this.#inflate.destroy()
this.#inflate = null

if (this.#currentCallback) {
const cb = this.#currentCallback
this.#currentCallback = null
cb(new MessageSizeExceededError())
}
return
}

Expand All @@ -93,22 +77,20 @@ class PerMessageDeflate {
})
}

this.#currentCallback = callback
this.#inflate.write(chunk)
if (fin) {
this.#inflate.write(tail)
}

this.#inflate.flush(() => {
if (this.#aborted || !this.#inflate) {
if (!this.#inflate) {
return
}

const full = Buffer.concat(this.#inflate[kBuffer], this.#inflate[kLength])

this.#inflate[kBuffer].length = 0
this.#inflate[kLength] = 0
this.#currentCallback = null

callback(null, full)
})
Expand Down
84 changes: 62 additions & 22 deletions lib/web/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,23 @@ class ByteParser extends Writable {
/** @type {import('./websocket').Handler} */
#handler

/** @type {number} */
#maxPayloadSize

/**
* @param {import('./websocket').Handler} handler
* @param {Map<string, string>|null} extensions
* @param {{ maxPayloadSize?: number }} [options]
*/
constructor (handler, extensions) {
constructor (handler, extensions, options = {}) {
super()

this.#handler = handler
this.#extensions = extensions == null ? new Map() : extensions
this.#maxPayloadSize = options.maxPayloadSize ?? 0

if (this.#extensions.has('permessage-deflate')) {
this.#extensions.set('permessage-deflate', new PerMessageDeflate(extensions))
this.#extensions.set('permessage-deflate', new PerMessageDeflate(extensions, options))
}
}

Expand All @@ -66,6 +71,19 @@ class ByteParser extends Writable {
this.run(callback)
}

#validatePayloadLength () {
if (
this.#maxPayloadSize > 0 &&
!isControlFrame(this.#info.opcode) &&
this.#info.payloadLength > this.#maxPayloadSize
) {
failWebsocketConnection(this.#handler, 1009, 'Payload size exceeds maximum allowed size')
return false
}

return true
}

/**
* Runs whenever a new chunk is received.
* Callback is called whenever there are no more chunks buffering,
Expand Down Expand Up @@ -154,6 +172,10 @@ class ByteParser extends Writable {
if (payloadLength <= 125) {
this.#info.payloadLength = payloadLength
this.#state = parserStates.READ_DATA

if (!this.#validatePayloadLength()) {
return
}
} else if (payloadLength === 126) {
this.#state = parserStates.PAYLOADLENGTH_16
} else if (payloadLength === 127) {
Expand All @@ -178,6 +200,10 @@ class ByteParser extends Writable {

this.#info.payloadLength = buffer.readUInt16BE(0)
this.#state = parserStates.READ_DATA

if (!this.#validatePayloadLength()) {
return
}
} else if (this.#state === parserStates.PAYLOADLENGTH_64) {
if (this.#byteOffset < 8) {
return callback()
Expand All @@ -200,6 +226,10 @@ class ByteParser extends Writable {

this.#info.payloadLength = lower
this.#state = parserStates.READ_DATA

if (!this.#validatePayloadLength()) {
return
}
} else if (this.#state === parserStates.READ_DATA) {
if (this.#byteOffset < this.#info.payloadLength) {
return callback()
Expand All @@ -224,29 +254,39 @@ class ByteParser extends Writable {

this.#state = parserStates.INFO
} else {
this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => {
if (error) {
// Use 1009 (Message Too Big) for decompression size limit errors
const code = error instanceof MessageSizeExceededError ? 1009 : 1007
failWebsocketConnection(this.#handler, code, error.message)
return
}

this.writeFragments(data)
this.#extensions.get('permessage-deflate').decompress(
body,
this.#info.fin,
(error, data) => {
if (error) {
const code = error instanceof MessageSizeExceededError ? 1009 : 1007
failWebsocketConnection(this.#handler, code, error.message)
return
}

this.writeFragments(data)

// Check cumulative fragment size
if (this.#maxPayloadSize > 0 && this.#fragmentsBytes > this.#maxPayloadSize) {
failWebsocketConnection(this.#handler, 1009, new MessageSizeExceededError().message)
return
}

if (!this.#info.fin) {
this.#state = parserStates.INFO
this.#loop = true
this.run(callback)
return
}

websocketMessageReceived(this.#handler, this.#info.binaryType, this.consumeFragments())

if (!this.#info.fin) {
this.#state = parserStates.INFO
this.#loop = true
this.#state = parserStates.INFO
this.run(callback)
return
}

websocketMessageReceived(this.#handler, this.#info.binaryType, this.consumeFragments())

this.#loop = true
this.#state = parserStates.INFO
this.run(callback)
})
},
this.#fragmentsBytes
Comment thread
KhafraDev marked this conversation as resolved.
)

this.#loop = false
break
Expand Down
7 changes: 6 additions & 1 deletion lib/web/websocket/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,12 @@ class WebSocket extends EventTarget {
// once this happens, the connection is open
this.#handler.socket = response.socket

const parser = new ByteParser(this.#handler, parsedExtensions)
// Get maxPayloadSize from dispatcher options
const maxPayloadSize = this.#handler.controller.dispatcher?.webSocketOptions?.maxPayloadSize

const parser = new ByteParser(this.#handler, parsedExtensions, {
maxPayloadSize
})
parser.on('drain', () => this.#handler.onParserDrain())
parser.on('error', (err) => this.#handler.onParserError(err))

Expand Down
Loading
Loading