From 8a54dd6320b845bc9ea193b283953a3a6f43c1a6 Mon Sep 17 00:00:00 2001 From: Elazar Date: Fri, 6 Mar 2026 12:43:12 +0200 Subject: [PATCH 1/5] fix: clean up chunksByMessageId entries after message completion Completed message chunks were never removed from the chunksByMessageId Map, causing unbounded memory growth during long debug sessions. Each image/tensor transfer left an orphaned entry. Fix: delete the map entry immediately after extracting the full message and before calling handleMessage. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../socket-based/Server.ts | 1 + tests/unit/ts/test_chunks_cleanup.js | 136 ++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 tests/unit/ts/test_chunks_cleanup.js diff --git a/src/python-communication/socket-based/Server.ts b/src/python-communication/socket-based/Server.ts index 5ee9ab43..53d81d52 100644 --- a/src/python-communication/socket-based/Server.ts +++ b/src/python-communication/socket-based/Server.ts @@ -129,6 +129,7 @@ export class SocketServer { if (chunks.isComplete()) { logTrace('Message is complete'); const fullMessage = chunks.fullMessage(); + this.chunksByMessageId.delete(header.messageID); handleMessage(header, fullMessage); } } diff --git a/tests/unit/ts/test_chunks_cleanup.js b/tests/unit/ts/test_chunks_cleanup.js new file mode 100644 index 00000000..e9bed34a --- /dev/null +++ b/tests/unit/ts/test_chunks_cleanup.js @@ -0,0 +1,136 @@ +/** + * Unit tests for SocketServer chunksByMessageId cleanup (R3). + * + * Bug: After a chunked message is fully reassembled and handled, + * the entry in chunksByMessageId is never deleted, causing unbounded + * memory growth over long debug sessions. + * + * Run: node tests/unit/ts/test_chunks_cleanup.js + */ + +let passed = 0 +let failed = 0 +function test(name, fn) { + try { + fn() + passed++ + console.log(` ✅ ${name}`) + } + catch (e) { + failed++ + console.log(` ❌ ${name}: ${e.message}`) + } +} +function assert(cond, msg) { + if (!cond) { + throw new Error(msg) + } +} + +// Minimal MessageChunks reproduction +class MessageChunks { + constructor(messageLength, chunkCount) { + this._messageLength = messageLength + this._chunkCount = chunkCount + this._chunks = [] + } + + addChunk(header, content) { + this._chunks.push({ header, content }) + } + + isComplete() { + return this._chunks.length === this._chunkCount + } + + fullMessage() { + return Buffer.concat(this._chunks.map(c => c.content)) + } +} + +function setDefault(map, key, ctor) { + if (!map.has(key)) { + map.set(key, ctor()) + } + return map.get(key) +} + +// Simulate the buggy handleData logic +function processMessageBuggy(chunksByMessageId, header, content, onComplete) { + const chunks = setDefault( + chunksByMessageId, + header.messageID, + () => new MessageChunks(header.messageLength, header.chunkCount), + ) + chunks.addChunk(header, content) + if (chunks.isComplete()) { + onComplete(header, chunks.fullMessage()) + // BUG: no cleanup — entry stays in map + } +} + +// Fixed version +function processMessageFixed(chunksByMessageId, header, content, onComplete) { + const chunks = setDefault( + chunksByMessageId, + header.messageID, + () => new MessageChunks(header.messageLength, header.chunkCount), + ) + chunks.addChunk(header, content) + if (chunks.isComplete()) { + onComplete(header, chunks.fullMessage()) + chunksByMessageId.delete(header.messageID) // FIX: clean up + } +} + +console.log('SocketServer chunksByMessageId cleanup tests:\n') + +test('Bug: completed messages stay in map', () => { + const map = new Map() + const header = { messageID: 1, messageLength: 5, chunkCount: 1, chunkIndex: 0, requestId: 1 } + processMessageBuggy(map, header, Buffer.from('hello'), () => {}) + assert(map.size === 1, `Expected map to retain entry (bug), size=${map.size}`) +}) + +test('Bug: 100 messages = 100 orphaned entries', () => { + const map = new Map() + for (let i = 0; i < 100; i++) { + const header = { messageID: i, messageLength: 3, chunkCount: 1, chunkIndex: 0, requestId: i } + processMessageBuggy(map, header, Buffer.from('abc'), () => {}) + } + assert(map.size === 100, `Expected 100 orphaned entries, got ${map.size}`) +}) + +test('Fix: completed messages cleaned up', () => { + const map = new Map() + const header = { messageID: 1, messageLength: 5, chunkCount: 1, chunkIndex: 0, requestId: 1 } + processMessageFixed(map, header, Buffer.from('hello'), () => {}) + assert(map.size === 0, `Expected map to be empty after completion, size=${map.size}`) +}) + +test('Fix: multi-chunk message cleaned after last chunk', () => { + const map = new Map() + const makeHeader = (idx) => ({ + messageID: 1, messageLength: 10, chunkCount: 3, chunkIndex: idx, requestId: 1, + }) + processMessageFixed(map, makeHeader(0), Buffer.from('aaa'), () => {}) + assert(map.size === 1, 'Should retain incomplete message') + processMessageFixed(map, makeHeader(1), Buffer.from('bbb'), () => {}) + assert(map.size === 1, 'Still incomplete') + let completed = false + processMessageFixed(map, makeHeader(2), Buffer.from('cccc'), () => { completed = true }) + assert(completed, 'Callback should fire') + assert(map.size === 0, `Should be cleaned up, size=${map.size}`) +}) + +test('Fix: 100 messages = 0 leftover entries', () => { + const map = new Map() + for (let i = 0; i < 100; i++) { + const header = { messageID: i, messageLength: 3, chunkCount: 1, chunkIndex: 0, requestId: i } + processMessageFixed(map, header, Buffer.from('abc'), () => {}) + } + assert(map.size === 0, `Expected 0 entries, got ${map.size}`) +}) + +console.log(`\nResults: ${passed} passed, ${failed} failed`) +process.exit(failed > 0 ? 1 : 0) From 5b0df82a46b1dfb2885bf62aaec62e933da02c3d Mon Sep 17 00:00:00 2001 From: Elazar Date: Sat, 7 Mar 2026 22:37:08 +0200 Subject: [PATCH 2/5] test(r3): rewrite chunk-cleanup tests; add pendingChunkCount getter; integration tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../socket-based/Server.ts | 4 + tests/unit/ts/chunks-cleanup.test.ts | 186 ++++++++++++++++++ tests/unit/ts/test_chunks_cleanup.js | 136 ------------- 3 files changed, 190 insertions(+), 136 deletions(-) create mode 100644 tests/unit/ts/chunks-cleanup.test.ts delete mode 100644 tests/unit/ts/test_chunks_cleanup.js diff --git a/src/python-communication/socket-based/Server.ts b/src/python-communication/socket-based/Server.ts index 53d81d52..3be627d2 100644 --- a/src/python-communication/socket-based/Server.ts +++ b/src/python-communication/socket-based/Server.ts @@ -55,6 +55,10 @@ export class SocketServer { return this.outgoingRequestsManager.count; } + get pendingChunkCount(): number { + return this.chunksByMessageId.size; + } + get portNumber() { if (!this.started) { throw new Error('SocketServer is not started'); diff --git a/tests/unit/ts/chunks-cleanup.test.ts b/tests/unit/ts/chunks-cleanup.test.ts new file mode 100644 index 00000000..317b8854 --- /dev/null +++ b/tests/unit/ts/chunks-cleanup.test.ts @@ -0,0 +1,186 @@ +/** + * Integration tests for SocketServer chunksByMessageId cleanup (R3). + * + * Verifies that completed message chunks are removed from the map so the + * server does not accumulate unbounded memory over long debug sessions. + */ + +import { Buffer } from 'node:buffer'; +import * as net from 'node:net'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { StatefulBufferWriter } from '../../../src/python-communication/socket-based/BufferWriter'; +import { + HEADER_LENGTH, + MessageType, + Sender, +} from '../../../src/python-communication/socket-based/protocol'; +import { SocketServer } from '../../../src/python-communication/socket-based/Server'; + +vi.mock('typedi', () => ({ Service: () => (_target: unknown) => _target })); + +vi.mock('vscode-extensions-json-generator/utils', () => ({ + configUtils: { + ConfigurationGetter: () => () => undefined, + ConfigurationSetter: () => () => undefined, + ConfigurationInspector: () => () => undefined, + }, +})); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Build a single wire-frame chunk (header + data). */ +function buildChunk(params: { + messageId: number; + requestId: number; + messageLength: number; // total data across all chunks + chunkCount: number; + chunkNumber: number; + data: Buffer; +}): Buffer { + const { messageId, requestId, messageLength, chunkCount, chunkNumber, data } = params; + const headerBuf = Buffer.alloc(HEADER_LENGTH); + const writer = new StatefulBufferWriter(headerBuf); + writer.writeUInt32(messageLength); + writer.writeUInt32(messageId); + writer.writeUInt8(Sender.Python); + writer.writeUInt32(requestId); + writer.writeUInt8(MessageType.PythonSendingObject); + writer.writeUInt32(chunkCount); + writer.writeUInt32(chunkNumber); + writer.writeUInt32(data.length); + return Buffer.concat([headerBuf, data]); +} + +/** Send bytes to the server and wait long enough for loopback TCP delivery. */ +async function sendAndWait(socket: net.Socket, buf: Buffer): Promise { + socket.write(buf); + // A short timer ensures we yield past the I/O poll phase so the server's + // 'data' handler has time to run before we check the map. + await new Promise(resolve => setTimeout(resolve, 10)); +} + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +let server: SocketServer; +let client: net.Socket; +const serverSideConnections: Set = new Set(); + +beforeEach(async () => { + server = new SocketServer(); + + // Track every server-side socket so we can destroy them in afterEach. + server.server.on('connection', (socket: net.Socket) => { + serverSideConnections.add(socket); + socket.once('close', () => serverSideConnections.delete(socket)); + }); + + await server.start(); + + client = await new Promise((resolve, reject) => { + const s = net.createConnection({ port: server.portNumber }, () => resolve(s)); + s.once('error', reject); + }); +}); + +afterEach(async () => { + client.destroy(); + for (const socket of serverSideConnections) { + socket.destroy(); + } + serverSideConnections.clear(); + await new Promise(resolve => server.server.close(() => resolve())); +}); + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('socketServer — chunksByMessageId cleanup', () => { + it('starts with zero pending chunks', () => { + expect(server.pendingChunkCount).toBe(0); + }); + + it('returns to 0 after a complete single-chunk message', async () => { + const data = Buffer.from('hello'); + const chunk = buildChunk({ + messageId: 1, + requestId: 99, + messageLength: data.length, + chunkCount: 1, + chunkNumber: 0, + data, + }); + await sendAndWait(client, chunk); + expect(server.pendingChunkCount).toBe(0); + }); + + it('is 1 while a multi-chunk message is incomplete', async () => { + const part1 = Buffer.from('abc'); + const part2 = Buffer.from('def'); + const messageLength = part1.length + part2.length; + + const chunk1 = buildChunk({ + messageId: 2, + requestId: 99, + messageLength, + chunkCount: 2, + chunkNumber: 0, + data: part1, + }); + await sendAndWait(client, chunk1); + expect(server.pendingChunkCount).toBe(1); + + // send the completing chunk + const chunk2 = buildChunk({ + messageId: 2, + requestId: 99, + messageLength, + chunkCount: 2, + chunkNumber: 1, + data: part2, + }); + await sendAndWait(client, chunk2); + expect(server.pendingChunkCount).toBe(0); + }); + + it('is 0 after N complete multi-chunk messages (not N × chunkCount)', async () => { + const N = 5; + for (let i = 0; i < N; i++) { + const parts = [Buffer.from('aa'), Buffer.from('bb'), Buffer.from('cc')]; + const messageLength = parts.reduce((s, p) => s + p.length, 0); + for (let j = 0; j < parts.length; j++) { + const chunk = buildChunk({ + messageId: 100 + i, + requestId: 1, + messageLength, + chunkCount: parts.length, + chunkNumber: j, + data: parts[j], + }); + await sendAndWait(client, chunk); + } + } + expect(server.pendingChunkCount).toBe(0); + }); + + it('stress: 100 complete single-chunk messages leave pendingChunkCount at 0', async () => { + for (let i = 0; i < 100; i++) { + const data = Buffer.from(`message-${i}`); + const chunk = buildChunk({ + messageId: 200 + i, + requestId: 1, + messageLength: data.length, + chunkCount: 1, + chunkNumber: 0, + data, + }); + await sendAndWait(client, chunk); + } + expect(server.pendingChunkCount).toBe(0); + }); +}); diff --git a/tests/unit/ts/test_chunks_cleanup.js b/tests/unit/ts/test_chunks_cleanup.js deleted file mode 100644 index e9bed34a..00000000 --- a/tests/unit/ts/test_chunks_cleanup.js +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Unit tests for SocketServer chunksByMessageId cleanup (R3). - * - * Bug: After a chunked message is fully reassembled and handled, - * the entry in chunksByMessageId is never deleted, causing unbounded - * memory growth over long debug sessions. - * - * Run: node tests/unit/ts/test_chunks_cleanup.js - */ - -let passed = 0 -let failed = 0 -function test(name, fn) { - try { - fn() - passed++ - console.log(` ✅ ${name}`) - } - catch (e) { - failed++ - console.log(` ❌ ${name}: ${e.message}`) - } -} -function assert(cond, msg) { - if (!cond) { - throw new Error(msg) - } -} - -// Minimal MessageChunks reproduction -class MessageChunks { - constructor(messageLength, chunkCount) { - this._messageLength = messageLength - this._chunkCount = chunkCount - this._chunks = [] - } - - addChunk(header, content) { - this._chunks.push({ header, content }) - } - - isComplete() { - return this._chunks.length === this._chunkCount - } - - fullMessage() { - return Buffer.concat(this._chunks.map(c => c.content)) - } -} - -function setDefault(map, key, ctor) { - if (!map.has(key)) { - map.set(key, ctor()) - } - return map.get(key) -} - -// Simulate the buggy handleData logic -function processMessageBuggy(chunksByMessageId, header, content, onComplete) { - const chunks = setDefault( - chunksByMessageId, - header.messageID, - () => new MessageChunks(header.messageLength, header.chunkCount), - ) - chunks.addChunk(header, content) - if (chunks.isComplete()) { - onComplete(header, chunks.fullMessage()) - // BUG: no cleanup — entry stays in map - } -} - -// Fixed version -function processMessageFixed(chunksByMessageId, header, content, onComplete) { - const chunks = setDefault( - chunksByMessageId, - header.messageID, - () => new MessageChunks(header.messageLength, header.chunkCount), - ) - chunks.addChunk(header, content) - if (chunks.isComplete()) { - onComplete(header, chunks.fullMessage()) - chunksByMessageId.delete(header.messageID) // FIX: clean up - } -} - -console.log('SocketServer chunksByMessageId cleanup tests:\n') - -test('Bug: completed messages stay in map', () => { - const map = new Map() - const header = { messageID: 1, messageLength: 5, chunkCount: 1, chunkIndex: 0, requestId: 1 } - processMessageBuggy(map, header, Buffer.from('hello'), () => {}) - assert(map.size === 1, `Expected map to retain entry (bug), size=${map.size}`) -}) - -test('Bug: 100 messages = 100 orphaned entries', () => { - const map = new Map() - for (let i = 0; i < 100; i++) { - const header = { messageID: i, messageLength: 3, chunkCount: 1, chunkIndex: 0, requestId: i } - processMessageBuggy(map, header, Buffer.from('abc'), () => {}) - } - assert(map.size === 100, `Expected 100 orphaned entries, got ${map.size}`) -}) - -test('Fix: completed messages cleaned up', () => { - const map = new Map() - const header = { messageID: 1, messageLength: 5, chunkCount: 1, chunkIndex: 0, requestId: 1 } - processMessageFixed(map, header, Buffer.from('hello'), () => {}) - assert(map.size === 0, `Expected map to be empty after completion, size=${map.size}`) -}) - -test('Fix: multi-chunk message cleaned after last chunk', () => { - const map = new Map() - const makeHeader = (idx) => ({ - messageID: 1, messageLength: 10, chunkCount: 3, chunkIndex: idx, requestId: 1, - }) - processMessageFixed(map, makeHeader(0), Buffer.from('aaa'), () => {}) - assert(map.size === 1, 'Should retain incomplete message') - processMessageFixed(map, makeHeader(1), Buffer.from('bbb'), () => {}) - assert(map.size === 1, 'Still incomplete') - let completed = false - processMessageFixed(map, makeHeader(2), Buffer.from('cccc'), () => { completed = true }) - assert(completed, 'Callback should fire') - assert(map.size === 0, `Should be cleaned up, size=${map.size}`) -}) - -test('Fix: 100 messages = 0 leftover entries', () => { - const map = new Map() - for (let i = 0; i < 100; i++) { - const header = { messageID: i, messageLength: 3, chunkCount: 1, chunkIndex: 0, requestId: i } - processMessageFixed(map, header, Buffer.from('abc'), () => {}) - } - assert(map.size === 0, `Expected 0 entries, got ${map.size}`) -}) - -console.log(`\nResults: ${passed} passed, ${failed} failed`) -process.exit(failed > 0 ? 1 : 0) From 4f2282dfebda05d31a032b2e20ad4cb5ff294b94 Mon Sep 17 00:00:00 2001 From: Elazar Date: Sat, 7 Mar 2026 22:42:09 +0200 Subject: [PATCH 3/5] test(r3): replace fixed-delay waits with deterministic poll; add ID reuse to stress test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/unit/ts/chunks-cleanup.test.ts | 87 ++++++++++++++++++++++++---- 1 file changed, 75 insertions(+), 12 deletions(-) diff --git a/tests/unit/ts/chunks-cleanup.test.ts b/tests/unit/ts/chunks-cleanup.test.ts index 317b8854..31a7992f 100644 --- a/tests/unit/ts/chunks-cleanup.test.ts +++ b/tests/unit/ts/chunks-cleanup.test.ts @@ -54,12 +54,47 @@ function buildChunk(params: { return Buffer.concat([headerBuf, data]); } -/** Send bytes to the server and wait long enough for loopback TCP delivery. */ -async function sendAndWait(socket: net.Socket, buf: Buffer): Promise { - socket.write(buf); - // A short timer ensures we yield past the I/O poll phase so the server's - // 'data' handler has time to run before we check the map. - await new Promise(resolve => setTimeout(resolve, 10)); +/** Send bytes to the server and poll until the server has processed them (pendingChunkCount === 0). */ +async function sendAndWaitForProcessed( + socket: net.Socket, + buf: Buffer, + server: SocketServer, +): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error('timed out waiting for server to process message')), + 500, + ); + socket.write(buf); + const check = () => { + if (server.pendingChunkCount === 0) { + clearTimeout(timeout); + resolve(); + } else { + setImmediate(check); + } + }; + setImmediate(check); + }); +} + +/** Poll until pendingChunkCount reaches the expected value. */ +function waitForPendingCount(server: SocketServer, count: number): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error(`timed out waiting for pendingChunkCount to reach ${count}`)), + 500, + ); + const check = () => { + if (server.pendingChunkCount === count) { + clearTimeout(timeout); + resolve(); + } else { + setImmediate(check); + } + }; + setImmediate(check); + }); } // --------------------------------------------------------------------------- @@ -93,7 +128,10 @@ afterEach(async () => { socket.destroy(); } serverSideConnections.clear(); - await new Promise(resolve => server.server.close(() => resolve())); + await new Promise((resolve, reject) => { + const t = setTimeout(() => reject(new Error('server.close() timed out')), 2000); + server.server.close(() => { clearTimeout(t); resolve(); }); + }); }); // --------------------------------------------------------------------------- @@ -115,7 +153,7 @@ describe('socketServer — chunksByMessageId cleanup', () => { chunkNumber: 0, data, }); - await sendAndWait(client, chunk); + await sendAndWaitForProcessed(client, chunk, server); expect(server.pendingChunkCount).toBe(0); }); @@ -132,7 +170,8 @@ describe('socketServer — chunksByMessageId cleanup', () => { chunkNumber: 0, data: part1, }); - await sendAndWait(client, chunk1); + client.write(chunk1); + await waitForPendingCount(server, 1); expect(server.pendingChunkCount).toBe(1); // send the completing chunk @@ -144,7 +183,7 @@ describe('socketServer — chunksByMessageId cleanup', () => { chunkNumber: 1, data: part2, }); - await sendAndWait(client, chunk2); + await sendAndWaitForProcessed(client, chunk2, server); expect(server.pendingChunkCount).toBe(0); }); @@ -162,7 +201,11 @@ describe('socketServer — chunksByMessageId cleanup', () => { chunkNumber: j, data: parts[j], }); - await sendAndWait(client, chunk); + if (j < parts.length - 1) { + client.write(chunk); + } else { + await sendAndWaitForProcessed(client, chunk, server); + } } } expect(server.pendingChunkCount).toBe(0); @@ -179,7 +222,27 @@ describe('socketServer — chunksByMessageId cleanup', () => { chunkNumber: 0, data, }); - await sendAndWait(client, chunk); + await sendAndWaitForProcessed(client, chunk, server); + } + expect(server.pendingChunkCount).toBe(0); + + // Phase 2: reuse IDs 200-209 ten more times each to verify no stale state + for (let round = 0; round < 10; round++) { + for (let i = 0; i < 10; i++) { + const data = Buffer.from(`reuse-${round}-${i}`); + await sendAndWaitForProcessed( + client, + buildChunk({ + messageId: 200 + i, + requestId: 1, + messageLength: data.length, + chunkCount: 1, + chunkNumber: 0, + data, + }), + server, + ); + } } expect(server.pendingChunkCount).toBe(0); }); From 4a633f8754f5a40c0e0d8bcd6c79e74492834c14 Mon Sep 17 00:00:00 2001 From: Elazar Date: Sat, 7 Mar 2026 22:42:42 +0200 Subject: [PATCH 4/5] style: fix max-statements-per-line in afterEach server.close callback Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/unit/ts/chunks-cleanup.test.ts | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/unit/ts/chunks-cleanup.test.ts b/tests/unit/ts/chunks-cleanup.test.ts index 31a7992f..8f908dbc 100644 --- a/tests/unit/ts/chunks-cleanup.test.ts +++ b/tests/unit/ts/chunks-cleanup.test.ts @@ -70,7 +70,8 @@ async function sendAndWaitForProcessed( if (server.pendingChunkCount === 0) { clearTimeout(timeout); resolve(); - } else { + } + else { setImmediate(check); } }; @@ -89,7 +90,8 @@ function waitForPendingCount(server: SocketServer, count: number): Promise if (server.pendingChunkCount === count) { clearTimeout(timeout); resolve(); - } else { + } + else { setImmediate(check); } }; @@ -130,7 +132,10 @@ afterEach(async () => { serverSideConnections.clear(); await new Promise((resolve, reject) => { const t = setTimeout(() => reject(new Error('server.close() timed out')), 2000); - server.server.close(() => { clearTimeout(t); resolve(); }); + server.server.close(() => { + clearTimeout(t); + resolve(); + }); }); }); @@ -203,7 +208,8 @@ describe('socketServer — chunksByMessageId cleanup', () => { }); if (j < parts.length - 1) { client.write(chunk); - } else { + } + else { await sendAndWaitForProcessed(client, chunk, server); } } From cea690608c905a6c008d4c307fc1234cade26bb1 Mon Sep 17 00:00:00 2001 From: Elazar Date: Sat, 7 Mar 2026 22:49:10 +0200 Subject: [PATCH 5/5] fix(r3): add processedMessageCount for test observability; fix setImmediate race in tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/python-communication/socket-based/Server.ts | 3 +++ tests/unit/ts/chunks-cleanup.test.ts | 7 ++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/python-communication/socket-based/Server.ts b/src/python-communication/socket-based/Server.ts index 3be627d2..5bc5f994 100644 --- a/src/python-communication/socket-based/Server.ts +++ b/src/python-communication/socket-based/Server.ts @@ -19,6 +19,8 @@ export class SocketServer { private outgoingRequestsManager: RequestsManager = new RequestsManager(); private chunksByMessageId: Map = new Map(); + private _processedMessageCount: number = 0; + get processedMessageCount(): number { return this._processedMessageCount; } constructor() { const options: net.ServerOpts = { @@ -134,6 +136,7 @@ export class SocketServer { logTrace('Message is complete'); const fullMessage = chunks.fullMessage(); this.chunksByMessageId.delete(header.messageID); + this._processedMessageCount++; handleMessage(header, fullMessage); } } diff --git a/tests/unit/ts/chunks-cleanup.test.ts b/tests/unit/ts/chunks-cleanup.test.ts index 8f908dbc..c8404af4 100644 --- a/tests/unit/ts/chunks-cleanup.test.ts +++ b/tests/unit/ts/chunks-cleanup.test.ts @@ -54,20 +54,21 @@ function buildChunk(params: { return Buffer.concat([headerBuf, data]); } -/** Send bytes to the server and poll until the server has processed them (pendingChunkCount === 0). */ +/** Send bytes to the server and poll until the server has processed them (processedMessageCount increases). */ async function sendAndWaitForProcessed( socket: net.Socket, buf: Buffer, server: SocketServer, ): Promise { + const before = server.processedMessageCount; + socket.write(buf); return new Promise((resolve, reject) => { const timeout = setTimeout( () => reject(new Error('timed out waiting for server to process message')), 500, ); - socket.write(buf); const check = () => { - if (server.pendingChunkCount === 0) { + if (server.processedMessageCount > before) { clearTimeout(timeout); resolve(); }