-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathindex.rl.ts
More file actions
152 lines (139 loc) · 4.45 KB
/
index.rl.ts
File metadata and controls
152 lines (139 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/**
* This version of `@henrygd/queue` supports rate limiting.
*
* @module
*/
/** List node */
type Node<T> = {
/** input promise wrapper */
p: () => T
/** resolve returned promise */
res: (value: T) => void
/** reject returned promise */
rej: (reason: any) => void
/** next node pointer */
next?: Node<T>
}
/** Queue interface */
export interface Queue {
/** Add an async function / promise wrapper to the queue */
add<T>(promiseFunction: () => PromiseLike<T>): Promise<T>
/** Returns a promise that resolves when the queue is empty */
done(): Promise<void>
/** Empties the queue (active promises are not cancelled) */
clear(): void
/** Returns the number of promises currently running */
active(): number
/** Returns the total number of promises in the queue */
size(): number
/** Adds promises (or wrappers) to the queue and resolves like Promise.all */
all<T>(promiseFunctions: Array<PromiseLike<T> | (() => PromiseLike<T>)>): Promise<T[]>
}
// this just saves a few bytes
let Promize = Promise
/**
* Creates a new queue with concurrency and optional rate limiting.
*
* @param {number} concurrency - The maximum number of concurrent operations.
* @param {number} [rate] - The maximum number of operations that can start within the interval.
* @param {number} [interval] - The time window in milliseconds for rate limiting.
* @return {Queue} - The newly created queue.
*/
export let newQueue = (concurrency: number, rate?: number, interval?: number): Queue => {
let active = 0
let size = 0
let head: Node<PromiseLike<any>> | undefined | null
let tail: Node<PromiseLike<any>> | undefined | null
let resolveDonePromise: (value: void | PromiseLike<void>) => void
let donePromise: Promise<void> | void
let queue: Queue
let scheduled = false
let startTimes: number[] = []
let afterRun = () => {
active--
if (--size) {
run()
} else {
donePromise = resolveDonePromise?.()
}
}
let run = () => {
// If already scheduled or no items in queue, skip
if (scheduled || !head) return
// Check concurrency limit
if (active >= concurrency) return
// Check rate limit if configured
if (rate !== undefined && interval !== undefined) {
let now = Date.now()
// Remove timestamps outside the current interval window
startTimes = startTimes.filter((t) => now - t < interval)
// If we've hit the rate limit, schedule a retry
if (startTimes.length >= rate) {
scheduled = true
let oldestStart = startTimes[0]
let delay = interval - (now - oldestStart)
setTimeout(() => {
scheduled = false
run()
}, delay)
return
}
// Track this task's start time
startTimes.push(now)
}
// Execute the task
active++
let curHead = head
head = head.next
curHead.p().then(
(v) => (curHead.res(v), afterRun()),
(e) => (curHead.rej(e), afterRun())
)
// Try to run more tasks if available
if (head && active < concurrency) {
run()
}
}
return (queue = {
add<T>(p: () => PromiseLike<T>) {
let node = { p } as Node<PromiseLike<T>>
let promise = new Promize((res, rej) => {
node.res = res
node.rej = rej
})
if (head) {
tail = tail!.next = node
} else {
tail = head = node
}
size++
run()
return promise as Promise<T>
},
done: () => {
if (!size) {
return Promize.resolve()
}
if (donePromise) {
return donePromise
}
return (donePromise = new Promize((resolve) => (resolveDonePromise = resolve)))
},
clear() {
for (let node = head; node; node = node.next) {
node.rej(new Error('Queue cleared'))
}
head = tail = null
size = active
startTimes = []
// If size is now 0 and there's a pending done() promise, resolve it
if (!size && donePromise) {
donePromise = resolveDonePromise?.()
}
},
active: () => active,
size: () => size,
all: <T>(fns: Array<PromiseLike<T> | (() => PromiseLike<T>)>): Promise<T[]> =>
Promize.all(fns.map(fn => queue.add(typeof fn === 'function' ? fn : () => fn))),
})
}