Skip to content

Commit

Permalink
p-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Aslemammad committed Apr 21, 2024
1 parent 66e022e commit 057a7d3
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 51 deletions.
1 change: 1 addition & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"commonjs": true
},
"rules": {
"import/no-extraneous-dependencies": "off",
"no-restricted-syntax": "off",
"no-await-in-loop": "off",
"no-plusplus": "off",
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ export type Hook = (task: Task, mode: "warmup" | "run") => void | Promise<void>;
```

- `async run()`: run the added tasks that were registered using the `add` method
- `async runConcurrently(limit: number = Infinity, mode: "bench" | "task" = "bench")`: similar to the `run` method but runs concurrently rather than sequentially. See the [Concurrency](#Concurrency) section.
- `async runConcurrently(threshold: number = Infinity, mode: "bench" | "task" = "bench")`: similar to the `run` method but runs concurrently rather than sequentially. See the [Concurrency](#Concurrency) section.
- `async warmup()`: warm up the benchmark tasks
- `async warmupConcurrently(threshold: number = Infinity, mode: "bench" | "task" = "bench")`: warm up the benchmark tasks concurrently
- `reset()`: reset each task and remove its result
- `add(name: string, fn: Fn, opts?: FnOpts)`: add a benchmark task to the task map
- `Fn`: `() => any | Promise<any>`
Expand Down Expand Up @@ -374,7 +375,7 @@ It may make your benchmarks slower, check #42.

- When `mode` is set to `null` (default), concurrency is disabled.
- When `mode` is set to 'task', each task's iterations run concurrently.
- When `mode` is set to 'bench', different tasks within the bench run concurrently
- When `mode` is set to 'bench', different tasks within the bench run concurrently. Concurrent cycles.

```ts
// options way (recommended)
Expand All @@ -384,6 +385,7 @@ bench.concurrency = "task" // The concurrency mode to determine how tasks are ru
await bench.run()

// standalone method way
// await bench.warmupConcurrently(10, "task")
await bench.runConcurrently(10, "task") // with runConcurrently, mode is set to 'bench' by default
```

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"eslint-config-airbnb-base": "^15.0.0",
"eslint-plugin-import": "^2.26.0",
"nano-staged": "^0.5.0",
"p-limit": "^5.0.0",
"size-limit": "^7.0.8",
"tsup": "^5.11.7",
"typescript": "^5.2.2",
Expand Down
10 changes: 10 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 37 additions & 19 deletions src/bench.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pLimit from 'p-limit';
import type {
Hook,
Options,
Expand Down Expand Up @@ -118,26 +119,14 @@ export default class Bench extends EventTarget {

this.dispatchEvent(createBenchEvent('start'));

const remainingTasks = [...this._tasks.values()];
const values: Task[] = [];

const handleConcurrency = async () => {
while (remainingTasks.length > 0) {
const runningTasks: (Promise<Task> | Task)[] = [];

// Start tasks up to the concurrency limit
while (runningTasks.length < threshold && remainingTasks.length > 0) {
const task = remainingTasks.pop()!;
runningTasks.push(this.runTask(task));
}
const limit = pLimit(threshold);

// Wait for all running tasks to complete
const completedTasks = await Promise.all(runningTasks);
values.push(...completedTasks);
}
};
const promises: Promise<Task>[] = [];
for (const task of [...this._tasks.values()]) {
promises.push(limit(() => this.runTask(task)));
}

await handleConcurrency();
const values = await Promise.all(promises);

this.dispatchEvent(createBenchEvent('complete'));

Expand All @@ -148,13 +137,42 @@ export default class Bench extends EventTarget {
* warmup the benchmark tasks.
* This is not run by default by the {@link run} method.
*/
async warmup() {
async warmup(): Promise<void> {
if (this.concurrency === 'bench') {
// TODO: in the next major, we should remove *Concurrently methods
await this.warmupConcurrently(this.threshold, this.concurrency);
return;
}
this.dispatchEvent(createBenchEvent('warmup'));
for (const [, task] of this._tasks) {
await task.warmup();
}
}

/**
* warmup the benchmark tasks concurrently.
* This is not run by default by the {@link runConcurrently} method.
*/
async warmupConcurrently(threshold = Infinity, mode: NonNullable<Bench['concurrency']> = 'bench'): Promise<void> {
this.threshold = threshold;
this.concurrency = mode;

if (mode === 'task') {
await this.warmup();
return;
}

this.dispatchEvent(createBenchEvent('warmup'));
const limit = pLimit(threshold);
const promises: Promise<void>[] = [];

for (const [, task] of this._tasks) {
promises.push(limit(() => task.warmup()));
}

await Promise.all(promises);
}

/**
* reset each task and remove its result
*/
Expand Down
20 changes: 7 additions & 13 deletions src/task.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pLimit from 'p-limit';
import type {
Fn,
TaskEvents,
Expand Down Expand Up @@ -89,27 +90,21 @@ export default class Task extends EventTarget {
}
};

const limit = pLimit(threshold);
try {
const currentTasks: Promise<void>[] = []; // only for task level concurrency
const promises: Promise<void>[] = []; // only for task level concurrency
while (
(totalTime < time || ((samples.length + currentTasks.length) < iterations))
(totalTime < time || ((samples.length + limit.activeCount + limit.pendingCount) < iterations))
&& !this.bench.signal?.aborted
) {
if (concurrent) {
if (currentTasks.length < threshold) {
currentTasks.push(executeTask());
} else {
await Promise.all(currentTasks);
currentTasks.length = 0;
}
promises.push(limit(executeTask));
} else {
await executeTask();
}
}
// The threshold is Infinity
if (currentTasks.length) {
await Promise.all(currentTasks);
currentTasks.length = 0;
if (promises.length) {
await Promise.all(promises);
}
} catch (error) {
return { error };
Expand Down Expand Up @@ -257,7 +252,6 @@ export default class Task extends EventTarget {
*/
reset() {
this.dispatchEvent(createBenchEvent('reset', this));
console.log('reset');
this.runs = 0;
this.result = undefined;
}
Expand Down
56 changes: 39 additions & 17 deletions test/sequential.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ test('sequential', async () => {
expect(isFirstTaskDefined).toBe(true);
});

test('concurrent (bench level)', async () => {
test.each(['warmup', 'run'])('%s concurrent (bench level)', async (mode) => {
const concurrentBench = new Bench({
time: 0,
iterations: 100,
throws: true,
});

let shouldBeDefined1: true;
Expand All @@ -52,7 +53,12 @@ test('concurrent (bench level)', async () => {
shouldNotBeDefinedFirst2 = true;
});

concurrentBench.runConcurrently();
if (mode === 'warmup') {
concurrentBench.warmupConcurrently();
} else {
concurrentBench.runConcurrently();
}

await setTimeout(0);
expect(shouldBeDefined1!).toBeDefined();
expect(shouldBeDefined2!).toBeDefined();
Expand All @@ -63,12 +69,13 @@ test('concurrent (bench level)', async () => {
expect(shouldNotBeDefinedFirst2!).toBeDefined();
});

test('concurrent (task level)', async () => {
console.log('here start');
test.each(['warmup', 'run'])('%s concurrent (task level)', async (mode) => {
const iterations = 10;
const concurrentBench = new Bench({
time: 0,
warmupTime: 0,
iterations,
warmupIterations: iterations,
});
const key = 'sample 1';

Expand All @@ -78,30 +85,45 @@ test('concurrent (task level)', async () => {
runs.value++;
await setTimeout(10);
// all task function should be here after 10ms
console.log(runs.value, iterations);
expect(runs.value).toEqual(iterations);
await setTimeout(10);
});

await concurrentBench.run();
expect(concurrentBench.getTask(key)!.runs).toEqual(0);
for (const result of concurrentBench.results) {
expect(result?.error).toMatch(/AssertionError/);
if (mode === 'warmup') {
await concurrentBench.warmup();
} else {
await concurrentBench.run();
for (const result of concurrentBench.results) {
expect(result?.error).toMatch(/AssertionError/);
}
}
expect(concurrentBench.getTask(key)!.runs).toEqual(0);

concurrentBench.reset();
runs.value = 0;

await concurrentBench.runConcurrently();
expect(concurrentBench.getTask(key)!.runs).toEqual(0);
for (const result of concurrentBench.results) {
expect(result?.error).toMatch(/AssertionError/);
if (mode === 'warmup') {
await concurrentBench.warmupConcurrently();
} else {
await concurrentBench.runConcurrently();
for (const result of concurrentBench.results) {
expect(result?.error).toMatch(/AssertionError/);
}
}
expect(concurrentBench.getTask(key)!.runs).toEqual(0);
concurrentBench.reset();
runs.value = 0;

await concurrentBench.runConcurrently(Infinity, 'task');
expect(concurrentBench.getTask(key)!.runs).toEqual(10);
for (const result of concurrentBench.results) {
expect(result?.error).toBeUndefined();
if (mode === 'warmup') {
await concurrentBench.warmupConcurrently(Infinity, 'task');
expect(runs.value).toEqual(10);
} else {
await concurrentBench.runConcurrently(Infinity, 'task');

for (const result of concurrentBench.results) {
expect(result?.error).toBeUndefined();
}
expect(runs.value).toEqual(10);
expect(concurrentBench.getTask(key)!.runs).toEqual(10);
}
});

0 comments on commit 057a7d3

Please sign in to comment.