From c1282c9e644fbea742846f96a80a06afe64664ba Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Tue, 17 Mar 2020 20:23:39 -0600 Subject: [PATCH] fix: input queuing, and use the block manager for fake-chain --- packages/SwingSet/src/devices/command-src.js | 6 +- packages/SwingSet/src/devices/timer-src.js | 5 +- packages/SwingSet/src/kernel/kernel.js | 24 +++++-- .../cosmic-swingset/lib/ag-solo/fake-chain.js | 35 +++++++--- packages/cosmic-swingset/lib/ag-solo/queue.js | 43 ++++++++++++ packages/cosmic-swingset/lib/ag-solo/start.js | 70 ++++++++----------- 6 files changed, 117 insertions(+), 66 deletions(-) create mode 100644 packages/cosmic-swingset/lib/ag-solo/queue.js diff --git a/packages/SwingSet/src/devices/command-src.js b/packages/SwingSet/src/devices/command-src.js index 0ed4853f9eb..9080a29d049 100644 --- a/packages/SwingSet/src/devices/command-src.js +++ b/packages/SwingSet/src/devices/command-src.js @@ -21,7 +21,7 @@ export default function setup(syscall, state, helpers, endowments) { const body = JSON.parse(`${bodyString}`); SO(inboundHandler).inbound(Nat(count), body); } catch (e) { - console.log(`error during inboundCallback`, e); + console.log(`error during inboundCallback: ${e}`); throw new Error(`error during inboundCallback: ${e}`); } }); @@ -33,10 +33,6 @@ export default function setup(syscall, state, helpers, endowments) { }, sendResponse(count, isReject, obj) { - if (isReject && obj instanceof Error) { - console.log('inboundHandler rejected with:', obj); - obj = { error: `${obj}` }; - } try { deliverResponse(count, isReject, JSON.stringify(obj)); } catch (e) { diff --git a/packages/SwingSet/src/devices/timer-src.js b/packages/SwingSet/src/devices/timer-src.js index f3a356b9fcb..e360ade6f67 100644 --- a/packages/SwingSet/src/devices/timer-src.js +++ b/packages/SwingSet/src/devices/timer-src.js @@ -193,10 +193,7 @@ export default function setup(syscall, state, helpers, endowments) { // The latest time poll() was called. This might be a block height or it // might be a time from Date.now(). The current time is not reflected back // to the user. - // - // Note that during a replay, we may be replaying an older time, - // so we always need to start at 0. - let lastPolled = 0; + let lastPolled = restart ? restart.lastPolled : 0; let nextRepeater = restart ? restart.nextRepeater : 0; function getLastPolled() { diff --git a/packages/SwingSet/src/kernel/kernel.js b/packages/SwingSet/src/kernel/kernel.js index 140747b1e9a..4e960cc4b9c 100644 --- a/packages/SwingSet/src/kernel/kernel.js +++ b/packages/SwingSet/src/kernel/kernel.js @@ -364,16 +364,26 @@ export default function buildKernel(kernelEndowments) { } } + let processQueueRunning; async function processQueueMessage(message) { kdebug(`processQ ${JSON.stringify(message)}`); - if (message.type === 'send') { - await deliverToTarget(message.target, message.msg); - } else if (message.type === 'notify') { - await processNotify(message); - } else { - throw Error(`unable to process message.type ${message.type}`); + if (processQueueRunning) { + console.log(`We're currently already running at`, processQueueRunning); + throw Error(`Kernel reentrancy is forbidden`); + } + try { + processQueueRunning = Error('here'); + if (message.type === 'send') { + await deliverToTarget(message.target, message.msg); + } else if (message.type === 'notify') { + await processNotify(message); + } else { + throw Error(`unable to process message.type ${message.type}`); + } + commitCrank(); + } finally { + processQueueRunning = undefined; } - commitCrank(); } function validateVatSetupFn(setup) { diff --git a/packages/cosmic-swingset/lib/ag-solo/fake-chain.js b/packages/cosmic-swingset/lib/ag-solo/fake-chain.js index ece8c2ede93..1fc3b7794bb 100644 --- a/packages/cosmic-swingset/lib/ag-solo/fake-chain.js +++ b/packages/cosmic-swingset/lib/ag-solo/fake-chain.js @@ -3,6 +3,7 @@ import path from 'path'; import fs from 'fs'; import stringify from '@agoric/swingset-vat/src/kernel/json-stable-stringify'; import { launch } from '../launch-chain'; +import makeBlockManager from '../block-manager'; const PRETEND_BLOCK_DELAY = 5; @@ -36,12 +37,18 @@ export async function connectToFakeChain(basedir, GCI, role, delay, inbound) { const argv = [`--role=${role}`, bootAddress]; const stateDBdir = path.join(basedir, `fake-chain-${GCI}-state`); const s = await launch(stateDBdir, mailboxStorage, vatsdir, argv); - const { deliverInbound, beginBlock, saveChainState, saveOutsideState } = s; - let pretendLast = Date.now(); - let blockHeight = 0; + const blockManager = makeBlockManager(s); + const { savedHeight, savedActions } = s; + + let blockHeight = savedHeight; + let blockTime = + savedActions.length > 0 + ? savedActions[0].blockTime + : Math.floor(Date.now() / 1000); let intoChain = []; let thisBlock = []; + async function simulateBlock() { const actualStart = Date.now(); // Gather up the new messages into the latest block. @@ -49,20 +56,28 @@ export async function connectToFakeChain(basedir, GCI, role, delay, inbound) { intoChain = []; try { - const commitStamp = pretendLast + PRETEND_BLOCK_DELAY * 1000; - const blockTime = Math.floor(commitStamp / 1000); - await beginBlock(blockHeight, blockTime); + blockTime += PRETEND_BLOCK_DELAY; + blockHeight += 1; + + await blockManager({ type: 'BEGIN_BLOCK', blockHeight, blockTime }); for (let i = 0; i < thisBlock.length; i += 1) { const [newMessages, acknum] = thisBlock[i]; - await deliverInbound(bootAddress, newMessages, acknum); + await blockManager({ + type: 'DELIVER_INBOUND', + peer: bootAddress, + messages: newMessages, + ack: acknum, + blockHeight, + blockTime, + }); } + await blockManager({ type: 'END_BLOCK', blockHeight, blockTime }); // Done processing, "commit the block". - saveChainState(); - saveOutsideState(); + await blockManager({ type: 'COMMIT_BLOCK', blockHeight, blockTime }); await writeMap(mailboxFile, mailboxStorage); thisBlock = []; - pretendLast = commitStamp + Date.now() - actualStart; + blockTime = blockTime + Date.now() - actualStart; blockHeight += 1; } catch (e) { console.log(`error fake processing`, e); diff --git a/packages/cosmic-swingset/lib/ag-solo/queue.js b/packages/cosmic-swingset/lib/ag-solo/queue.js new file mode 100644 index 00000000000..d6fd30c386d --- /dev/null +++ b/packages/cosmic-swingset/lib/ag-solo/queue.js @@ -0,0 +1,43 @@ +import makePromise from '@agoric/make-promise'; + +// Return a function that can wrap an async or sync method, but +// ensures only one of them (in order) is running at a time. +export const makeWithQueue = () => { + const queue = []; + + // Execute the thunk at the front of the queue. + const dequeue = () => { + if (!queue.length) { + return; + } + const [thunk, resolve, reject] = queue[0]; + // Run the thunk in a new turn. + Promise.resolve() + .then(thunk) + // Resolve or reject our caller with the thunk's value. + .then(resolve, reject) + // Rerun dequeue() after settling. + .finally(() => { + queue.shift(); + dequeue(); + }); + }; + + return function withQueue(inner) { + return function queueCall(...args) { + // Curry the arguments into the inner function, and + // resolve/reject with whatever the inner function does. + const thunk = _ => inner(...args); + const pr = makePromise(); + queue.push([thunk, pr.res, pr.rej]); + + if (queue.length === 1) { + // Start running immediately. + dequeue(); + } + + // Allow the caller to retrieve our thunk's results. + return pr.p; + }; + }; +}; diff --git a/packages/cosmic-swingset/lib/ag-solo/start.js b/packages/cosmic-swingset/lib/ag-solo/start.js index c13467381be..41c49f118a7 100644 --- a/packages/cosmic-swingset/lib/ag-solo/start.js +++ b/packages/cosmic-swingset/lib/ag-solo/start.js @@ -23,6 +23,7 @@ import { import { deliver, addDeliveryTarget } from './outbound'; import { makeHTTPListener } from './web'; +import { makeWithQueue } from './queue'; import { connectToChain } from './chain-cosmos-sdk'; import { connectToFakeChain } from './fake-chain'; @@ -121,44 +122,23 @@ async function buildSwingset( } } - // Return a function that can wrap an async or sync thunk, but - // ensures only one of them (in order) is running at a time. - const makeWithCriticalSection = () => { - let runQueueP = Promise.resolve(); - return function withCriticalSection(inner) { - return function wrappedCall(...args) { - // Curry the arguments into the inner function, and - // resolve/reject with whatever the inner function does. - const thunk = _ => inner(...args); - - // Atomically replace the runQueue by appending us to it. - runQueueP = runQueueP.then(thunk, thunk); - - // Return the promisified version of the thunk. - return runQueueP; - }; - }; - }; - - const withInboundCriticalSection = makeWithCriticalSection(); + const withInputQueue = makeWithQueue(); - // Use the critical section to make sure it doesn't overlap with + // Use the input queue to make sure it doesn't overlap with // other inbound messages. - const deliverInboundToMbx = withInboundCriticalSection( - async (sender, messages, ack) => { - if (!(messages instanceof Array)) { - throw new Error(`inbound given non-Array: ${messages}`); - } - // console.log(`deliverInboundToMbx`, messages, ack); - if (mb.deliverInbound(sender, messages, ack, true)) { - await processKernel(); - } - }, - ); + const deliverInboundToMbx = withInputQueue(async (sender, messages, ack) => { + if (!(messages instanceof Array)) { + throw new Error(`inbound given non-Array: ${messages}`); + } + // console.log(`deliverInboundToMbx`, messages, ack); + if (mb.deliverInbound(sender, messages, ack, true)) { + await processKernel(); + } + }); - // Use the critical section to make sure it doesn't overlap with + // Use the input queue to make sure it doesn't overlap with // other inbound messages. - const deliverInboundCommand = withInboundCriticalSection(async obj => { + const deliverInboundCommand = withInputQueue(async obj => { // this promise could take an arbitrarily long time to resolve, so don't // wait on it const p = cm.inboundCommand(obj); @@ -177,11 +157,11 @@ async function buildSwingset( }); }); - const intervalMillis = 1200; + let intervalMillis; - // Use the critical section to make sure it doesn't overlap with + // Use the input queue to make sure it doesn't overlap with // other inbound messages. - const moveTimeForward = withInboundCriticalSection(async () => { + const moveTimeForward = withInputQueue(async () => { const now = Math.floor(Date.now() / intervalMillis); try { if (timer.poll(now)) { @@ -201,12 +181,14 @@ async function buildSwingset( // now let the bootstrap functions run await processKernel(); - setTimeout(moveTimeForward, intervalMillis); - return { deliverInboundToMbx, deliverInboundCommand, deliverOutbound, + startTimer: interval => { + intervalMillis = interval; + setTimeout(moveTimeForward, intervalMillis); + }, }; } @@ -239,7 +221,12 @@ export default async function start(basedir, withSES, argv) { broadcast, ); - const { deliverInboundToMbx, deliverInboundCommand, deliverOutbound } = d; + const { + deliverInboundToMbx, + deliverInboundCommand, + deliverOutbound, + startTimer, + } = d; await Promise.all( connections.map(async c => { @@ -292,6 +279,9 @@ export default async function start(basedir, withSES, argv) { }), ); + // Start timer here! + startTimer(1200); + console.log(`swingset running`); swingSetRunning = true; deliverOutbound();