Skip to content

Commit

Permalink
feat(notifier): allow makeSubscriptionKit(initialState)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Jun 4, 2022
1 parent 59646f8 commit f9894f6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
7 changes: 6 additions & 1 deletion packages/notifier/src/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ const makeSubscriptionIterator = tailP => {
* distributed pub/sub.
*
* @template T
* @param {T[]} optionalInitialState
* @returns {SubscriptionRecord<T>}
*/
const makeSubscriptionKit = () => {
const makeSubscriptionKit = (...optionalInitialState) => {
/** @type {((internals: ERef<SubscriptionInternals<T>>) => void) | undefined} */
let rear;
const hp = new HandledPromise(r => (rear = r));
Expand Down Expand Up @@ -96,6 +97,10 @@ const makeSubscriptionKit = () => {
rear = undefined;
},
});

if (optionalInitialState.length > 0) {
publication.updateState(optionalInitialState[0]);
}
return harden({ publication, subscription });
};
harden(makeSubscriptionKit);
Expand Down
40 changes: 20 additions & 20 deletions packages/notifier/test/test-stored-subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ const makeFakeStorage = (path, publication) => {
};

test('stored subscription', async t => {
t.plan((jsonPairs.length + 1) * 4 + 1);
t.plan((jsonPairs.length + 2) * 4 + 1);

/** @type {any} */
const initialValue = 'first value';
const { publication: pubStorage, subscription: subStorage } =
makeSubscriptionKit();
const storage = makeFakeStorage('publish.foo.bar', pubStorage);
const { subscription, publication } = makeSubscriptionKit();
const { subscription, publication } = makeSubscriptionKit(initialValue);
const storesub = makeStoredSubscription(subscription, storage);

t.is(await E(storesub).getStoreKey(), await E(storage).getStoreKey());
Expand All @@ -43,22 +46,7 @@ test('stored subscription', async t => {
const storeAit = subStorage[Symbol.asyncIterator]();

const { unserialize } = makeMarshal();
const updateAndCheck = async (description, origValue, expectDone) => {
const nextP = E(ait).next();
if (expectDone) {
await E(publication).finish(origValue);
} else {
await E(publication).updateState(origValue);
}

const { done, value } = await nextP;
t.is(done, expectDone, `check doneness for ${description}`);
t.deepEqual(
value,
origValue,
`check value against original ${description}`,
);

const check = async (description, origValue, expectedDone) => {
const { done: storedDone, value: storedEncoded } = await storeAit.next();
t.assert(!storedDone, `check not stored done for ${description}`);
const storedDecoded = JSON.parse(storedEncoded);
Expand All @@ -68,12 +56,24 @@ test('stored subscription', async t => {
origValue,
`check stored value against original ${description}`,
);

const { done, value } = await E(ait).next();
t.is(done, expectedDone, `check doneness for ${description}`);
t.deepEqual(
value,
origValue,
`check value against original ${description}`,
);
};

await check(initialValue, initialValue, false);
for await (const [encoded] of jsonPairs) {
const origValue = unserialize({ body: encoded, slots: [] });
await updateAndCheck(encoded, origValue, false);
await E(publication).updateState(origValue);
await check(encoded, origValue, false);
}

await updateAndCheck('terminal', null, true);
const terminalNull = null;
await E(publication).finish(terminalNull);
await check('terminal null', terminalNull, true);
});

0 comments on commit f9894f6

Please sign in to comment.