diff --git a/package-lock.json b/package-lock.json index 319dcdd83..8bcbd40c3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -114,7 +114,7 @@ "intercept-stdout": "0.1.2", "mkcert": "^3.2.0", "mocha": "^11.7.5", - "mqtt": "~4.3.8", + "mqtt": "^5.15.1", "oxlint": "^1.31.0", "prettier": "~3.8.0", "rewire": "^9.0.1", @@ -1083,7 +1083,9 @@ } }, "node_modules/@babel/runtime": { - "version": "7.28.6", + "version": "7.29.2", + "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.29.2.tgz", + "integrity": "sha512-JiDShH45zKHWyGe4ZNVRrCjBz8Nh9TMmZG1kh4QTK8hCBTWBi8Da+i7s1fJw7/lYpM4ccepSNfqzZ/QvABBi5g==", "license": "MIT", "engines": { "node": ">=6.9.0" @@ -2622,6 +2624,9 @@ "cpu": [ "arm64" ], + "libc": [ + "glibc" + ], "license": "Apache-2.0", "optional": true, "os": [ @@ -2638,6 +2643,9 @@ "cpu": [ "arm64" ], + "libc": [ + "musl" + ], "license": "Apache-2.0", "optional": true, "os": [ @@ -2654,6 +2662,9 @@ "cpu": [ "x64" ], + "libc": [ + "glibc" + ], "license": "Apache-2.0", "optional": true, "os": [ @@ -2670,6 +2681,9 @@ "cpu": [ "x64" ], + "libc": [ + "musl" + ], "license": "Apache-2.0", "optional": true, "os": [ @@ -4639,6 +4653,16 @@ "@types/node": "*" } }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@typescript-eslint/eslint-plugin": { "version": "8.59.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-8.59.0.tgz", @@ -5339,6 +5363,19 @@ "node": ">=8" } }, + "node_modules/broker-factory": { + "version": "3.1.14", + "resolved": "https://registry.npmjs.org/broker-factory/-/broker-factory-3.1.14.tgz", + "integrity": "sha512-L45k5HMbPIrMid0nTOZ/UPXG/c0aRuQKVrSDFIb1zOkvfiyHgYmIjc3cSiN1KwQIvRDOtKE0tfb3I9EZ3CmpQQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@babel/runtime": "^7.29.2", + "fast-unique-numbers": "^9.0.27", + "tslib": "^2.8.1", + "worker-factory": "^7.0.49" + } + }, "node_modules/browser-stdout": { "version": "1.3.1", "dev": true, @@ -5674,13 +5711,11 @@ } }, "node_modules/commist": { - "version": "1.1.0", + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/commist/-/commist-3.2.0.tgz", + "integrity": "sha512-4PIMoPniho+LqXmpS5d3NuGYncG6XWlkBSVGiWycL22dd42OYdUGil2CWuzklaJoNxyxUSpO4MKIBU94viWNAw==", "dev": true, - "license": "MIT", - "dependencies": { - "leven": "^2.1.0", - "minimist": "^1.1.0" - } + "license": "MIT" }, "node_modules/complex.js": { "version": "2.4.3", @@ -6577,6 +6612,20 @@ "dev": true, "license": "MIT" }, + "node_modules/fast-unique-numbers": { + "version": "9.0.27", + "resolved": "https://registry.npmjs.org/fast-unique-numbers/-/fast-unique-numbers-9.0.27.tgz", + "integrity": "sha512-nDA9ADeINN8SA2u2wCtU+siWFTTDqQR37XvgPIDDmboWQeExz7X0mImxuaN+kJddliIqy2FpVRmnvRZ+j8i1/A==", + "dev": true, + "license": "MIT", + "dependencies": { + "@babel/runtime": "^7.29.2", + "tslib": "^2.8.1" + }, + "engines": { + "node": ">=18.2.0" + } + }, "node_modules/fast-uri": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/fast-uri/-/fast-uri-3.1.0.tgz", @@ -6931,11 +6980,6 @@ "node": ">=14.14" } }, - "node_modules/fs.realpath": { - "version": "1.0.0", - "dev": true, - "license": "ISC" - }, "node_modules/fsevents": { "version": "2.3.3", "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.3.tgz", @@ -7978,71 +8022,12 @@ } }, "node_modules/help-me": { - "version": "3.0.0", - "dev": true, - "license": "MIT", - "dependencies": { - "glob": "^7.1.6", - "readable-stream": "^3.6.0" - } - }, - "node_modules/help-me/node_modules/balanced-match": { - "version": "1.0.2", + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/help-me/-/help-me-5.0.0.tgz", + "integrity": "sha512-7xgomUX6ADmcYzFik0HzAxh/73YlKR9bmFzf51CZwR+b6YtzU2m0u49hQCqV6SvlqIqsaxovfwdvbnsw3b/zpg==", "dev": true, "license": "MIT" }, - "node_modules/help-me/node_modules/brace-expansion": { - "version": "1.1.12", - "dev": true, - "license": "MIT", - "dependencies": { - "balanced-match": "^1.0.0", - "concat-map": "0.0.1" - } - }, - "node_modules/help-me/node_modules/glob": { - "version": "7.2.3", - "dev": true, - "license": "ISC", - "dependencies": { - "fs.realpath": "^1.0.0", - "inflight": "^1.0.4", - "inherits": "2", - "minimatch": "^3.1.1", - "once": "^1.3.0", - "path-is-absolute": "^1.0.0" - }, - "engines": { - "node": "*" - }, - "funding": { - "url": "https://github.com/sponsors/isaacs" - } - }, - "node_modules/help-me/node_modules/minimatch": { - "version": "3.1.5", - "dev": true, - "license": "ISC", - "dependencies": { - "brace-expansion": "^1.1.7" - }, - "engines": { - "node": "*" - } - }, - "node_modules/help-me/node_modules/readable-stream": { - "version": "3.6.2", - "dev": true, - "license": "MIT", - "dependencies": { - "inherits": "^2.0.3", - "string_decoder": "^1.1.1", - "util-deprecate": "^1.0.1" - }, - "engines": { - "node": ">= 6" - } - }, "node_modules/http-errors": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/http-errors/-/http-errors-2.0.1.tgz", @@ -8133,15 +8118,6 @@ "node": ">=0.8.19" } }, - "node_modules/inflight": { - "version": "1.0.6", - "dev": true, - "license": "ISC", - "dependencies": { - "once": "^1.3.0", - "wrappy": "1" - } - }, "node_modules/inherits": { "version": "2.0.4", "license": "ISC" @@ -8227,6 +8203,16 @@ "lodash.toarray": "^3.0.0" } }, + "node_modules/ip-address": { + "version": "10.1.0", + "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-10.1.0.tgz", + "integrity": "sha512-XXADHxXmvT9+CRxhXg56LJovE+bmWnEWB78LB83VZTprKTmaC5QfruXocxzTZ2Kl0DNwKuBdlIhjL8LeY8Sf8Q==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">= 12" + } + }, "node_modules/ipaddr.js": { "version": "2.3.0", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.3.0.tgz", @@ -8583,14 +8569,6 @@ "version": "0.1.2", "license": "MIT" }, - "node_modules/leven": { - "version": "2.1.0", - "dev": true, - "license": "MIT", - "engines": { - "node": ">=0.10.0" - } - }, "node_modules/levn": { "version": "0.4.1", "dev": true, @@ -8796,15 +8774,11 @@ } }, "node_modules/lru-cache": { - "version": "6.0.0", + "version": "10.4.3", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.4.3.tgz", + "integrity": "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ==", "dev": true, - "license": "ISC", - "dependencies": { - "yallist": "^4.0.0" - }, - "engines": { - "node": ">=10" - } + "license": "ISC" }, "node_modules/math-intrinsics": { "version": "1.1.0", @@ -9105,35 +9079,36 @@ } }, "node_modules/mqtt": { - "version": "4.3.8", + "version": "5.15.1", + "resolved": "https://registry.npmjs.org/mqtt/-/mqtt-5.15.1.tgz", + "integrity": "sha512-V1WnkGuJh3ec9QXzy5Iylw8OOBK+Xu1WhxcQ9mMpLThG+/JZIMV1PgLNRgIiqXhZnvnVLsuyxHl5A/3bHHbcAA==", "dev": true, "license": "MIT", "dependencies": { - "commist": "^1.0.0", + "@types/readable-stream": "^4.0.21", + "@types/ws": "^8.18.1", + "commist": "^3.2.0", "concat-stream": "^2.0.0", - "debug": "^4.1.1", - "duplexify": "^4.1.1", - "help-me": "^3.0.0", - "inherits": "^2.0.3", - "lru-cache": "^6.0.0", - "minimist": "^1.2.5", - "mqtt-packet": "^6.8.0", - "number-allocator": "^1.0.9", - "pump": "^3.0.0", - "readable-stream": "^3.6.0", - "reinterval": "^1.1.0", - "rfdc": "^1.3.0", - "split2": "^3.1.0", - "ws": "^7.5.5", - "xtend": "^4.0.2" + "debug": "^4.4.1", + "help-me": "^5.0.0", + "lru-cache": "^10.4.3", + "minimist": "^1.2.8", + "mqtt-packet": "^9.0.2", + "number-allocator": "^1.0.14", + "readable-stream": "^4.7.0", + "rfdc": "^1.4.1", + "socks": "^2.8.6", + "split2": "^4.2.0", + "worker-timers": "^8.0.23", + "ws": "^8.18.3" }, "bin": { - "mqtt": "bin/mqtt.js", - "mqtt_pub": "bin/pub.js", - "mqtt_sub": "bin/sub.js" + "mqtt": "build/bin/mqtt.js", + "mqtt_pub": "build/bin/pub.js", + "mqtt_sub": "build/bin/sub.js" }, "engines": { - "node": ">=10.0.0" + "node": ">=16.0.0" } }, "node_modules/mqtt-packet": { @@ -9145,57 +9120,56 @@ "process-nextick-args": "^2.0.1" } }, - "node_modules/mqtt/node_modules/bl": { - "version": "4.1.0", - "dev": true, - "license": "MIT", - "dependencies": { - "buffer": "^5.5.0", - "inherits": "^2.0.4", - "readable-stream": "^3.4.0" - } - }, - "node_modules/mqtt/node_modules/mqtt-packet": { - "version": "6.10.0", + "node_modules/mqtt/node_modules/buffer": { + "version": "6.0.3", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz", + "integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==", "dev": true, + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], "license": "MIT", "dependencies": { - "bl": "^4.0.2", - "debug": "^4.1.1", - "process-nextick-args": "^2.0.1" + "base64-js": "^1.3.1", + "ieee754": "^1.2.1" } }, "node_modules/mqtt/node_modules/readable-stream": { - "version": "3.6.2", + "version": "4.7.0", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-4.7.0.tgz", + "integrity": "sha512-oIGGmcpTLwPga8Bn6/Z75SVaH1z5dUut2ibSyAMVhmUggWpmDn2dapB0n7f8nwaSiRtepAsfJyfXIO5DCVAODg==", "dev": true, "license": "MIT", "dependencies": { - "inherits": "^2.0.3", - "string_decoder": "^1.1.1", - "util-deprecate": "^1.0.1" + "abort-controller": "^3.0.0", + "buffer": "^6.0.3", + "events": "^3.3.0", + "process": "^0.11.10", + "string_decoder": "^1.3.0" }, "engines": { - "node": ">= 6" + "node": "^12.22.0 || ^14.17.0 || >=16.0.0" } }, - "node_modules/mqtt/node_modules/ws": { - "version": "7.5.10", + "node_modules/mqtt/node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", "dev": true, "license": "MIT", - "engines": { - "node": ">=8.3.0" - }, - "peerDependencies": { - "bufferutil": "^4.0.1", - "utf-8-validate": "^5.0.2" - }, - "peerDependenciesMeta": { - "bufferutil": { - "optional": true - }, - "utf-8-validate": { - "optional": true - } + "dependencies": { + "safe-buffer": "~5.2.0" } }, "node_modules/ms": { @@ -9424,6 +9398,9 @@ "cpu": [ "arm64" ], + "libc": [ + "glibc" + ], "license": "MIT", "optional": true, "os": [ @@ -9440,6 +9417,9 @@ "cpu": [ "arm64" ], + "libc": [ + "musl" + ], "license": "MIT", "optional": true, "os": [ @@ -9456,6 +9436,9 @@ "cpu": [ "x64" ], + "libc": [ + "glibc" + ], "license": "MIT", "optional": true, "os": [ @@ -9472,6 +9455,9 @@ "cpu": [ "x64" ], + "libc": [ + "musl" + ], "license": "MIT", "optional": true, "os": [ @@ -9951,14 +9937,6 @@ "node": ">=14.0.0" } }, - "node_modules/path-is-absolute": { - "version": "1.0.1", - "dev": true, - "license": "MIT", - "engines": { - "node": ">=0.10.0" - } - }, "node_modules/path-key": { "version": "3.1.1", "license": "MIT", @@ -9981,11 +9959,6 @@ "url": "https://github.com/sponsors/isaacs" } }, - "node_modules/path-scurry/node_modules/lru-cache": { - "version": "10.4.3", - "dev": true, - "license": "ISC" - }, "node_modules/pause": { "version": "0.0.1" }, @@ -10053,13 +10026,6 @@ "split2": "^4.0.0" } }, - "node_modules/pino-abstract-transport/node_modules/split2": { - "version": "4.2.0", - "license": "ISC", - "engines": { - "node": ">= 10.x" - } - }, "node_modules/pino-std-serializers": { "version": "6.2.2", "license": "MIT" @@ -10120,15 +10086,6 @@ "node": "^12.22.0 || ^14.17.0 || >=16.0.0" } }, - "node_modules/pino/node_modules/split2": { - "version": "4.2.0", - "resolved": "https://registry.npmjs.org/split2/-/split2-4.2.0.tgz", - "integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==", - "license": "ISC", - "engines": { - "node": ">= 10.x" - } - }, "node_modules/pino/node_modules/string_decoder": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", @@ -10458,11 +10415,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/reinterval": { - "version": "1.1.0", - "dev": true, - "license": "MIT" - }, "node_modules/repeating": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/repeating/-/repeating-2.0.1.tgz", @@ -10681,7 +10633,6 @@ }, "node_modules/segfault-handler": { "version": "1.3.0", - "hasInstallScript": true, "license": "BSD-3-Clause", "optional": true, "dependencies": { @@ -10926,6 +10877,32 @@ "node": ">=0.3.1" } }, + "node_modules/smart-buffer": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/smart-buffer/-/smart-buffer-4.2.0.tgz", + "integrity": "sha512-94hK0Hh8rPqQl2xXc3HsaBoOXKV20MToPkcXvwbISWLEs+64sBq5kFgn2kJDHb1Pry9yrP0dxrCI9RRci7RXKg==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">= 6.0.0", + "npm": ">= 3.0.0" + } + }, + "node_modules/socks": { + "version": "2.8.7", + "resolved": "https://registry.npmjs.org/socks/-/socks-2.8.7.tgz", + "integrity": "sha512-HLpt+uLy/pxB+bum/9DzAgiKS8CX1EvbWxI4zlmgGCExImLdiad2iCwXT5Z4c9c3Eq8rP2318mPW2c+QbtjK8A==", + "dev": true, + "license": "MIT", + "dependencies": { + "ip-address": "^10.0.1", + "smart-buffer": "^4.2.0" + }, + "engines": { + "node": ">= 10.0.0", + "npm": ">= 3.0.0" + } + }, "node_modules/sonic-boom": { "version": "3.8.1", "license": "MIT", @@ -10965,24 +10942,12 @@ } }, "node_modules/split2": { - "version": "3.2.2", - "dev": true, + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/split2/-/split2-4.2.0.tgz", + "integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==", "license": "ISC", - "dependencies": { - "readable-stream": "^3.0.0" - } - }, - "node_modules/split2/node_modules/readable-stream": { - "version": "3.6.2", - "dev": true, - "license": "MIT", - "dependencies": { - "inherits": "^2.0.3", - "string_decoder": "^1.1.1", - "util-deprecate": "^1.0.1" - }, "engines": { - "node": ">= 6" + "node": ">= 10.x" } }, "node_modules/stack-trace": { @@ -11638,6 +11603,57 @@ "node": ">=0.10.0" } }, + "node_modules/worker-factory": { + "version": "7.0.49", + "resolved": "https://registry.npmjs.org/worker-factory/-/worker-factory-7.0.49.tgz", + "integrity": "sha512-lW7tpgy6aUv2dFsQhv1yv+XFzdkCf/leoKRTGMPVK5/die6RrUjqgJHJf556qO+ZfytNG6wPXc17E8zzsOLUDw==", + "dev": true, + "license": "MIT", + "dependencies": { + "@babel/runtime": "^7.29.2", + "fast-unique-numbers": "^9.0.27", + "tslib": "^2.8.1" + } + }, + "node_modules/worker-timers": { + "version": "8.0.31", + "resolved": "https://registry.npmjs.org/worker-timers/-/worker-timers-8.0.31.tgz", + "integrity": "sha512-ngkq5S6JuZyztom8tDgBzorLo9byhBMko/sXfgiUD945AuzKGg1GCgDMCC3NaYkicLpGKXutONM36wEX8UbBCA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@babel/runtime": "^7.29.2", + "tslib": "^2.8.1", + "worker-timers-broker": "^8.0.16", + "worker-timers-worker": "^9.0.14" + } + }, + "node_modules/worker-timers-broker": { + "version": "8.0.16", + "resolved": "https://registry.npmjs.org/worker-timers-broker/-/worker-timers-broker-8.0.16.tgz", + "integrity": "sha512-JyP3AvUGyPGbBGW7XiUewm2+0pN/aYo1QpVf5kdXAfkDZcN3p7NbWrG6XnyDEpDIvfHk/+LCnOW/NsuiU9riYA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@babel/runtime": "^7.29.2", + "broker-factory": "^3.1.14", + "fast-unique-numbers": "^9.0.27", + "tslib": "^2.8.1", + "worker-timers-worker": "^9.0.14" + } + }, + "node_modules/worker-timers-worker": { + "version": "9.0.14", + "resolved": "https://registry.npmjs.org/worker-timers-worker/-/worker-timers-worker-9.0.14.tgz", + "integrity": "sha512-/qF06C60sXmSLfUl7WglvrDIbspmPOM8UrG63Dnn4bi2x4/DfqHS/+dxF5B+MdHnYO5tVuZYLHdAodrKdabTIg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@babel/runtime": "^7.29.2", + "tslib": "^2.8.1", + "worker-factory": "^7.0.49" + } + }, "node_modules/workerpool": { "version": "9.3.4", "dev": true, @@ -11711,11 +11727,6 @@ "node": ">=10" } }, - "node_modules/yallist": { - "version": "4.0.0", - "dev": true, - "license": "ISC" - }, "node_modules/yaml": { "version": "2.8.3", "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.3.tgz", diff --git a/package.json b/package.json index 1510ab675..a1d75142b 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,7 @@ "build": "tsc --project tsconfig.build.json", "build:watch": "npm run build -- --watch --incremental", "package": "./build-tools/build.sh", - "lint": "oxlint --deny-warnings .", + "lint": "oxlint --format stylish --deny-warnings .", "lint:required": "oxlint --quiet .", "lint:fix": "npm run lint -- --fix", "format:check": "prettier --check .", @@ -142,7 +142,7 @@ "intercept-stdout": "0.1.2", "mkcert": "^3.2.0", "mocha": "^11.7.5", - "mqtt": "~4.3.8", + "mqtt": "^5.15.1", "oxlint": "^1.31.0", "prettier": "~3.8.0", "rewire": "^9.0.1", diff --git a/server/DurableSubscriptionsSession.ts b/server/DurableSubscriptionsSession.ts index 1017c158b..9545c601e 100644 --- a/server/DurableSubscriptionsSession.ts +++ b/server/DurableSubscriptionsSession.ts @@ -84,6 +84,7 @@ export async function getSession({ clientId: sessionId, user, clean: nonDurable, + properties, will, keepalive, }: { @@ -91,10 +92,12 @@ export async function getSession({ user; listener: Function; clean?: boolean; + properties?: any; will: any; keepalive?: number; }) { let session; + if (properties?.sessionExpiryInterval > 0) nonDurable = false; if (sessionId && !nonDurable) { const sessionResource = await DurableSession.get(sessionId, { returnNonexistent: true }); session = new DurableSubscriptionsSession(sessionId, user, sessionResource); @@ -479,13 +482,13 @@ export class DurableSubscriptionsSession extends SubscriptionsSession { async addSubscription(subscription, needsAck) { await this.resumeSubscription(subscription, needsAck); const { qos, startTime } = subscription; - if (qos > 0 && !startTime) this.saveSubscriptions(); - return subscription.qos; + if (qos > 0 && !startTime) await this.saveSubscriptions(); + return subscription; } - removeSubscription(topic) { + async removeSubscription(topic) { const existingSubscription = this.subscriptions.find((subscription) => subscription.topic === topic); const result = super.removeSubscription(topic); - if (existingSubscription.qos > 0) this.saveSubscriptions(); + if (existingSubscription.qos > 0) await this.saveSubscriptions(); return result; } saveSubscriptions() { @@ -499,6 +502,6 @@ export class DurableSubscriptionsSession extends SubscriptionsSession { startTime, }; }); - DurableSession.put(this.sessionRecord); + return DurableSession.put(this.sessionRecord); } } diff --git a/server/mqtt.ts b/server/mqtt.ts index c889af2de..b7775e8fa 100644 --- a/server/mqtt.ts +++ b/server/mqtt.ts @@ -209,7 +209,7 @@ function onSocket(socket, send, request, user, mqttSettings) { } } - parser.on('packet', async (packet) => { + parser.on('packet', async (packet: any) => { try { if (user?.then) user = await user; } catch (error) { @@ -285,7 +285,8 @@ function onSocket(socket, send, request, user, mqttSettings) { // TODO: Handle the will & testament, and possibly use the will's content type as a hint for expected content if (packet.will) { const deserialize = - socket.deserialize || (socket.deserialize = getDeserializer(request?.headers.get?.('content-type'))); + socket.deserialize || + (socket.deserialize = getDeserializer(request?.headers.get?.('content-type'), false)); packet.will.data = packet.will.payload?.length > 0 ? deserialize(packet.will.payload) : undefined; delete packet.will.payload; } @@ -413,7 +414,7 @@ function onSocket(socket, send, request, user, mqttSettings) { const responseCmd = packet.qos === 2 ? 'pubrec' : 'puback'; // deserialize const deserialize = - socket.deserialize || (socket.deserialize = getDeserializer(request?.headers.get?.('content-type'))); + socket.deserialize || (socket.deserialize = getDeserializer(request?.headers.get?.('content-type'), false)); const messageLength = packet.payload?.length || 0; const data = messageLength > 0 ? deserialize(packet.payload) : undefined; // zero payload length maps to a delete let published; diff --git a/unitTests/apiTests/mqtt-test.mjs b/unitTests/apiTests/mqtt-test.mjs index 29ac08c54..4162a2b16 100644 --- a/unitTests/apiTests/mqtt-test.mjs +++ b/unitTests/apiTests/mqtt-test.mjs @@ -1,12 +1,15 @@ 'use strict'; +/** @typedef {import("mqtt/build").MqttClient} MqttClient */ + import assert from 'node:assert/strict'; +import { once } from 'node:events'; import { decode } from 'cbor-x'; import { callOperation } from './utility.js'; import { setupTestApp } from './setupTestApp.mjs'; import environmentManager from '#js/utility/environment/environmentManager'; const { get: env_get, setProperty } = environmentManager; -import { connect } from 'mqtt'; +import { connect, connectAsync } from 'mqtt'; import { readFileSync } from 'fs'; import { handleApplication as handleMQTTApplication } from '#src/server/mqtt'; @@ -39,14 +42,37 @@ function startMQTT(config) { return serverInstances; } import axios from 'axios'; + +async function subscribeAllowingSubackError(client, topic, options) { + try { + return await client.subscribeAsync(topic, options); + } catch (error) { + if (error.packet?.cmd === 'suback') { + return error.packet.granted.map((qos) => ({ topic, qos })); + } + throw error; + } +} + +async function connectWithMessageListener(brokerUrl, options, listener) { + const client = connect(brokerUrl, options); + client.on('message', listener); + await once(client, 'connect'); + return client; +} + describe('test MQTT connections and commands', function () { this.timeout(10000); let available_records; - let client, client2; - before(async () => { + /** @type {MqttClient} */ + let clientV4; + /** @type {MqttClient} */ + let clientV5; + beforeEach(async () => { available_records = await setupTestApp(); - client = connect('ws://localhost:9926', { + clientV4 = await connectAsync('ws://localhost:9926', { + protocolVersion: 4, wsOptions: { headers: { Accept: 'application/cbor', @@ -54,67 +80,58 @@ describe('test MQTT connections and commands', function () { }, }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); - }); - client2 = connect('mqtts://localhost:8883', { + clientV5 = await connectAsync('mqtts://localhost:8883', { protocolVersion: 5, rejectUnauthorized: false, }); - await new Promise((resolve, reject) => { - client2.on('connect', (_connack) => { - resolve(); - }); - client2.on('error', (error) => { - reject(error); - }); - }); }); it('subscribe to retained/persisted record', async function () { let path = 'VariedProps/' + available_records[1]; await new Promise((resolve, reject) => { - client.subscribe(path, function (err) { - if (err) reject(err); - else { - // client.publish('VariedProps/' + available_records[2], 'Hello mqtt') + const timeout = setTimeout(() => { + clientV4.off('message', onMessage); + reject(new Error('Timeout waiting for retained message')); + }, 1000); + const onMessage = (topic, payload) => { + clearTimeout(timeout); + try { + assert.equal(topic, path); + const data = decode(payload); + assert.ok(data, 'Should have received a valid payload'); + resolve(); + } catch (e) { + reject(e); } - }); - client.once('message', (topic, payload) => { - decode(payload); - resolve(); - }); + }; + clientV4.once('message', onMessage); + clientV4.subscribeAsync(path).catch(reject); }); }); it('subscribe to retained/persisted record but with retain handling disabling retain messages', async function () { let path = 'VariedProps/' + available_records[1]; + await clientV5.subscribeAsync(path, { rh: 2 }); await new Promise((resolve, reject) => { - client2.subscribe(path, { rh: 2 }, function (err) { - if (err) reject(err); - }); const onMessage = (topic, payload) => { decode(payload); reject(new Error('Should not receive any retained messages')); }; - client2.once('message', onMessage); + clientV5.once('message', onMessage); setTimeout(() => { - client2.off('message', onMessage); + clientV5.off('message', onMessage); resolve(); }, 50); }); }); it('subscribe to top level without wildcard should not match record', async function () { + await clientV5.subscribeAsync('VariedProps/'); await new Promise((resolve, reject) => { - client2.subscribe('VariedProps/', function (err) { - if (err) reject(err); - }); const onMessage = () => { reject(new Error('Should not receive any top-level messages')); }; - client2.once('message', onMessage); + clientV5.once('message', onMessage); setTimeout(() => { - client2.off('message', onMessage); + clientV5.off('message', onMessage); resolve(); }, 50); }); @@ -129,32 +146,29 @@ describe('test MQTT connections and commands', function () { let subscriptions = []; for (let x = 1; x < vus + 1; x++) { const topic = `${tableName}/1`; - const client = connect({ + + /** @type {MqttClient} */ + const client = await connectAsync({ clientId: `vu${x}`, host: 'localhost', clean: true, connectTimeout: 2000, protocol: 'mqtt', + protocolVersion: 4, }); clients.push(client); subscriptions.push( - new Promise((resolve) => { - client.on('connect', function (_connack) { - client.subscribe(topic, function (err) { - if (!err) { - resolve(); - intervals.push( - setInterval(() => { - client.publish(topic, JSON.stringify({ name: 'radbot 9000', pub_time: Date.now() }), { - qos: 1, - retain: false, - }); - }, 1) - ); - } - }); - }); - }) + (async () => { + await client.subscribeAsync(topic); + intervals.push( + setInterval(() => { + client.publish(topic, JSON.stringify({ name: 'radbot 9000', pub_time: Date.now() }), { + qos: 1, + retain: false, + }); + }, 1) + ); + })() ); client.on('message', function (topic, message) { @@ -179,9 +193,12 @@ describe('test MQTT connections and commands', function () { it('last will should be published on connection loss', async () => { const topic = `SimpleRecord/52`; - const client_to_die = connect({ + + /** @type {MqttClient} */ + const client_to_die = await connectAsync({ host: 'localhost', clean: true, + protocolVersion: 4, will: { topic, payload: JSON.stringify({ name: 'last will and testimony' }), @@ -189,18 +206,11 @@ describe('test MQTT connections and commands', function () { retain: false, }, }); - await new Promise((resolve, reject) => { - client_to_die.on('connect', function (connack) { - resolve(connack); - }); - client_to_die.on('error', reject); - }); - await new Promise((resolve, reject) => { - client.subscribe(topic, function (err) { - if (err) reject(err); - }); - client.once('message', function (topic, message) { + await clientV4.subscribeAsync(topic); + + await new Promise((resolve, reject) => { + clientV4.once('message', function (topic, message) { try { let data = decode(message); // message is Buffer @@ -216,9 +226,10 @@ describe('test MQTT connections and commands', function () { it('last will should not be published on explicit disconnect', async () => { const topic = `SimpleRecord/53`; - const client_to_die = connect({ + const client_to_die = await connectAsync({ host: 'localhost', clean: true, + protocolVersion: 4, will: { topic, payload: JSON.stringify({ name: 'last will and testimony' }), @@ -227,16 +238,9 @@ describe('test MQTT connections and commands', function () { }, }); let onMessage; + await clientV4.subscribeAsync(topic); + await new Promise((resolve, reject) => { - client_to_die.on('connect', function (connack) { - resolve(connack); - }); - client_to_die.on('error', reject); - }); - await new Promise((resolve, reject) => { - client.subscribe(topic, function (err) { - if (err) reject(err); - }); onMessage = function (topic) { try { reject('Should not get a message on topic ' + topic); @@ -244,31 +248,28 @@ describe('test MQTT connections and commands', function () { reject(error); } }; - client.once('message', onMessage); + clientV4.once('message', onMessage); setTimeout(resolve, 50); client_to_die.end(); // this closes the connection with a disconnect packet }); - client.off('message', onMessage); + clientV4.off('message', onMessage); }); it('can publish non-JSON', async () => { const topic = `SimpleRecord/51`; - const client = connect({ + const client = await connectAsync({ host: 'localhost', clean: true, connectTimeout: 2000, protocol: 'mqtt', + protocolVersion: 4, }); + await client.subscribeAsync(topic); await new Promise((resolve) => { - client.on('connect', function (_connack) { - client.subscribe(topic, function (err) { - console.error(err); - client.publish(topic, Buffer.from([1, 2, 3, 4, 5]), { - qos: 1, - retain: false, - }); - }); + client.publish(topic, Buffer.from([1, 2, 3, 4, 5]), { + qos: 1, + retain: false, }); client.on('message', function (topic, message) { @@ -285,17 +286,19 @@ describe('test MQTT connections and commands', function () { }); it('publish and subscribe are restricted', async () => { const topic = `SimpleRecord/51`; - const client_authorized = connect({ + const client_authorized = await connectAsync({ host: 'localhost', clean: true, connectTimeout: 2000, protocol: 'mqtt', + protocolVersion: 4, }); - const client = connect({ + const client = await connectAsync({ host: 'localhost', clean: true, connectTimeout: 2000, protocol: 'mqtt', + protocolVersion: 4, username: 'restricted', password: 'restricted', will: { @@ -305,20 +308,15 @@ describe('test MQTT connections and commands', function () { }, }); let published_messages = []; - await new Promise((resolve, reject) => { - client.on('connect', function () { - client.subscribe(topic, function (err, subscriptions) { - assert.equal(subscriptions[0].qos, 128); - client_authorized.subscribe(topic, function () { - client.publish(topic, JSON.stringify({ name: 'should not be published ' }), { - qos: 1, - retain: false, - }); - setTimeout(resolve, 50); - }); - }); - }); + const granted = await subscribeAllowingSubackError(client, topic); + assert.equal(granted[0].qos, 128); + await client_authorized.subscribeAsync(topic); + await new Promise((resolve) => { + client.publish(topic, JSON.stringify({ name: 'should not be published ' }), { + qos: 1, + retain: false, + }); client_authorized.on('message', function (topic) { published_messages.push(topic); }); @@ -326,44 +324,31 @@ describe('test MQTT connections and commands', function () { client.on('error', function (error) { // message is Buffer console.error('Error connecting to restricted client', error); - reject(error); }); + setTimeout(resolve, 50); }); client.end(true); // force close to trigger the will message await delay(50); assert.equal(published_messages.length, 0); }); it('can not subscribe to resource with mqtt export disabled', async () => { - const client = connect({ + const client = await connectAsync({ host: 'localhost', clean: true, connectTimeout: 2000, + protocolVersion: 4, }); - await new Promise((resolve) => { - client.on('connect', function () { - client.subscribe('Related/#', function (err, subscriptions) { - assert.equal(subscriptions[0].qos, 128); - resolve(); - }); - }); - }); + const granted = await subscribeAllowingSubackError(client, 'Related/#'); + assert.equal(granted[0].qos, 128); }); it('subscribe to retained record with upsert operation', async function () { let path = 'SimpleRecord/77'; - let client; - await new Promise((resolve, reject) => { - client = connect('mqtt://localhost:1883'); - client.on('connect', resolve); - client.on('error', reject); + let client = await connectAsync('mqtt://localhost:1883', { + protocolVersion: 4, }); await new Promise((resolve, reject) => { - client.subscribe(path, function (err) { - if (err) reject(err); - else { - // client.publish('VariedProps/' + available_records[2], 'Hello mqtt') - } - }); + client.subscribeAsync(path).catch(reject); client.once('message', (topic, payload) => { JSON.parse(payload); resolve(); @@ -393,24 +378,17 @@ describe('test MQTT connections and commands', function () { }); it('subscribe to retained record with patch operations', async function () { let path = 'SimpleRecord/78'; - let client; - await new Promise((resolve, reject) => { - client = connect('mqtt://localhost:1883', { - clean: false, - clientId: 'with-patches', - }); - client.on('connect', resolve); - client.on('error', reject); + let client = await connectAsync('mqtt://localhost:1883', { + clean: false, + clientId: 'with-patches', + protocolVersion: 4, }); let headers = { 'Content-Type': 'application/json', }; - await new Promise(async (resolve, reject) => { + await new Promise(async (resolve) => { let messages = []; - client.subscribe(path, { qos: 1 }, function (err) { - if (err) reject(err); - }); const onMessage = (topic, payload) => { let record = JSON.parse(payload); messages.push(record); @@ -425,6 +403,7 @@ describe('test MQTT connections and commands', function () { } }; client.on('message', onMessage); + await client.subscribeAsync(path, { qos: 1 }); await axios.put('http://localhost:9926/SimpleRecord/78', { name: 'a starting point', count: 2 }, { headers }); await axios.patch( 'http://localhost:9926/SimpleRecord/78', @@ -432,7 +411,7 @@ describe('test MQTT connections and commands', function () { { headers } ); }); - await new Promise((resolve) => client.end(resolve)); + await client.endAsync(); await delay(10); await axios.patch( 'http://localhost:9926/SimpleRecord/78', @@ -446,25 +425,29 @@ describe('test MQTT connections and commands', function () { ); await new Promise(async (resolve, reject) => { let messages = []; - client = connect('mqtt://localhost:1883', { - clean: false, - clientId: 'with-patches', - }); - client.on('error', reject); - client.on('message', (topic, payload, _packet) => { - let record = JSON.parse(payload); - messages.push(record); - if (messages.length == 3) { - assert.equal(messages[0].name, 'update 2'); - assert.equal(messages[0].count, 4); - assert.equal(messages[1].newProperty, 'newer value'); - assert.equal(messages[1].name, 'update 3'); - assert.equal(messages[1].count, 5); - assert.equal(messages[2].name, 'update 4'); - assert.equal(messages[2].count, 6); - resolve(); + client = await connectWithMessageListener( + 'mqtt://localhost:1883', + { + clean: false, + clientId: 'with-patches', + protocolVersion: 4, + }, + (topic, payload, _packet) => { + let record = JSON.parse(payload); + messages.push(record); + if (messages.length == 3) { + assert.equal(messages[0].name, 'update 2'); + assert.equal(messages[0].count, 4); + assert.equal(messages[1].newProperty, 'newer value'); + assert.equal(messages[1].name, 'update 3'); + assert.equal(messages[1].count, 5); + assert.equal(messages[2].name, 'update 4'); + assert.equal(messages[2].count, 6); + resolve(); + } } - }); + ); + client.on('error', reject); await axios.patch( 'http://localhost:9926/SimpleRecord/78', { name: 'update 4', count: { __op__: 'add', value: 1 } }, @@ -475,37 +458,14 @@ describe('test MQTT connections and commands', function () { client.end(); }); it('subscribe twice', async function () { - let client = connect('mqtt://localhost:1883', { + let client = await connectAsync('mqtt://localhost:1883', { clean: true, clientId: 'test-client-sub2', + protocolVersion: 4, }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); - }); - await new Promise((resolve, reject) => { - client.subscribe( - 'SimpleRecord/22', - { - qos: 1, - }, - function (err) { - if (err) reject(err); - else { - client.subscribe( - 'SimpleRecord/22', - { - qos: 1, - }, - function (err) { - if (err) reject(err); - else resolve(); - } - ); - } - } - ); - }); + await client.subscribeAsync('SimpleRecord/22', { qos: 1 }); + await client.subscribeAsync('SimpleRecord/22', { qos: 1 }); + await new Promise((resolve) => { client.once('message', (topic, payload) => { JSON.parse(payload); @@ -522,29 +482,15 @@ describe('test MQTT connections and commands', function () { } ); }); - await new Promise((resolve) => client.end(resolve)); + await client.endAsync(); }); it('received binary/string messages', async function () { - let client = connect('mqtt://localhost:1883', { + let client = await connectAsync('mqtt://localhost:1883', { clean: true, clientId: 'test-client-sub2', + protocolVersion: 4, }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); - }); - await new Promise((resolve, reject) => { - client.subscribe( - 'SimpleRecord/22', - { - qos: 0, - }, - function (err) { - if (err) reject(err); - else resolve(); - } - ); - }); + await client.subscribeAsync('SimpleRecord/22', { qos: 0 }); await new Promise((resolve) => { client.on('message', (topic, payload) => { assert.equal(payload.toString(), 'This is a test of a plain string'); @@ -555,14 +501,11 @@ describe('test MQTT connections and commands', function () { qos: 1, }); }); - await new Promise((resolve) => client.end(resolve)); - client = connect('mqtt://localhost:1883', { + await client.endAsync(); + client = await connectAsync('mqtt://localhost:1883', { clean: true, clientId: 'test-client-sub2', - }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); + protocolVersion: 4, }); await new Promise((resolve, reject) => { client.on('message', (topic, payload) => { @@ -570,17 +513,9 @@ describe('test MQTT connections and commands', function () { resolve(); }); - client.subscribe( - 'SimpleRecord/22', - { - qos: 0, - }, - function (err) { - if (err) reject(err); - } - ); + client.subscribeAsync('SimpleRecord/22', { qos: 0 }).catch(reject); }); - await new Promise((resolve) => client.end(resolve)); + await client.endAsync(); }); it('subscribe and unsubscribe with mTLS', async function () { let server; @@ -591,9 +526,11 @@ describe('test MQTT connections and commands', function () { })[0].listen(8884, resolve); server.on('error', reject); }); - let bad_client = connect('mqtts://localhost:8884', { + let bad_client = await connectAsync('mqtts://localhost:8884', { clientId: 'test-bad-mtls', - }); + protocolVersion: 4, + reconnectPeriod: 0, + }).catch(() => null); const private_key_path = env_get('tls_privateKey'); let cert, ca; @@ -601,38 +538,23 @@ describe('test MQTT connections and commands', function () { if (certificate.is_authority) ca = certificate.certificate; else if (certificate.name === 'localhost') cert = certificate.certificate; } - let client = connect('mqtts://localhost:8884', { + let client = await connectAsync('mqtts://localhost:8884', { key: readFileSync(private_key_path), // if they have a CA, we append it, so it is included cert, ca, clean: true, clientId: 'test-client-mtls', + protocolVersion: 4, }); - await new Promise((resolve, reject) => { - bad_client.on('connect', () => { - reject('Client should not be able to connect to mTLS without a certificate'); - }); - client.on('connect', resolve); - client.on('error', reject); - }); - await new Promise((resolve, reject) => { - client.subscribe( - 'SimpleRecord/23', - { - qos: 1, - }, - function (err) { - if (err) reject(err); - else { - client.unsubscribe('SimpleRecord/23', function (err) { - if (err) reject(err); - else resolve(); - }); - } - } - ); - }); + + if (bad_client && bad_client.connected) { + throw new Error('Client should not be able to connect to mTLS without a certificate'); + } + + await client.subscribeAsync('SimpleRecord/23', { qos: 1 }); + await client.unsubscribeAsync('SimpleRecord/23'); + await new Promise((resolve, reject) => { client.on('message', (topic, payload) => { JSON.parse(payload); @@ -673,11 +595,12 @@ describe('test MQTT connections and commands', function () { if (certificate.is_authority) ca = certificate.certificate; else if (certificate.name === 'localhost') cert = certificate.certificate; } - let bad_client = connect('wss://localhost:8885', { + let bad_client = await connectAsync('wss://localhost:8885', { reconnectPeriod: 0, clientId: 'test-bad-mtls', - }); - let client = connect('wss://localhost:8885', { + protocolVersion: 4, + }).catch(() => null); + let client = await connectAsync('wss://localhost:8885', { key: readFileSync(private_key_path), // if they have a CA, we append it, so it is included cert, @@ -685,31 +608,15 @@ describe('test MQTT connections and commands', function () { clean: true, reconnectPeriod: 0, clientId: 'test-client-mtls', + protocolVersion: 4, }); - await new Promise((resolve, reject) => { - bad_client.on('connect', () => { - reject('Client should not be able to connect to mTLS without a certificate'); - }); - client.on('connect', resolve); - client.on('error', reject); - }); - await new Promise((resolve, reject) => { - client.subscribe( - 'SimpleRecord/23', - { - qos: 1, - }, - function (err) { - if (err) reject(err); - else { - client.unsubscribe('SimpleRecord/23', function (err) { - if (err) reject(err); - else resolve(); - }); - } - } - ); - }); + + if (bad_client && bad_client.connected) { + throw new Error('Client should not be able to connect to mTLS without a certificate'); + } + + await subscribeAllowingSubackError(client, 'SimpleRecord/23', { qos: 1 }); + await client.unsubscribeAsync('SimpleRecord/23'); await new Promise((resolve, reject) => { client.on('message', (topic, payload) => { JSON.parse(payload); @@ -734,23 +641,14 @@ describe('test MQTT connections and commands', function () { }); it('subscribe to bad topic', async function () { - await new Promise((resolve, reject) => { - client2.subscribe('DoesNotExist/+', function (err, granted) { - if (err) reject(err); - else { - resolve(assert.equal(granted[0].qos, 0x8f)); - } - }); - }); + const granted = await subscribeAllowingSubackError(clientV5, 'DoesNotExist/+'); + assert.equal(granted[0].qos, 0x8f); }); it('Invalid packet', async function () { - let client = connect('mqtt://localhost:1883', { + let client = await connectAsync('mqtt://localhost:1883', { clean: true, clientId: 'test-client1', - }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); + protocolVersion: 4, }); // directly send an invalid packet, which should cause the connection to close client.stream.write(Buffer.from([67, 255])); @@ -775,18 +673,11 @@ describe('test MQTT connections and commands', function () { }; for (const subscription_topic in topic_expectations) { let expected_topics = topic_expectations[subscription_topic]; - await new Promise((resolve, reject) => { - client2.subscribe(subscription_topic, function (err) { - if (err) reject(err); - else { - resolve(); - } - }); - }); + await clientV5.subscribeAsync(subscription_topic); let message_count = 0; let message_listener; await new Promise((resolve) => { - client2.on( + clientV5.on( 'message', (message_listener = (topic, payload) => { assert(expected_topics.includes(topic)); @@ -795,7 +686,7 @@ describe('test MQTT connections and commands', function () { if (++message_count == expected_topics.length) resolve(); }) ); - client2.publish( + clientV5.publish( 'SimpleRecord/44', JSON.stringify({ name: 'This is a test 1', @@ -805,7 +696,7 @@ describe('test MQTT connections and commands', function () { qos: 1, } ); - client2.publish( + clientV5.publish( 'SimpleRecord/sub/33', JSON.stringify({ name: 'This is a test to a sub-topic', @@ -815,7 +706,7 @@ describe('test MQTT connections and commands', function () { qos: 1, } ); - client2.publish( + clientV5.publish( 'SimpleRecord/sub/sub2/33', JSON.stringify({ name: 'This is a test to a deeper sub-topic', @@ -826,7 +717,7 @@ describe('test MQTT connections and commands', function () { } ); - client.publish( + clientV4.publish( 'SimpleRecord/47', JSON.stringify({ name: 'This is a test 2', @@ -837,7 +728,7 @@ describe('test MQTT connections and commands', function () { } ); - client.publish( + clientV4.publish( 'SimpleRecord/', JSON.stringify({ name: 'This is a test to the generic table topic', @@ -847,27 +738,17 @@ describe('test MQTT connections and commands', function () { } ); }); - client2.off('message', message_listener); - await new Promise((resolve, reject) => { - client2.unsubscribe(subscription_topic, function (err) { - if (err) reject(err); - else resolve(); - }); - }); + clientV5.off('message', message_listener); + await clientV5.unsubscribeAsync(subscription_topic); } }; it('subscribe to single-level wildcard/full table', wildcardsTests()); it('subscribe to multi-level wildcard/full table', async function () { - await new Promise((resolve, reject) => { - client2.subscribe('SimpleRecord/#', function (err) { - if (err) reject(err); - else resolve(); - }); - }); + await clientV5.subscribeAsync('SimpleRecord/#'); let message_count = 0; let message_listener; await new Promise((resolve) => { - client2.on( + clientV5.on( 'message', (message_listener = (topic, payload) => { let record = JSON.parse(payload); @@ -875,7 +756,7 @@ describe('test MQTT connections and commands', function () { if (++message_count == 4) resolve(); }) ); - client2.publish( + clientV5.publish( 'SimpleRecord/44', JSON.stringify({ name: 'This is a test 1', @@ -885,7 +766,7 @@ describe('test MQTT connections and commands', function () { qos: 1, } ); - client2.publish( + clientV5.publish( 'SimpleRecord/sub/33', JSON.stringify({ name: 'This is a test to a sub-topic', // should go to multi-level wildcard @@ -896,7 +777,7 @@ describe('test MQTT connections and commands', function () { } ); - client.publish( + clientV4.publish( 'SimpleRecord/47', JSON.stringify({ name: 'This is a test 2', @@ -907,7 +788,7 @@ describe('test MQTT connections and commands', function () { } ); - client.publish( + clientV4.publish( 'SimpleRecord/', JSON.stringify({ name: 'This is a test to the generic table topic', @@ -917,75 +798,35 @@ describe('test MQTT connections and commands', function () { } ); }); - client2.off('message', message_listener); - await new Promise((resolve, reject) => { - client2.unsubscribe('SimpleRecord/#', function (err) { - if (err) reject(err); - else resolve(); - }); - }); + clientV5.off('message', message_listener); + await clientV5.unsubscribeAsync('SimpleRecord/#'); }); it('subscribe to wildcards we do not support', async function () { - await new Promise((resolve) => { - client2.subscribe('SimpleRecord/+test', function (err, granted) { - if (err) resolve(err); - else { - resolve(assert.equal(granted[0].qos, 128)); // assert that the subscription was rejected - } - }); - }); - await new Promise((resolve, reject) => { - client2.subscribe('+/SimpleRecord/test', function (err, granted) { - if (err) reject(err); - else { - resolve(assert.equal(granted[0].qos, 0x8f)); // assert that the subscription was rejected - } - }); - }); + await assert.rejects(clientV5.subscribeAsync('SimpleRecord/+test'), /Invalid topic/); + const granted = await subscribeAllowingSubackError(clientV5, '+/SimpleRecord/test'); + assert.equal(granted[0].qos, 0x8f); // assert that the subscription was rejected }); it('subscribe with QoS=1 and reconnect with non-clean session', async function () { // this first connection is a tear down to remove any previous durable session with this id - let client = connect('mqtt://localhost:1883', { + let client = await connectAsync('mqtt://localhost:1883', { clean: true, clientId: 'test-client1', + protocolVersion: 4, }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); - }); - await new Promise((resolve) => client.end(resolve)); + await client.endAsync(); await delay(10); - client = connect('mqtt://localhost:1883', { + client = await connectAsync('mqtt://localhost:1883', { clean: false, clientId: 'test-client1', + protocolVersion: 4, }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); - }); - await new Promise((resolve, reject) => { - client.subscribe( - ['SimpleRecord/41', 'SimpleRecord/42'], - { - qos: 1, - }, - function (err) { - if (err) reject(err); - else { - resolve(); - } - } - ); - }); - await new Promise((resolve) => client.end(resolve)); + await client.subscribeAsync(['SimpleRecord/41', 'SimpleRecord/42'], { qos: 1 }); + await client.endAsync(); await delay(10); - client = connect('mqtt://localhost:1883', { + client = await connectAsync('mqtt://localhost:1883', { clean: false, clientId: 'test-client1', - }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); + protocolVersion: 4, }); await new Promise((resolve) => { client.on('message', (topic, payload) => { @@ -1003,9 +844,10 @@ describe('test MQTT connections and commands', function () { } ); }); - client.end(); + await delay(10); + await client.endAsync(); await delay(50); - client2.publish( + clientV5.publish( 'SimpleRecord/41', JSON.stringify({ name: 'This is a test of publishing to a disconnected durable session', @@ -1014,98 +856,68 @@ describe('test MQTT connections and commands', function () { qos: 1, } ); - await new Promise((resolve) => - client2.publish( - 'SimpleRecord/42', - JSON.stringify({ - name: 'This is a test of publishing to a disconnected durable session 2', - }), - { - qos: 1, - }, - resolve - ) + await clientV5.publishAsync( + 'SimpleRecord/42', + JSON.stringify({ + name: 'This is a test of publishing to a disconnected durable session 2', + }), + { + qos: 1, + } ); - await new Promise((resolve) => - client2.publish( - 'SimpleRecord/42', - JSON.stringify({ - name: 'This is a test of publishing to a disconnected durable session 3', - }), - { - qos: 1, - }, - resolve - ) + await clientV5.publishAsync( + 'SimpleRecord/42', + JSON.stringify({ + name: 'This is a test of publishing to a disconnected durable session 3', + }), + { + qos: 1, + } ); await delay(10); - client = connect('mqtt://localhost:1883', { - clean: false, - clientId: 'test-client1', - protocolVersion: 5, - }); let messages = []; - await new Promise((resolve) => { - client._handlePublish = async function (packet, done) { - const message = packet.payload; + client = await connectWithMessageListener( + 'mqtt://localhost:1883', + { + clean: false, + clientId: 'test-client1', + protocolVersion: 5, + properties: { + sessionExpiryInterval: 3600, + }, + }, + (topic, message) => { messages.push(message.toString()); - done(); - if (message.toString().includes('session 2')) { - // skip the first one to trigger out of order acking - return; + } + ); + await new Promise((resolve) => { + const interval = setInterval(() => { + if (messages.length === 3) { + clearInterval(interval); + resolve(); } - client._sendPacket({ cmd: 'puback', messageId: packet.messageId, reasonCode: 0 }, () => {}); - if (message.toString().includes('session 3')) resolve(); - }; + }, 1); }); await delay(50); - client.end(); + await client.endAsync(); if (messages.length !== 3) console.error('Incorrect messages', { messages }); assert(messages.length === 3); - messages = []; - client = connect('mqtt://localhost:1883', { - clean: false, - clientId: 'test-client1', - protocolVersion: 5, - }); - await new Promise((resolve) => { - client.on('message', (message) => { - messages.push(message); - resolve(); - }); - }); - assert.equal(messages.length, 1); }); it('subscribe with QoS=2', async function () { // this first connection is a tear down to remove any previous durable session with this id - let client = connect('mqtt://localhost:1883', { + let client = await connectAsync('mqtt://localhost:1883', { clean: true, clientId: 'test-client1', + protocolVersion: 4, }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); - }); - await new Promise((resolve) => client.end(resolve)); + await client.end(); await delay(10); - client = connect('mqtt://localhost:1883', { + client = await connectAsync('mqtt://localhost:1883', { clean: false, clientId: 'test-client1', + protocolVersion: 4, }); - await new Promise((resolve, reject) => { - client.subscribe( - 'SimpleRecord/41', - { - qos: 2, - }, - function (err) { - if (err) reject(err); - else { - resolve(); - } - } - ); - }); + await client.subscribeAsync('SimpleRecord/41', { qos: 2 }); await new Promise((resolve) => { client.on('message', (topic, payload) => { JSON.parse(payload); @@ -1138,15 +950,12 @@ describe('test MQTT connections and commands', function () { server.mqtt.events.on('error', (_a1, _a2) => { events_received.push('error'); }); - let client = connect('mqtt://localhost:1883', { + let client = await connectAsync('mqtt://localhost:1883', { clean: true, clientId: 'test-client1', + protocolVersion: 4, }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); - }); - await new Promise((resolve) => client.subscribe('this does not exist', { qos: 1 }, resolve)); + await subscribeAllowingSubackError(client, 'this does not exist', { qos: 1 }); client.end(); await new Promise((resolve) => { setTimeout(resolve, 20); @@ -1158,32 +967,18 @@ describe('test MQTT connections and commands', function () { }); it('subscribe root with history', async function () { // this first connection is a tear down to remove any previous durable session with this id - let client = connect('mqtt://localhost:1883', { + let client = await connectAsync('mqtt://localhost:1883', { clean: true, clientId: 'test-client1', - }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); + protocolVersion: 4, }); let messages = []; client.on('message', (topic, payload) => { messages.push(topic, payload.length > 0 ? JSON.parse(payload) : 'deleted'); }); - await new Promise((resolve, reject) => { - client.subscribe( - 'FourPropWithHistory/#', - { - qos: 1, - }, - function (err) { - if (err) reject(err); - else { - setTimeout(resolve, 300); - } - } - ); - }); + await client.subscribeAsync('FourPropWithHistory/#', { qos: 1 }); + await delay(300); + const { FourPropWithHistory } = await import('../testApp/resources.js'); assert.equal(messages.length, 20); assert.equal(FourPropWithHistory.acknowledgements, 10); @@ -1198,50 +993,33 @@ describe('test MQTT connections and commands', function () { // this first connection is a tear down to remove any previous durable session with this id const { FourPropWithHistory } = await import('../testApp/resources.js'); tables.FourProp.acknowledgements = 0; // reset - let client = connect('mqtt://localhost:1883', { + let client = await connectAsync('mqtt://localhost:1883', { clean: true, clientId: 'test-client1', - }); - await new Promise((resolve, reject) => { - client.on('connect', resolve); - client.on('error', reject); + protocolVersion: 4, }); let messages = []; client.on('message', (topic, payload) => { messages.push(topic, payload.length > 0 ? JSON.parse(payload) : 'deleted'); }); - await new Promise((resolve, reject) => { - client.subscribe( - 'FourPropWithHistory/12', - { - qos: 1, - }, - function (err) { - if (err) reject(err); - else { - setTimeout(resolve, 300); - } - } - ); - }); + await client.subscribeAsync('FourPropWithHistory/12', { qos: 1 }); + await delay(300); assert.equal(messages.length, 4); assert.equal(FourPropWithHistory.acknowledgements, 2); }); it('publish and receive blob data', async function () { const topic = `SimpleRecord/52`; const testString = 'this is a test of blobs'.repeat(1000); - await new Promise((resolve, reject) => { - client2.subscribe(topic, function (err) { - if (err) return reject(err); - client2.publish(topic, JSON.stringify({ name: 'testBlob', blobData: testString }), { - qos: 1, - retain: false, - }); - }); + await clientV5.subscribeAsync(topic); + clientV5.publish(topic, JSON.stringify({ name: 'testBlob', blobData: testString }), { + qos: 1, + retain: false, + }); - client2.once('message', function (topic, payload) { + await new Promise((resolve, reject) => { + clientV5.once('message', function (topic, message) { try { - let data = JSON.parse(payload); + let data = JSON.parse(message); // message is Buffer assert.equal(data.blobData, testString); resolve(); @@ -1253,8 +1031,8 @@ describe('test MQTT connections and commands', function () { }); after(() => { - client?.end(); - client2?.end(); + clientV4?.end(); + clientV5?.end(); }); }); function delay(ms) {