From a958e366710aa415bae5a0ad1748f8c414fa3591 Mon Sep 17 00:00:00 2001 From: Joel Webber Date: Tue, 5 May 2026 19:59:23 -0400 Subject: [PATCH] =?UTF-8?q?tunnel:=20package=20hygiene=20=E2=80=94=20split?= =?UTF-8?q?=20build/dist,=20untrack=20artifacts,=20harden=20publish?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cleans up the publish flow so the tarball is always self-contained and local builds can't get into a half-built state. - .gitignore: untrack tunnel/build/ and tunnel/dist/. tsc and rollup outputs are reproducible from sources; carrying them in git produced a 38k-line phantom diff every time `bundle` was run because rollup overwrote the small re-export with the full bundled output at the same path. - rollup.config.mjs: bundle the entire CLI (input build/src/index.js, where the Node-version-check entrypoint lives) into a single dist/index.js. Build (tsc → build/) and bundle (rollup → dist/) now write to disjoint trees; no more clobbering. Shebang carries through from index.ts so the banner option is gone. - package.json: - bin/files now point at dist/. - clean wipes both build/ and dist/. - build = clean && tsc (always a fresh build). - bundle = build && rollup. - test = build && node --test (self-bootstrapping). - prepack = bundle (covers both `npm pack` and `npm publish` — more robust than prepublishOnly, which only runs on publish). - verify = npm pack --dry-run (quick "what would ship?" check). - prepublishOnly removed (covered by prepack). - main.ts: walk up to find package.json instead of hard-coding ../.., since the bundled file at dist/index.js sits one level above package.json and the tsc output at build/src/main.js sits two levels above. Verified: `npm pack --dry-run` ships exactly two files (dist/index.js + package.json); installing the tarball into a clean dir and running the binary returns the expected version with no node_modules pulls. --- .gitignore | 5 + tunnel/build/src/allowlist.js | 146 --------- tunnel/build/src/client.js | 281 ---------------- tunnel/build/src/index.js | 16 - tunnel/build/src/loopback.js | 84 ----- tunnel/build/src/main.js | 249 -------------- tunnel/build/src/third_party/index.js | 6 - tunnel/build/src/transport.js | 53 --- tunnel/build/src/transport_legacy.js | 235 -------------- tunnel/build/src/transport_yamux.js | 233 ------------- tunnel/build/src/types.js | 22 -- tunnel/build/src/yamux.js | 449 -------------------------- tunnel/package.json | 15 +- tunnel/rollup.config.mjs | 17 +- tunnel/src/main.ts | 28 +- 15 files changed, 42 insertions(+), 1797 deletions(-) delete mode 100644 tunnel/build/src/allowlist.js delete mode 100644 tunnel/build/src/client.js delete mode 100644 tunnel/build/src/index.js delete mode 100644 tunnel/build/src/loopback.js delete mode 100644 tunnel/build/src/main.js delete mode 100644 tunnel/build/src/third_party/index.js delete mode 100644 tunnel/build/src/transport.js delete mode 100644 tunnel/build/src/transport_legacy.js delete mode 100644 tunnel/build/src/transport_yamux.js delete mode 100644 tunnel/build/src/types.js delete mode 100644 tunnel/build/src/yamux.js diff --git a/.gitignore b/.gitignore index fc6a119b..aa4476b7 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,8 @@ __pycache__/ # Local design docs, plans, and skill-eval writeups. Authored alongside # implementation work but not part of the shipped plugin surface. docs/ + +# Tunnel package build artifacts. tsc emits to build/, rollup emits to dist/. +# Both are reproducible from sources; npm publish regenerates them via prepack. +tunnel/build/ +tunnel/dist/ diff --git a/tunnel/build/src/allowlist.js b/tunnel/build/src/allowlist.js deleted file mode 100644 index 7292795a..00000000 --- a/tunnel/build/src/allowlist.js +++ /dev/null @@ -1,146 +0,0 @@ -// Public TLDs we explicitly refuse to wildcard. Belt-and-braces on top of the -// loopback-resolution check at fetch time. Anything that could plausibly -// resolve outside loopback gets rejected at parse so it never lives in a -// runtime allowlist. -const PUBLIC_TLD_WILDCARD_DENY = new Set([ - 'com', - 'net', - 'org', - 'io', - 'dev', - 'app', - 'co', - 'info', - 'biz', - 'me', - 'local', // mDNS — not loopback -]); -const LOOPBACK_HOSTS = new Set(['localhost', '127.0.0.1', '::1']); -function isLoopbackHost(host) { - if (LOOPBACK_HOSTS.has(host)) - return true; - if (host.endsWith('.localhost')) - return true; - if (host.endsWith('.test')) - return true; - // 127.0.0.0/8: any 127.x.x.x literal. - if (/^127\.\d+\.\d+\.\d+$/.test(host)) - return true; - return false; -} -function isAllowedWildcardSuffix(suffix) { - const lc = suffix.toLowerCase(); - if (PUBLIC_TLD_WILDCARD_DENY.has(lc)) - return false; - // Bare suffixes for the two reserved test TLDs (RFC 6761): `*.test` and - // `*.localhost` are perfectly valid allowlist entries on their own. - if (lc === 'localhost' || lc === 'test') - return true; - if (lc.endsWith('.test') || lc.endsWith('.localhost')) - return true; - return false; -} -// Pattern parser: stricter than URL() — no path/query/fragment/userinfo at -// all. Wildcards (*.suffix) are special and would confuse URL() anyway. -const PATTERN_RE = /^(http|https):\/\/([^\/?#@]+)$/; -export function parseOriginPattern(s) { - if (!s) - throw new Error('empty origin pattern'); - const m = PATTERN_RE.exec(s); - if (!m) { - // Scheme first — that's the primary classification. Only after the - // scheme is acceptable do we report stricter syntax errors. - if (!/^https?:\/\//.test(s)) { - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: scheme must be http or https`); - } - // Strip the leading "scheme://" before testing for path/query/userinfo - // so the `//` doesn't mistrigger. - const afterScheme = s.replace(/^https?:\/\//, ''); - if (/[\/?#@]/.test(afterScheme)) { - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: must not include path, query, fragment, or userinfo`); - } - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: must be scheme://host:port`); - } - const [, scheme, authority] = m; - const colonIdx = authority.lastIndexOf(':'); - if (colonIdx < 0) { - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: explicit port required`); - } - const host = authority.slice(0, colonIdx).toLowerCase(); - const port = authority.slice(colonIdx + 1); - if (!host) - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: missing host`); - if (!port || !/^\d+$/.test(port)) { - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: explicit port required`); - } - if (host.includes('*')) { - if (!host.startsWith('*.')) { - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: only leading '*.' wildcards are supported`); - } - const suffix = host.slice(2); - if (!suffix || suffix.includes('*')) { - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: bad wildcard suffix`); - } - if (!isAllowedWildcardSuffix(suffix)) { - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: wildcard suffix ${JSON.stringify(suffix)} not allowed (must be localhost, .test, .localhost, or another non-public suffix)`); - } - return { scheme: scheme, host: '', port, wildcard: true, suffix }; - } - if (!isLoopbackHost(host)) { - throw new Error(`invalid origin pattern ${JSON.stringify(s)}: host must be loopback (localhost, 127.x, ::1, *.localhost, *.test)`); - } - return { scheme: scheme, host, port, wildcard: false, suffix: '' }; -} -export function parseOriginPatterns(entries) { - if (!entries || entries.length === 0) - return []; - return entries.map(parseOriginPattern); -} -/** - * Render a pattern back to its canonical form — used in error messages so - * the user sees what we actually parsed. - */ -export function originPatternString(p) { - const host = p.wildcard ? `*.${p.suffix}` : p.host; - return `${p.scheme}://${host}:${p.port}`; -} -/** - * Matches reports whether `pattern` matches the canonical origin string - * `scheme://host:port`. Origin must be bare — any explicit path, query, or - * fragment in the input is treated as a non-match. (The relay always sends - * canonical bare origins; this guard is defense in depth.) - * - * We don't use URL() to parse because it normalizes `:3000` and `:3000/path` - * the same way (pathname='/'), making "bare vs explicit-path" ambiguous. - * A regex is simpler and unambiguous for this fixed shape. - */ -const ORIGIN_RE = /^(http|https):\/\/([^\/?#]+)$/; -export function patternMatches(pattern, origin) { - const m = ORIGIN_RE.exec(origin); - if (!m) - return false; - const [, scheme, authority] = m; - if (scheme !== pattern.scheme) - return false; - const colonIdx = authority.lastIndexOf(':'); - // Require explicit port — patterns always have one and the relay always sends one. - if (colonIdx < 0) - return false; - const host = authority.slice(0, colonIdx).toLowerCase(); - const port = authority.slice(colonIdx + 1); - if (port !== pattern.port) - return false; - // Strip IPv6 brackets if present. - const bareHost = host.startsWith('[') && host.endsWith(']') ? host.slice(1, -1) : host; - if (pattern.wildcard) { - return bareHost.endsWith('.' + pattern.suffix) && bareHost !== pattern.suffix; - } - return bareHost === pattern.host; -} -export function matchesAny(patterns, origin) { - for (const p of patterns) { - if (patternMatches(p, origin)) - return true; - } - return false; -} diff --git a/tunnel/build/src/client.js b/tunnel/build/src/client.js deleted file mode 100644 index 6b443725..00000000 --- a/tunnel/build/src/client.js +++ /dev/null @@ -1,281 +0,0 @@ -import { EventEmitter } from 'node:events'; -import { WebSocket } from './third_party/index.js'; -import { parseOriginPatterns } from './allowlist.js'; -import { RECONNECT_BASE_MS, RECONNECT_MAX_MS, RESUME_SUBPROTOCOL_PREFIX, STALE_CONNECTION_MS, YAMUX_PING_INTERVAL_MS, } from './types.js'; -import { LegacyTransport } from './transport_legacy.js'; -import { YamuxTransport } from './transport_yamux.js'; -/** - * TunnelClient manages the WebSocket connection lifecycle: connect, handshake, - * reconnect. After the hello/ready exchange it delegates all protocol-specific - * work to a TunnelTransport (LegacyTransport or YamuxTransport). - * - * Emits 'need_live_tunnel' when a resume token is rejected (401), indicating - * the caller must obtain a fresh relay URL via live-tunnel before reconnecting. - */ -export class TunnelClient extends EventEmitter { - #relayUrl; - #initialConnectionId; - #connectionId; - #upgradeHeaders; - #log; - #allowedOriginsRaw; - #allowedOrigins; - #staleTimeoutMs; - #yamuxPingIntervalMs; - #ws = null; - #state = 'disconnected'; - #tunnelId; - #transport = null; - #reconnectAttempts = 0; - #reconnectTimer = null; - #staleTimer = null; - #connectedSince = null; - #intentionalDisconnect = false; - #resumeToken; - #traceId; - constructor(opts) { - super(); - this.#relayUrl = opts.relayUrl; - this.#initialConnectionId = opts.connectionId; - this.#connectionId = opts.connectionId; - this.#log = opts.log; - this.#upgradeHeaders = opts.headers ?? {}; - // Parse the allowlist once at construction. Throws if any entry is - // malformed — surfacing the error here is friendlier than waiting for - // the relay to reject the hello. - this.#allowedOriginsRaw = opts.allowedOrigins; - this.#allowedOrigins = parseOriginPatterns(opts.allowedOrigins); - this.#staleTimeoutMs = opts.staleTimeoutMs ?? STALE_CONNECTION_MS; - this.#yamuxPingIntervalMs = opts.yamuxPingIntervalMs ?? YAMUX_PING_INTERVAL_MS; - } - get state() { - return this.#state; - } - get tunnelId() { - return this.#tunnelId; - } - get connectionId() { - return this.#connectionId; - } - get traceId() { - return this.#traceId; - } - connect() { - this.#intentionalDisconnect = false; - this.#doConnect(); - } - disconnect() { - this.#intentionalDisconnect = true; - this.#cleanup(); - this.#state = 'disconnected'; - } - // ----- Connection lifecycle ----- - #doConnect() { - this.#state = 'connecting'; - // Resume path authenticates via subprotocol; strip the (spent) nonce token. - // Keep connection_id in the URL — the relay's affinity router hashes on it - // to send the WS to the pod that owns the chromium browser context. Without - // it, the affinity router mints a fresh UUID and the reconnect lands on a - // random pod; the new tunnel registers there with the (correct, preserved) - // connection_id, but the chromium-side forward proxy on the original pod - // still can't see it and the next navigation gets ERR_TUNNEL_CONNECTION_FAILED. - // Initial path keeps the relay URL intact and sets connection_id if provided. - const u = new URL(this.#relayUrl); - let protocols; - if (this.#resumeToken) { - u.searchParams.delete('token'); - if (this.#connectionId) { - u.searchParams.set('connection_id', this.#connectionId); - } - protocols = [`${RESUME_SUBPROTOCOL_PREFIX}${this.#resumeToken}`]; - } - else if (this.#initialConnectionId) { - u.searchParams.set('connection_id', this.#initialConnectionId); - } - const wsUrlStr = u.toString(); - this.#log(`Connecting to ${wsUrlStr}`); - const ws = protocols - ? new WebSocket(wsUrlStr, protocols, { headers: this.#upgradeHeaders }) - : new WebSocket(wsUrlStr, { headers: this.#upgradeHeaders }); - // Handle non-101 upgrade responses (e.g. 401 on resume token replay). - ws.on('unexpected-response', (_req, res) => { - this.#log(`Relay rejected upgrade: ${res.statusCode}`); - if (res.statusCode === 401) { - this.#resumeToken = undefined; - this.#traceId = undefined; - this.#intentionalDisconnect = true; - this.emit('need_live_tunnel'); - } - res.resume(); // drain so socket can be released - }); - ws.on('open', () => { - this.#state = 'connected'; - this.#connectedSince = Date.now(); - this.#log('WebSocket open, sending hello'); - const hello = { - type: 'hello', - protocol: 'yamux', - streaming: true, - }; - if (this.#allowedOriginsRaw && this.#allowedOriginsRaw.length > 0) { - hello.allowedOrigins = this.#allowedOriginsRaw; - } - // On resume path the server already knows the connectionId; don't echo - // the stale initial value. - if (this.#initialConnectionId && !this.#resumeToken) { - hello.connectionId = this.#initialConnectionId; - } - ws.send(JSON.stringify(hello)); - this.#resetStaleTimer(); - }); - // Listen for the ready message (always JSON, regardless of protocol). - const handshakeHandler = (data) => { - this.#resetStaleTimer(); - let msg; - try { - msg = JSON.parse(data.toString()); - } - catch { - this.#log(`Invalid message from relay: ${data.toString().slice(0, 200)}`); - return; - } - if (msg.type === 'error') { - // Server rejected the resume (e.g. RotateConnection DB failure). - // Token already revoked server-side; skip reconnect and request a - // fresh live-tunnel instead. - this.#log(`Relay handshake error: ${msg.message}`); - this.#resumeToken = undefined; - this.#traceId = undefined; - ws.removeListener('message', handshakeHandler); - this.#intentionalDisconnect = true; - ws.close(); - this.emit('need_live_tunnel'); - return; - } - if (msg.type !== 'ready') { - this.#log(`Expected ready, got: ${msg.type}`); - return; - } - ws.removeListener('message', handshakeHandler); - this.#tunnelId = msg.tunnelId; - this.#connectionId = msg.connectionId; - // Capture rotating resume token and stable trace ID from the server. - // The server preserves the connection_id across resume (lidar - // tryResume reads it from the trace row), so the chromium browser - // context's forward proxy continues to find tunnels after reconnect. - if (msg.resumeToken !== undefined) - this.#resumeToken = msg.resumeToken; - if (msg.traceId !== undefined) - this.#traceId = msg.traceId; - this.#state = 'ready'; - this.#reconnectAttempts = 0; - this.#log(`Tunnel ready: ${msg.tunnelId} (connection ${msg.connectionId})`); - // Create the transport based on negotiated protocol. Both transports - // wire onActivity to the stale-timer reset: yamux server-initiated - // pings alone are not sufficient for liveness, since a silently dropped - // WS leaves us with no way to learn the peer is gone. The yamux session - // also sends its own client-initiated PINGs to keep stateful - // intermediaries (linkerd, NATs, LBs) from idling us out. - if (msg.protocol === 'yamux') { - this.#transport = new YamuxTransport({ - ws, - log: this.#log, - streaming: msg.streaming === true, - allowedOrigins: this.#allowedOrigins, - onActivity: () => this.#resetStaleTimer(), - pingIntervalMs: this.#yamuxPingIntervalMs, - }); - } - else { - this.#transport = new LegacyTransport({ - ws, - log: this.#log, - onActivity: () => this.#resetStaleTimer(), - allowedOrigins: this.#allowedOrigins, - }); - } - // Transport.serve() resolves when the WebSocket closes or the session - // tears down. The close handler below then triggers reconnect. - // Catch unexpected rejections so they don't become unhandled and kill - // the process -- treat them the same as a connection drop. - this.#transport.serve().catch((err) => { - this.#log(`Transport error: ${err instanceof Error ? err.message : String(err)}`); - this.#onDisconnect(); - }); - }; - ws.on('message', handshakeHandler); - ws.on('close', (code, reason) => { - this.#log(`WebSocket closed: ${code} ${reason.toString()}`); - this.#onDisconnect(); - }); - ws.on('error', (err) => { - this.#log(`WebSocket error: ${err.message}`); - // 'close' fires after 'error', so reconnect happens there - }); - this.#ws = ws; - } - // ----- Disconnect / reconnect ----- - #onDisconnect() { - this.#transport?.close(); - this.#transport = null; - this.#clearStaleTimer(); - this.#tunnelId = undefined; - this.#ws = null; - if (this.#intentionalDisconnect) { - this.#state = 'disconnected'; - return; - } - // Reset backoff if we had a healthy connection for >60s - if (this.#connectedSince !== null && - Date.now() - this.#connectedSince > 60_000) { - this.#reconnectAttempts = 0; - } - this.#state = 'disconnected'; - this.#scheduleReconnect(); - } - #scheduleReconnect() { - const delay = Math.min(RECONNECT_BASE_MS * Math.pow(2, this.#reconnectAttempts), RECONNECT_MAX_MS); - // Add jitter: 0-25% of delay - const jitter = Math.random() * delay * 0.25; - const total = Math.round(delay + jitter); - this.#reconnectAttempts++; - this.#log(`Reconnecting in ${total}ms (attempt ${this.#reconnectAttempts})`); - this.#reconnectTimer = setTimeout(() => { - this.#reconnectTimer = null; - this.#doConnect(); - }, total); - } - // ----- Stale connection detection ----- - #resetStaleTimer() { - this.#clearStaleTimer(); - this.#staleTimer = setTimeout(() => { - this.#log('Connection stale, reconnecting'); - this.#ws?.close(); - }, this.#staleTimeoutMs); - } - #clearStaleTimer() { - if (this.#staleTimer !== null) { - clearTimeout(this.#staleTimer); - this.#staleTimer = null; - } - } - // ----- Cleanup ----- - #cleanup() { - this.#transport?.close(); - this.#transport = null; - this.#clearStaleTimer(); - if (this.#reconnectTimer !== null) { - clearTimeout(this.#reconnectTimer); - this.#reconnectTimer = null; - } - if (this.#ws) { - this.#ws.removeAllListeners(); - // Re-attach a no-op error handler: close() on a CONNECTING socket emits - // 'error' synchronously; without a listener Node throws. - this.#ws.on('error', () => { }); - this.#ws.close(); - this.#ws = null; - } - this.#tunnelId = undefined; - } -} diff --git a/tunnel/build/src/index.js b/tunnel/build/src/index.js deleted file mode 100644 index cc385ed0..00000000 --- a/tunnel/build/src/index.js +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env node -import { version } from 'node:process'; -const [major, minor] = version.substring(1).split('.').map(Number); -if (major === 20 && minor < 11) { - console.error(`ERROR: \`subtext-tunnel\` does not support Node ${process.version}. Please upgrade to Node 20.11.0 LTS or a newer LTS.`); - process.exit(1); -} -if (major === 22 && minor < 12) { - console.error(`ERROR: \`subtext-tunnel\` does not support Node ${process.version}. Please upgrade to Node 22.12.0 LTS or a newer LTS.`); - process.exit(1); -} -if (major < 20) { - console.error(`ERROR: \`subtext-tunnel\` does not support Node ${process.version}. Please upgrade to Node 20.11.0 LTS or a newer LTS.`); - process.exit(1); -} -await import('./main.js'); diff --git a/tunnel/build/src/loopback.js b/tunnel/build/src/loopback.js deleted file mode 100644 index 2e655970..00000000 --- a/tunnel/build/src/loopback.js +++ /dev/null @@ -1,84 +0,0 @@ -import { promises as dns } from 'node:dns'; -import * as net from 'node:net'; -/** - * resolveLoopbackOrigin verifies that `hostname` resolves to a loopback IP - * and returns the resolved address along with a fetch-friendly URL. - * - * The returned `ipUrl` rewrites the host portion to the resolved IP literal - * so the subsequent fetch() does NOT do its own DNS lookup. This is the - * load-bearing defense against DNS rebinding: by pinning the IP after the - * loopback check, an attacker can no longer flip the resolution between - * "check" and "fetch." - * - * The Host: header should be reset to the original hostname:port by the - * caller so virtual-host routing on the upstream still works. - * - * Throws if: - * - DNS lookup fails entirely - * - The resolved IP is not in 127.0.0.0/8 or ::1 - */ -export async function resolveLoopbackOrigin(origin) { - const u = parseOriginStrict(origin); - const { scheme, host, port } = u; - // If the host is already an IP literal, we still validate it before letting - // it through. Avoids the case where a malicious relay sends an Origin like - // "http://10.0.0.1:80/" expecting fetch() to skip resolution. - let ip; - let family; - if (net.isIP(host)) { - ip = host; - family = net.isIP(host) === 6 ? 6 : 4; - } - else { - const result = await dns.lookup(host, { family: 0, all: false }); - ip = result.address; - family = result.family; - } - if (!isLoopbackIP(ip)) { - throw new Error(`loopback check failed: ${host} resolved to ${ip}, not loopback`); - } - // Rewrite the URL to the IP literal so fetch() doesn't re-resolve. Bracket - // IPv6 addresses for URL syntax. - const ipHost = family === 6 ? `[${ip}]` : ip; - const ipUrl = `${scheme}://${ipHost}:${port}`; - return { scheme, hostname: host, port, resolvedIp: ip, family, ipUrl }; -} -const LOOPBACK_V4_RE = /^127\.\d+\.\d+\.\d+$/; -export function isLoopbackIP(ip) { - if (LOOPBACK_V4_RE.test(ip)) - return true; - // IPv6 ::1 has multiple representations after normalization. - if (ip === '::1' || ip === '0:0:0:0:0:0:0:1') - return true; - return false; -} -const ORIGIN_RE = /^(http|https):\/\/([^\/?#]+)$/; -function parseOriginStrict(origin) { - const m = ORIGIN_RE.exec(origin); - if (!m) - throw new Error(`invalid origin: ${origin}`); - const [, scheme, authority] = m; - let host; - let port; - if (authority.startsWith('[')) { - // IPv6 bracketed form: [::1]:8080 - const closeIdx = authority.indexOf(']'); - if (closeIdx < 0) - throw new Error(`invalid origin: ${origin}`); - host = authority.slice(1, closeIdx); - const rest = authority.slice(closeIdx + 1); - if (!rest.startsWith(':')) - throw new Error(`invalid origin: ${origin}`); - port = rest.slice(1); - } - else { - const colonIdx = authority.lastIndexOf(':'); - if (colonIdx < 0) - throw new Error(`invalid origin: ${origin} (port required)`); - host = authority.slice(0, colonIdx); - port = authority.slice(colonIdx + 1); - } - if (!port || !/^\d+$/.test(port)) - throw new Error(`invalid origin: ${origin} (port required)`); - return { scheme: scheme, host, port }; -} diff --git a/tunnel/build/src/main.js b/tunnel/build/src/main.js deleted file mode 100644 index 9fe1365d..00000000 --- a/tunnel/build/src/main.js +++ /dev/null @@ -1,249 +0,0 @@ -import process from "node:process"; -import { readFileSync } from "node:fs"; -import { fileURLToPath } from "node:url"; -import { dirname, join } from "node:path"; -import { McpServer, StdioServerTransport, z, yargs, hideBin, } from "./third_party/index.js"; -import { TunnelClient } from "./client.js"; -// Single source of truth: read version from package.json at runtime so it -// can't drift from what npm publishes. From build/src/main.js, package.json -// sits two levels up at the package root. -const pkg = JSON.parse(readFileSync(join(dirname(fileURLToPath(import.meta.url)), "..", "..", "package.json"), "utf8")); -const VERSION = pkg.version; -await yargs(hideBin(process.argv)) - .version(VERSION) - .help() - .parse(); -// Wrap console.error so a broken stderr (e.g. parent process died and closed -// the read side of the pipe) cannot recurse into our error handlers below. -// Without this guard the sequence stderr.write -> EPIPE -> uncaughtException -// -> log() -> stderr.write spins at 100% CPU forever. See orphan_spin.test.ts. -const log = (msg) => { - try { - console.error(`[subtext-tunnel] ${msg}`); - } - catch { - // Logger itself failed; nothing we can do. Don't recurse. - } -}; -// Multiple tunnels can be active simultaneously, keyed by tunnelId. -const clients = new Map(); -const server = new McpServer({ - name: "subtext_tunnel", - version: VERSION, -}, { capabilities: {} }); -server.registerTool("tunnel-connect", { - description: "Connect a tunnel to the relay. Multiple tunnels can be active simultaneously. " + - "Call live-tunnel on the subtext MCP server first to obtain the relayUrl.", - inputSchema: z.object({ - relayUrl: z - .string() - .describe("WebSocket URL of the relay (from live-tunnel)"), - connectionId: z - .string() - .optional() - .describe("Connection ID to bind this tunnel to. Required for connection-first flow " + - "(pass the connection_id from open_connection). Omit for tunnel-first flow " + - "(the server mints one and returns it in the response)."), - allowedOrigins: z - .array(z.string()) - .optional() - .describe("Optional per-tunnel origin allowlist. Each entry is " + - "`scheme://host:port` (exact) or `scheme://*.suffix:port` (subdomain " + - "wildcard). Hosts must be loopback-resolving (localhost, 127.x, ::1, " + - "*.test, *.localhost). The relay routes per-request to one of these " + - "origins; the client refuses anything not on the list (e.g. an API " + - "on :3000 + a frontend on :4200 + assets across *.myapp.test:3000)."), - }), -}, async ({ relayUrl, connectionId, allowedOrigins }) => { - // Optional env-var overrides for the keepalive timing. Production - // defaults (STALE_CONNECTION_MS=90s, YAMUX_PING_INTERVAL_MS=30s) are - // appropriate for staging/cloud LBs. For local testing of the silent- - // drop detection, set e.g. SUBTEXT_TUNNEL_STALE_MS=2000 and - // SUBTEXT_TUNNEL_PING_MS=200 so freezes resolve in seconds. - const staleMs = Number(process.env.SUBTEXT_TUNNEL_STALE_MS) || undefined; - const pingMs = Number(process.env.SUBTEXT_TUNNEL_PING_MS) || undefined; - let client; - try { - client = new TunnelClient({ - relayUrl, - connectionId, - log, - allowedOrigins, - staleTimeoutMs: staleMs, - yamuxPingIntervalMs: pingMs, - }); - } - catch (err) { - // Bad allowlist entry — surfaced before any connect attempt. - return { - content: [ - { - type: "text", - text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }, null, 2), - }, - ], - isError: true, - }; - } - // Surface a clear error if the initial handshake fails with a rejection. - let needsLiveTunnel = false; - client.once('need_live_tunnel', () => { - needsLiveTunnel = true; - log(`Tunnel needs a fresh live-tunnel call (resume token rejected)`); - }); - client.connect(); - // Wait briefly for the handshake to complete - const deadline = Date.now() + 5000; - while (client.state !== "ready" && !needsLiveTunnel && Date.now() < deadline) { - await new Promise((r) => setTimeout(r, 100)); - } - if (client.tunnelId) { - const id = client.tunnelId; - clients.set(id, client); - // Capture id now: tunnelId is cleared by #onDisconnect() before the - // reconnect that triggers need_live_tunnel, so reading client.tunnelId - // at event-fire time is always undefined → stale map entry. - client.once('need_live_tunnel', () => clients.delete(id)); - } - if (needsLiveTunnel) { - return { - content: [ - { - type: "text", - text: JSON.stringify({ error: "resume token rejected; call live-tunnel to get a fresh relay URL" }, null, 2), - }, - ], - isError: true, - }; - } - return { - content: [ - { - type: "text", - text: JSON.stringify({ - state: client.state, - tunnelId: client.tunnelId ?? null, - connectionId: client.connectionId ?? null, - traceId: client.traceId ?? null, - relayUrl, - }, null, 2), - }, - ], - }; -}); -server.registerTool("tunnel-disconnect", { - description: "Disconnect a specific tunnel by its tunnelId. If no tunnelId is given, disconnects all tunnels.", - inputSchema: z.object({ - tunnelId: z - .string() - .optional() - .describe("The tunnelId to disconnect (from tunnel-connect response). Omit to disconnect all."), - }), -}, async ({ tunnelId }) => { - if (tunnelId) { - const client = clients.get(tunnelId); - if (!client) { - return { - content: [ - { - type: "text", - text: JSON.stringify({ error: `No tunnel with id ${tunnelId}` }, null, 2), - }, - ], - isError: true, - }; - } - client.disconnect(); - clients.delete(tunnelId); - return { - content: [ - { - type: "text", - text: JSON.stringify({ disconnected: tunnelId }, null, 2), - }, - ], - }; - } - // Disconnect all - const ids = [...clients.keys()]; - for (const client of clients.values()) { - client.disconnect(); - } - clients.clear(); - return { - content: [ - { - type: "text", - text: JSON.stringify({ disconnected: ids }, null, 2), - }, - ], - }; -}); -server.registerTool("tunnel-status", { - description: "Returns the status of all active tunnels", - inputSchema: z.object({}), -}, async () => { - const tunnels = [...clients.entries()].map(([id, client]) => ({ - tunnelId: id, - state: client.state, - traceId: client.traceId ?? null, - })); - return { - content: [ - { - type: "text", - text: JSON.stringify({ tunnels, count: tunnels.length }, null, 2), - }, - ], - }; -}); -const transport = new StdioServerTransport(); -await server.connect(transport); -log("MCP server started"); -const shutdown = () => { - log("Shutting down"); - for (const client of clients.values()) { - client.disconnect(); - } - process.exit(0); -}; -process.on("SIGINT", shutdown); -process.on("SIGTERM", shutdown); -// Last-resort handlers to keep the MCP server alive if something slips -// through. There is no process manager to restart us, so a crash means -// Claude Code loses the tunnel tools entirely. log() above is internally -// guarded so it cannot itself throw and re-enter these handlers. -process.on("unhandledRejection", (reason) => { - log(`Unhandled rejection: ${reason instanceof Error ? reason.stack ?? reason.message : String(reason)}`); -}); -process.on("uncaughtException", (err) => { - log(`Uncaught exception: ${err.stack ?? err.message}`); -}); -// Orphan-spin protection: if our parent process (the MCP host) exits, our -// stdio pipes break and any further write triggers EPIPE. Detect that and -// exit cleanly instead of pegging the CPU. Two complementary detectors: -// -// 1. EPIPE on stderr/stdout — fires the moment a write fails. Catches the -// common case where the parent exits while we're mid-log. -// 2. Periodic PPID check — catches the case where the parent exits cleanly -// and we don't write anything until the next tunnel-tool call. On Linux -// orphaned processes are reparented to PID 1; on macOS to launchd (also -// typically PID 1). If we see PPID === 1, we have no MCP host. -// -// Both paths exit(0) — this is not a crash, it's "our reason to live ended." -process.stderr.on("error", (err) => { - if (err.code === "EPIPE") - process.exit(0); -}); -process.stdout.on("error", (err) => { - if (err.code === "EPIPE") - process.exit(0); -}); -// Default 30s; overridable for tests. unref() so the timer alone doesn't -// keep the event loop alive. -const orphanCheckMs = Number(process.env.SUBTEXT_TUNNEL_ORPHAN_CHECK_MS) || 30_000; -const orphanTimer = setInterval(() => { - if (process.ppid === 1) - process.exit(0); -}, orphanCheckMs); -orphanTimer.unref(); diff --git a/tunnel/build/src/third_party/index.js b/tunnel/build/src/third_party/index.js deleted file mode 100644 index f6f904fb..00000000 --- a/tunnel/build/src/third_party/index.js +++ /dev/null @@ -1,6 +0,0 @@ -export { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; -export { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; -export { z } from 'zod'; -export { default as WebSocket } from 'ws'; -export { default as yargs } from 'yargs'; -export { hideBin } from 'yargs/helpers'; diff --git a/tunnel/build/src/transport.js b/tunnel/build/src/transport.js deleted file mode 100644 index 62dec528..00000000 --- a/tunnel/build/src/transport.js +++ /dev/null @@ -1,53 +0,0 @@ -/** Convert wire headers (Record) to fetch Headers. */ -export function wireHeadersToHeaders(wire) { - const h = new Headers(); - for (const [name, values] of Object.entries(wire)) { - for (const v of values) { - h.append(name, v); - } - } - return h; -} -/** - * Convert fetch Headers to wire headers (Record). - * - * Note: Headers.forEach provides comma-joined values per the Fetch spec. - * This is lossy for headers whose values contain commas internally. - * Set-Cookie is handled correctly via getSetCookie(). For the tunnel proxy - * use case this is effectively harmless. - */ -export function headersToWireHeaders(headers) { - const wire = {}; - const setCookies = headers.getSetCookie(); - if (setCookies.length > 0) { - wire['set-cookie'] = setCookies; - } - headers.forEach((value, name) => { - if (name.toLowerCase() === 'set-cookie') - return; - wire[name] = [value]; - }); - return wire; -} -/** - * Strip transfer-encoding headers that are invalid after transparent - * decompression by Node's fetch. Both transports call this after - * headersToWireHeaders(). - */ -export function stripTransferHeaders(wire) { - delete wire['content-encoding']; - delete wire['content-length']; -} -/** Parse a host:port string, returning hostname and numeric port. */ -export function parseHostPort(hostport, defaultPort = 80) { - const lastColon = hostport.lastIndexOf(':'); - if (lastColon >= 0) { - return { - hostname: hostport.slice(0, lastColon), - port: parseInt(hostport.slice(lastColon + 1), 10) || defaultPort, - }; - } - return { hostname: hostport, port: defaultPort }; -} -/** Maximum size for JSON headers received over the yamux wire format. */ -export const MAX_YAMUX_HEADER_BYTES = 1024 * 1024; // 1 MB diff --git a/tunnel/build/src/transport_legacy.js b/tunnel/build/src/transport_legacy.js deleted file mode 100644 index 8901a6c1..00000000 --- a/tunnel/build/src/transport_legacy.js +++ /dev/null @@ -1,235 +0,0 @@ -import { WebSocket } from './third_party/index.js'; -import * as net from 'node:net'; -import { gzip } from 'node:zlib'; -import { promisify } from 'node:util'; -const gzipAsync = promisify(gzip); -import { MAX_INFLIGHT, MAX_RESPONSE_BODY_BYTES } from './types.js'; -import { matchesAny } from './allowlist.js'; -import { resolveLoopbackOrigin } from './loopback.js'; -import { wireHeadersToHeaders, headersToWireHeaders, stripTransferHeaders, parseHostPort } from './transport.js'; -/** - * LegacyTransport handles the JSON-over-WebSocket protocol with base64-encoded - * bodies, channel-based stream management, and pause/resume flow control. - */ -export class LegacyTransport { - #ws; - #log; - #onActivity; - #allowedOrigins; - #inflight = new Map(); - #streams = new Map(); - constructor(opts) { - this.#ws = opts.ws; - this.#log = opts.log; - this.#onActivity = opts.onActivity; - this.#allowedOrigins = opts.allowedOrigins ?? []; - } - serve() { - return new Promise((resolve) => { - const handler = (data) => { - this.#onActivity(); - let msg; - try { - msg = JSON.parse(data.toString()); - } - catch { - this.#log(`Invalid message from relay: ${data.toString().slice(0, 200)}`); - return; - } - switch (msg.type) { - case 'request': - void this.#handleRequest(msg); - break; - case 'connect': - this.#handleConnect(msg); - break; - case 'data': - this.#handleStreamData(msg); - break; - case 'end': - this.#handleStreamEnd(msg); - break; - case 'pause': - this.#handleStreamPause(msg); - break; - case 'resume': - this.#handleStreamResume(msg); - break; - case 'ping': - this.#send({ type: 'pong' }); - break; - default: - this.#log(`Unknown message type: ${msg.type}`); - } - }; - this.#ws.on('message', handler); - this.#ws.once('close', () => { - this.#ws.removeListener('message', handler); - resolve(); - }); - }); - } - close() { - this.#abortInflight(); - this.#closeStreams(); - } - // ----- Request handling ----- - async #handleRequest(msg) { - if (this.#inflight.size >= MAX_INFLIGHT) { - this.#send({ - type: 'error', - requestId: msg.requestId, - message: `Too many inflight requests (max ${MAX_INFLIGHT})`, - }); - return; - } - const ac = new AbortController(); - this.#inflight.set(msg.requestId, ac); - try { - // The relay is the source of truth for which origin a request belongs - // to. A request without `origin` is a protocol violation by the relay. - if (!msg.origin) { - throw new Error('legacy request missing origin'); - } - const origin = msg.origin; - if (this.#allowedOrigins.length > 0 && !matchesAny(this.#allowedOrigins, origin)) { - throw new Error(`origin not in allowlist: ${origin}`); - } - const resolved = await resolveLoopbackOrigin(origin); - const url = resolved.ipUrl + msg.url; - const headers = wireHeadersToHeaders(msg.headers); - headers.set('Host', `${resolved.hostname}:${resolved.port}`); - const body = msg.body !== null ? Buffer.from(msg.body, 'base64') : undefined; - const resp = await fetch(url, { - method: msg.method, - headers, - body, - signal: ac.signal, - redirect: 'manual', - }); - const respBody = await resp.arrayBuffer(); - if (respBody.byteLength > MAX_RESPONSE_BODY_BYTES) { - this.#send({ - type: 'error', - requestId: msg.requestId, - message: `Response body too large: ${respBody.byteLength} bytes (max ${MAX_RESPONSE_BODY_BYTES})`, - }); - return; - } - const respHeaders = headersToWireHeaders(resp.headers); - stripTransferHeaders(respHeaders); - let bodyBuf = Buffer.from(respBody); - let encoding; - if (bodyBuf.length > 0) { - const compressed = await gzipAsync(bodyBuf); - if (compressed.length < bodyBuf.length) { - bodyBuf = compressed; - encoding = 'gzip'; - } - } - const encodedBody = bodyBuf.length > 0 ? bodyBuf.toString('base64') : null; - this.#send({ - type: 'response', - requestId: msg.requestId, - status: resp.status, - headers: respHeaders, - body: encodedBody, - encoding, - }); - } - catch (err) { - if (ac.signal.aborted) - return; - const message = err instanceof Error ? err.message : String(err); - this.#send({ - type: 'error', - requestId: msg.requestId, - message, - }); - } - finally { - this.#inflight.delete(msg.requestId); - } - } - // ----- CONNECT stream handling ----- - #handleConnect(msg) { - const { streamId, host } = msg; - this.#log(`Stream ${streamId}: connecting to ${host}`); - const { hostname, port } = parseHostPort(host); - const socket = net.connect({ host: hostname, port }); - socket.once('connect', () => { - this.#log(`Stream ${streamId}: connected to ${host}`); - this.#streams.set(streamId, socket); - this.#send({ type: 'connected', streamId }); - socket.on('data', (chunk) => { - this.#send({ - type: 'data', - streamId, - data: chunk.toString('base64'), - }); - }); - socket.on('end', () => { - this.#log(`Stream ${streamId}: remote end closed`); - this.#streams.delete(streamId); - this.#send({ type: 'end', streamId }); - }); - socket.on('error', (err) => { - this.#log(`Stream ${streamId}: socket error: ${err.message}`); - this.#streams.delete(streamId); - this.#send({ type: 'end', streamId }); - }); - }); - socket.once('error', (err) => { - if (!this.#streams.has(streamId)) { - this.#log(`Stream ${streamId}: connect error: ${err.message}`); - this.#send({ type: 'stream_error', streamId, message: err.message }); - } - }); - } - #handleStreamData(msg) { - const socket = this.#streams.get(msg.streamId); - if (socket) { - socket.write(Buffer.from(msg.data, 'base64')); - } - } - #handleStreamEnd(msg) { - const socket = this.#streams.get(msg.streamId); - if (socket) { - this.#streams.delete(msg.streamId); - socket.end(); - } - } - #handleStreamPause(msg) { - const socket = this.#streams.get(msg.streamId); - if (socket) { - socket.pause(); - } - } - #handleStreamResume(msg) { - const socket = this.#streams.get(msg.streamId); - if (socket) { - socket.resume(); - } - } - // ----- Helpers ----- - #send(msg) { - if (this.#ws?.readyState === WebSocket.OPEN) { - this.#ws.send(JSON.stringify(msg)); - return true; - } - this.#log(`Dropped ${msg.type} message (ws state=${this.#ws?.readyState})`); - return false; - } - #abortInflight() { - for (const ac of this.#inflight.values()) { - ac.abort(); - } - this.#inflight.clear(); - } - #closeStreams() { - for (const socket of this.#streams.values()) { - socket.end(); - } - this.#streams.clear(); - } -} diff --git a/tunnel/build/src/transport_yamux.js b/tunnel/build/src/transport_yamux.js deleted file mode 100644 index 0b107c15..00000000 --- a/tunnel/build/src/transport_yamux.js +++ /dev/null @@ -1,233 +0,0 @@ -import * as net from 'node:net'; -import { matchesAny } from './allowlist.js'; -import { resolveLoopbackOrigin } from './loopback.js'; -import { MAX_RESPONSE_BODY_BYTES } from './types.js'; -import { wireHeadersToHeaders, headersToWireHeaders, stripTransferHeaders, parseHostPort, MAX_YAMUX_HEADER_BYTES, } from './transport.js'; -import { YamuxSession } from './yamux.js'; -export const STREAM_TYPE_REQUEST = 0x01; -export const STREAM_TYPE_CONNECT = 0x02; -export const CONNECT_STATUS_OK = 0x00; -export const CONNECT_STATUS_ERR = 0x01; -/** - * YamuxTransport handles the binary yamux multiplexing protocol. After the - * hello/ready handshake, the WebSocket carries raw yamux frames. The server - * opens streams; each stream starts with a 1-byte type prefix: - * - * - STREAM_TYPE_REQUEST (0x01): HTTP request/response - * - STREAM_TYPE_CONNECT (0x02): CONNECT (TCP pipe) - */ -export class YamuxTransport { - #log; - #session; - #streaming; - #allowedOrigins; - constructor(opts) { - this.#log = opts.log; - this.#session = new YamuxSession(opts.ws, { - onActivity: opts.onActivity, - pingIntervalMs: opts.pingIntervalMs, - }); - this.#streaming = opts.streaming ?? false; - this.#allowedOrigins = opts.allowedOrigins ?? []; - } - async serve() { - while (true) { - const stream = await this.#session.accept(); - if (stream === null) - break; - void this.#handleStream(stream).catch((err) => { - this.#log(`yamux stream ${stream.id} error: ${errMsg(err)}`); - }); - } - } - close() { - this.#session.close(); - } - // ----- Stream dispatch ----- - async #handleStream(stream) { - const typeBuf = await stream.readExact(1); - const streamType = typeBuf[0]; - if (streamType === STREAM_TYPE_REQUEST) { - await this.#handleHttpStream(stream); - } - else if (streamType === STREAM_TYPE_CONNECT) { - await this.#handleConnectStream(stream); - } - else { - this.#log(`yamux stream ${stream.id}: unknown type 0x${streamType.toString(16)}`); - stream.close(); - } - } - // ----- Shared: read length-prefixed JSON header from stream ----- - async #readJsonHeader(stream) { - const lenBuf = await stream.readExact(4); - const headerLen = lenBuf.readUInt32BE(0); - if (headerLen > MAX_YAMUX_HEADER_BYTES) { - throw new Error(`header too large: ${headerLen} bytes (max ${MAX_YAMUX_HEADER_BYTES})`); - } - const headerBuf = await stream.readExact(headerLen); - return JSON.parse(headerBuf.toString()); - } - // ----- HTTP request/response ----- - async #handleHttpStream(stream) { - try { - const header = await this.#readJsonHeader(stream); - let reqBody; - if (header.bodyLen > 0) { - reqBody = await stream.readExact(header.bodyLen); - } - // The relay is the source of truth for which origin a request belongs - // to. A header without `origin` is a protocol violation by the relay. - if (!header.origin) { - throw new Error('yamux request header missing origin'); - } - const origin = header.origin; - // Allowlist gate: defense in depth. The relay also enforces this — but - // a compromised or buggy relay must not be able to drive us to fetch an - // origin the user didn't opt into. - if (this.#allowedOrigins.length > 0 && !matchesAny(this.#allowedOrigins, origin)) { - throw new Error(`origin not in allowlist: ${origin}`); - } - // DNS resolve-and-pin: rebinding defense. resolveLoopbackOrigin does - // ONE DNS lookup, asserts loopback, and gives us back a URL with the - // resolved IP literal so fetch() doesn't re-resolve. The Host: header - // is restored to the original hostname so virtual-host routing on the - // upstream still works. - const resolved = await resolveLoopbackOrigin(origin); - const url = resolved.ipUrl + header.url; - const fetchHeaders = wireHeadersToHeaders(header.headers); - // Set Host explicitly even when it's already in headers — the - // resolved IP URL has the wrong authority for Host inference. - fetchHeaders.set('Host', `${resolved.hostname}:${resolved.port}`); - const resp = await fetch(url, { - method: header.method, - headers: fetchHeaders, - body: reqBody, - redirect: 'manual', - }); - const respHeaders = headersToWireHeaders(resp.headers); - stripTransferHeaders(respHeaders); - if (this.#streaming) { - // Send header immediately, then stream body bytes until FIN. - await stream.write(streamingResponseFrame(resp.status, respHeaders)); - if (resp.body) { - const reader = resp.body.getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) - break; - await stream.write(Buffer.from(value.buffer, value.byteOffset, value.byteLength)); - } - } - finally { - reader.releaseLock(); - } - } - } - else { - const respBody = await resp.arrayBuffer(); - if (respBody.byteLength > MAX_RESPONSE_BODY_BYTES) { - throw new Error(`response body too large: ${respBody.byteLength} bytes (max ${MAX_RESPONSE_BODY_BYTES})`); - } - await stream.write(bufferedResponseFrame(resp.status, respHeaders, Buffer.from(respBody))); - } - } - catch (err) { - await this.#writeHttpError(stream, err); - throw err; - } - finally { - stream.close(); - } - } - // ----- CONNECT (TCP pipe) ----- - async #handleConnectStream(stream) { - const { host } = await this.#readJsonHeader(stream); - this.#log(`yamux stream ${stream.id}: CONNECT ${host}`); - const { hostname, port } = parseHostPort(host); - const socket = net.connect({ host: hostname, port }); - try { - await new Promise((resolve, reject) => { - socket.once('connect', resolve); - socket.once('error', reject); - }); - } - catch (err) { - const msg = err instanceof Error ? err.message : String(err); - this.#log(`yamux stream ${stream.id}: connect error: ${msg}`); - const errBuf = Buffer.from(msg); - const resp = Buffer.allocUnsafe(1 + errBuf.length); - resp[0] = CONNECT_STATUS_ERR; - errBuf.copy(resp, 1); - await stream.write(resp).catch(() => undefined); - stream.close(); - return; - } - // Write success byte. - await stream.write(Buffer.from([CONNECT_STATUS_OK])); - // Pump socket -> yamux stream with backpressure. - const socketDone = new Promise((resolve) => { - socket.on('data', (chunk) => { - socket.pause(); - stream - .write(chunk) - .then(() => { - socket.resume(); - }) - .catch(() => { - socket.destroy(); - resolve(); - }); - }); - socket.once('end', () => { - stream.close(); - resolve(); - }); - socket.once('error', () => { - stream.close(); - resolve(); - }); - }); - // Pump yamux stream -> socket (read loop). - const yamuxDone = (async () => { - try { - while (true) { - const chunk = await stream.read(); - if (chunk.length === 0) { - socket.end(); - break; - } - socket.write(chunk); - } - } - catch { - socket.destroy(); - } - })(); - await Promise.all([socketDone, yamuxDone]); - } - // Write a synthetic 502 so the relay reads a valid framed response instead of EOF. - // Mirrors the error-response path in #handleConnectStream. - async #writeHttpError(stream, err) { - const body = Buffer.from(errMsg(err)); - await stream.write(bufferedResponseFrame(502, {}, body)).catch(() => undefined); - } -} -function errMsg(err) { - return err instanceof Error ? err.message : String(err); -} -/** Build a length-prefixed JSON frame: [4-byte hdr len][hdr JSON][optional body]. */ -export function frameJson(hdrJson, body) { - const lenPrefix = Buffer.allocUnsafe(4); - lenPrefix.writeUInt32BE(hdrJson.length, 0); - return body ? Buffer.concat([lenPrefix, hdrJson, body]) : Buffer.concat([lenPrefix, hdrJson]); -} -/** Streaming response frame: header only, body follows as raw chunks until FIN. */ -export function streamingResponseFrame(status, headers) { - return frameJson(Buffer.from(JSON.stringify({ status, headers }))); -} -/** Buffered response frame: header + full body in one write. */ -export function bufferedResponseFrame(status, headers, body) { - return frameJson(Buffer.from(JSON.stringify({ status, headers, bodyLen: body.length })), body); -} diff --git a/tunnel/build/src/types.js b/tunnel/build/src/types.js deleted file mode 100644 index 54446263..00000000 --- a/tunnel/build/src/types.js +++ /dev/null @@ -1,22 +0,0 @@ -// Wire protocol message types for the tunnel relay. -// See docs/design.md for the full protocol specification. -// Resume subprotocol: the client sends `${RESUME_SUBPROTOCOL_PREFIX}${token}` -// as a Sec-WebSocket-Protocol; the server echoes the same full string on success. -export const RESUME_SUBPROTOCOL = 'subtext-resume.v1'; -export const RESUME_SUBPROTOCOL_PREFIX = RESUME_SUBPROTOCOL + '.'; -// Limits -export const MAX_INFLIGHT = 20; -export const MAX_RESPONSE_BODY_BYTES = 200 * 1024 * 1024; // 200 MB -export const REQUEST_TIMEOUT_MS = 30_000; // 30s -export const RECONNECT_BASE_MS = 1_000; // 1s -export const RECONNECT_MAX_MS = 30_000; // 30s -// 90s — if no message at all (yamux frame, ping, ack) arrives within this -// window, we treat the WS as silently dropped and reconnect. Applies to both -// legacy and yamux transports; yamux liveness used to rely solely on -// server-initiated keepalives, which silently failed when intermediate -// infra (linkerd, NATs, LBs) dropped the connection without delivering FIN. -export const STALE_CONNECTION_MS = 90_000; -// 30s — yamux client-initiated PING cadence. Well under STALE_CONNECTION_MS -// so a single missed round-trip doesn't trip the stale timer, but frequent -// enough to keep stateful intermediaries from idling the WS out. -export const YAMUX_PING_INTERVAL_MS = 30_000; diff --git a/tunnel/build/src/yamux.js b/tunnel/build/src/yamux.js deleted file mode 100644 index 626165b0..00000000 --- a/tunnel/build/src/yamux.js +++ /dev/null @@ -1,449 +0,0 @@ -/** - * Minimal yamux client implementation. - * - * Only implements the CLIENT role: accepts streams opened by the server, - * reads/writes data on those streams, and responds to pings. It never - * initiates streams (no Open) and never sends GoAway. - * - * Wire format reference: https://github.com/hashicorp/yamux/blob/master/spec.md - * - * Each yamux frame has a 12-byte header (big-endian): - * [0] version (always 0) - * [1] type (0=data, 1=window_update, 2=ping, 3=go_away) - * [2-3] flags (SYN=0x01, ACK=0x02, FIN=0x04, RST=0x08) - * [4-7] streamId - * [8-11] length (for DATA: payload size; for others: a value, no payload follows) - */ -// ----- Protocol constants ----- -const PROTO_VERSION = 0; -export const HEADER_SIZE = 12; -/** Initial send/receive window per stream (matches hashicorp/yamux default). */ -export const INITIAL_WINDOW = 256 * 1024; -export const TYPE_DATA = 0; -export const TYPE_WINDOW_UPDATE = 1; -export const TYPE_PING = 2; -export const TYPE_GO_AWAY = 3; -export const FLAG_SYN = 0x01; -export const FLAG_ACK = 0x02; -export const FLAG_FIN = 0x04; -export const FLAG_RST = 0x08; -// ----- YamuxStream ----- -/** - * One bidirectional stream opened by the server. Supports sequential - * readExact / readAll / write operations. Not safe for concurrent reads or - * concurrent writes; the use-pattern is one reader and one writer. - */ -export class YamuxStream { - id; - #session; - // Receive side - #recvBuf = Buffer.alloc(0); - #recvWindow = INITIAL_WINDOW; // remaining receive quota - #recvConsumed = 0; // bytes consumed since last window update sent - // Send side (starts at INITIAL_WINDOW; incremented by window_update from server) - #sendWindow = INITIAL_WINDOW; - #sendWaiters = []; - #finReceived = false; - #rstReceived = false; - #closed = false; - // Resolved when new data (or FIN/RST) is added to #recvBuf. - // Only one outstanding waiter at a time (sequential reads). - #recvWaiter = null; - constructor(id, session) { - this.id = id; - this.#session = session; - } - // ----- Called by YamuxSession ----- - _onData(data) { - this.#recvWindow -= data.length; - if (this.#recvWindow < 0) { - // Server violated our receive window — treat as protocol error. - this._onRst(); - return; - } - this.#recvBuf = Buffer.concat([this.#recvBuf, data]); - this.#recvWaiter?.(); - this.#recvWaiter = null; - } - _onFin() { - this.#finReceived = true; - this.#recvWaiter?.(); - this.#recvWaiter = null; - } - _onRst() { - this.#rstReceived = true; - this.#closed = true; - this.#recvWaiter?.(); - this.#recvWaiter = null; - const waiters = this.#sendWaiters.splice(0); - for (const w of waiters) - w(); - } - _onWindowUpdate(delta) { - this.#sendWindow += delta; - const waiters = this.#sendWaiters.splice(0); - for (const w of waiters) - w(); - } - // ----- Public API ----- - /** - * Read exactly `n` bytes. Waits until enough bytes have arrived. - * Throws if the stream is reset or closed before `n` bytes arrive. - */ - async readExact(n) { - while (this.#recvBuf.length < n) { - if (this.#rstReceived) - throw new Error(`yamux stream ${this.id} reset`); - if (this.#finReceived || this.#closed) { - throw new Error(`yamux stream ${this.id} closed before ${n} bytes (have ${this.#recvBuf.length})`); - } - await new Promise((resolve) => { - this.#recvWaiter = resolve; - }); - } - const result = Buffer.from(this.#recvBuf.subarray(0, n)); - this.#recvBuf = this.#recvBuf.subarray(n); - this.#creditWindowFor(n); - return result; - } - /** - * Read and return whatever data is currently buffered, waiting if the buffer - * is empty. Returns an empty buffer on FIN (EOF). Throws on RST. - * Suitable for streaming/piping use cases where chunk boundaries don't matter. - */ - async read() { - while (this.#recvBuf.length === 0) { - if (this.#rstReceived) - throw new Error(`yamux stream ${this.id} reset`); - if (this.#finReceived || this.#closed) - return Buffer.alloc(0); - await new Promise((resolve) => { - this.#recvWaiter = resolve; - }); - } - const result = Buffer.from(this.#recvBuf); - const consumed = result.length; - this.#recvBuf = Buffer.alloc(0); - this.#creditWindowFor(consumed); - return result; - } - /** - * Read all remaining bytes until the stream is FIN-closed. - * Throws if the stream is reset. - */ - async readAll() { - const chunks = []; - while (true) { - if (this.#rstReceived) - throw new Error(`yamux stream ${this.id} reset`); - if ((this.#finReceived || this.#closed) && this.#recvBuf.length === 0) - break; - if (this.#recvBuf.length > 0) { - const consumed = this.#recvBuf.length; - chunks.push(Buffer.from(this.#recvBuf)); - this.#recvBuf = Buffer.alloc(0); - this.#creditWindowFor(consumed); - } - else { - await new Promise((resolve) => { - this.#recvWaiter = resolve; - }); - } - } - return Buffer.concat(chunks); - } - /** - * Write `data` to the stream, respecting the send window. - * Blocks if the window is exhausted until the server grants more credit. - */ - async write(data) { - if (this.#closed || this.#rstReceived) { - throw new Error(`yamux stream ${this.id} closed`); - } - let offset = 0; - while (offset < data.length) { - while (this.#sendWindow === 0) { - if (this.#rstReceived) - throw new Error(`yamux stream ${this.id} reset`); - if (this.#closed) - throw new Error(`yamux stream ${this.id} closed`); - await new Promise((resolve) => { - this.#sendWaiters.push(resolve); - }); - } - const n = Math.min(this.#sendWindow, data.length - offset); - this.#session._sendData(this.id, data.subarray(offset, offset + n)); - this.#sendWindow -= n; - offset += n; - } - } - /** Send FIN and remove this stream from the session. */ - close() { - if (this.#closed) - return; - this.#closed = true; - this.#session._sendFin(this.id); - this.#session._removeStream(this.id); - // Wake any blocked reader so it can observe the closed state. - this.#recvWaiter?.(); - this.#recvWaiter = null; - // Wake any blocked writer so it can observe the closed state. - const waiters = this.#sendWaiters.splice(0); - for (const w of waiters) - w(); - } - // ----- Private helpers ----- - /** Account for consumed bytes and send a window update when the threshold is met. */ - #creditWindowFor(n) { - this.#recvConsumed += n; - // Send update when consumed >= half the window (matches yamux heuristic). - if (this.#recvConsumed >= INITIAL_WINDOW / 2) { - this.#session._sendWindowUpdate(this.id, this.#recvConsumed); - this.#recvWindow += this.#recvConsumed; - this.#recvConsumed = 0; - } - } -} -export class YamuxSession { - #ws; - #streams = new Map(); - #onActivity; - #acceptQueue = []; - #acceptWaiters = []; - #closed = false; - #pingTimer = null; - #pingNonce = 0; - /** Accumulator for incomplete frames across WebSocket messages. */ - #readBuf = Buffer.alloc(0); - constructor(ws, opts = {}) { - this.#ws = ws; - this.#onActivity = opts.onActivity; - ws.on('message', (data) => { - // Any WS message — yamux frame, ping ack, anything — counts as the - // peer being alive. Reset the upstream stale timer before parsing so - // that even malformed frames keep liveness honest. - this.#onActivity?.(); - const chunk = toBuffer(data); - this.#readBuf = - this.#readBuf.length === 0 - ? chunk - : Buffer.concat([this.#readBuf, chunk]); - this.#processFrames(); - }); - ws.on('close', () => { - this.#onClose(); - }); - // Client-initiated keepalive. Server PINGs alone don't help when an - // intermediary has silently dropped our outbound path: we'd happily - // ack the next PING that arrives, but it never does, and we have no - // way to provoke one. Sending our own PINGs makes the WS bidirectional- - // active so stateful intermediaries don't idle-time us out, and forces - // an ACK back from the server which the activity hook above observes. - const pingMs = opts.pingIntervalMs ?? 0; - if (pingMs > 0) { - this.#pingTimer = setInterval(() => this.#sendPing(), pingMs); - // unref so a quiet session doesn't keep the event loop alive on its own. - this.#pingTimer.unref?.(); - } - } - #sendPing() { - if (this.#closed) - return; - // PING is identified by the SYN flag with a nonce in the length field; - // server echoes back with FLAG_ACK and the same nonce. - const nonce = (this.#pingNonce = (this.#pingNonce + 1) >>> 0); - try { - this.#ws.send(makeHeader(TYPE_PING, FLAG_SYN, 0, nonce)); - } - catch { - // WS may have torn down between ticks; close handler will reconcile. - } - } - // ----- Public API ----- - /** - * Wait for the next server-initiated stream. - * Returns `null` when the session closes. - */ - async accept() { - if (this.#acceptQueue.length > 0) - return this.#acceptQueue.shift(); - if (this.#closed) - return null; - return new Promise((resolve) => { - this.#acceptWaiters.push(resolve); - }); - } - /** Send GoAway (normal termination) and tear down all streams locally. */ - close() { - if (this.#closed) - return; - this.#closed = true; - if (this.#pingTimer !== null) { - clearInterval(this.#pingTimer); - this.#pingTimer = null; - } - try { - this.#ws.send(makeHeader(TYPE_GO_AWAY, 0, 0, 0 /* normal termination */)); - } - catch { - // WebSocket may already be closed — best effort. - } - for (const stream of this.#streams.values()) { - stream._onRst(); - } - this.#streams.clear(); - this.#drainWaiters(); - } - // ----- Called by YamuxStream ----- - _sendData(streamId, data) { - const hdr = makeHeader(TYPE_DATA, 0, streamId, data.length); - this.#ws.send(Buffer.concat([hdr, data])); - } - _sendWindowUpdate(streamId, delta) { - this.#ws.send(makeHeader(TYPE_WINDOW_UPDATE, 0, streamId, delta)); - } - _sendFin(streamId) { - // Send an empty DATA frame with FIN flag. - this.#ws.send(makeHeader(TYPE_DATA, FLAG_FIN, streamId, 0)); - } - _removeStream(streamId) { - this.#streams.delete(streamId); - } - // ----- Frame processing ----- - #processFrames() { - const buf = this.#readBuf; - let offset = 0; - while (offset + HEADER_SIZE <= buf.length) { - const version = buf[offset]; - if (version !== PROTO_VERSION) { - this.close(); - return; - } - const type = buf[offset + 1]; - const flags = buf.readUInt16BE(offset + 2); - const streamId = buf.readUInt32BE(offset + 4); - const length = buf.readUInt32BE(offset + 8); - if (type === TYPE_DATA) { - // DATA frames carry a payload of `length` bytes. - if (offset + HEADER_SIZE + length > buf.length) - break; // incomplete - const payload = buf.subarray(offset + HEADER_SIZE, offset + HEADER_SIZE + length); - this.#handleData(flags, streamId, payload); - offset += HEADER_SIZE + length; - } - else { - // All other frame types have no payload; `length` is a scalar value. - this.#handleControl(type, flags, streamId, length); - offset += HEADER_SIZE; - } - } - // Keep leftover bytes for the next message. - this.#readBuf = offset === 0 ? buf : buf.subarray(offset); - } - #handleData(flags, streamId, payload) { - if (flags & FLAG_SYN) { - this.#openStream(streamId); - } - const stream = this.#streams.get(streamId); - if (!stream) - return; - if (flags & FLAG_RST) { - stream._onRst(); - this.#streams.delete(streamId); - return; - } - if (payload.length > 0) { - stream._onData(payload); - } - if (flags & FLAG_FIN) { - stream._onFin(); - } - } - #handleControl(type, flags, streamId, length) { - switch (type) { - case TYPE_WINDOW_UPDATE: { - if (flags & FLAG_SYN) { - this.#openStream(streamId); - } - const stream = this.#streams.get(streamId); - if (stream) { - // Apply window delta regardless of SYN — the spec allows SYN/ACK - // to carry an initial window update indicating a larger window. - if (length > 0) { - stream._onWindowUpdate(length); - } - if (flags & FLAG_FIN) - stream._onFin(); - if (flags & FLAG_RST) { - stream._onRst(); - this.#streams.delete(streamId); - } - } - break; - } - case TYPE_PING: { - if (flags & FLAG_SYN) { - this.#ws.send(makeHeader(TYPE_PING, FLAG_ACK, 0, length)); - } - break; - } - case TYPE_GO_AWAY: - this.close(); - break; - default: - // Unknown frame type — protocol error, close the session. - this.close(); - break; - } - } - #openStream(streamId) { - if (this.#streams.has(streamId)) - return; // duplicate SYN; ignore - const stream = new YamuxStream(streamId, this); - this.#streams.set(streamId, stream); - // Respond with ACK (length=0 — both sides start with INITIAL_WINDOW already). - this.#ws.send(makeHeader(TYPE_WINDOW_UPDATE, FLAG_ACK, streamId, 0)); - if (this.#acceptWaiters.length > 0) { - this.#acceptWaiters.shift()(stream); - } - else { - this.#acceptQueue.push(stream); - } - } - #onClose() { - this.#closed = true; - if (this.#pingTimer !== null) { - clearInterval(this.#pingTimer); - this.#pingTimer = null; - } - for (const stream of this.#streams.values()) { - stream._onRst(); - } - this.#streams.clear(); - this.#drainWaiters(); - } - #drainWaiters() { - const waiters = this.#acceptWaiters.splice(0); - for (const w of waiters) - w(null); - } -} -// ----- Helpers ----- -export function makeHeader(type, flags, streamId, length) { - const hdr = Buffer.allocUnsafe(HEADER_SIZE); - hdr[0] = PROTO_VERSION; - hdr[1] = type; - hdr.writeUInt16BE(flags, 2); - hdr.writeUInt32BE(streamId, 4); - hdr.writeUInt32BE(length, 8); - return hdr; -} -function toBuffer(data) { - if (Buffer.isBuffer(data)) - return data; - if (data instanceof ArrayBuffer) - return Buffer.from(data); - if (Array.isArray(data)) - return Buffer.concat(data); - return Buffer.from(data); -} diff --git a/tunnel/package.json b/tunnel/package.json index de5fa219..6aab2b7b 100644 --- a/tunnel/package.json +++ b/tunnel/package.json @@ -4,17 +4,18 @@ "description": "MCP server for reverse tunneling to localhost — connects hosted Subtext browser tools to local dev servers.", "type": "module", "bin": { - "subtext-tunnel": "./build/src/index.js" + "subtext-tunnel": "./dist/index.js" }, "files": [ - "build/src/" + "dist/" ], "scripts": { - "clean": "node -e \"require('fs').rmSync('build', {recursive: true, force: true})\"", - "build": "tsc", - "bundle": "npm run clean && npm run build && rollup -c rollup.config.mjs", - "test": "node --test build/tests/**/*.js", - "prepublishOnly": "npm run bundle" + "clean": "node -e \"require('fs').rmSync('build', {recursive: true, force: true}); require('fs').rmSync('dist', {recursive: true, force: true})\"", + "build": "npm run clean && tsc", + "bundle": "npm run build && rollup -c rollup.config.mjs", + "test": "npm run build && node --test build/tests/**/*.js", + "verify": "npm pack --dry-run", + "prepack": "npm run bundle" }, "keywords": [ "mcp", diff --git a/tunnel/rollup.config.mjs b/tunnel/rollup.config.mjs index a2817458..f892762d 100644 --- a/tunnel/rollup.config.mjs +++ b/tunnel/rollup.config.mjs @@ -1,20 +1,23 @@ -import fs from 'node:fs'; -import path from 'node:path'; - import commonjs from '@rollup/plugin-commonjs'; import json from '@rollup/plugin-json'; import {nodeResolve} from '@rollup/plugin-node-resolve'; import cleanup from 'rollup-plugin-cleanup'; -const thirdPartyDir = './build/src/third_party'; - +// Bundle the entire CLI into a single self-contained file. tsc emits to +// build/ (used by tests + dev); rollup reads that and emits to dist/, which +// is what `npm publish` ships. Keeping input/output in different trees means +// `build` and `bundle` never clobber each other's output. export default { - input: path.join(thirdPartyDir, 'index.js'), + // index.ts is the real entry point: it does a Node-version check and then + // dynamic-imports main. inlineDynamicImports below pulls main into the same + // bundled file so we ship one self-contained ./dist/index.js. + input: './build/src/index.js', output: { - file: path.join(thirdPartyDir, 'index.js'), + file: './dist/index.js', sourcemap: false, format: 'esm', inlineDynamicImports: true, + // Shebang is carried over from src/index.ts; no banner needed. }, // yargs and tslib emit TypeScript's __classPrivateField helpers that read // `this` at module top-level for a CJS caching trick. Rollup's ESM output diff --git a/tunnel/src/main.ts b/tunnel/src/main.ts index 67caaf84..4479c696 100644 --- a/tunnel/src/main.ts +++ b/tunnel/src/main.ts @@ -1,5 +1,5 @@ import process from "node:process"; -import { readFileSync } from "node:fs"; +import { existsSync, readFileSync } from "node:fs"; import { fileURLToPath } from "node:url"; import { dirname, join } from "node:path"; @@ -13,14 +13,24 @@ import { import { TunnelClient } from "./client.js"; // Single source of truth: read version from package.json at runtime so it -// can't drift from what npm publishes. From build/src/main.js, package.json -// sits two levels up at the package root. -const pkg = JSON.parse( - readFileSync( - join(dirname(fileURLToPath(import.meta.url)), "..", "..", "package.json"), - "utf8", - ), -) as { version: string }; +// can't drift from what npm publishes. The relative depth depends on where +// this file ends up: build/src/main.js (tsc, dev) is two levels deep; +// dist/index.js (rollup, published) is one level deep. Walk up until a +// package.json is found. +function findPackageJson(): string { + let dir = dirname(fileURLToPath(import.meta.url)); + for (let i = 0; i < 5; i++) { + const candidate = join(dir, "package.json"); + if (existsSync(candidate)) return candidate; + const parent = dirname(dir); + if (parent === dir) break; + dir = parent; + } + throw new Error("package.json not found near tunnel binary"); +} +const pkg = JSON.parse(readFileSync(findPackageJson(), "utf8")) as { + version: string; +}; const VERSION = pkg.version; await yargs(hideBin(process.argv))