Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { initPeers } from './lib/peers.js';
import { createWebHandler, setNet as webSetNet } from './lib/web-handler.js';
import { setNet as listenSetNet, initListeners } from './lib/socket-listeners.js';
import { setNet as relaySetNet, initRelays } from './lib/socket-relays.js';
import { setDgram, initUdpRelays } from './lib/udp-relays.js';
import fetch from './lib/fetch.js';

const debug = createDebug('hsync:info');
Expand All @@ -22,6 +23,8 @@ export function setNet(netImpl) {
relaySetNet(netImpl);
}

export { setDgram };

export function setMqtt(mqttImpl) {
mqtt = mqttImpl;
}
Expand Down Expand Up @@ -66,6 +69,7 @@ export async function createHsync(config) {
hsyncClient.peers = initPeers(hsyncClient);
hsyncClient.listeners = initListeners(hsyncClient);
hsyncClient.relays = initRelays(hsyncClient);
hsyncClient.udpRelays = initUdpRelays(hsyncClient);

const events = new EventEmitter();

Expand Down
4 changes: 3 additions & 1 deletion hsync.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import net from 'net';
import dgram from 'dgram';
import mqtt from 'mqtt';
import createDebug from 'debug';
import { createHsync, setNet, setMqtt } from './connection.js';
import { createHsync, setNet, setMqtt, setDgram } from './connection.js';
import config from './config.js';
import { setRTC } from './lib/peers.js';
import rtc from './lib/rtc-node.js';
Expand All @@ -10,6 +11,7 @@ const debugError = createDebug('errors');

setRTC(rtc);
setNet(net);
setDgram(dgram);
setMqtt(mqtt);

process.on('unhandledRejection', (reason, p) => {
Expand Down
163 changes: 163 additions & 0 deletions lib/udp-relays.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import createDebug from 'debug';

const debug = createDebug('hsync:udp-relay');
const debugError = createDebug('hsync:error');

debugError.color = 1;

let dgram;

export function setDgram(dgramImpl) {
dgram = dgramImpl;
}

export function initUdpRelays(hsyncClient) {
const cachedUdpRelays = {};
const udpSockets = {};

function getUdpRelays() {
const keys = Object.keys(cachedUdpRelays);
debug('getUdpRelays', keys);
return keys.map((key) => {
const relay = cachedUdpRelays[key];
return {
port: relay.port,
targetHost: relay.targetHost,
targetPort: relay.targetPort,
whitelist: relay.whitelist || '',
blacklist: relay.blacklist || '',
multicast: relay.multicast || null,
};
});
}

function addUdpRelay({ whitelist, blacklist, port, targetPort, targetHost, multicast }) {
targetPort = targetPort || port;
targetHost = targetHost || 'localhost';
debug('creating UDP relay', whitelist, blacklist, port, targetPort, targetHost, multicast);

const newRelay = {
whitelist,
blacklist,
port,
targetPort,
targetHost,
multicast,
};
cachedUdpRelays['u' + port] = newRelay;

// Create the UDP socket for this relay
const socket = dgram.createSocket('udp4');
socket.relayPort = port;
udpSockets['u' + port] = socket;

socket.on('message', (msg, rinfo) => {
debug(`UDP message from ${rinfo.address}:${rinfo.port}`, msg.length, 'bytes');
// Forward to peer via RTC if available
if (hsyncClient.udpMessageHandler) {
hsyncClient.udpMessageHandler({
port,
data: msg,
remoteAddress: rinfo.address,
remotePort: rinfo.port,
});
}
});

socket.on('error', (err) => {
debugError('UDP socket error', port, err);
socket.close();
delete udpSockets['u' + port];
});

socket.on('listening', () => {
const address = socket.address();
debug(`UDP relay listening on ${address.address}:${address.port}`);

// Join multicast group if specified
if (multicast) {
try {
socket.addMembership(multicast);
debug(`Joined multicast group ${multicast}`);
} catch (e) {
debugError('Failed to join multicast group', multicast, e);
}
}
});

socket.bind(port);

return newRelay;
}

function sendUdpMessage({ port, data, targetHost, targetPort }) {
const relay = cachedUdpRelays['u' + port];
if (!relay) {
throw new Error('no UDP relay found for port: ' + port);
}

const socket = udpSockets['u' + port];
if (!socket) {
throw new Error('no UDP socket found for port: ' + port);
}

const host = targetHost || relay.targetHost;
const destPort = targetPort || relay.targetPort;

return new Promise((resolve, reject) => {
socket.send(data, destPort, host, (err) => {
if (err) {
debugError('UDP send error', err);
reject(err);
} else {
debug(`UDP sent ${data.length} bytes to ${host}:${destPort}`);
resolve({ sent: data.length, host, port: destPort });
}
});
});
}

function removeUdpRelay(port) {
const key = 'u' + port;
const socket = udpSockets[key];
if (socket) {
socket.close();
delete udpSockets[key];
}
delete cachedUdpRelays[key];
debug('removed UDP relay', port);
}

function closeAllUdpRelays() {
Object.keys(udpSockets).forEach((key) => {
const socket = udpSockets[key];
if (socket) {
socket.close();
}
});
Object.keys(cachedUdpRelays).forEach((key) => {
delete cachedUdpRelays[key];
});
Object.keys(udpSockets).forEach((key) => {
delete udpSockets[key];
});
debug('closed all UDP relays');
}

// Attach to hsyncClient
hsyncClient.cachedUdpRelays = cachedUdpRelays;
hsyncClient.udpSockets = udpSockets;
hsyncClient.addUdpRelay = addUdpRelay;
hsyncClient.getUdpRelays = getUdpRelays;
hsyncClient.sendUdpMessage = sendUdpMessage;
hsyncClient.removeUdpRelay = removeUdpRelay;
hsyncClient.closeAllUdpRelays = closeAllUdpRelays;

return {
getUdpRelays,
addUdpRelay,
sendUdpMessage,
removeUdpRelay,
closeAllUdpRelays,
};
}
Loading