diff --git a/.github/dependabot.yml b/.github/dependabot.yml index b6e11cd..96f0226 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -44,3 +44,21 @@ updates: patterns: - "debug" - "@types/debug" + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + time: "17:00" + assignees: + - "nerjs" + labels: + - "dependencies" + - "bot" + - "github-actions" + target-branch: "dependencies" + open-pull-requests-limit: 5 + groups: + actions: + patterns: + - "*" diff --git a/.github/workflows/bump-version.yml b/.github/workflows/bump-version.yml index c2d9afd..af105d9 100644 --- a/.github/workflows/bump-version.yml +++ b/.github/workflows/bump-version.yml @@ -1,59 +1,43 @@ -# This is a basic workflow that is manually triggered - name: Bump version -# Controls when the action will run. Workflow runs when manually triggered using the UI -# or API. on: workflow_dispatch: - # Inputs the workflow accepts. inputs: version: - # Friendly description to be shown in the UI instead of 'name' description: 'Semver type of new version (major / minor / patch)' - # Input has to be provided for the workflow to run required: true type: choice - options: - - patch - - minor - - major + options: + - patch + - minor + - major -# A workflow run is made up of one or more jobs that can run sequentially or in parallel jobs: - # This workflow contains a single job called "bump-version" bump-version: - # The type of runner that the job will run on runs-on: ubuntu-latest - # Steps represent a sequence of tasks that will be executed as part of the job steps: - # Check out the content (source branch). Use a deploy key so that - # when we push changes, it will trigger the release workflow - # run that runs on: tag. (Using the GitHub token would - # not run the workflow to prevent infinite recursion.) - - name: Check out source - uses: actions/checkout@v3 - with: - token: ${{ secrets.PUBLISH_TOKEN }} - - - name: Setup Node.js - uses: actions/setup-node@v3 - with: - node-version: '18' - cache: 'npm' - - - name: Install npm packages - run: npm ci - - - name: Setup Git - run: | - git config user.name 'nerjs' - git config user.email 'nerjs.stap@gmail.com' - - - - name: bump version - run: npm version ${{ github.event.inputs.version }} - - - name: Push latest version - run: git push origin main --follow-tags \ No newline at end of file + - name: Checkout repository + uses: actions/checkout@v4 + with: + token: ${{ secrets.PUBLISH_TOKEN }} + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: 22 + cache: 'npm' + + - name: Install dependencies + run: npm ci + + - name: Setup Git + run: | + git config user.name 'nerjs' + git config user.email 'nerjs.stap@gmail.com' + + - name: Bump version + run: npm version ${{ github.event.inputs.version }} + + - name: Push latest version + run: git push origin main --follow-tags diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 55c24b1..97fdc7f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,52 +3,43 @@ name: "CI: tests, linter and build" on: workflow_dispatch: push: - branches: + branches: - main - develop - dependencies - 'dependencies-**' pull_request: - # Sequence of patterns matched against refs/heads branches: - main - dependencies - jobs: - try-build: + build: + if: "!contains(github.event.head_commit.message, '[skip-CI]')" + runs-on: ubuntu-latest strategy: + fail-fast: false matrix: - node-version: - - 18 - - 20 - - 22 - os: - - ubuntu-22.04 - - ubuntu-latest - runs-on: ${{ matrix.os }} - name: os:${{ matrix.os }}; node:${{ matrix.node-version }}. Run linter, tests and Trying to build + node-version: [20, 22, 24] + name: "node:${{ matrix.node-version }}" steps: - - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v3 + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v4 with: node-version: ${{ matrix.node-version }} - - name: Checkout repository - uses: actions/checkout@v3 - - name: Dependencies installation + cache: 'npm' + + - name: Install dependencies run: npm ci - - name: check versions - run: | - node -v - npm -v - - name: run linter - run: npm run lint - - name: run build + + - name: Run linter + run: npm run lint + + - name: Run build run: npm run build - - name: run tests in development env - run: npm run test -- --bail=2 --ci --coverage=true --runInBand --coverageReporters=text - - name: run tests in production env - run: NODE_ENV=production npm run test -- --bail=2 --ci --coverage=false --runInBand - - name: check - run: git status \ No newline at end of file + - name: Run tests + run: npm test -- --ci --coverage --runInBand --coverageReporters=text diff --git a/.github/workflows/create-release.yml b/.github/workflows/create-release.yml index 9f9a5d6..390510e 100644 --- a/.github/workflows/create-release.yml +++ b/.github/workflows/create-release.yml @@ -1,53 +1,60 @@ +name: Create Release + on: push: tags: - 'v*' -name: Create Release - - jobs: - lint-and-tests: name: Run linter and tests runs-on: ubuntu-latest steps: - - name: Checkout source - uses: actions/checkout@v3 + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: 22 + cache: 'npm' + - name: Install dependencies run: npm ci + - name: Run linter run: npm run lint - - name: Run tests - run: npm run test -- --ci - + - name: Run tests + run: npm test -- --ci publish-to-npm: name: Publish to npm runs-on: ubuntu-latest needs: lint-and-tests steps: - - name: Checkout source - uses: actions/checkout@v3 - with: - token: ${{ secrets.PUBLISH_TOKEN }} - - name: Setup node - uses: actions/setup-node@v3 - with: - node-version: 18 - registry-url: https://registry.npmjs.org - token: ${{ secrets.NPM_PUBLISH_TOKEN }} - - name: Install dependencies - run: npm ci - - name: build - run: npm run build - - name: Publish package - run: npm publish - env: - NODE_AUTH_TOKEN: ${{ secrets.NPM_PUBLISH_TOKEN }} + - name: Checkout repository + uses: actions/checkout@v4 + with: + token: ${{ secrets.PUBLISH_TOKEN }} + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: 22 + cache: 'npm' + registry-url: https://registry.npmjs.org - + - name: Install dependencies + run: npm ci + + - name: Build + run: npm run build + + - name: Publish package + run: npm publish + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_PUBLISH_TOKEN }} create-github-release: name: Create GitHub Release @@ -56,9 +63,10 @@ jobs: permissions: contents: write steps: - - name: Checkout code - uses: actions/checkout@v3 + - name: Checkout repository + uses: actions/checkout@v4 + - name: Create Release run: gh release create ${{ github.ref }} --generate-notes env: - GITHUB_TOKEN: ${{ secrets.PUBLISH_TOKEN }} \ No newline at end of file + GITHUB_TOKEN: ${{ secrets.PUBLISH_TOKEN }} diff --git a/.github/workflows/sync-dependencies.yml b/.github/workflows/sync-dependencies.yml index ce46fe0..f883f96 100644 --- a/.github/workflows/sync-dependencies.yml +++ b/.github/workflows/sync-dependencies.yml @@ -1,31 +1,56 @@ name: "SYNC: dependencies with main" - on: push: - branches: + branches: - main -# A workflow run is made up of one or more jobs that can run sequentially or in parallel +permissions: + contents: write + pull-requests: write + jobs: - # This workflow contains a single job called "bump-version" sync: - # The type of runner that the job will run on runs-on: ubuntu-latest - # Steps represent a sequence of tasks that will be executed as part of the job steps: - - name: Check out source + - name: Check out dependencies uses: actions/checkout@v4 with: - token: ${{ secrets.PUBLISH_TOKEN }} ref: dependencies - - - name: fetch - run: git fetch + fetch-depth: 0 + token: ${{ secrets.PUBLISH_TOKEN }} + + - name: Configure git + run: | + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + + - name: Try merge main + id: merge + run: | + git fetch origin main + if git merge origin/main -m "Sync main to dependencies [skip-CI]"; then + echo "result=success" >> $GITHUB_OUTPUT + else + git merge --abort + echo "result=conflict" >> $GITHUB_OUTPUT + fi - - name: pull main - run: git merge origin/main --no-edit + - name: Push if no conflicts + if: steps.merge.outputs.result == 'success' + run: git push origin dependencies - - name: push into dependencies - run: git push origin dependencies \ No newline at end of file + - name: Create PR if conflicts + if: steps.merge.outputs.result == 'conflict' + env: + GH_TOKEN: ${{ secrets.PUBLISH_TOKEN }} + run: | + existing=$(gh pr list --base dependencies --head main --json number --jq '.[0].number') + if [ -n "$existing" ]; then + echo "Sync PR #$existing already exists" + else + gh pr create --base dependencies --head main \ + --title "Sync: merge main into dependencies (conflicts)" \ + --body "Merge conflicts. Resolve manually." + fi diff --git a/README.md b/README.md index 447203e..9fe4339 100644 --- a/README.md +++ b/README.md @@ -251,7 +251,7 @@ Thrown when a task is manually aborted during execution, explicitly indicating t - **Request Grouping**: Groups requests into batches based on size or timeout. - **Concurrency Control**: Limits the number of tasks that can run in parallel (`concurrencyLimit`). -- **Timeout Handling**: Supports execution timeouts (`batchTimeout`) and waiting timeouts (`maxWaitingTimeMs`). +- **Timeout Handling**: Supports execution timeouts (`timeoutMs`) and waiting timeouts (`maxWaitingTimeMs`). --- @@ -263,7 +263,7 @@ interface IBatchAggregatorOptions { maxBatchSize: number; // Maximum number of requests per batch batchTimeMs: number; // Maximum time to form a batch maxWaitingTimeMs?: number; // Maximum waiting time for tasks in the queue (only if concurrencyLimit > 0) - batchTimeout?: number; // Maximum execution time for batchFn (the function passed as the first argument) + timeoutMs: number; // Maximum execution time for batchFn (the function passed as the first argument) } ``` @@ -284,7 +284,7 @@ const aggregator = new BatchAggregator( { maxBatchSize: 3, batchTimeMs: 100, - batchTimeout: 500, + timeoutMs: 500, } ) @@ -311,7 +311,7 @@ const aggregator = new BatchAggregator( { maxBatchSize: 2, batchTimeMs: 100, - batchTimeout: 500, + timeoutMs: 500, concurrencyLimit: 2, // Limit to 2 parallel tasks } ) @@ -340,7 +340,7 @@ const aggregator = new BatchAggregator( { maxBatchSize: 1, batchTimeMs: 100, - batchTimeout: 500, + timeoutMs: 500, concurrencyLimit: 1, maxWaitingTimeMs: 100, // Timeout for tasks in the queue } diff --git a/package-lock.json b/package-lock.json index cbbfa47..5ff20da 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@nerjs/batchloader", - "version": "2.0.6", + "version": "2.0.7", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@nerjs/batchloader", - "version": "2.0.6", + "version": "2.0.7", "license": "MIT", "dependencies": { "debug": "^4.4.3" @@ -1579,13 +1579,12 @@ "dev": true }, "node_modules/@types/node": { - "version": "25.0.10", - "resolved": "https://registry.npmjs.org/@types/node/-/node-25.0.10.tgz", - "integrity": "sha512-zWW5KPngR/yvakJgGOmZ5vTBemDoSqF3AcV/LrO5u5wTWyEAVVh+IT39G4gtyAkh3CtTZs8aX/yRM82OfzHJRg==", + "version": "25.6.2", + "resolved": "https://registry.npmjs.org/@types/node/-/node-25.6.2.tgz", + "integrity": "sha512-sokuT28dxf9JT5Kady1fsXOvI4HVpjZa95NKT5y9PNTIrs2AsobR4GFAA90ZG8M+nxVRLysCXsVj6eGC7Vbrlw==", "dev": true, - "license": "MIT", "dependencies": { - "undici-types": "~7.16.0" + "undici-types": "~7.19.0" } }, "node_modules/@types/stack-utils": { @@ -5488,7 +5487,6 @@ "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.4.9.tgz", "integrity": "sha512-LTb9496gYPMCqjeDLdPrKuXtncudeV1yRZnF4Wo5l3SFi0RYEnYRNgMrFIdg+FHvfzjCyQk1cLncWVqiSX+EvQ==", "dev": true, - "license": "MIT", "dependencies": { "bs-logger": "^0.2.6", "fast-json-stable-stringify": "^2.1.0", @@ -5541,7 +5539,6 @@ "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.4.tgz", "integrity": "sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA==", "dev": true, - "license": "ISC", "bin": { "semver": "bin/semver.js" }, @@ -5609,7 +5606,6 @@ "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, - "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -5657,11 +5653,10 @@ } }, "node_modules/undici-types": { - "version": "7.16.0", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.16.0.tgz", - "integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==", - "dev": true, - "license": "MIT" + "version": "7.19.2", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.19.2.tgz", + "integrity": "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==", + "dev": true }, "node_modules/unrs-resolver": { "version": "1.11.1", diff --git a/package.json b/package.json index 005e86d..c012349 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@nerjs/batchloader", - "version": "2.0.6", + "version": "2.0.7", "main": "dist/index.js", "scripts": { "build": "tsc -p tsconfig.build.json", diff --git a/src/batch-aggregator/batch-aggregator.ts b/src/batch-aggregator/batch-aggregator.ts index 9f61cd6..a71905d 100644 --- a/src/batch-aggregator/batch-aggregator.ts +++ b/src/batch-aggregator/batch-aggregator.ts @@ -2,10 +2,22 @@ import { ILimitedTimekeeperMetrics, ITask, ITimekeeper } from '../timekeeper/int import { LimitedTimekeeper } from '../timekeeper/limited.timekeeper' import { UnlimitedTimekeeper } from '../timekeeper/unlimited.timekeeper' import createDebug from 'debug' -import { BatchLoaderFn, IBatchAggregatorMetrics, IBatchAggregatorOptions } from './interfaces' +import { BatchLoaderFn, DEFAULT_MAX_WAITING_TIME_MS, IBatchAggregatorMetrics, IBatchAggregatorOptions } from './interfaces' import { LoaderError } from '../utils/errors' const debug = createDebug('batchloader:aggregator') +export const validateAggregatorOptions = (o: IBatchAggregatorOptions) => { + if (!Number.isFinite(o.timeoutMs) || o.timeoutMs <= 0) throw new RangeError(`timeoutMs must be > 0`) + if (!Number.isFinite(o.batchTimeMs) || o.batchTimeMs < 0) throw new RangeError(`batchTimeMs must be >= 0`) + if (!Number.isInteger(o.maxBatchSize) || o.maxBatchSize < 1) throw new RangeError(`maxBatchSize must be a positive integer`) + if (o.concurrencyLimit !== undefined && o.concurrencyLimit !== Infinity) { + if (!Number.isInteger(o.concurrencyLimit) || o.concurrencyLimit < 1) + throw new RangeError(`concurrencyLimit must be a positive integer or Infinity`) + if (o.maxWaitingTimeMs !== undefined && (!Number.isFinite(o.maxWaitingTimeMs) || o.maxWaitingTimeMs <= 0)) + throw new RangeError(`maxWaitingTimeMs must be > 0`) + } +} + interface TaskData { requests: T[] responses: R[] @@ -18,8 +30,8 @@ const createTimekeeperMetrics = (metrics?: IBatchAggregatorMetrics): ILimitedTim if (metrics.resolveBatch) tkMetrics.resolveTask = task => metrics.resolveBatch?.(task.data.requests.length) if (metrics.rejectBatch) tkMetrics.rejectTask = (_, task) => metrics.rejectBatch?.(task.data.requests.length) - if (metrics.parallelBatches) tkMetrics.runTask = runnedSize => metrics.parallelBatches?.(runnedSize) - if (metrics.waitingBatches) tkMetrics.waitTask = runnedSize => metrics.waitingBatches?.(runnedSize) + if (metrics.parallelBatches) tkMetrics.runTask = size => metrics.parallelBatches?.(size) + if (metrics.waitingBatches) tkMetrics.waitTask = size => metrics.waitingBatches?.(size) return tkMetrics } @@ -28,11 +40,11 @@ export class BatchAggregator { private readonly timekeeper: ITimekeeper> private readonly batchRunner = async (task: ITask>, signal: AbortSignal) => { - this.metrics?.rejectBatch?.(task.data.requests.length) + this.metrics?.runBatch?.(task.data.requests.length) debug(`Running batchRunner with a query array of length ${task.data.requests.length}. task id="${task.id}"`) const response = await this.batchLoaderFn([...task.data.requests], signal) if (!Array.isArray(response) || response.length !== task.data.requests.length) - throw new LoaderError(`The result of batchLoadFn must be an array equal in length to the query array `) + throw new LoaderError(`The result of batchLoadFn must be an array equal in length to the query array`) task.data.responses = response } @@ -42,19 +54,21 @@ export class BatchAggregator { private readonly options: IBatchAggregatorOptions, private readonly metrics?: IBatchAggregatorMetrics, ) { - const { concurrencyLimit, maxWaitingTimeMs, batchTimeMs: runMs, timeoutMs } = options + validateAggregatorOptions(options) + const { concurrencyLimit, maxWaitingTimeMs, batchTimeMs: runMs, timeoutMs, unrefTimeouts } = options const initialDataFactory = () => ({ requests: [], responses: [] }) this.timekeeper = - concurrencyLimit && concurrencyLimit > 0 && concurrencyLimit < Infinity + concurrencyLimit !== undefined && concurrencyLimit !== Infinity ? new LimitedTimekeeper( { concurrencyLimit, initialDataFactory, - maxWaitingTimeMs: maxWaitingTimeMs || 60_000, + maxWaitingTimeMs: maxWaitingTimeMs ?? DEFAULT_MAX_WAITING_TIME_MS, runMs, runner: this.batchRunner, timeoutMs, callRejectedTask: false, + unrefTimeouts, }, createTimekeeperMetrics(metrics), ) @@ -65,6 +79,7 @@ export class BatchAggregator { runner: this.batchRunner, timeoutMs, callRejectedTask: false, + unrefTimeouts, }, createTimekeeperMetrics(metrics), ) @@ -86,7 +101,7 @@ export class BatchAggregator { const task = this.getCurrentTask() const index = task.data.requests.length this.metrics?.loadBatchItem?.() - debug(`Load data. task id="${task.id}"; curent index="${index}"`) + debug(`Load data. task id="${task.id}"; current index="${index}"`) task.data.requests.push(request) await this.timekeeper.wait(task) diff --git a/src/batch-aggregator/interfaces.ts b/src/batch-aggregator/interfaces.ts index 9d10a12..567434b 100644 --- a/src/batch-aggregator/interfaces.ts +++ b/src/batch-aggregator/interfaces.ts @@ -1,3 +1,5 @@ +export const DEFAULT_MAX_WAITING_TIME_MS = 60_000 + export interface IBatchAggregatorOptions { /** * @description Maximum number of parallel tasks (default: unlimited) @@ -23,6 +25,12 @@ export interface IBatchAggregatorOptions { * @description Maximum execution time for batchFn (the function passed as the first argument) */ timeoutMs: number + + /** + * @description Allows timers to avoid blocking the event loop + * @default true + */ + unrefTimeouts?: boolean } export type BatchLoaderFn = (batchArray: T[], signal: AbortSignal) => Promise | R[] diff --git a/src/batch-loader/batch-loader.ts b/src/batch-loader/batch-loader.ts index b780138..9691b04 100644 --- a/src/batch-loader/batch-loader.ts +++ b/src/batch-loader/batch-loader.ts @@ -1,20 +1,27 @@ -import { BatchAggregator } from '../batch-aggregator/batch-aggregator' -import { BatchLoaderFn } from '../batch-aggregator/interfaces' +import { BatchAggregator, validateAggregatorOptions } from '../batch-aggregator/batch-aggregator' +import { BatchLoaderFn, DEFAULT_MAX_WAITING_TIME_MS } from '../batch-aggregator/interfaces' import { Deduplicator } from '../deduplicator/deduplicator' import { Key } from '../utils/interfaces' import { CacheAdapter } from './cache-adapter' import { IBatchLoaderOptions } from './interfaces' -const prepareOptions = (options: IBatchLoaderOptions) => ({ - getKey: (query: K) => `${query}`, - timeoutMs: 60_000, - unrefTimeouts: false, - concurrencyLimit: Infinity, - maxBatchSize: 1000, - batchTimeMs: 50, - maxWaitingTimeMs: 60_000, - ...options, -}) +// Node's setTimeout silently caps delays here and warns - clamp upstream to avoid TimeoutOverflowWarning + firing in 1ms. +const TIMEOUT_MAX = 2_147_483_647 + +const prepareOptions = (options: IBatchLoaderOptions) => { + const merged = { + getKey: (query: K) => `${query}`, + timeoutMs: 60_000, + unrefTimeouts: true, + concurrencyLimit: Infinity, + maxBatchSize: 1000, + batchTimeMs: 50, + maxWaitingTimeMs: DEFAULT_MAX_WAITING_TIME_MS, + ...options, + } + validateAggregatorOptions(merged) + return merged +} export class BatchLoader { private readonly cache: CacheAdapter @@ -23,17 +30,24 @@ export class BatchLoader { private readonly getKey: (query: K) => Key constructor(batchLoaderFn: BatchLoaderFn, options: IBatchLoaderOptions) { - const { cache, getKey, timeoutMs, unrefTimeouts, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs } = + const { cache, getKey, timeoutMs, unrefTimeouts, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs, metrics } = prepareOptions(options) + const limited = concurrencyLimit !== Infinity + const deduplicatorTimeoutMs = Math.min(timeoutMs + batchTimeMs + (limited ? maxWaitingTimeMs : 0), TIMEOUT_MAX) + this.getKey = getKey this.cache = new CacheAdapter(cache) this.deduplicator = new Deduplicator(this.deduplicatorRunner, { getKey, - timeoutMs: timeoutMs + batchTimeMs, - unrefTimeouts: !!unrefTimeouts, + timeoutMs: deduplicatorTimeoutMs, + unrefTimeouts, }) - this.aggregator = new BatchAggregator(batchLoaderFn, { timeoutMs, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs }) + this.aggregator = new BatchAggregator( + batchLoaderFn, + { timeoutMs, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs, unrefTimeouts }, + metrics, + ) } private readonly deduplicatorRunner = async (query: K, signal: AbortSignal): Promise => { @@ -47,6 +61,8 @@ export class BatchLoader { const loaded = await this.aggregator.load(query) + if (signal.aborted) throw signal.reason + this.deduplicator.restartTimeout(query) await this.cache.set(key, loaded) diff --git a/src/batch-loader/interfaces.ts b/src/batch-loader/interfaces.ts index e7f2449..1cc4c43 100644 --- a/src/batch-loader/interfaces.ts +++ b/src/batch-loader/interfaces.ts @@ -1,3 +1,4 @@ +import { IBatchAggregatorMetrics } from '../batch-aggregator/interfaces' import { Key } from '../utils/interfaces' export interface ICache { @@ -7,6 +8,8 @@ export interface ICache { clear(): Promise } +export interface IBatchLoaderMetrics extends IBatchAggregatorMetrics {} + export interface IBatchLoaderOptions { /** * @description Function to extract the key from a query @@ -24,7 +27,7 @@ export interface IBatchLoaderOptions { /** * @description Allows timers to avoid blocking the event loop - * @default false + * @default true */ unrefTimeouts?: boolean @@ -51,4 +54,9 @@ export interface IBatchLoaderOptions { * @default 60_000 */ maxWaitingTimeMs?: number + + /** + * @description Optional metrics hooks. Forwarded to the internal BatchAggregator. + */ + metrics?: IBatchLoaderMetrics } diff --git a/src/deduplicator/deduplicator.ts b/src/deduplicator/deduplicator.ts index b1e1d17..0217ca5 100644 --- a/src/deduplicator/deduplicator.ts +++ b/src/deduplicator/deduplicator.ts @@ -1,6 +1,7 @@ import { Defer } from '../utils/defer' import createDebug from 'debug' import { RejectedAbortError, SilentAbortError, TimeoutError } from '../utils/errors' +import { unrefTimer } from '../utils/timer' import { DeduplicatorRunnerCallback, IDeduplicatorOptions } from './interfaces' import { Key } from '../utils/interfaces' const debug = createDebug('batchloader:deduplicator') @@ -47,7 +48,10 @@ export class Deduplicator { } private createTimeout(key: Key): NodeJS.Timeout { - return setTimeout(() => this.callError(key, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs) + return unrefTimer( + setTimeout(() => this.callError(key, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs), + this.options.unrefTimeouts, + ) } private createRunner(key: Key, query: T): Defer { @@ -56,7 +60,8 @@ export class Deduplicator { const controller = new AbortController() const tid = this.createTimeout(key) - if (this.options.unrefTimeouts) tid?.unref?.() + + this.runners.set(key, { defer, controller, tid }) this.run(query, controller.signal) .then(result => defer.resolve(result)) @@ -68,8 +73,6 @@ export class Deduplicator { }) .finally(() => this.clearRunner(key)) - this.runners.set(key, { defer, controller, tid }) - return defer } diff --git a/src/index.ts b/src/index.ts index 1a95577..24bf85f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,13 +1,13 @@ export { Key } from './utils/interfaces' -export { IBatchLoaderOptions, ICache } from './batch-loader/interfaces' +export { IBatchLoaderOptions, IBatchLoaderMetrics, ICache } from './batch-loader/interfaces' export { BatchLoader } from './batch-loader/batch-loader' export { CacheAdapter, MapCache } from './batch-loader/cache-adapter' export { DeduplicatorRunnerCallback, IDeduplicatorOptions } from './deduplicator/interfaces' export { Deduplicator } from './deduplicator/deduplicator' -export { BatchLoaderFn, IBatchAggregatorOptions } from './batch-aggregator/interfaces' +export { BatchLoaderFn, IBatchAggregatorOptions, IBatchAggregatorMetrics } from './batch-aggregator/interfaces' export { BatchAggregator } from './batch-aggregator/batch-aggregator' export { @@ -18,6 +18,8 @@ export { UnlimitedTimekeeperOptions, TaskStatus, TimekeeperRunnerCallback, + IUnlimitedTimekeeperMetrics, + ILimitedTimekeeperMetrics, } from './timekeeper/interfaces' export { UnlimitedTimekeeper } from './timekeeper/unlimited.timekeeper' export { LimitedTimekeeper } from './timekeeper/limited.timekeeper' diff --git a/src/timekeeper/interfaces.ts b/src/timekeeper/interfaces.ts index 30c3af8..35cbe36 100644 --- a/src/timekeeper/interfaces.ts +++ b/src/timekeeper/interfaces.ts @@ -2,10 +2,10 @@ export type TaskStatus = 'pending' | 'runned' | 'resolved' | 'rejected' export interface ITask { readonly id: string - status: TaskStatus - data: D - runnedAt: number | null - createdAt: number + readonly status: TaskStatus + readonly data: D + readonly runnedAt: number | null + readonly createdAt: number } export type TimekeeperRunnerCallback = (task: ITask, signal: AbortSignal) => Promise | void @@ -25,6 +25,11 @@ export interface UnlimitedTimekeeperOptions { runner: TimekeeperRunnerCallback timeoutMs: number callRejectedTask?: boolean + /** + * @description Allows timers to avoid blocking the event loop + * @default true + */ + unrefTimeouts?: boolean } export interface LimitedOptions { @@ -38,10 +43,10 @@ export interface IUnlimitedTimekeeperMetrics { create?: () => void forcedRun?: () => void abort?: (task: ITask, error: unknown) => void - runTask?: (runnedSize: number, task: ITask) => void + runTask?: (size: number, task: ITask) => void resolveTask?: (task: ITask) => void rejectTask?: (error: unknown, task: ITask) => void } export interface ILimitedTimekeeperMetrics extends IUnlimitedTimekeeperMetrics { - waitTask?: (waitListSize: number) => void + waitTask?: (size: number) => void } diff --git a/src/timekeeper/limited.timekeeper.ts b/src/timekeeper/limited.timekeeper.ts index f8e857f..5ebe98a 100644 --- a/src/timekeeper/limited.timekeeper.ts +++ b/src/timekeeper/limited.timekeeper.ts @@ -1,4 +1,5 @@ import { SilentAbortError, TimeoutError } from '../utils/errors' +import { unrefTimer } from '../utils/timer' import { ILimitedTimekeeperMetrics, ITask, ITimekeeper, LimitedOptions, LimitedTimekeeperOptions } from './interfaces' import { Task } from './task' import { UnlimitedTimekeeper } from './unlimited.timekeeper' @@ -22,26 +23,32 @@ export class LimitedTimekeeper extends UnlimitedTimekeeper): void { - if (this.runnedTasks.size < this.limitedOptions.concurrencyLimit) { - super.runTask(task) - task.defer.promise.finally(() => this.runNextWaitingTask()).catch(() => {}) + if (this.runnedTasks.size >= this.limitedOptions.concurrencyLimit) { + this.enqueueTask(task) return } - const runnedTime = Date.now() - task.tid = setTimeout(() => { - debug( - `A task on the waiting list is waiting longer than it should. id="${task.id}"; time="${Date.now() - runnedTime}"; maxWaitingTimeMs="${this.limitedOptions.maxWaitingTimeMs}"`, - ) - this.abort(task.id, new TimeoutError(this.limitedOptions.maxWaitingTimeMs)) - }, this.limitedOptions.maxWaitingTimeMs)?.unref() + super.runTask(task) + task.defer.promise.finally(() => this.runNextWaitingTask()).catch(() => {}) + } + + private enqueueTask(task: Task): void { + const enqueuedAt = Date.now() + task.tid = unrefTimer( + setTimeout(() => { + debug( + `A task on the waiting list is waiting longer than it should. id="${task.id}"; time="${Date.now() - enqueuedAt}"; maxWaitingTimeMs="${this.limitedOptions.maxWaitingTimeMs}"`, + ) + this.abort(task.id, new TimeoutError(this.limitedOptions.maxWaitingTimeMs)) + }, this.limitedOptions.maxWaitingTimeMs), + this.options.unrefTimeouts, + ) this.waitingTasks.push(task) this.metrics?.waitTask?.(this.waitingTasks.length) debug(`The task has been added to the waiting list. id="${task.id}"`) @@ -54,6 +61,11 @@ export class LimitedTimekeeper extends UnlimitedTimekeeper, error: unknown): void { if (this.currentTask?.id === task.id) return super.rejectPendingTask(task, error) this.waitingTasks = this.waitingTasks.filter(({ id }) => id !== task.id) + this.rejectWaitingTask(task, error) + } + + private rejectWaitingTask(task: Task, error: unknown): void { + if (task.status !== 'pending') return if (task.tid) clearTimeout(task.tid) this.metrics?.rejectTask?.(error, task.inner) this.callAbortedRunner(task, error) @@ -61,7 +73,10 @@ export class LimitedTimekeeper extends UnlimitedTimekeeper this.abort(task.inner, new SilentAbortError('timekeeper'))) + const error = new SilentAbortError('timekeeper') + waiting.forEach(task => this.rejectWaitingTask(task, error)) } } diff --git a/src/timekeeper/unlimited.timekeeper.ts b/src/timekeeper/unlimited.timekeeper.ts index fcfdb0e..504fabd 100644 --- a/src/timekeeper/unlimited.timekeeper.ts +++ b/src/timekeeper/unlimited.timekeeper.ts @@ -1,5 +1,6 @@ import { AbortError, SilentAbortError, TimeoutError } from '../utils/errors' import { isLoaderError } from '../utils/is' +import { unrefTimer } from '../utils/timer' import { ITask, ITimekeeper, IUnlimitedTimekeeperMetrics, UnlimitedTimekeeperOptions } from './interfaces' import { Task } from './task' import createDebug from 'debug' @@ -72,10 +73,13 @@ export class UnlimitedTimekeeper { - debug(`The current task is started by a timer. id=${this.currentTask?.id}`) - this.runCurrentTask() - }, this.options.runMs)?.unref() + this.tidRunner = unrefTimer( + setTimeout(() => { + debug(`The current task is started by a timer. id=${this.currentTask?.id}`) + this.runCurrentTask() + }, this.options.runMs), + this.options.unrefTimeouts, + ) } private clearRunnerTimeout() { @@ -103,8 +107,12 @@ export class UnlimitedTimekeeper) { task.status = 'runned' + task.runnedAt = Date.now() task.controller = task.controller || new AbortController() - task.tid = setTimeout(() => this.abort(task.id, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs)?.unref() + task.tid = unrefTimer( + setTimeout(() => this.abort(task.id, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs), + this.options.unrefTimeouts, + ) this.runnedTasks.set(task.id, task) this.metrics?.runTask?.(this.runnedTasks.size, task.inner) diff --git a/src/utils/is.ts b/src/utils/is.ts index 0d820ac..c20aeae 100644 --- a/src/utils/is.ts +++ b/src/utils/is.ts @@ -1,5 +1,4 @@ import { LoaderError } from './errors' -const isObject = (value: any): value is Record => value && typeof value === 'object' -export const isError = (value: any): value is Error => isObject(value) && value instanceof Error -export const isLoaderError = (value: any): value is LoaderError => isError(value) && value instanceof LoaderError +export const isError = (value: any): value is Error => value instanceof Error +export const isLoaderError = (value: any): value is LoaderError => value instanceof LoaderError diff --git a/src/utils/signals.ts b/src/utils/signals.ts deleted file mode 100644 index e69de29..0000000 diff --git a/src/utils/timer.ts b/src/utils/timer.ts new file mode 100644 index 0000000..cd70a22 --- /dev/null +++ b/src/utils/timer.ts @@ -0,0 +1,10 @@ +/** + * Conditionally unrefs the timer. When `unrefTimeouts` is left `undefined`, + * the default is to unref - matching the historical behaviour of this library + * where pending timers never blocked process exit. Pass `false` explicitly to + * keep the event loop alive while timers are pending. + */ +export const unrefTimer = (tid: NodeJS.Timeout, unrefTimeouts: boolean | undefined): NodeJS.Timeout => { + if (unrefTimeouts !== false) tid?.unref?.() + return tid +} diff --git a/tsconfig.json b/tsconfig.json index b243a8e..c44b64f 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,15 +1,14 @@ { - "compilerOptions": { + "compilerOptions": { "module": "commonjs", "types": ["node", "jest"], - "moduleResolution": "Node", + "moduleResolution": "node10", "declaration": true, "removeComments": false, "allowSyntheticDefaultImports": true, "target": "ES2023", "sourceMap": true, "outDir": "./dist", - "baseUrl": "./src", "incremental": false, "skipLibCheck": true, "strictNullChecks": true, @@ -20,7 +19,7 @@ "strict": true, "resolveJsonModule": true, "useDefineForClassFields": false, - "noEmitOnError": true, + "noEmitOnError": true }, "include": ["src"] - } \ No newline at end of file + }