diff --git a/connection.js b/connection.js index ffef33a..d87ddd3 100644 --- a/connection.js +++ b/connection.js @@ -4,6 +4,7 @@ import { initPeers } from './lib/peers.js'; import { createWebHandler, setNet as webSetNet } from './lib/web-handler.js'; import { setNet as listenSetNet, initListeners } from './lib/socket-listeners.js'; import { setNet as relaySetNet, initRelays } from './lib/socket-relays.js'; +import { setDgram, initUdpRelays } from './lib/udp-relays.js'; import fetch from './lib/fetch.js'; const debug = createDebug('hsync:info'); @@ -22,6 +23,8 @@ export function setNet(netImpl) { relaySetNet(netImpl); } +export { setDgram }; + export function setMqtt(mqttImpl) { mqtt = mqttImpl; } @@ -66,6 +69,7 @@ export async function createHsync(config) { hsyncClient.peers = initPeers(hsyncClient); hsyncClient.listeners = initListeners(hsyncClient); hsyncClient.relays = initRelays(hsyncClient); + hsyncClient.udpRelays = initUdpRelays(hsyncClient); const events = new EventEmitter(); diff --git a/hsync.js b/hsync.js index e8b7243..5bac947 100644 --- a/hsync.js +++ b/hsync.js @@ -1,7 +1,8 @@ import net from 'net'; +import dgram from 'dgram'; import mqtt from 'mqtt'; import createDebug from 'debug'; -import { createHsync, setNet, setMqtt } from './connection.js'; +import { createHsync, setNet, setMqtt, setDgram } from './connection.js'; import config from './config.js'; import { setRTC } from './lib/peers.js'; import rtc from './lib/rtc-node.js'; @@ -10,6 +11,7 @@ const debugError = createDebug('errors'); setRTC(rtc); setNet(net); +setDgram(dgram); setMqtt(mqtt); process.on('unhandledRejection', (reason, p) => { diff --git a/lib/udp-relays.js b/lib/udp-relays.js new file mode 100644 index 0000000..58954f7 --- /dev/null +++ b/lib/udp-relays.js @@ -0,0 +1,163 @@ +import createDebug from 'debug'; + +const debug = createDebug('hsync:udp-relay'); +const debugError = createDebug('hsync:error'); + +debugError.color = 1; + +let dgram; + +export function setDgram(dgramImpl) { + dgram = dgramImpl; +} + +export function initUdpRelays(hsyncClient) { + const cachedUdpRelays = {}; + const udpSockets = {}; + + function getUdpRelays() { + const keys = Object.keys(cachedUdpRelays); + debug('getUdpRelays', keys); + return keys.map((key) => { + const relay = cachedUdpRelays[key]; + return { + port: relay.port, + targetHost: relay.targetHost, + targetPort: relay.targetPort, + whitelist: relay.whitelist || '', + blacklist: relay.blacklist || '', + multicast: relay.multicast || null, + }; + }); + } + + function addUdpRelay({ whitelist, blacklist, port, targetPort, targetHost, multicast }) { + targetPort = targetPort || port; + targetHost = targetHost || 'localhost'; + debug('creating UDP relay', whitelist, blacklist, port, targetPort, targetHost, multicast); + + const newRelay = { + whitelist, + blacklist, + port, + targetPort, + targetHost, + multicast, + }; + cachedUdpRelays['u' + port] = newRelay; + + // Create the UDP socket for this relay + const socket = dgram.createSocket('udp4'); + socket.relayPort = port; + udpSockets['u' + port] = socket; + + socket.on('message', (msg, rinfo) => { + debug(`UDP message from ${rinfo.address}:${rinfo.port}`, msg.length, 'bytes'); + // Forward to peer via RTC if available + if (hsyncClient.udpMessageHandler) { + hsyncClient.udpMessageHandler({ + port, + data: msg, + remoteAddress: rinfo.address, + remotePort: rinfo.port, + }); + } + }); + + socket.on('error', (err) => { + debugError('UDP socket error', port, err); + socket.close(); + delete udpSockets['u' + port]; + }); + + socket.on('listening', () => { + const address = socket.address(); + debug(`UDP relay listening on ${address.address}:${address.port}`); + + // Join multicast group if specified + if (multicast) { + try { + socket.addMembership(multicast); + debug(`Joined multicast group ${multicast}`); + } catch (e) { + debugError('Failed to join multicast group', multicast, e); + } + } + }); + + socket.bind(port); + + return newRelay; + } + + function sendUdpMessage({ port, data, targetHost, targetPort }) { + const relay = cachedUdpRelays['u' + port]; + if (!relay) { + throw new Error('no UDP relay found for port: ' + port); + } + + const socket = udpSockets['u' + port]; + if (!socket) { + throw new Error('no UDP socket found for port: ' + port); + } + + const host = targetHost || relay.targetHost; + const destPort = targetPort || relay.targetPort; + + return new Promise((resolve, reject) => { + socket.send(data, destPort, host, (err) => { + if (err) { + debugError('UDP send error', err); + reject(err); + } else { + debug(`UDP sent ${data.length} bytes to ${host}:${destPort}`); + resolve({ sent: data.length, host, port: destPort }); + } + }); + }); + } + + function removeUdpRelay(port) { + const key = 'u' + port; + const socket = udpSockets[key]; + if (socket) { + socket.close(); + delete udpSockets[key]; + } + delete cachedUdpRelays[key]; + debug('removed UDP relay', port); + } + + function closeAllUdpRelays() { + Object.keys(udpSockets).forEach((key) => { + const socket = udpSockets[key]; + if (socket) { + socket.close(); + } + }); + Object.keys(cachedUdpRelays).forEach((key) => { + delete cachedUdpRelays[key]; + }); + Object.keys(udpSockets).forEach((key) => { + delete udpSockets[key]; + }); + debug('closed all UDP relays'); + } + + // Attach to hsyncClient + hsyncClient.cachedUdpRelays = cachedUdpRelays; + hsyncClient.udpSockets = udpSockets; + hsyncClient.addUdpRelay = addUdpRelay; + hsyncClient.getUdpRelays = getUdpRelays; + hsyncClient.sendUdpMessage = sendUdpMessage; + hsyncClient.removeUdpRelay = removeUdpRelay; + hsyncClient.closeAllUdpRelays = closeAllUdpRelays; + + return { + getUdpRelays, + addUdpRelay, + sendUdpMessage, + removeUdpRelay, + closeAllUdpRelays, + }; +} diff --git a/test/unit/udp-relays.test.js b/test/unit/udp-relays.test.js new file mode 100644 index 0000000..d1d3a9c --- /dev/null +++ b/test/unit/udp-relays.test.js @@ -0,0 +1,259 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { initUdpRelays, setDgram } from '../../lib/udp-relays.js'; + +describe('udp-relays', () => { + let mockDgram; + let mockSocket; + let mockHsyncClient; + let messageHandler; + let _errorHandler; + let listeningHandler; + + beforeEach(() => { + messageHandler = null; + _errorHandler = null; + listeningHandler = null; + + mockSocket = { + relayPort: null, + bind: vi.fn(), + close: vi.fn(), + send: vi.fn((data, port, host, cb) => cb && cb()), + address: vi.fn(() => ({ address: '0.0.0.0', port: 5000 })), + addMembership: vi.fn(), + on: vi.fn((event, handler) => { + if (event === 'message') messageHandler = handler; + if (event === 'error') _errorHandler = handler; + if (event === 'listening') listeningHandler = handler; + }), + }; + + mockDgram = { + createSocket: vi.fn(() => mockSocket), + }; + + mockHsyncClient = { + myHostName: 'local.example.com', + }; + + setDgram(mockDgram); + }); + + describe('setDgram', () => { + it('should set the dgram implementation', () => { + const customDgram = { createSocket: vi.fn() }; + setDgram(customDgram); + // No error means success + }); + }); + + describe('initUdpRelays', () => { + it('should return object with required methods', () => { + const udpRelays = initUdpRelays(mockHsyncClient); + + expect(udpRelays.addUdpRelay).toBeTypeOf('function'); + expect(udpRelays.getUdpRelays).toBeTypeOf('function'); + expect(udpRelays.sendUdpMessage).toBeTypeOf('function'); + expect(udpRelays.removeUdpRelay).toBeTypeOf('function'); + expect(udpRelays.closeAllUdpRelays).toBeTypeOf('function'); + }); + + it('should attach methods to hsyncClient', () => { + initUdpRelays(mockHsyncClient); + + expect(mockHsyncClient.cachedUdpRelays).toBeTypeOf('object'); + expect(mockHsyncClient.udpSockets).toBeTypeOf('object'); + expect(mockHsyncClient.addUdpRelay).toBeTypeOf('function'); + expect(mockHsyncClient.getUdpRelays).toBeTypeOf('function'); + expect(mockHsyncClient.sendUdpMessage).toBeTypeOf('function'); + }); + }); + + describe('addUdpRelay', () => { + let udpRelays; + + beforeEach(() => { + udpRelays = initUdpRelays(mockHsyncClient); + }); + + it('should create UDP socket and bind to port', () => { + udpRelays.addUdpRelay({ port: 5000 }); + + expect(mockDgram.createSocket).toHaveBeenCalledWith('udp4'); + expect(mockSocket.bind).toHaveBeenCalledWith(5000); + }); + + it('should use localhost as default target host', () => { + const relay = udpRelays.addUdpRelay({ port: 5000 }); + + expect(relay.targetHost).toBe('localhost'); + }); + + it('should use same port as default target port', () => { + const relay = udpRelays.addUdpRelay({ port: 5000 }); + + expect(relay.targetPort).toBe(5000); + }); + + it('should store relay with custom options', () => { + const relay = udpRelays.addUdpRelay({ + port: 5000, + targetPort: 6000, + targetHost: 'remote.local', + whitelist: 'allowed.com', + blacklist: 'blocked.com', + }); + + expect(relay.port).toBe(5000); + expect(relay.targetPort).toBe(6000); + expect(relay.targetHost).toBe('remote.local'); + expect(relay.whitelist).toBe('allowed.com'); + expect(relay.blacklist).toBe('blocked.com'); + }); + + it('should support multicast group', () => { + udpRelays.addUdpRelay({ port: 5000, multicast: '239.1.2.3' }); + + // Trigger listening handler to join multicast + listeningHandler(); + + expect(mockSocket.addMembership).toHaveBeenCalledWith('239.1.2.3'); + }); + + it('should call udpMessageHandler on incoming message', () => { + mockHsyncClient.udpMessageHandler = vi.fn(); + udpRelays.addUdpRelay({ port: 5000 }); + + // Simulate incoming message + const testData = Buffer.from('test message'); + const rinfo = { address: '192.168.1.1', port: 12345 }; + messageHandler(testData, rinfo); + + expect(mockHsyncClient.udpMessageHandler).toHaveBeenCalledWith({ + port: 5000, + data: testData, + remoteAddress: '192.168.1.1', + remotePort: 12345, + }); + }); + }); + + describe('getUdpRelays', () => { + let udpRelays; + + beforeEach(() => { + udpRelays = initUdpRelays(mockHsyncClient); + }); + + it('should return empty array when no relays', () => { + const result = udpRelays.getUdpRelays(); + + expect(result).toEqual([]); + }); + + it('should return relay info', () => { + udpRelays.addUdpRelay({ + port: 5000, + targetPort: 6000, + targetHost: 'remote.local', + }); + + const result = udpRelays.getUdpRelays(); + + expect(result).toHaveLength(1); + expect(result[0].port).toBe(5000); + expect(result[0].targetHost).toBe('remote.local'); + expect(result[0].targetPort).toBe(6000); + }); + }); + + describe('sendUdpMessage', () => { + let udpRelays; + + beforeEach(() => { + udpRelays = initUdpRelays(mockHsyncClient); + }); + + it('should throw if no relay found for port', () => { + expect(() => + udpRelays.sendUdpMessage({ port: 9999, data: Buffer.from('test') }) + ).toThrow('no UDP relay found for port: 9999'); + }); + + it('should send data to target host and port', async () => { + udpRelays.addUdpRelay({ + port: 5000, + targetPort: 6000, + targetHost: 'remote.local', + }); + + const data = Buffer.from('test message'); + const result = await udpRelays.sendUdpMessage({ port: 5000, data }); + + expect(mockSocket.send).toHaveBeenCalledWith(data, 6000, 'remote.local', expect.any(Function)); + expect(result.sent).toBe(data.length); + expect(result.host).toBe('remote.local'); + expect(result.port).toBe(6000); + }); + + it('should allow overriding target host and port', async () => { + udpRelays.addUdpRelay({ port: 5000 }); + + const data = Buffer.from('test'); + await udpRelays.sendUdpMessage({ + port: 5000, + data, + targetHost: 'override.local', + targetPort: 7000, + }); + + expect(mockSocket.send).toHaveBeenCalledWith( + data, + 7000, + 'override.local', + expect.any(Function) + ); + }); + }); + + describe('removeUdpRelay', () => { + let udpRelays; + + beforeEach(() => { + udpRelays = initUdpRelays(mockHsyncClient); + }); + + it('should close socket and remove relay', () => { + udpRelays.addUdpRelay({ port: 5000 }); + + udpRelays.removeUdpRelay(5000); + + expect(mockSocket.close).toHaveBeenCalled(); + expect(udpRelays.getUdpRelays()).toHaveLength(0); + }); + }); + + describe('closeAllUdpRelays', () => { + let udpRelays; + + beforeEach(() => { + udpRelays = initUdpRelays(mockHsyncClient); + }); + + it('should close all sockets and clear relays', () => { + const socket1 = { ...mockSocket, close: vi.fn() }; + const socket2 = { ...mockSocket, close: vi.fn() }; + let socketIndex = 0; + mockDgram.createSocket = vi.fn(() => (socketIndex++ === 0 ? socket1 : socket2)); + + udpRelays.addUdpRelay({ port: 5000 }); + udpRelays.addUdpRelay({ port: 5001 }); + + udpRelays.closeAllUdpRelays(); + + expect(socket1.close).toHaveBeenCalled(); + expect(socket2.close).toHaveBeenCalled(); + expect(udpRelays.getUdpRelays()).toHaveLength(0); + }); + }); +});