Skip to content

Commit

Permalink
fix: input queuing, and use the block manager for fake-chain
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Mar 18, 2020
1 parent 6f80429 commit c1282c9
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 66 deletions.
6 changes: 1 addition & 5 deletions packages/SwingSet/src/devices/command-src.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export default function setup(syscall, state, helpers, endowments) {
const body = JSON.parse(`${bodyString}`);
SO(inboundHandler).inbound(Nat(count), body);
} catch (e) {
console.log(`error during inboundCallback`, e);
console.log(`error during inboundCallback: ${e}`);
throw new Error(`error during inboundCallback: ${e}`);
}
});
Expand All @@ -33,10 +33,6 @@ export default function setup(syscall, state, helpers, endowments) {
},

sendResponse(count, isReject, obj) {
if (isReject && obj instanceof Error) {
console.log('inboundHandler rejected with:', obj);
obj = { error: `${obj}` };
}
try {
deliverResponse(count, isReject, JSON.stringify(obj));
} catch (e) {
Expand Down
5 changes: 1 addition & 4 deletions packages/SwingSet/src/devices/timer-src.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,7 @@ export default function setup(syscall, state, helpers, endowments) {
// The latest time poll() was called. This might be a block height or it
// might be a time from Date.now(). The current time is not reflected back
// to the user.
//
// Note that during a replay, we may be replaying an older time,
// so we always need to start at 0.
let lastPolled = 0;
let lastPolled = restart ? restart.lastPolled : 0;
let nextRepeater = restart ? restart.nextRepeater : 0;

function getLastPolled() {
Expand Down
24 changes: 17 additions & 7 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -364,16 +364,26 @@ export default function buildKernel(kernelEndowments) {
}
}

let processQueueRunning;
async function processQueueMessage(message) {
kdebug(`processQ ${JSON.stringify(message)}`);
if (message.type === 'send') {
await deliverToTarget(message.target, message.msg);
} else if (message.type === 'notify') {
await processNotify(message);
} else {
throw Error(`unable to process message.type ${message.type}`);
if (processQueueRunning) {
console.log(`We're currently already running at`, processQueueRunning);
throw Error(`Kernel reentrancy is forbidden`);
}
try {
processQueueRunning = Error('here');
if (message.type === 'send') {
await deliverToTarget(message.target, message.msg);
} else if (message.type === 'notify') {
await processNotify(message);
} else {
throw Error(`unable to process message.type ${message.type}`);
}
commitCrank();
} finally {
processQueueRunning = undefined;
}
commitCrank();
}

function validateVatSetupFn(setup) {
Expand Down
35 changes: 25 additions & 10 deletions packages/cosmic-swingset/lib/ag-solo/fake-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import path from 'path';
import fs from 'fs';
import stringify from '@agoric/swingset-vat/src/kernel/json-stable-stringify';
import { launch } from '../launch-chain';
import makeBlockManager from '../block-manager';

const PRETEND_BLOCK_DELAY = 5;

Expand Down Expand Up @@ -36,33 +37,47 @@ export async function connectToFakeChain(basedir, GCI, role, delay, inbound) {
const argv = [`--role=${role}`, bootAddress];
const stateDBdir = path.join(basedir, `fake-chain-${GCI}-state`);
const s = await launch(stateDBdir, mailboxStorage, vatsdir, argv);
const { deliverInbound, beginBlock, saveChainState, saveOutsideState } = s;

let pretendLast = Date.now();
let blockHeight = 0;
const blockManager = makeBlockManager(s);
const { savedHeight, savedActions } = s;

let blockHeight = savedHeight;
let blockTime =
savedActions.length > 0
? savedActions[0].blockTime
: Math.floor(Date.now() / 1000);
let intoChain = [];
let thisBlock = [];

async function simulateBlock() {
const actualStart = Date.now();
// Gather up the new messages into the latest block.
thisBlock.push(...intoChain);
intoChain = [];

try {
const commitStamp = pretendLast + PRETEND_BLOCK_DELAY * 1000;
const blockTime = Math.floor(commitStamp / 1000);
await beginBlock(blockHeight, blockTime);
blockTime += PRETEND_BLOCK_DELAY;
blockHeight += 1;

await blockManager({ type: 'BEGIN_BLOCK', blockHeight, blockTime });
for (let i = 0; i < thisBlock.length; i += 1) {
const [newMessages, acknum] = thisBlock[i];
await deliverInbound(bootAddress, newMessages, acknum);
await blockManager({
type: 'DELIVER_INBOUND',
peer: bootAddress,
messages: newMessages,
ack: acknum,
blockHeight,
blockTime,
});
}
await blockManager({ type: 'END_BLOCK', blockHeight, blockTime });

// Done processing, "commit the block".
saveChainState();
saveOutsideState();
await blockManager({ type: 'COMMIT_BLOCK', blockHeight, blockTime });
await writeMap(mailboxFile, mailboxStorage);
thisBlock = [];
pretendLast = commitStamp + Date.now() - actualStart;
blockTime = blockTime + Date.now() - actualStart;
blockHeight += 1;
} catch (e) {
console.log(`error fake processing`, e);
Expand Down
43 changes: 43 additions & 0 deletions packages/cosmic-swingset/lib/ag-solo/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import makePromise from '@agoric/make-promise';

// Return a function that can wrap an async or sync method, but
// ensures only one of them (in order) is running at a time.
export const makeWithQueue = () => {
const queue = [];

// Execute the thunk at the front of the queue.
const dequeue = () => {
if (!queue.length) {
return;
}
const [thunk, resolve, reject] = queue[0];
// Run the thunk in a new turn.
Promise.resolve()
.then(thunk)
// Resolve or reject our caller with the thunk's value.
.then(resolve, reject)
// Rerun dequeue() after settling.
.finally(() => {
queue.shift();
dequeue();
});
};

return function withQueue(inner) {
return function queueCall(...args) {
// Curry the arguments into the inner function, and
// resolve/reject with whatever the inner function does.
const thunk = _ => inner(...args);
const pr = makePromise();
queue.push([thunk, pr.res, pr.rej]);

if (queue.length === 1) {
// Start running immediately.
dequeue();
}

// Allow the caller to retrieve our thunk's results.
return pr.p;
};
};
};
70 changes: 30 additions & 40 deletions packages/cosmic-swingset/lib/ag-solo/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {

import { deliver, addDeliveryTarget } from './outbound';
import { makeHTTPListener } from './web';
import { makeWithQueue } from './queue';

import { connectToChain } from './chain-cosmos-sdk';
import { connectToFakeChain } from './fake-chain';
Expand Down Expand Up @@ -121,44 +122,23 @@ async function buildSwingset(
}
}

// Return a function that can wrap an async or sync thunk, but
// ensures only one of them (in order) is running at a time.
const makeWithCriticalSection = () => {
let runQueueP = Promise.resolve();
return function withCriticalSection(inner) {
return function wrappedCall(...args) {
// Curry the arguments into the inner function, and
// resolve/reject with whatever the inner function does.
const thunk = _ => inner(...args);

// Atomically replace the runQueue by appending us to it.
runQueueP = runQueueP.then(thunk, thunk);

// Return the promisified version of the thunk.
return runQueueP;
};
};
};

const withInboundCriticalSection = makeWithCriticalSection();
const withInputQueue = makeWithQueue();

// Use the critical section to make sure it doesn't overlap with
// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const deliverInboundToMbx = withInboundCriticalSection(
async (sender, messages, ack) => {
if (!(messages instanceof Array)) {
throw new Error(`inbound given non-Array: ${messages}`);
}
// console.log(`deliverInboundToMbx`, messages, ack);
if (mb.deliverInbound(sender, messages, ack, true)) {
await processKernel();
}
},
);
const deliverInboundToMbx = withInputQueue(async (sender, messages, ack) => {
if (!(messages instanceof Array)) {
throw new Error(`inbound given non-Array: ${messages}`);
}
// console.log(`deliverInboundToMbx`, messages, ack);
if (mb.deliverInbound(sender, messages, ack, true)) {
await processKernel();
}
});

// Use the critical section to make sure it doesn't overlap with
// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const deliverInboundCommand = withInboundCriticalSection(async obj => {
const deliverInboundCommand = withInputQueue(async obj => {
// this promise could take an arbitrarily long time to resolve, so don't
// wait on it
const p = cm.inboundCommand(obj);
Expand All @@ -177,11 +157,11 @@ async function buildSwingset(
});
});

const intervalMillis = 1200;
let intervalMillis;

// Use the critical section to make sure it doesn't overlap with
// Use the input queue to make sure it doesn't overlap with
// other inbound messages.
const moveTimeForward = withInboundCriticalSection(async () => {
const moveTimeForward = withInputQueue(async () => {
const now = Math.floor(Date.now() / intervalMillis);
try {
if (timer.poll(now)) {
Expand All @@ -201,12 +181,14 @@ async function buildSwingset(
// now let the bootstrap functions run
await processKernel();

setTimeout(moveTimeForward, intervalMillis);

return {
deliverInboundToMbx,
deliverInboundCommand,
deliverOutbound,
startTimer: interval => {
intervalMillis = interval;
setTimeout(moveTimeForward, intervalMillis);
},
};
}

Expand Down Expand Up @@ -239,7 +221,12 @@ export default async function start(basedir, withSES, argv) {
broadcast,
);

const { deliverInboundToMbx, deliverInboundCommand, deliverOutbound } = d;
const {
deliverInboundToMbx,
deliverInboundCommand,
deliverOutbound,
startTimer,
} = d;

await Promise.all(
connections.map(async c => {
Expand Down Expand Up @@ -292,6 +279,9 @@ export default async function start(basedir, withSES, argv) {
}),
);

// Start timer here!
startTimer(1200);

console.log(`swingset running`);
swingSetRunning = true;
deliverOutbound();
Expand Down

0 comments on commit c1282c9

Please sign in to comment.