From 057a7d306d4790f90b17a44b1a28976051f4052d Mon Sep 17 00:00:00 2001 From: Mohammad Bagher Abiyat Date: Sun, 21 Apr 2024 11:30:32 +0330 Subject: [PATCH] p-limit --- .eslintrc.json | 1 + README.md | 6 +++-- package.json | 1 + pnpm-lock.yaml | 10 ++++++++ src/bench.ts | 56 +++++++++++++++++++++++++++-------------- src/task.ts | 20 ++++++--------- test/sequential.test.ts | 56 ++++++++++++++++++++++++++++------------- 7 files changed, 99 insertions(+), 51 deletions(-) diff --git a/.eslintrc.json b/.eslintrc.json index 755381f..e395f92 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -17,6 +17,7 @@ "commonjs": true }, "rules": { + "import/no-extraneous-dependencies": "off", "no-restricted-syntax": "off", "no-await-in-loop": "off", "no-plusplus": "off", diff --git a/README.md b/README.md index 93390e1..5035af0 100644 --- a/README.md +++ b/README.md @@ -141,8 +141,9 @@ export type Hook = (task: Task, mode: "warmup" | "run") => void | Promise; ``` - `async run()`: run the added tasks that were registered using the `add` method -- `async runConcurrently(limit: number = Infinity, mode: "bench" | "task" = "bench")`: similar to the `run` method but runs concurrently rather than sequentially. See the [Concurrency](#Concurrency) section. +- `async runConcurrently(threshold: number = Infinity, mode: "bench" | "task" = "bench")`: similar to the `run` method but runs concurrently rather than sequentially. See the [Concurrency](#Concurrency) section. - `async warmup()`: warm up the benchmark tasks +- `async warmupConcurrently(threshold: number = Infinity, mode: "bench" | "task" = "bench")`: warm up the benchmark tasks concurrently - `reset()`: reset each task and remove its result - `add(name: string, fn: Fn, opts?: FnOpts)`: add a benchmark task to the task map - `Fn`: `() => any | Promise` @@ -374,7 +375,7 @@ It may make your benchmarks slower, check #42. - When `mode` is set to `null` (default), concurrency is disabled. - When `mode` is set to 'task', each task's iterations run concurrently. -- When `mode` is set to 'bench', different tasks within the bench run concurrently +- When `mode` is set to 'bench', different tasks within the bench run concurrently. Concurrent cycles. ```ts // options way (recommended) @@ -384,6 +385,7 @@ bench.concurrency = "task" // The concurrency mode to determine how tasks are ru await bench.run() // standalone method way +// await bench.warmupConcurrently(10, "task") await bench.runConcurrently(10, "task") // with runConcurrently, mode is set to 'bench' by default ``` diff --git a/package.json b/package.json index 7f5651f..8f994d9 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "eslint-config-airbnb-base": "^15.0.0", "eslint-plugin-import": "^2.26.0", "nano-staged": "^0.5.0", + "p-limit": "^5.0.0", "size-limit": "^7.0.8", "tsup": "^5.11.7", "typescript": "^5.2.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6fa0d20..0c4ee53 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -44,6 +44,9 @@ importers: nano-staged: specifier: ^0.5.0 version: 0.5.0 + p-limit: + specifier: ^5.0.0 + version: 5.0.0 size-limit: specifier: ^7.0.8 version: 7.0.8 @@ -2997,6 +3000,13 @@ packages: yocto-queue: 1.0.0 dev: true + /p-limit@5.0.0: + resolution: {integrity: sha512-/Eaoq+QyLSiXQ4lyYV23f14mZRQcXnxfHrN0vCai+ak9G0pp9iEQukIIZq5NccEvwRB8PUnZT0KsOoDCINS1qQ==} + engines: {node: '>=18'} + dependencies: + yocto-queue: 1.0.0 + dev: true + /p-locate@4.1.0: resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==} engines: {node: '>=8'} diff --git a/src/bench.ts b/src/bench.ts index 7ed9d13..93ea905 100644 --- a/src/bench.ts +++ b/src/bench.ts @@ -1,3 +1,4 @@ +import pLimit from 'p-limit'; import type { Hook, Options, @@ -118,26 +119,14 @@ export default class Bench extends EventTarget { this.dispatchEvent(createBenchEvent('start')); - const remainingTasks = [...this._tasks.values()]; - const values: Task[] = []; - - const handleConcurrency = async () => { - while (remainingTasks.length > 0) { - const runningTasks: (Promise | Task)[] = []; - - // Start tasks up to the concurrency limit - while (runningTasks.length < threshold && remainingTasks.length > 0) { - const task = remainingTasks.pop()!; - runningTasks.push(this.runTask(task)); - } + const limit = pLimit(threshold); - // Wait for all running tasks to complete - const completedTasks = await Promise.all(runningTasks); - values.push(...completedTasks); - } - }; + const promises: Promise[] = []; + for (const task of [...this._tasks.values()]) { + promises.push(limit(() => this.runTask(task))); + } - await handleConcurrency(); + const values = await Promise.all(promises); this.dispatchEvent(createBenchEvent('complete')); @@ -148,13 +137,42 @@ export default class Bench extends EventTarget { * warmup the benchmark tasks. * This is not run by default by the {@link run} method. */ - async warmup() { + async warmup(): Promise { + if (this.concurrency === 'bench') { + // TODO: in the next major, we should remove *Concurrently methods + await this.warmupConcurrently(this.threshold, this.concurrency); + return; + } this.dispatchEvent(createBenchEvent('warmup')); for (const [, task] of this._tasks) { await task.warmup(); } } + /** + * warmup the benchmark tasks concurrently. + * This is not run by default by the {@link runConcurrently} method. + */ + async warmupConcurrently(threshold = Infinity, mode: NonNullable = 'bench'): Promise { + this.threshold = threshold; + this.concurrency = mode; + + if (mode === 'task') { + await this.warmup(); + return; + } + + this.dispatchEvent(createBenchEvent('warmup')); + const limit = pLimit(threshold); + const promises: Promise[] = []; + + for (const [, task] of this._tasks) { + promises.push(limit(() => task.warmup())); + } + + await Promise.all(promises); + } + /** * reset each task and remove its result */ diff --git a/src/task.ts b/src/task.ts index 8170611..17627c5 100644 --- a/src/task.ts +++ b/src/task.ts @@ -1,3 +1,4 @@ +import pLimit from 'p-limit'; import type { Fn, TaskEvents, @@ -89,27 +90,21 @@ export default class Task extends EventTarget { } }; + const limit = pLimit(threshold); try { - const currentTasks: Promise[] = []; // only for task level concurrency + const promises: Promise[] = []; // only for task level concurrency while ( - (totalTime < time || ((samples.length + currentTasks.length) < iterations)) + (totalTime < time || ((samples.length + limit.activeCount + limit.pendingCount) < iterations)) && !this.bench.signal?.aborted ) { if (concurrent) { - if (currentTasks.length < threshold) { - currentTasks.push(executeTask()); - } else { - await Promise.all(currentTasks); - currentTasks.length = 0; - } + promises.push(limit(executeTask)); } else { await executeTask(); } } - // The threshold is Infinity - if (currentTasks.length) { - await Promise.all(currentTasks); - currentTasks.length = 0; + if (promises.length) { + await Promise.all(promises); } } catch (error) { return { error }; @@ -257,7 +252,6 @@ export default class Task extends EventTarget { */ reset() { this.dispatchEvent(createBenchEvent('reset', this)); - console.log('reset'); this.runs = 0; this.result = undefined; } diff --git a/test/sequential.test.ts b/test/sequential.test.ts index ea71ea6..27f3afb 100644 --- a/test/sequential.test.ts +++ b/test/sequential.test.ts @@ -29,10 +29,11 @@ test('sequential', async () => { expect(isFirstTaskDefined).toBe(true); }); -test('concurrent (bench level)', async () => { +test.each(['warmup', 'run'])('%s concurrent (bench level)', async (mode) => { const concurrentBench = new Bench({ time: 0, iterations: 100, + throws: true, }); let shouldBeDefined1: true; @@ -52,7 +53,12 @@ test('concurrent (bench level)', async () => { shouldNotBeDefinedFirst2 = true; }); - concurrentBench.runConcurrently(); + if (mode === 'warmup') { + concurrentBench.warmupConcurrently(); + } else { + concurrentBench.runConcurrently(); + } + await setTimeout(0); expect(shouldBeDefined1!).toBeDefined(); expect(shouldBeDefined2!).toBeDefined(); @@ -63,12 +69,13 @@ test('concurrent (bench level)', async () => { expect(shouldNotBeDefinedFirst2!).toBeDefined(); }); -test('concurrent (task level)', async () => { - console.log('here start'); +test.each(['warmup', 'run'])('%s concurrent (task level)', async (mode) => { const iterations = 10; const concurrentBench = new Bench({ time: 0, + warmupTime: 0, iterations, + warmupIterations: iterations, }); const key = 'sample 1'; @@ -78,30 +85,45 @@ test('concurrent (task level)', async () => { runs.value++; await setTimeout(10); // all task function should be here after 10ms - console.log(runs.value, iterations); expect(runs.value).toEqual(iterations); await setTimeout(10); }); - await concurrentBench.run(); - expect(concurrentBench.getTask(key)!.runs).toEqual(0); - for (const result of concurrentBench.results) { - expect(result?.error).toMatch(/AssertionError/); + if (mode === 'warmup') { + await concurrentBench.warmup(); + } else { + await concurrentBench.run(); + for (const result of concurrentBench.results) { + expect(result?.error).toMatch(/AssertionError/); + } } + expect(concurrentBench.getTask(key)!.runs).toEqual(0); + concurrentBench.reset(); runs.value = 0; - await concurrentBench.runConcurrently(); - expect(concurrentBench.getTask(key)!.runs).toEqual(0); - for (const result of concurrentBench.results) { - expect(result?.error).toMatch(/AssertionError/); + if (mode === 'warmup') { + await concurrentBench.warmupConcurrently(); + } else { + await concurrentBench.runConcurrently(); + for (const result of concurrentBench.results) { + expect(result?.error).toMatch(/AssertionError/); + } } + expect(concurrentBench.getTask(key)!.runs).toEqual(0); concurrentBench.reset(); runs.value = 0; - await concurrentBench.runConcurrently(Infinity, 'task'); - expect(concurrentBench.getTask(key)!.runs).toEqual(10); - for (const result of concurrentBench.results) { - expect(result?.error).toBeUndefined(); + if (mode === 'warmup') { + await concurrentBench.warmupConcurrently(Infinity, 'task'); + expect(runs.value).toEqual(10); + } else { + await concurrentBench.runConcurrently(Infinity, 'task'); + + for (const result of concurrentBench.results) { + expect(result?.error).toBeUndefined(); + } + expect(runs.value).toEqual(10); + expect(concurrentBench.getTask(key)!.runs).toEqual(10); } });