From fe48ce2d8049120db7eafcd3711db0b4afcc50cb Mon Sep 17 00:00:00 2001 From: Gorniaky Date: Tue, 24 Jun 2025 23:20:00 -0300 Subject: [PATCH] feat: integrate @discloudapp/ws for improved socket communication and remove deprecated socket client --- package.json | 1 + .../discloud/socket/actions/commit.ts | 34 ++- .../discloud/socket/actions/upload.ts | 36 ++-- src/services/discloud/socket/client.ts | 201 ------------------ .../discloud/socket/errors/BufferOverflow.ts | 9 - src/services/discloud/socket/types.ts | 60 ------ yarn.lock | 7 + 7 files changed, 54 insertions(+), 294 deletions(-) delete mode 100644 src/services/discloud/socket/client.ts delete mode 100644 src/services/discloud/socket/errors/BufferOverflow.ts delete mode 100644 src/services/discloud/socket/types.ts diff --git a/package.json b/package.json index 2d3f9bfa..72d3c058 100644 --- a/package.json +++ b/package.json @@ -1272,6 +1272,7 @@ "test": "vscode-test" }, "dependencies": { + "@discloudapp/ws": "^0.1.0", "@vscode/l10n": "^0.0.18", "adm-zip": "^0.5.16", "bytes": "^3.1.2", diff --git a/src/services/discloud/socket/actions/commit.ts b/src/services/discloud/socket/actions/commit.ts index adea0230..992368ff 100644 --- a/src/services/discloud/socket/actions/commit.ts +++ b/src/services/discloud/socket/actions/commit.ts @@ -1,3 +1,4 @@ +import { SocketClient, type SocketEventUploadData } from "@discloudapp/ws"; import { t } from "@vscode/l10n"; import bytes from "bytes"; import { Routes } from "discloud.app"; @@ -8,12 +9,10 @@ import extension from "../../../../extension"; import AppTreeItem from "../../../../structures/AppTreeItem"; import type TeamAppTreeItem from "../../../../structures/TeamAppTreeItem"; import { MAX_FILE_SIZE } from "../../constants"; -import SocketClient from "../client"; -import { type SocketEventUploadData } from "../types"; export async function socketCommit(task: TaskData, buffer: Buffer, app: AppTreeItem | TeamAppTreeItem) { await new Promise((resolve, reject) => { - const debugCode = Date.now(); + const debugCode = app.appId; function debug(message: string, ...args: unknown[]) { extension.debug(`%o ${message}`, debugCode, ...args); @@ -47,7 +46,12 @@ export async function socketCommit(task: TaskData, buffer: Buffer, app: AppTreeI uploading: false, }; - const ws = new SocketClient(url) + const socket = new SocketClient(url, { + headers: { + "api-token": extension.token!, + ...extension.api.options.userAgent ? { "User-Agent": `${extension.api.options.userAgent}` } : {}, + }, + }) .once("close", async (code, reason) => { debug("close", code); @@ -76,7 +80,7 @@ export async function socketCommit(task: TaskData, buffer: Buffer, app: AppTreeI task.progress.report({ increment: -1, message: t("committing") }); - await ws.sendBuffer(buffer, (data) => { + await socket.sendBuffer(buffer, (data) => { debug("progress received %o/%o", data.current, data.total); task.progress.report({ increment: 100 / data.total }); }); @@ -90,8 +94,11 @@ export async function socketCommit(task: TaskData, buffer: Buffer, app: AppTreeI task.progress.report({ increment: -1, message: t("socket.connecting") }); }) .on("connectionFailed", async () => { - debug(t("socket.connecting.fail")); - await window.showErrorMessage(t("socket.connecting.fail")); + resolve(); + const message = t("socket.connecting.fail"); + debug(message); + showLog(message); + await window.showErrorMessage(message); }) .on("data", (data) => { debug("data received with status %s %o", data.status, data.statusCode); @@ -106,6 +113,8 @@ export async function socketCommit(task: TaskData, buffer: Buffer, app: AppTreeI if (data.logs) showLog(data.logs); + if (status.uploading) return; + if (data.statusCode !== 102) { const isDone = data.statusCode === 200; const isCodeError = !isDone; @@ -121,13 +130,16 @@ export async function socketCommit(task: TaskData, buffer: Buffer, app: AppTreeI showError(error); }) .on("unauthorized", async () => { - debug(t("socket.authentication.fail")); - await window.showErrorMessage(t("socket.authentication.fail")); + resolve(); + const message = t("socket.authentication.fail"); + debug(message); + showLog(message); + await window.showErrorMessage(message); }); - extension.context.subscriptions.push(ws); + extension.context.subscriptions.push(socket); - ws.connect().catch(reject); + socket.connect().catch(reject); }); } diff --git a/src/services/discloud/socket/actions/upload.ts b/src/services/discloud/socket/actions/upload.ts index 3242609b..8fab9939 100644 --- a/src/services/discloud/socket/actions/upload.ts +++ b/src/services/discloud/socket/actions/upload.ts @@ -1,3 +1,4 @@ +import { SocketClient, type SocketEventUploadData } from "@discloudapp/ws"; import { t } from "@vscode/l10n"; import bytes from "bytes"; import { Routes, type DiscloudConfig } from "discloud.app"; @@ -6,8 +7,6 @@ import { window } from "vscode"; import { type ApiVscodeApp, type TaskData } from "../../../../@types"; import extension from "../../../../extension"; import { MAX_FILE_SIZE } from "../../constants"; -import SocketClient from "../client"; -import { type SocketEventUploadData } from "../types"; export async function socketUpload(task: TaskData, buffer: Buffer, dConfig: DiscloudConfig) { await new Promise((resolve, reject) => { @@ -44,16 +43,21 @@ export async function socketUpload(task: TaskData, buffer: Buffer, dConfig: Disc uploading: false, }; - const ws = new SocketClient(url) + const socket = new SocketClient(url, { + headers: { + "api-token": extension.token!, + ...extension.api.options.userAgent ? { "User-Agent": `${extension.api.options.userAgent}` } : {}, + }, + }) .once("close", async (code, reason) => { debug("close", code); resolve(); - setTimeout(() => logger.dispose(), 60_000); - if (!status.connected || !status.authenticated) return; + setTimeout(() => logger.dispose(), 60_000); + if (code !== 1000) { await window.showErrorMessage(t(`socket.close.${code}`)); return; @@ -74,9 +78,12 @@ export async function socketUpload(task: TaskData, buffer: Buffer, dConfig: Disc task.progress.report({ increment: -1, message: t("socket.connecting") }); }) .on("connectionFailed", async () => { - debug(t("socket.connecting.fail")); resolve(); - await window.showErrorMessage(t("socket.connecting.fail")); + const message = t("socket.connecting.fail"); + debug(message); + showLog(message); + await window.showErrorMessage(message); + setTimeout(() => logger.dispose(), 60_000); }) .on("connected", async () => { debug("connected"); @@ -86,7 +93,7 @@ export async function socketUpload(task: TaskData, buffer: Buffer, dConfig: Disc task.progress.report({ increment: -1, message: t("uploading") }); - await ws.sendBuffer(buffer, (data) => { + await socket.sendBuffer(buffer, (data) => { debug("progress received %o/%o", data.current, data.total); task.progress.report({ increment: 100 / data.total }); }); @@ -110,7 +117,7 @@ export async function socketUpload(task: TaskData, buffer: Buffer, dConfig: Disc dConfig.update({ ID: data.app.id, AVATAR: data.app.avatarURL }); const app: ApiVscodeApp = { - apts: dConfig.data.APT as any, + apts: dConfig.data.APT ?? [], clusterName: "", exitCode: data.statusCode === 200 ? 0 : 1, online: data.statusCode === 200, @@ -129,14 +136,17 @@ export async function socketUpload(task: TaskData, buffer: Buffer, dConfig: Disc showError(error); }) .on("unauthorized", async () => { - debug(t("socket.authentication.fail")); resolve(); - await window.showErrorMessage(t("socket.authentication.fail")); + const message = t("socket.authentication.fail"); + debug(message); + showLog(message); + await window.showErrorMessage(message); + setTimeout(() => logger.dispose(), 60_000); }); - extension.context.subscriptions.push(logger, ws); + extension.context.subscriptions.push(logger, socket); - ws.connect().catch(reject); + socket.connect().catch(reject); }); } diff --git a/src/services/discloud/socket/client.ts b/src/services/discloud/socket/client.ts deleted file mode 100644 index ead534c4..00000000 --- a/src/services/discloud/socket/client.ts +++ /dev/null @@ -1,201 +0,0 @@ -import { EventEmitter } from "events"; -import type vscode from "vscode"; -import WebSocket from "ws"; -import extension from "../../../extension"; -import { DEFAULT_CHUNK_SIZE, MAX_FILE_SIZE, NETWORK_UNREACHABLE_CODE, SOCKET_UNAUTHORIZED_CODE } from "../constants"; -import BufferOverflowError from "./errors/BufferOverflow"; -import { type OnProgressCallback, type ProgressData, type SocketEventsMap, type SocketOptions } from "./types"; - -export default class SocketClient = Record> - extends EventEmitter> - implements vscode.Disposable, Disposable { - constructor(protected wsURL: URL, options?: SocketOptions) { - super({ captureRejections: true }); - - if (options) { - if (options.chunkSize !== undefined) - this._chunkSize = options.chunkSize; - - if (options.connectingTimeout !== undefined) - this._connectingTimeout = options.connectingTimeout; - - if (typeof options.disposeOnClose === "boolean") - this._disposeOnClose = options.disposeOnClose; - - if (options.headers) Object.assign(this._headers, options.headers); - } - } - - protected _chunkSize: number = DEFAULT_CHUNK_SIZE; - protected readonly _connectingTimeout: number | null = 10_000; - protected readonly _disposeOnClose: boolean = true; - protected readonly _headers: Record = {}; - declare protected _socket?: WebSocket; - declare protected _ping: number; - declare protected _pong: number; - declare ping: number; - - get closed() { - return !this._socket || this._socket.readyState === this._socket.CLOSED; - } - - get closing() { - return this._socket ? this._socket.readyState === this._socket.CLOSING : false; - } - - get connected() { - return this._socket ? this._socket.readyState === this._socket.OPEN : false; - } - - get connecting() { - return this._socket ? this._socket.readyState === this._socket.CONNECTING : false; - } - - close() { - if (this._socket) { - this._socket.removeAllListeners().close(); - delete this._socket; - } - } - - dispose() { - this[Symbol.dispose](); - } - - async connect() { - await new Promise((resolve, reject) => { - if (this.connected) return resolve(); - this.#createWebSocket().then(resolve).catch(reject); - }); - } - - async #waitConnect() { - await new Promise((resolve, reject) => { - if (this.connecting) { - const onConnected = () => { - this.off("close", onClose); - resolve(); - }; - const onClose = () => { - this.off("connected", onConnected); - reject(); - }; - return this.once("connected", onConnected).once("close", onClose); - } - if (this.connected) return resolve(); - reject(); - }); - } - - async sendJSON(value: Record | any[]): Promise { - if (!this.connected) await this.connect(); - - await new Promise((resolve, reject) => { - this._socket!.send(JSON.stringify(value), (err) => { - if (err) return reject(err); - resolve(); - }); - }); - } - - async sendBuffer(buffer: Buffer, onProgress?: OnProgressCallback) { - if (buffer.length > MAX_FILE_SIZE) throw new BufferOverflowError(MAX_FILE_SIZE); - - const total = Math.ceil(buffer.length / this._chunkSize); - const chunkSize = Math.ceil(buffer.length / total); - - for (let i = 0; i < total;) { - const offset = chunkSize * i; - const end = offset + chunkSize; - const chunk = buffer.subarray(offset, end); - const current = ++i; - const pending = current < total; - - const value: ProgressData = { chunk, current, offset, pending, total }; - - await this.sendJSON(value); - - await onProgress?.(value); - } - } - - #createWebSocket() { - return new Promise((resolve, reject) => { - if (this.connecting) return this.#waitConnect().then(resolve).catch(reject); - - if (this.connected) return resolve(); - - this.emit("connecting"); - - const options: ConstructorParameters[2] = { - headers: Object.assign({ "api-token": extension.api.token! }, - extension.api.options.userAgent ? { "User-Agent": extension.api.options.userAgent } : {}, - this._headers), - ...typeof this._connectingTimeout === "number" - ? { signal: AbortSignal.timeout(this._connectingTimeout) } - : {}, - }; - - const status = { - connected: false, - error: undefined as any, - }; - - this._socket = new WebSocket(this.wsURL, options) - .once("close", (code, reason) => { - queueMicrotask(() => this.emit("close", code, reason)); - if (this._disposeOnClose) queueMicrotask(() => this.dispose()); - - switch (code) { - case SOCKET_UNAUTHORIZED_CODE: - return this.emit("unauthorized"); - } - - if (!status.connected) return this.emit("connectionFailed"); - - status.connected = false; - - if (status.error) { - const error = status.error; - delete status.error; - - switch (error.code) { - case NETWORK_UNREACHABLE_CODE: - return this.emit("connectionFailed"); - } - } - }) - .on("error", (error) => { - this.emit("error", status.error = error); - }) - .on("message", (data) => { - try { this.emit("data", JSON.parse(data.toString())); } - catch { this.emit("message", data); } - }) - .once("open", () => { - status.connected = true; - status.error = null; - - this._ping = Date.now(); - this._socket!.ping(); - - this.emit("connected"); - - resolve(); - }) - .on("ping", () => { - this._ping = Date.now(); - this._socket!.ping(); - }) - .on("pong", () => { - this._pong = Date.now(); - this.ping = this._pong - this._ping; - }); - }); - } - - [Symbol.dispose]() { - this.close(); - this.removeAllListeners(); - } -} diff --git a/src/services/discloud/socket/errors/BufferOverflow.ts b/src/services/discloud/socket/errors/BufferOverflow.ts deleted file mode 100644 index e6034a6d..00000000 --- a/src/services/discloud/socket/errors/BufferOverflow.ts +++ /dev/null @@ -1,9 +0,0 @@ -export default class BufferOverflowError extends Error { - readonly name = "BufferOverflow"; - - constructor( - readonly max: number, - ) { - super(); - } -} diff --git a/src/services/discloud/socket/types.ts b/src/services/discloud/socket/types.ts deleted file mode 100644 index 1959801c..00000000 --- a/src/services/discloud/socket/types.ts +++ /dev/null @@ -1,60 +0,0 @@ -import type { ApiUploadApp } from "discloud.app"; -import type { RawData } from "ws"; - -export interface SocketEventsMap = Record> { - close: [code: number, reason: Buffer] - connected: [] - connecting: [] - connectionFailed: [] - data: [data: Data] - error: [error: Error] - message: [data: RawData] - unauthorized: [] -} - -export interface SocketOptions { - /** - * Set the buffer chunk size per message - * - * Note that very large chunks may cause unexpected closure - * - * @default 262_144 (256KB) - */ - chunkSize?: number - /** - * Connecting timeout in milliseconds - * - * @default 10_000 - */ - connectingTimeout?: number | null - /** - * @default true - */ - disposeOnClose?: boolean - headers?: Record -} - -export interface SocketEventUploadData { - app?: ApiUploadApp - logs?: string - message: string | null - progress: SocketProgressData - status: "ok" | "error" - statusCode: number -} - -export interface SocketProgressData { - /** `0 - 100` */ - bar: number - log: string -} - -export interface ProgressData { - chunk: Buffer - current: number - offset: number - pending: boolean - total: number -} - -export type OnProgressCallback = (data: ProgressData) => unknown | Promise diff --git a/yarn.lock b/yarn.lock index 29de2e74..b4c3e117 100644 --- a/yarn.lock +++ b/yarn.lock @@ -38,6 +38,13 @@ zod "^3.25.67" zod-validation-error "^3.5.2" +"@discloudapp/ws@^0.1.0": + version "0.1.0" + resolved "https://registry.yarnpkg.com/@discloudapp/ws/-/ws-0.1.0.tgz#2d450d396004a1bdef1c2bb9ba39a9ca3d6036b0" + integrity sha512-I44KRv7Su+YkCnLVhN38bgktP8UOS8osNnakHxGTtJXAiNlMZSeOmKH3EpY835EQsmomp+h27TBXZG/Ze7OE6g== + dependencies: + ws "^8.18.2" + "@esbuild/aix-ppc64@0.25.5": version "0.25.5" resolved "https://registry.yarnpkg.com/@esbuild/aix-ppc64/-/aix-ppc64-0.25.5.tgz#4e0f91776c2b340e75558f60552195f6fad09f18"