From ffed9483195dc821f6a35b6a24ac8af6e6e33cb3 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Sun, 15 Jan 2023 11:29:23 -0600 Subject: [PATCH] De-register all shared workers Co-authored-by: Mark Wubben --- lib/plugin-support/shared-workers.js | 52 ++++++++++++------- .../fixtures/package.json | 8 +++ .../fixtures/test.js | 30 +++++++++++ .../fixtures/worker.mjs | 9 ++++ .../multiple-workers-are-loaded/test.js | 9 ++++ 5 files changed, 88 insertions(+), 20 deletions(-) create mode 100644 test/shared-workers/multiple-workers-are-loaded/fixtures/package.json create mode 100644 test/shared-workers/multiple-workers-are-loaded/fixtures/test.js create mode 100644 test/shared-workers/multiple-workers-are-loaded/fixtures/worker.mjs create mode 100644 test/shared-workers/multiple-workers-are-loaded/test.js diff --git a/lib/plugin-support/shared-workers.js b/lib/plugin-support/shared-workers.js index 1a23fa230..2d65454ab 100644 --- a/lib/plugin-support/shared-workers.js +++ b/lib/plugin-support/shared-workers.js @@ -49,40 +49,54 @@ function launchWorker(filename, initialData) { } export async function observeWorkerProcess(fork, runStatus) { - let registrationCount = 0; - let signalDeregistered; - let launched; - const deregistered = new Promise(resolve => { - signalDeregistered = () => { - // Only unref the worker once all test workers have been deregistered, otherwise the worker may exit before test workers are deregistered - launched?.worker.unref(); + let signalDone; + + const done = new Promise(resolve => { + signalDone = () => { resolve(); }; }); - fork.promise.finally(() => { - if (registrationCount === 0) { - signalDeregistered(); + const activeInstances = new Set(); + + const removeInstance = instance => { + instance.worker.unref(); + activeInstances.delete(instance); + + if (activeInstances.size === 0) { + signalDone(); + } + }; + + const removeAllInstances = () => { + if (activeInstances.size === 0) { + signalDone(); + return; } + + for (const instance of activeInstances) { + removeInstance(instance); + } + }; + + fork.promise.finally(() => { + removeAllInstances(); }); fork.onConnectSharedWorker(async ({filename, initialData, port, signalError}) => { - launched = launchWorker(filename, initialData); + const launched = launchWorker(filename, initialData); + activeInstances.add(launched); const handleWorkerMessage = async message => { if (message.type === 'deregistered-test-worker' && message.id === fork.threadId) { launched.worker.off('message', handleWorkerMessage); - - registrationCount--; - if (registrationCount === 0) { - signalDeregistered(); - } + removeInstance(launched); } }; launched.statePromises.error.then(error => { - signalDeregistered(); launched.worker.off('message', handleWorkerMessage); + removeAllInstances(); runStatus.emitStateChange({type: 'shared-worker-error', err: serializeError('Shared worker error', true, error)}); signalError(); }); @@ -90,8 +104,6 @@ export async function observeWorkerProcess(fork, runStatus) { try { await launched.statePromises.available; - registrationCount++; - port.postMessage({type: 'ready'}); launched.worker.postMessage({ @@ -112,5 +124,5 @@ export async function observeWorkerProcess(fork, runStatus) { } catch {} }); - return deregistered; + return done; } diff --git a/test/shared-workers/multiple-workers-are-loaded/fixtures/package.json b/test/shared-workers/multiple-workers-are-loaded/fixtures/package.json new file mode 100644 index 000000000..54f672450 --- /dev/null +++ b/test/shared-workers/multiple-workers-are-loaded/fixtures/package.json @@ -0,0 +1,8 @@ +{ + "type": "module", + "ava": { + "files": [ + "*.js" + ] + } +} diff --git a/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js b/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js new file mode 100644 index 000000000..6cf895d78 --- /dev/null +++ b/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js @@ -0,0 +1,30 @@ +import test from 'ava'; +import {registerSharedWorker} from 'ava/plugin'; + +const worker1 = registerSharedWorker({ + filename: new URL('worker.mjs#1', import.meta.url), + supportedProtocols: ['ava-4'], + initialData: { + id: '1', + }, +}); + +const worker2 = registerSharedWorker({ + filename: new URL('worker.mjs#2', import.meta.url), + supportedProtocols: ['ava-4'], + initialData: { + id: '2', + }, +}); + +const messageFromWorker1 = worker1.subscribe().next(); +const messageFromWorker2 = worker2.subscribe().next(); + +test('can load multiple workers', async t => { + const {value: {data: dataFromWorker1}} = await messageFromWorker1; + const {value: {data: dataFromWorker2}} = await messageFromWorker2; + + t.deepEqual(dataFromWorker1, {id: '1'}); + t.deepEqual(dataFromWorker2, {id: '2'}); + t.pass(); +}); diff --git a/test/shared-workers/multiple-workers-are-loaded/fixtures/worker.mjs b/test/shared-workers/multiple-workers-are-loaded/fixtures/worker.mjs new file mode 100644 index 000000000..1943f14ca --- /dev/null +++ b/test/shared-workers/multiple-workers-are-loaded/fixtures/worker.mjs @@ -0,0 +1,9 @@ +export default async ({negotiateProtocol}) => { + const protocol = negotiateProtocol(['ava-4']); + + await protocol.ready(); + + for await (const testWorker of protocol.testWorkers()) { + testWorker.publish(protocol.initialData); + } +}; diff --git a/test/shared-workers/multiple-workers-are-loaded/test.js b/test/shared-workers/multiple-workers-are-loaded/test.js new file mode 100644 index 000000000..a15ff6a2d --- /dev/null +++ b/test/shared-workers/multiple-workers-are-loaded/test.js @@ -0,0 +1,9 @@ +import test from '@ava/test'; + +import {fixture} from '../../helpers/exec.js'; + +test('can load multiple workers', async t => { + await fixture(); + + t.pass(); +});