Skip to content

Commit

Permalink
feat(captp): leverage makeSubscriptionKit to drive trapHost
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Jul 16, 2021
1 parent a8e0e96 commit a350b9d
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 91 deletions.
1 change: 1 addition & 0 deletions packages/captp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"@agoric/eventual-send": "^0.13.22",
"@agoric/marshal": "^0.4.19",
"@agoric/nat": "^4.1.0",
"@agoric/notifier": "^0.3.22",
"@agoric/promise-kit": "^0.2.20",
"esm": "agoric-labs/esm#Agoric-built"
},
Expand Down
114 changes: 70 additions & 44 deletions packages/captp/src/captp.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
QCLASS,
} from '@agoric/marshal';
import { E, HandledPromise } from '@agoric/eventual-send';
import { makeSubscriptionKit, observeIteration } from '@agoric/notifier';
import { isPromise } from '@agoric/promise-kit';
import { assert, details as X } from '@agoric/assert';

Expand Down Expand Up @@ -74,7 +75,8 @@ export const makeCapTP = (
const disconnectReason = id =>
Error(`${JSON.stringify(id)} connection closed`);

const trapGiveMore = new Map();
/** @type {Map<string, IterationObserver<any>>} */
const trapPublishers = new Map();

/** @type {any} */
let unplug = false;
Expand Down Expand Up @@ -400,17 +402,34 @@ export const makeCapTP = (
'function',
X`CapTP cannot answer Trap(x) without a trapHost function`,
);

// We need to create the publication right now to prevent a race with
// the other side.
const { subscription, publication } = makeSubscriptionKit();
trapPublishers.set(questionID, publication);

sendReturn = async (isReject, value) => {
const serialized = serialize(harden(value));
const giveMore = trapHost([isReject, serialized]);
if (giveMore) {
assert.typeof(
giveMore,
'function',
X`CapTP trapHost.reply result can only be a giveMore function`,
);
trapGiveMore.set(questionID, giveMore);
const it = trapHost([isReject, serialized]);
if (!it) {
trapPublishers.delete(questionID);
return;
}

// Drive the trapHost async iterator via the subscription.
observeIteration(subscription, {
updateState(nonFinalValue) {
it.next(nonFinalValue);
},
finish(completion) {
trapPublishers.delete(questionID);
it.return && it.return(completion);
},
fail(reason) {
trapPublishers.delete(questionID);
it.throw && it.throw(reason);
},
}).catch(e => console.error('error observing', e));
};
} catch (e) {
sendReturn(true, e);
Expand Down Expand Up @@ -440,23 +459,26 @@ export const makeCapTP = (
.catch(rej => quietReject(rej, false));
},
// Have the host serve more of the reply.
CTP_TRAP_TAKE_MORE: async obj => {
assert(trapHost, X`CTP_TRAP_TAKE_MORE is impossible without a trapHost`);
const giveMore = trapGiveMore.get(obj.questionID);
trapGiveMore.delete(obj.questionID);
assert.typeof(
giveMore,
'function',
X`CTP_TRAP_TAKE_MORE did not expect ${obj.questionID}`,
);
const nextGiveMore = giveMore(obj.data);
if (nextGiveMore) {
assert.typeof(
nextGiveMore,
'function',
X`CapTP giveMore result can only be another giveMore function`,
);
trapGiveMore.set(obj.questionID, nextGiveMore);
CTP_TRAP_ITERATE: async obj => {
assert(trapHost, X`CTP_TRAP_ITERATE is impossible without a trapHost`);
const { questionID, serialized } = obj;
const pub = trapPublishers.get(questionID);
assert(pub, X`CTP_TRAP_ITERATE did not expect ${questionID}`);
const [method, args] = unserialize(serialized);
try {
switch (method) {
case 'updateState':
case 'finish':
case 'fail': {
pub[method](...args);
break;
}
default: {
assert.fail(X`Unexpected CTP_TRAP_ITERATE method ${method}`);
}
}
} catch (e) {
pub.fail(e);
}
},
// Answer to one of our questions.
Expand Down Expand Up @@ -587,7 +609,7 @@ export const makeCapTP = (

// Send a "trap" message.
lastQuestionID += 1;
const questionID = lastQuestionID;
const questionID = `${ourId}#${lastQuestionID}`;

// Encode the "method" parameter of the CTP_CALL.
let method;
Expand All @@ -614,31 +636,35 @@ export const makeCapTP = (

// Set up the trap call with its identifying information and a way to send
// messages over the current CapTP data channel.
let isFirst = true;
const [isException, serialized] = trapGuest({
implMethod,
slot,
implArgs,
takeMore: data => {
if (isFirst) {
// Send the call metadata over the connection.
isFirst = false;
send({
type: 'CTP_CALL',
epoch,
trap: true, // This is the magic marker.
questionID,
target: slot,
method,
});
} else {
trapToHost: () => {
// Send the call metadata over the connection.
send({
type: 'CTP_CALL',
epoch,
trap: true, // This is the magic marker.
questionID,
target: slot,
method,
});

// Return an IterationObserver.
const makeObserverMethod = observerMethod => (...args) => {
send({
type: 'CTP_TRAP_TAKE_MORE',
type: 'CTP_TRAP_ITERATE',
epoch,
questionID,
data,
serialized: serialize(harden([observerMethod, args])),
});
}
};
return harden({
updateState: makeObserverMethod('updateState'),
fail: makeObserverMethod('fail'),
finish: makeObserverMethod('finish'),
});
},
});

Expand Down
23 changes: 4 additions & 19 deletions packages/captp/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
* @property {keyof TrapImpl} implMethod the TrapImpl method that was called
* @property {CapTPSlot} slot the target slot
* @property {Array<any>} implArgs arguments to the TrapImpl method
* @property {(data?: any) => void} takeMore send some data over the existing
* CapTP data channel for the trapHost to receive and supply us with more of the
* synchronous result
* @property {() => IterationObserver<any>} trapToHost start the trap process on
* the trapHost, and drive the other side.
*/

/**
Expand All @@ -43,22 +42,8 @@
* @callback TrapHost start the process of transferring the Trap request's
* results
* @param {TrapCompletion} completion
* @returns {undefined | ((data: any) => void)} If a function is returned, it will
* satisfy a future `takeMore`.
*/

/**
* @callback GiveTrapReply Return an AsyncGenerator which is synchronously
* iterated by TakeTrapReply's generator to signal readiness of parts of the
* reply (there may be only one). These two generators must be written to
* cooperate over a specific CapTP connection, and via any out-of-band
* mechanisms as well.
*
* @param {TrapCompletion[0]} isReject whether the reply to communicate was a
* rejection or a regular return
* @param {TrapCompletion[1]} serialized the marshal-serialized data to be
* communicated to the other side. Note that the serialized data is JSONable.
* @returns {AsyncGenerator<void, void, any>}
* @returns {AsyncIterator<void, void, any> | undefined} If an AsyncIterator is returned, it
* will satisfy a future guest IterationObserver.
*/

/** @typedef {import('./ts-types').Trap} Trap */
Expand Down
55 changes: 27 additions & 28 deletions packages/captp/test/traplib.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const createGuestBootstrap = (Trap, other) => {
};

const SEM_REJECT = 1;
const SEM_READY = 2;
const SEM_DONE = 2;
const SEM_AGAIN = 4;
const SEM_WAITING = 8;

Expand All @@ -99,32 +99,30 @@ export const makeHost = (send, sab) => {
send,
() => createHostBootstrap(makeTrapHandler),
{
trapHost: ([isReject, ser]) => {
// We need a bufferable message.
async *trapHost([isReject, ser]) {
// Get the complete encoded message buffer.
const json = JSON.stringify(ser);
const encoded = te.encode(json);
let i = 0;

// Send chunks in the data transfer buffer.
const sendChunk = () => {
let i = 0;
let done = false;
while (!done) {
const subenc = encoded.subarray(i, i + databuf.length);
databuf.set(subenc);
sembuf[1] = encoded.length - i;

i += subenc.length;
const done = i >= encoded.length;
done = i >= encoded.length;

sembuf[0] =
// eslint-disable-next-line no-bitwise
(done ? SEM_READY : SEM_AGAIN) | (isReject ? SEM_REJECT : 0);
(done ? SEM_DONE : SEM_AGAIN) | (isReject ? SEM_REJECT : 0);
Atomics.notify(sembuf, 0, +Infinity);

if (done) {
// All done!
return undefined;
}

return sendChunk;
};
return sendChunk();
// Wait until the next call.
yield;
}
},
},
);
Expand All @@ -139,27 +137,28 @@ export const makeGuest = (send, sab) => {
send,
() => createGuestBootstrap(Trap, getBootstrap()),
{
trapGuest: ({ takeMore }) => {
trapGuest: ({ trapToHost }) => {
const td = new TextDecoder('utf-8');

// Initialize the reply.
const next = () => {
let json = '';

// Start by sending the trap to the host.
const pub = trapToHost();

let done = false;
while (!done) {
sembuf[0] = SEM_WAITING;
takeMore();
pub.updateState();

// Wait for the reply to return.
Atomics.wait(sembuf, 0, SEM_WAITING);
};

next();

let json = '';
// eslint-disable-next-line no-bitwise
while (sembuf[0] & SEM_AGAIN) {
json += td.decode(databuf.subarray(0, sembuf[1]), { stream: true });
next();
// eslint-disable-next-line no-bitwise
done = (sembuf[0] & SEM_DONE) !== 0;
json += td.decode(databuf.subarray(0, sembuf[1]), { stream: !done });
}

json += td.decode(databuf.subarray(0, sembuf[1]));
pub.finish();

// eslint-disable-next-line no-bitwise
const isReject = !!(sembuf[0] & SEM_REJECT);
Expand Down

0 comments on commit a350b9d

Please sign in to comment.