From 6066c2c981b1f95709dd51f1007e33244b340e0d Mon Sep 17 00:00:00 2001 From: claneo Date: Fri, 19 Dec 2025 15:47:36 +0800 Subject: [PATCH 1/3] feat: support serialization option for child_process runtime --- src/common.ts | 2 + src/index.ts | 3 + test/fixtures/child_process-communication.mjs | 6 +- test/runtime.test.ts | 63 ++++++++++++++++++- 4 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/common.ts b/src/common.ts index 46757f6..674027a 100644 --- a/src/common.ts +++ b/src/common.ts @@ -1,4 +1,5 @@ import type { MessagePort, TransferListItem } from 'node:worker_threads' +import type { SerializationType } from 'node:child_process' /** Channel for communicating between main thread and workers */ export interface TinypoolChannel { @@ -21,6 +22,7 @@ export interface TinypoolWorker { resourceLimits?: any workerData: TinypoolData trackUnmanagedFds?: boolean + serialization?: SerializationType }): void terminate(): Promise postMessage(message: any, transferListItem?: TransferListItem[]): void diff --git a/src/index.ts b/src/index.ts index 2033b2b..883c89a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ import { type MessagePort, receiveMessageOnPort, } from 'node:worker_threads' +import type { SerializationType } from 'node:child_process' import { once, EventEmitterAsyncResource } from 'node:events' import { AsyncResource } from 'node:async_hooks' import { fileURLToPath, URL } from 'node:url' @@ -153,6 +154,7 @@ interface Options { trackUnmanagedFds?: boolean isolateWorkers?: boolean teardown?: string + serialization?: SerializationType } interface FilledOptions extends Options { @@ -735,6 +737,7 @@ class ThreadPool { this.options.workerData, ] as TinypoolData, trackUnmanagedFds: this.options.trackUnmanagedFds, + serialization: this.options.serialization, }) const onMessage = (message: ResponseMessage) => { diff --git a/test/fixtures/child_process-communication.mjs b/test/fixtures/child_process-communication.mjs index e7c55ba..4d3da58 100644 --- a/test/fixtures/child_process-communication.mjs +++ b/test/fixtures/child_process-communication.mjs @@ -1,4 +1,4 @@ -export default async function run() { +export default async function run(task) { let resolve = () => {} const promise = new Promise((r) => (resolve = r)) @@ -6,8 +6,8 @@ export default async function run() { process.on('message', (message) => { process.send({ received: message, response: 'Hello from worker' }) - resolve() + resolve({ received: task, response: 'Hello from worker' }) }) - await promise + return promise } diff --git a/test/runtime.test.ts b/test/runtime.test.ts index 2203809..c666ea5 100644 --- a/test/runtime.test.ts +++ b/test/runtime.test.ts @@ -1,7 +1,7 @@ +import EventEmitter from 'node:events' import * as path from 'node:path' import { fileURLToPath } from 'node:url' import { Tinypool } from 'tinypool' -import EventEmitter from 'node:events' const __dirname = path.dirname(fileURLToPath(import.meta.url)) @@ -199,6 +199,67 @@ describe('child_process', () => { }) }) + test('can send complex messages to port', async () => { + const pool = createPool({ + runtime: 'child_process', + filename: path.resolve( + __dirname, + 'fixtures/child_process-communication.mjs' + ), + serialization: 'advanced', + }) + + const complexData = { + bigint: 123456789123456789n, + map: new Map([['hello', 'world']]), + set: new Set(['hello', 'world']), + error: new Error('message'), + regexp: /regexp/, + } + + const emitter = new EventEmitter() + + const startup = new Promise((resolve) => + emitter.on( + 'response', + (message) => message === 'Child process started' && resolve() + ) + ) + + const runPromise = pool.run(complexData, { + channel: { + onMessage: (callback) => emitter.on('message', callback), + postMessage: (message) => emitter.emit('response', message), + }, + }) + + // Wait for the child process to start + await startup + + const response = new Promise((resolve) => + emitter.on('response', (message) => resolve(message)) + ) + + // Send message to child process + emitter.emit('message', complexData) + + // Wait for task to finish + const runResult = await runPromise + + expect(runResult).toMatchObject({ + received: complexData, + response: 'Hello from worker', + }) + + // Wait for response from child + const channelResult = await response + + expect(channelResult).toMatchObject({ + received: complexData, + response: 'Hello from worker', + }) + }) + test('channel is closed when isolated', async () => { const pool = createPool({ runtime: 'child_process', From 3aff1b62eccd9ca420a0bc64733d691d1a864278 Mon Sep 17 00:00:00 2001 From: claneo Date: Fri, 19 Dec 2025 21:52:06 +0800 Subject: [PATCH 2/3] docs: add serialization to constructor options --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 23e6df2..14e214a 100644 --- a/README.md +++ b/README.md @@ -181,6 +181,7 @@ We have a similar API to Piscina, so for more information, you can read Piscina' - `worker_threads`: Runs workers in [`node:worker_threads`](https://nodejs.org/api/worker_threads.html). For `main thread <-> worker thread` communication you can use [`MessagePort`](https://nodejs.org/api/worker_threads.html#class-messageport) in the `pool.run()` method's [`transferList` option](https://nodejs.org/api/worker_threads.html#portpostmessagevalue-transferlist). See [example](#main-thread---worker-thread-communication). - `child_process`: Runs workers in [`node:child_process`](https://nodejs.org/api/child_process.html). For `main thread <-> worker process` communication you can use `TinypoolChannel` in the `pool.run()` method's `channel` option. For filtering out the Tinypool's internal messages see `TinypoolWorkerMessage`. See [example](#main-process---worker-process-communication). - `teardown`: name of the function in file that should be called before worker is terminated. Must be named exported. +- `serialization`: Specify the kind of serialization used for the `child_process` runtime. Possible values are `'json'` and `'advanced'`. See Node.js [Advanced serialization](https://nodejs.org/docs/latest/api/child_process.html#advanced-serialization) for more details. Default: `'json'`. #### Pool methods From 3ba507d25697f5bfadb933881bd8b08c1b6511ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ari=20Perkki=C3=B6?= Date: Sat, 3 Jan 2026 09:34:05 +0200 Subject: [PATCH 3/3] Apply suggestion from @AriPerkkio --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 14e214a..88bd6a1 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,7 @@ We have a similar API to Piscina, so for more information, you can read Piscina' - `worker_threads`: Runs workers in [`node:worker_threads`](https://nodejs.org/api/worker_threads.html). For `main thread <-> worker thread` communication you can use [`MessagePort`](https://nodejs.org/api/worker_threads.html#class-messageport) in the `pool.run()` method's [`transferList` option](https://nodejs.org/api/worker_threads.html#portpostmessagevalue-transferlist). See [example](#main-thread---worker-thread-communication). - `child_process`: Runs workers in [`node:child_process`](https://nodejs.org/api/child_process.html). For `main thread <-> worker process` communication you can use `TinypoolChannel` in the `pool.run()` method's `channel` option. For filtering out the Tinypool's internal messages see `TinypoolWorkerMessage`. See [example](#main-process---worker-process-communication). - `teardown`: name of the function in file that should be called before worker is terminated. Must be named exported. -- `serialization`: Specify the kind of serialization used for the `child_process` runtime. Possible values are `'json'` and `'advanced'`. See Node.js [Advanced serialization](https://nodejs.org/docs/latest/api/child_process.html#advanced-serialization) for more details. Default: `'json'`. +- `serialization`: Specify the kind of serialization used for the `child_process` runtime. Possible values are `'json'` and `'advanced'`. See Node.js [Advanced serialization](https://nodejs.org/docs/latest/api/child_process.html#advanced-serialization) for more details. #### Pool methods