Skip to content

Commit

Permalink
Merge pull request #5400 from Agoric/mfig-agoric-publish
Browse files Browse the repository at this point in the history
Publish amm pool list subscription to chain storage
  • Loading branch information
mergify[bot] committed Jun 4, 2022
2 parents 109ff65 + affccab commit 308309e
Show file tree
Hide file tree
Showing 19 changed files with 475 additions and 23 deletions.
4 changes: 2 additions & 2 deletions packages/notifier/src/asyncIterableAdaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export const makeAsyncIterableFromNotifier = notifierP => {
* @returns {Promise<undefined>}
*/
export const observeIterator = (asyncIteratorP, iterationObserver) => {
return new Promise(ack => {
return new Promise((ack, observerError) => {
const recur = () => {
E.when(
E(asyncIteratorP).next(),
Expand All @@ -117,7 +117,7 @@ export const observeIterator = (asyncIteratorP, iterationObserver) => {
iterationObserver.fail && iterationObserver.fail(reason);
ack(undefined);
},
);
).catch(observerError);
};
recur();
});
Expand Down
1 change: 1 addition & 0 deletions packages/notifier/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ export {
// Consider deprecating or not reexporting
makeAsyncIterableFromNotifier,
} from './asyncIterableAdaptor.js';
export * from './storesub.js';
78 changes: 78 additions & 0 deletions packages/notifier/src/storesub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// @ts-check
import { E } from '@endo/eventual-send';
import { Far, makeMarshal } from '@endo/marshal';
import { observeIteration } from './asyncIterableAdaptor.js';

/**
* Begin iterating the source, storing serialized iteration values. If the
* storageNode's `setValue` operation rejects, the iteration will be terminated.
*
* Returns a StoredSubscription that can be used by a client to directly follow
* the iteration themselves, or obtain information to subscribe to the stored
* data out-of-band.
*
* @template T
* @param {Subscription<T>} subscription
* @param {ERef<StorageNode>} [storageNode]
* @param {ERef<ReturnType<typeof makeMarshal>>} [marshaller]
* @returns {StoredSubscription<T>}
*/
export const makeStoredSubscription = (
subscription,
storageNode,
marshaller = makeMarshal(undefined, undefined, {
marshalSaveError: () => {},
}),
) => {
/** @type {Unserializer} */
const unserializer = Far('unserializer', {
unserialize: E(marshaller).unserialize,
});

// Abort the iteration on the next observation if the publisher ever fails.
let publishFailed = false;
let publishException;

const fail = err => {
publishFailed = true;
publishException = err;
};

// Must *not* be an async function, because it sometimes must throw to abort
// the iteration.
const publishValue = obj => {
assert(storageNode);
if (publishFailed) {
// To properly abort the iteration, this must be a synchronous exception.
throw publishException;
}

// Publish the value, capturing any error.
E(marshaller)
.serialize(obj)
.then(serialized => {
const encoded = JSON.stringify(serialized);
return E(storageNode).setValue(encoded);
})
.catch(fail);
};

if (storageNode) {
// Start publishing the source.
observeIteration(subscription, {
updateState: publishValue,
finish: publishValue,
}).catch(fail);
}

/** @type {StoredSubscription<T>} */
const storesub = Far('StoredSubscription', {
getStoreKey: () => storageNode && E(storageNode).getStoreKey(),
getUnserializer: () => unserializer,
getSharableSubscriptionInternals:
subscription.getSharableSubscriptionInternals,
[Symbol.asyncIterator]: subscription[Symbol.asyncIterator],
});
return storesub;
};
harden(makeStoredSubscription);
8 changes: 7 additions & 1 deletion packages/notifier/src/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const makeSubscriptionIterator = tailP => {
next: () => {
const resultP = E.get(tailP).head;
tailP = E.get(tailP).tail;
Promise.resolve(tailP).catch(() => {}); // suppress unhandled rejection error
return resultP;
},
});
Expand All @@ -56,9 +57,10 @@ const makeSubscriptionIterator = tailP => {
* distributed pub/sub.
*
* @template T
* @param {T[]} optionalInitialState
* @returns {SubscriptionRecord<T>}
*/
const makeSubscriptionKit = () => {
const makeSubscriptionKit = (...optionalInitialState) => {
/** @type {((internals: ERef<SubscriptionInternals<T>>) => void) | undefined} */
let rear;
const hp = new HandledPromise(r => (rear = r));
Expand Down Expand Up @@ -95,6 +97,10 @@ const makeSubscriptionKit = () => {
rear = undefined;
},
});

if (optionalInitialState.length > 0) {
publication.updateState(optionalInitialState[0]);
}
return harden({ publication, subscription });
};
harden(makeSubscriptionKit);
Expand Down
26 changes: 26 additions & 0 deletions packages/notifier/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,29 @@
* @property {IterationObserver<T>} publication
* @property {Subscription<T>} subscription
*/

/** @typedef {ReturnType<typeof import('@endo/marshal').makeMarshal>} Marshaller */

/**
* @typedef {object} Unserializer
* @property {Marshaller['unserialize']} unserialize
*/

/**
* @typedef {object} StorageNode
* @property {(data: string) => void} setValue publishes some data
* @property {() => unknown} getStoreKey get the externally-reachable store key
* for this storage item
* @property {(subPath: string) => StorageNode} getChildNode TODO: makeChildNode
*/

/**
* @typedef {object} StoredFacet
* @property {StorageNode['getStoreKey']} getStoreKey get the externally-reachable store key
* @property {() => Unserializer} getUnserializer get the unserializer for the stored data
*/

/**
* @template T
* @typedef {Subscription<T> & StoredFacet} StoredSubscription
*/
73 changes: 73 additions & 0 deletions packages/notifier/test/marshal-corpus.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Based on roundTripPairs from test-marshal.js
*
* A list of `[body, justinSrc]` pairs, where the body parses into
* an encoding that decodes to a Justin expression that evaluates to something
* that has the same encoding.
*/
export const jsonPairs = harden([
// Justin is the same as the JSON encoding but without unnecessary quoting
['[1,2]', '[1,2]'],
['{"foo":1}', '{foo:1}'],
['{"a":1,"b":2}', '{a:1,b:2}'],
['{"a":1,"b":{"c":3}}', '{a:1,b:{c:3}}'],
['true', 'true'],
['1', '1'],
['"abc"', '"abc"'],
['null', 'null'],

// Primitives not representable in JSON
['{"@qclass":"undefined"}', 'undefined'],
// FIGME: Uncomment when this is in agoric-sdk: https://github.com/endojs/endo/pull/1204
// ['{"@qclass":"NaN"}', 'NaN'], // TODO: uncomment
['{"@qclass":"Infinity"}', 'Infinity'],
['{"@qclass":"-Infinity"}', '-Infinity'],
['{"@qclass":"bigint","digits":"4"}', '4n'],
['{"@qclass":"bigint","digits":"9007199254740993"}', '9007199254740993n'],
['{"@qclass":"symbol","name":"@@asyncIterator"}', 'Symbol.asyncIterator'],
['{"@qclass":"symbol","name":"@@match"}', 'Symbol.match'],
['{"@qclass":"symbol","name":"foo"}', 'Symbol.for("foo")'],
['{"@qclass":"symbol","name":"@@@@foo"}', 'Symbol.for("@@foo")'],

// Arrays and objects
['[{"@qclass":"undefined"}]', '[undefined]'],
['{"foo":{"@qclass":"undefined"}}', '{foo:undefined}'],
['{"@qclass":"error","message":"","name":"Error"}', 'Error("")'],
[
'{"@qclass":"error","message":"msg","name":"ReferenceError"}',
'ReferenceError("msg")',
],

// The one case where JSON is not a semantic subset of JS
['{"__proto__":8}', '{["__proto__"]:8}'],

// The Hilbert Hotel is always tricky
['{"@qclass":"hilbert","original":8}', '{"@qclass":8}'],
['{"@qclass":"hilbert","original":"@qclass"}', '{"@qclass":"@qclass"}'],
[
'{"@qclass":"hilbert","original":{"@qclass":"hilbert","original":8}}',
'{"@qclass":{"@qclass":8}}',
],
[
'{"@qclass":"hilbert","original":{"@qclass":"hilbert","original":8,"rest":{"foo":"foo1"}},"rest":{"bar":{"@qclass":"hilbert","original":{"@qclass":"undefined"}}}}',
'{"@qclass":{"@qclass":8,foo:"foo1"},bar:{"@qclass":undefined}}',
],

// tagged
['{"@qclass":"tagged","tag":"x","payload":8}', 'makeTagged("x",8)'],
[
'{"@qclass":"tagged","tag":"x","payload":{"@qclass":"undefined"}}',
'makeTagged("x",undefined)',
],

// Slots
[
'[{"@qclass":"slot","iface":"Alleged: for testing Justin","index":0}]',
'[slot(0,"Alleged: for testing Justin")]',
],
// Tests https://github.com/endojs/endo/issues/1185 fix
[
'[{"@qclass":"slot","iface":"Alleged: for testing Justin","index":0},{"@qclass":"slot","index":0}]',
'[slot(0,"Alleged: for testing Justin"),slot(0)]',
],
]);
46 changes: 46 additions & 0 deletions packages/notifier/test/test-notifier-adaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,52 @@ test('observeIteration - update from iterator fails', t => {
return observeIteration(explodingStream, u);
});

test('observeIteration - synchronous throw from observer stops hard', async t => {
t.plan(2);
await t.throwsAsync(
() =>
observeIteration(finiteStream, {
updateState: state => {
t.is(state, 1);
throw new Error('sync propagates');
},
finish: state => {
t.fail(`unexpected finish with state ${state}`);
},
fail: reason => {
t.is(reason.message, 'sync propagates');
},
}),
{ message: 'sync propagates' },
);
});

test('observeIteration - rejection from observer does nothing', async t => {
const u = makeTestIterationObserver(t, true, false);
const handledRejection = reason => {
const r = Promise.reject(reason);
r.catch(() => {});
return r;
};
await observeIteration(finiteStream, {
...u,
updateState: state => {
u.updateState(state);
return handledRejection(
Error('updateState rejection does not propagate'),
);
},
finish: state => {
u.finish(state);
return handledRejection(Error('finish rejection does not propagate'));
},
fail: reason => {
u.fail(reason);
return handledRejection(Error('fail rejection does not propagate'));
},
});
});

// /////////////////////////////// NotifierKit /////////////////////////////////

test('notifier adaptor - manual finishes', async t => {
Expand Down
79 changes: 79 additions & 0 deletions packages/notifier/test/test-stored-subscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// @ts-check

// eslint-disable-next-line import/order
import { test } from './prepare-test-env-ava.js';

import { E } from '@endo/eventual-send';
import { Far, makeMarshal } from '@endo/marshal';
import { makeSubscriptionKit, makeStoredSubscription } from '../src/index.js';

import '../src/types.js';
import { jsonPairs } from './marshal-corpus.js';

const makeFakeStorage = (path, publication) => {
const fullPath = `publish.${path}`;
const storeKey = harden({
storeName: 'swingset',
storeSubkey: `swingset/data:${fullPath}`,
});
/** @type {StorageNode} */
const storage = Far('fakeStorage', {
getStoreKey: () => storeKey,
setValue: value => {
assert.typeof(value, 'string');
publication.updateState(value);
},
getChildNode: () => storage,
});
return storage;
};

test('stored subscription', async t => {
t.plan((jsonPairs.length + 2) * 4 + 1);

/** @type {any} */
const initialValue = 'first value';
const { publication: pubStorage, subscription: subStorage } =
makeSubscriptionKit();
const storage = makeFakeStorage('publish.foo.bar', pubStorage);
const { subscription, publication } = makeSubscriptionKit(initialValue);
const storesub = makeStoredSubscription(subscription, storage);

t.is(await E(storesub).getStoreKey(), await E(storage).getStoreKey());
const unserializer = E(storesub).getUnserializer();

const ait = E(storesub)[Symbol.asyncIterator]();
const storeAit = subStorage[Symbol.asyncIterator]();

const { unserialize } = makeMarshal();
const check = async (description, origValue, expectedDone) => {
const { done: storedDone, value: storedEncoded } = await storeAit.next();
t.assert(!storedDone, `check not stored done for ${description}`);
const storedDecoded = JSON.parse(storedEncoded);
const storedValue = await E(unserializer).unserialize(storedDecoded);
t.deepEqual(
storedValue,
origValue,
`check stored value against original ${description}`,
);

const { done, value } = await E(ait).next();
t.is(done, expectedDone, `check doneness for ${description}`);
t.deepEqual(
value,
origValue,
`check value against original ${description}`,
);
};

await check(initialValue, initialValue, false);
for await (const [encoded] of jsonPairs) {
const origValue = unserialize({ body: encoded, slots: [] });
await E(publication).updateState(origValue);
await check(encoded, origValue, false);
}

const terminalNull = null;
await E(publication).finish(terminalNull);
await check('terminal null', terminalNull, true);
});
2 changes: 2 additions & 0 deletions packages/run-protocol/src/proposals/core-proposal.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const ECON_COMMITTEE_MANIFEST = harden({
const SHARED_MAIN_MANIFEST = harden({
[econBehaviors.setupAmm.name]: {
consume: {
board: 'board',
chainStorage: true,
chainTimerService: 'timer',
zoe: 'zoe',
economicCommitteeCreatorFacet: 'economicCommittee',
Expand Down
Loading

0 comments on commit 308309e

Please sign in to comment.