-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker-thread.mjs
More file actions
89 lines (85 loc) · 2.9 KB
/
worker-thread.mjs
File metadata and controls
89 lines (85 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import thread from "node:worker_threads";
class ThreadPool {
constructor(size,file) {
this.file=file
this.size=size
this.wokerPool=new Map()
this.idleWorker=new Set()
this.taskQueue=[]
this.activeWorker=new Map()
this.callback
for (let index = 0; index < size; index++) {
this.#createWorker()
}
for (const [key,value] of this.wokerPool) {
this.idleWorker.add(key)
}
}
#createWorker(){
const worker=new thread.Worker(this.file)
const id=worker.threadId
worker.on('message',(msg)=>{
const callback=this.activeWorker.get(msg.id)
if (callback) {
callback(msg.result)
}
this.idleWorker.add(msg.id)
console.log('thread',msg.id,'is idle')
this.#cleanUp(msg.id)
})
worker.on('error',(message)=>{
console.log(message)
this.wokerPool.delete(id)
console.log('thread',msg.id,'was destroyed')
const newWorker=this.#createWorker()
this.idleWorker.add(newWorker.threadId)
console.log('new thread created wit id:',msg.id,' and is idle')
this.#cleanUp(id)
})
worker.on('exit',(code)=>{
this.#cleanUp(id)
if (code!==0) {
this.wokerPool.delete(id)
console.log('thread',msg.id,'was destroyed')
const newWorker=this.#createWorker()
this.idleWorker.add(newWorker.threadId)
console.log('new thread created wit id:',msg.id,' and is idle')
}
})
return this.wokerPool.set(id,worker)
}
#cleanUp(worker){
this.activeWorker.delete(worker)
this.#assignment(this.callback)
}
#assignment(callback){
if (this.idleWorker.size===0||this.taskQueue.length===0) return
const id=this.idleWorker.values().next().value
const value=this.wokerPool.get(id)
if (!value) {
console.warn(`⚠️ Worker ${id} not found in pool.`)
this.idleWorker.delete(id)
this.#assignment(callback)
return
}
const task=this.taskQueue.shift()
this.activeWorker.set(id,callback)
console.log(' thread :',id,' is now active')
value.postMessage({id:id,task:task})
//console.log(this.callback)
this.idleWorker.delete(id)
}
run(data,callback){
if (!Array.isArray(data)) {
throw new TypeError('data is not an array')
}
this.callback=callback
this.taskQueue=data
for (let index = 0; index < this.wokerPool; index++) {
this.#assignment(this.callback)
}
}
}
const data=[5,50,40,1]
const pool=new ThreadPool(4,'./test-worker.mjs')
pool.run(data,(result)=>{console.log(result)})