Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/docs/api/BalancedPool.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ Extends: [`PoolOptions`](/docs/docs/api/Pool.md#parameter-pooloptions)
* **factory** `(origin: URL, opts: Object) => Dispatcher` - Default: `(origin, opts) => new Pool(origin, opts)`

The `PoolOptions` are passed to each of the `Pool` instances being created.

When an upstream hostname resolves to multiple DNS records, `BalancedPool`
resolves the hostname lazily at connect time and rotates new connections across
the resolved addresses. Requests continue to use the original hostname for
`Host` headers and TLS `servername`.
## Instance Properties

### `BalancedPool.upstreams`
Expand Down
80 changes: 78 additions & 2 deletions lib/dispatcher/balanced-pool.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict'

const dns = require('node:dns')
const { isIP } = require('node:net')
const {
BalancedPoolMissingUpstreamError,
InvalidArgumentError
Expand All @@ -13,6 +15,7 @@ const {
kGetDispatcher
} = require('./pool-base')
const Pool = require('./pool')
const buildConnector = require('../core/connect')
const { kUrl } = require('../core/symbols')
const util = require('../core/util')
const kFactory = Symbol('factory')
Expand Down Expand Up @@ -48,6 +51,69 @@ function defaultFactory (origin, opts) {
return new Pool(origin, opts)
}

function buildDnsBalancedConnector (origin, opts) {
const {
connect,
connectTimeout,
tls,
maxCachedSessions,
socketPath,
autoSelectFamily,
autoSelectFamilyAttemptTimeout,
allowH2
} = opts

const connector = typeof connect === 'function'
? connect
: buildConnector({
...tls,
maxCachedSessions,
allowH2,
socketPath,
timeout: connectTimeout,
...(typeof autoSelectFamily === 'boolean' ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
...connect
})

let offset = -1

return function dnsBalancedConnector (connectOpts, callback) {
dns.lookup(origin.hostname, { all: true, order: 'ipv4first' }, (err, addresses) => {
if (err) {
callback(err)
return
}

const uniqueAddresses = []
const seen = new Set()

for (const address of addresses) {
const key = `${address.address}:${address.family}`
if (seen.has(key)) {
continue
}

seen.add(key)
uniqueAddresses.push(address)
}

if (uniqueAddresses.length === 0) {
callback(new Error(`No DNS entries found for ${origin.hostname}`))
return
}

offset = (offset + 1) % uniqueAddresses.length
const address = uniqueAddresses[offset]

connector({
...connectOpts,
hostname: address.address,
servername: connectOpts.servername ?? origin.hostname
}, callback)
})
}
}

class BalancedPool extends PoolBase {
constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) {
if (typeof factory !== 'function') {
Expand All @@ -57,6 +123,7 @@ class BalancedPool extends PoolBase {
super()

this[kOptions] = { ...util.deepClone(opts) }
this[kOptions].connect = opts.connect
this[kOptions].interceptors = opts.interceptors
? { ...opts.interceptors }
: undefined
Expand All @@ -79,7 +146,8 @@ class BalancedPool extends PoolBase {
}

addUpstream (upstream) {
const upstreamOrigin = util.parseOrigin(upstream).origin
const upstreamUrl = util.parseOrigin(upstream)
const upstreamOrigin = upstreamUrl.origin

if (this[kClients].find((pool) => (
pool[kUrl].origin === upstreamOrigin &&
Expand All @@ -88,7 +156,15 @@ class BalancedPool extends PoolBase {
))) {
return this
}
const pool = this[kFactory](upstreamOrigin, this[kOptions])

const poolOptions = isIP(upstreamUrl.hostname) === 0
? {
...this[kOptions],
connect: buildDnsBalancedConnector(upstreamUrl, this[kOptions])
}
: this[kOptions]

const pool = this[kFactory](upstreamOrigin, poolOptions)

this[kAddClient](pool)
pool.on('connect', () => {
Expand Down
191 changes: 191 additions & 0 deletions test/node-test/balanced-pool.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
'use strict'

const dns = require('node:dns')
const { describe, test } = require('node:test')
const assert = require('node:assert/strict')
const { BalancedPool, Pool, Client, errors } = require('../..')
const buildConnector = require('../../lib/core/connect')
const { createServer } = require('node:http')
const { promisify } = require('node:util')
const { tspl } = require('@matteo.collina/tspl')
const { kNeedDrain, kGetDispatcher } = require('../../lib/dispatcher/pool-base')

test('throws when factory is not a function', (t) => {
const p = tspl(t, { plan: 2 })
Expand Down Expand Up @@ -298,6 +301,194 @@ test('getUpstream returns undefined for closed/destroyed upstream', (t) => {
p.strictEqual(result, undefined)
})

test('should balance hostname upstreams across resolved dns records', async (t) => {
const p = tspl(t, { plan: 13 })

const hostnames = []
const hosts = []
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
hosts.push(req.headers.host)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.after(server.close.bind(server))

await promisify(server.listen).call(server, 0)

const originalLookup = dns.lookup
dns.lookup = function lookup (hostname, options, callback) {
if (hostname !== 'service.local') {
return originalLookup.call(this, hostname, options, callback)
}

queueMicrotask(() => {
callback(null, [
{ address: '127.0.0.1', family: 4 },
{ address: '127.0.0.2', family: 4 }
])
})
}
t.after(() => {
dns.lookup = originalLookup
})

const connect = buildConnector({})
const client = new BalancedPool(`http://service.local:${server.address().port}`, {
connections: 2,
pipelining: 1,
connect (opts, callback) {
hostnames.push(opts.hostname)
connect({ ...opts, hostname: '127.0.0.1' }, callback)
}
})
t.after(client.destroy.bind(client))

const responses = await Promise.all([
client.request({ path: '/', method: 'GET' }),
client.request({ path: '/', method: 'GET' })
])

for (const { statusCode, headers, body } of responses) {
p.strictEqual(statusCode, 200)
p.strictEqual(headers['content-type'], 'text/plain')
p.strictEqual(await body.text(), 'hello')
}

p.deepStrictEqual(client.upstreams, [`http://service.local:${server.address().port}`])
p.strictEqual(hosts.length, 2)
p.strictEqual(hosts[0], `service.local:${server.address().port}`)
p.strictEqual(hosts[1], `service.local:${server.address().port}`)
p.strictEqual(hostnames.length, 2)
p.ok(hostnames.includes('127.0.0.1'))
p.ok(hostnames.includes('127.0.0.2'))
})

test('should propagate dns lookup errors for hostname upstreams', async (t) => {
const p = tspl(t, { plan: 2 })

const originalLookup = dns.lookup
dns.lookup = function lookup (hostname, options, callback) {
if (hostname !== 'service.local') {
return originalLookup.call(this, hostname, options, callback)
}

queueMicrotask(() => {
callback(new Error('lookup failed'))
})
}
t.after(() => {
dns.lookup = originalLookup
})

const client = new BalancedPool('http://service.local:1')
t.after(client.destroy.bind(client))

try {
await client.request({ path: '/', method: 'GET' })
} catch (err) {
p.strictEqual(err.message, 'lookup failed')
p.strictEqual(err.code, undefined)
}
})

test('should ignore duplicate dns records for hostname upstreams', async (t) => {
const p = tspl(t, { plan: 7 })

const hostnames = []
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.end('ok')
})
t.after(server.close.bind(server))

await promisify(server.listen).call(server, 0)

const originalLookup = dns.lookup
dns.lookup = function lookup (hostname, options, callback) {
if (hostname !== 'service.local') {
return originalLookup.call(this, hostname, options, callback)
}

queueMicrotask(() => {
callback(null, [
{ address: '127.0.0.1', family: 4 },
{ address: '127.0.0.1', family: 4 },
{ address: '127.0.0.2', family: 4 }
])
})
}
t.after(() => {
dns.lookup = originalLookup
})

const connect = buildConnector({})
const client = new BalancedPool(`http://service.local:${server.address().port}`, {
connections: 3,
pipelining: 1,
connect (opts, callback) {
hostnames.push(opts.hostname)
connect({ ...opts, hostname: '127.0.0.1' }, callback)
}
})
t.after(client.destroy.bind(client))

const responses = await Promise.all([
client.request({ path: '/', method: 'GET' }),
client.request({ path: '/', method: 'GET' }),
client.request({ path: '/', method: 'GET' })
])

for (const response of responses) {
p.strictEqual(await response.body.text(), 'ok')
}

p.strictEqual(hostnames.length, 3)
p.strictEqual(hostnames[0], '127.0.0.1')
p.strictEqual(hostnames[1], '127.0.0.2')
p.strictEqual(hostnames[2], '127.0.0.1')
})

test('should fail when dns lookup returns no records for hostname upstreams', async (t) => {
const p = tspl(t, { plan: 1 })

const originalLookup = dns.lookup
dns.lookup = function lookup (hostname, options, callback) {
if (hostname !== 'service.local') {
return originalLookup.call(this, hostname, options, callback)
}

queueMicrotask(() => {
callback(null, [])
})
}
t.after(() => {
dns.lookup = originalLookup
})

const client = new BalancedPool('http://service.local:1')
t.after(client.destroy.bind(client))

await assert.rejects(client.request({ path: '/', method: 'GET' }), {
message: 'No DNS entries found for service.local'
})
p.ok(true)
})

test('should return no dispatcher when all upstreams become busy', () => {
const pool = new BalancedPool()
const upstream = pool.addUpstream('http://localhost:3001').getUpstream('http://localhost:3001')

let accesses = 0
Object.defineProperty(upstream, kNeedDrain, {
configurable: true,
get () {
accesses++
return accesses > 1
}
})

assert.strictEqual(pool[kGetDispatcher](), undefined)
})

class TestServer {
constructor ({ config: { server, socketHangup, downOnRequests, socketHangupOnRequests }, onRequest }) {
this.config = {
Expand Down
Loading