A small zero-dependency async queue library for building composable streaming pipelines in TypeScript. Think multiple airport counters with queues of people routed to minimize average waiting time.
bun add superqueue
# or
npm i superqueueimport {Superqueue} from 'superqueue';
const results = await Superqueue.fromArray([1, 2, 3, 4, 5])
.concurrency(4)
.upipe(async n => {
await fetchSomething(n);
return n * 2;
})
.collect();
// results is an unordered array of n*2 for each inputReal-world shape — fan-out pipeline with parallel stages, splits, and batching:
const videosQueue = await youtube.getChannelVideos(channelId); // Superqueue<VideoSnippetResponse>
const dbVideosIndex = /* ... */;
const idsQueue = videosQueue.pipe(v => v.resourceId!.videoId!);
const [freshIdsQueue, cachedIdsQueue] = idsQueue
.pipe(id => (dbVideosIndex[id] ? undefined : id))
.concurrency(16)
.usplit(async id =>
(await storage.exists(`transcript/${id}.vtt`)) ? [id, 1] : [id, 0],
);
const metadatasQueue = freshIdsQueue
.batch(50)
.upipe(batch => youtube.getVideosMetadata(batch))
.flat();
const [englishQueue, otherQueue] = metadatasQueue.split(m =>
englishLangCodes.includes(m.defaultAudioLanguage) ? [m, 0] : [m, 1],
);
const [goodSubs, badSubs] = englishQueue
.concurrency(8)
.usplit(async m => {
try {
return [{...m, path: await youtube.downloadSubtitles(m.videoId)}, 0];
} catch {
return [m, 1];
}
});new Superqueue<T>()— empty queue.Superqueue.fromArray<T>(array)— preloaded, auto-ended queue.
push(...vals: T[])— enqueue values. Throws if the queue is ended.end()— no more values. Idempotent.pause()/resume()— gate consumption.#shiftblocks while paused; producers can stillpush/end. Idempotent.fail— no explicit fail mechanism. User callback throws are expected to be caught by the callback itself; the library just guarantees the pipeline closes rather than hangs (see Error handling below).
collect(): Promise<T[]>— drain into an array.consume(callback: (v: T) => void | Promise<void>)— run a callback per value. If the callback returns a Promise it is tracked againstconcurrency()and gathered beforeconsumeresolves; synchronous returns are fire-and-forget.[Symbol.asyncIterator]—for await (const v of queue) ....shiftUnsafe(): Promise<T | typeof Superqueue.EOF>— single-shift escape hatch. Bypasses the "piped" guard; caller owns the lifecycle.
| Method | Sync / Async | Order |
|---|---|---|
pipe(fn) |
sync callback | preserved |
upipe(fn) |
async callback | unordered (parallel, bounded by concurrency()) |
split(fn) → [Q, Q] |
sync routing | preserved |
usplit(fn) → [Q, Q] |
async routing | unordered |
umerge(other) → Q |
— | unordered (interleaves two sources) |
batch(size | predicate, idleMs?) |
— | preserved |
flat() |
— | preserved |
clone(count = 2) → Q[] |
— | preserved (each clone sees every value) |
u-prefixed methods don't preserve input order because callbacks run in parallel or sources interleave. Use the plain counterpart (pipe/split) for order.
pipe<U>(callback: (v: T) => U | undefined): Superqueue<U>— returningundefinedfilters.upipe<U>(callback: (v: T) => Promise<U | undefined>): Superqueue<U>— ditto, async.split<U, V = U>(callback: (v: T) => [U, 0] | [V, 1]): [Superqueue<U>, Superqueue<V>]usplit<U, V = U>(callback: (v: T) => Promise<[U, 0] | [V, 1] | undefined>): [Superqueue<U>, Superqueue<V>]— returningundefinedfilters.umerge(q: Superqueue<T>): Superqueue<T>batch(sizeOrPredicate, idleMs?)— numeric size cap, OR a predicate(size, startTime) => booleanevaluated on each item.idleMsflushes the partial buffer after a stall.flat()— flattens array values.clone(count = 2)— multi-reader copies; each clone inherits the source's concurrency.
concurrency(n: number): this— set the max in-flight callbacks forconsume/upipe/uspliton this queue. Throws for non-positive-integer values; acceptsInfinityfor unbounded. Can be called before piping or live from inside a callback to retune (the loop re-reads every iteration). Defaults to 8.
q.concurrency(10).upipe(async x => { ... });
// or live-retune:
q.upipe(async x => {
if (backpressureHigh()) q.concurrency(2);
return await work(x);
});ended: boolean,paused: boolean,piped: boolean— observable flags.size(): number— current queue length.pushCount(): number— total values ever pushed.waitForEnd(): Promise<void>— resolves whenend()fires.waitForShift(): Promise<void>— resolves the next time a value is consumed from this queue.
Superqueue.EOF— a unique symbol returned byshiftUnsafewhen the queue is drained and ended.undefinedis a legal user value (it's not the sentinel).
Superqueue does not surface user-callback errors as a first-class signal. The semantics are:
- If a user callback throws (sync) or rejects (async), the upstream
consume's promise rejects. - Derived-queue constructors wire
.finally(end)so the output queue always closes, even on error — no silent hangs. - The rejection itself propagates as an unhandled rejection (visible in logs). Catch inside your callback if you want to observe or recover from it.
If you need "keep going on error" semantics, wrap your callback in try/catch; the queue won't notice.
bun run testRuns the unit suite (bun test) plus a manual integration harness that asserts pipeline-closes-on-throw across every operator — Bun's test runner fails on unhandled rejections through a path that process.on can't intercept, so those scenarios live outside bun test.