From e094914ad5ceec3d1131270e5943c6f0df267cac Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Sun, 23 May 2021 21:16:07 -0700 Subject: [PATCH] feat: provide streamStore implementations --- .../swing-store-lmdb/src/lmdbSwingStore.js | 197 +++++++++++++----- packages/swing-store-lmdb/test/test-state.js | 82 +++++++- .../src/simpleSwingStore.js | 152 +++++++++----- .../swing-store-simple/test/test-state.js | 73 ++++++- 4 files changed, 395 insertions(+), 109 deletions(-) diff --git a/packages/swing-store-lmdb/src/lmdbSwingStore.js b/packages/swing-store-lmdb/src/lmdbSwingStore.js index 4de063ff635..a15055f10af 100644 --- a/packages/swing-store-lmdb/src/lmdbSwingStore.js +++ b/packages/swing-store-lmdb/src/lmdbSwingStore.js @@ -2,15 +2,19 @@ import fs from 'fs'; import os from 'os'; import path from 'path'; +import util from 'util'; import Readlines from 'n-readlines'; import lmdb from 'node-lmdb'; -import { assert, details as X } from '@agoric/assert'; +import { assert, details as X, q } from '@agoric/assert'; + +const encoder = new util.TextEncoder(); /** * @typedef { import('@agoric/swing-store-simple').KVStore } KVStore * @typedef { import('@agoric/swing-store-simple').StreamStore } StreamStore + * @typedef { import('@agoric/swing-store-simple').StreamWriter } StreamWriter * @typedef { import('@agoric/swing-store-simple').SwingStore } SwingStore */ @@ -47,9 +51,6 @@ function makeSwingStore(dirPath, forceReset = false) { create: true, }); - const activeStreamFds = new Set(); - const streamFds = new Map(); - function ensureTxn() { if (!txn) { txn = lmdbEnv.beginTxn(); @@ -73,7 +74,7 @@ function makeSwingStore(dirPath, forceReset = false) { * @throws if key is not a string. */ function get(key) { - assert.typeof(key, 'string', X`non-string key ${key}`); + assert.typeof(key, 'string'); ensureTxn(); let result = txn.getString(dbi, key); if (result === null) { @@ -97,8 +98,8 @@ function makeSwingStore(dirPath, forceReset = false) { * @throws if either parameter is not a string. */ function* getKeys(start, end) { - assert.typeof(start, 'string', X`non-string start ${start}`); - assert.typeof(end, 'string', X`non-string end ${end}`); + assert.typeof(start, 'string'); + assert.typeof(end, 'string'); ensureTxn(); const cursor = new lmdb.Cursor(txn, dbi); @@ -120,7 +121,7 @@ function makeSwingStore(dirPath, forceReset = false) { * @throws if key is not a string. */ function has(key) { - assert.typeof(key, 'string', X`non-string key ${key}`); + assert.typeof(key, 'string'); return get(key) !== undefined; } @@ -134,8 +135,8 @@ function makeSwingStore(dirPath, forceReset = false) { * @throws if either parameter is not a string. */ function set(key, value) { - assert.typeof(key, 'string', X`non-string key ${key}`); - assert.typeof(value, 'string', X`non-string value ${value}`); + assert.typeof(key, 'string'); + assert.typeof(value, 'string'); ensureTxn(); txn.putString(dbi, key, value); } @@ -149,7 +150,7 @@ function makeSwingStore(dirPath, forceReset = false) { * @throws if key is not a string. */ function del(key) { - assert.typeof(key, 'string', X`non-string key ${key}`); + assert.typeof(key, 'string'); if (has(key)) { ensureTxn(); txn.del(dbi, key); @@ -164,59 +165,167 @@ function makeSwingStore(dirPath, forceReset = false) { delete: del, }; + /** @type {Set} */ + const activeStreamFds = new Set(); + /** @type {Map} */ + const streamFds = new Map(); + /** @type {Map} */ + const streamStatus = new Map(); + + function insistStreamName(streamName) { + assert.typeof(streamName, 'string'); + assert( + streamName.match(/^[-\w]+$/), + X`invalid stream name ${q(streamName)}`, + ); + } + /** * Generator function that returns an iterator over the items in a stream. * * @param {string} streamName The stream to read + * @param {Object} endPosition The position of the end of the stream + * @param {Object?} startPosition Optional position to start reading from * * @yields {string} an iterator for the items in the named stream */ - function* readStream(streamName) { - try { - const fd = fs.openSync(`${dirPath}/${streamName}`, 'r'); - if (fd) { + function* openReadStream(streamName, endPosition, startPosition) { + insistStreamName(streamName); + const status = streamStatus.get(streamName); + assert( + status === 'unused' || !status, + // prettier-ignore + X`can't read stream ${q(streamName)} because it's already being used for ${q(status)}`, + ); + let itemCount = endPosition.itemCount; + if (endPosition.offset === 0) { + assert(itemCount === 0); + } else { + assert(itemCount > 0); + assert(endPosition.offset > 0); + let fd; + try { + streamStatus.set(streamName, 'read'); + const filePath = `${dirPath}/${streamName}`; + fs.truncateSync(filePath, endPosition.offset); + fd = fs.openSync(filePath, 'r'); + // let startOffset = 0; + let skipCount = 0; + if (startPosition) { + itemCount -= startPosition.itemCount; + // startOffset = startPosition.offset; + skipCount = startPosition.itemCount; + } const reader = new Readlines(fd); + // We would like to be able to seek Readlines to a particular position + // in the file before it starts reading. Unfortunately, it is hardcoded + // to reset to 0 at the start and then manually walk itself through the + // file, ignoring whatever current position that the fd is set to. + // Investigation has revealed that giving it a 'position' option for + // where to start reading is trivial (~4 lines of code) change, but that + // would cause us to diverge from the official npm version. There are + // even a couple of forks on NPM that do this, but they have like 2 + // downloads per week so I don't trust them. Until this is resolved, + // the only way to realize a different starting point than 0 is to + // simply read and ignore some number of records, which the following + // code does. It's ideal, but it works. + while (skipCount > 0) { + reader.next(); + skipCount -= 1; + } + // const reader = new Readlines(fd, { position: startOffset }); while (true) { const line = /** @type {string|false} */ (reader.next()); if (line) { - yield line; + itemCount -= 1; + assert(itemCount >= 0); + const result = line.toString(); + yield result; } else { break; } } - fs.closeSync(fd); - } - } catch (e) { - if (e.code !== 'ENOENT') { - throw e; + } finally { + streamStatus.set(streamName, 'unused'); + assert(itemCount === 0, X`leftover item count ${q(itemCount)}`); } } } /** - * Append an item to a stream. + * Obtain a writer for a stream. + * + * @param {string} streamName The stream to be written * - * @param {string} streamName The stream to append to - * @param {string} item The item to append to it + * @returns {StreamWriter} a writer for the named stream */ - function appendItem(streamName, item) { - assert.typeof( - streamName, - 'string', - X`non-string stream name ${streamName}`, + function openWriteStream(streamName) { + insistStreamName(streamName); + const status = streamStatus.get(streamName); + assert( + status === 'unused' || !status, + // prettier-ignore + X`can't write stream ${q(streamName)} because it's already being used for ${q(status)}`, ); - assert(streamName.match(/^[-\w]+$/)); - let fd = streamFds.get(streamName); - if (!fd) { - fd = fs.openSync(`${dirPath}/${streamName}`, 'a'); - streamFds.set(streamName, fd); + streamStatus.set(streamName, 'write'); + + // XXX fdTemp is a workaround for a flaw in TypeScript's type inference + // It should be fd, which it should be changed to when and if they fix tsc. + let fdTemp = streamFds.get(streamName); + if (!fdTemp) { + const filePath = `${dirPath}/${streamName}`; + const mode = fs.existsSync(filePath) ? 'r+' : 'w'; + fdTemp = fs.openSync(filePath, mode); + streamFds.set(streamName, fdTemp); } + const fd = fdTemp; activeStreamFds.add(fd); - fs.writeSync(fd, item); - fs.writeSync(fd, '\n'); + + /** + * Write to a stream. + * + * @param {string} item The item to write + * @param {Object|null} position The position to write the item + * + * @returns {Object} the new position after writing + */ + function write(item, position) { + assert.typeof(item, 'string'); + assert( + streamFds.get(streamName) === fd, + X`write to closed stream ${q(streamName)}`, + ); + if (!position) { + position = { offset: 0, itemCount: 0 }; + } + const buf = encoder.encode(`${item}\n`); + fs.writeSync(fd, buf, 0, buf.length, position.offset); + fs.fsyncSync(fd); + return { + offset: position.offset + buf.length, + itemCount: position.itemCount + 1, + }; + } + return write; } - const streamStore = { appendItem, readStream }; + /** + * Close a stream. + * + * @param {string} streamName The stream to close + */ + function closeStream(streamName) { + insistStreamName(streamName); + const fd = streamFds.get(streamName); + if (fd) { + fs.closeSync(fd); + streamFds.delete(streamName); + activeStreamFds.delete(fd); + streamStatus.set(streamName, 'unused'); + } + } + + const streamStore = { openReadStream, openWriteStream, closeStream }; /** * Commit unsaved changes. @@ -224,7 +333,7 @@ function makeSwingStore(dirPath, forceReset = false) { function commit() { if (txn) { for (const fd of activeStreamFds) { - fs.fsyncSync(fd); + fs.closeSync(fd); } activeStreamFds.clear(); txn.commit(); @@ -266,9 +375,7 @@ function makeSwingStore(dirPath, forceReset = false) { * @returns {SwingStore} */ export function initSwingStore(dirPath) { - if (`${dirPath}` !== dirPath) { - throw new Error('dirPath must be a string'); - } + assert.typeof(dirPath, 'string'); return makeSwingStore(dirPath, true); } @@ -284,9 +391,7 @@ export function initSwingStore(dirPath) { * @returns {SwingStore} */ export function openSwingStore(dirPath) { - if (`${dirPath}` !== dirPath) { - throw new Error('dirPath must be a string'); - } + assert.typeof(dirPath, 'string'); return makeSwingStore(dirPath, false); } @@ -302,9 +407,7 @@ export function openSwingStore(dirPath) { * */ export function isSwingStore(dirPath) { - if (`${dirPath}` !== dirPath) { - throw new Error('dirPath must be a string'); - } + assert.typeof(dirPath, 'string'); if (fs.existsSync(dirPath)) { const storeFile = path.resolve(dirPath, 'data.mdb'); if (fs.existsSync(storeFile)) { diff --git a/packages/swing-store-lmdb/test/test-state.js b/packages/swing-store-lmdb/test/test-state.js index dc90582454f..e6a47ab2fce 100644 --- a/packages/swing-store-lmdb/test/test-state.js +++ b/packages/swing-store-lmdb/test/test-state.js @@ -15,7 +15,7 @@ import { isSwingStore, } from '../src/lmdbSwingStore'; -function testStorage(t, kvStore) { +function testKVStore(t, kvStore) { t.falsy(kvStore.has('missing')); t.is(kvStore.get('missing'), undefined); @@ -52,15 +52,91 @@ test('storageInLMDB under SES', t => { fs.rmdirSync(dbDir, { recursive: true }); t.is(isSwingStore(dbDir), false); const { kvStore, commit, close } = initSwingStore(dbDir); - testStorage(t, kvStore); + testKVStore(t, kvStore); commit(); const before = getAllState(kvStore); close(); t.is(isSwingStore(dbDir), true); - const { kvStore: after } = openSwingStore(dbDir); + const { kvStore: after, close: close2 } = openSwingStore(dbDir); t.deepEqual(getAllState(after), before, 'check state after reread'); t.is(isSwingStore(dbDir), true); + close2(); +}); + +test('streamStore read/write', t => { + const dbDir = 'testdb'; + t.teardown(() => fs.rmdirSync(dbDir, { recursive: true })); + fs.rmdirSync(dbDir, { recursive: true }); + t.is(isSwingStore(dbDir), false); + const { streamStore, commit, close } = initSwingStore(dbDir); + + let s1pos; + const writer1 = streamStore.openWriteStream('st1'); + s1pos = writer1('first', s1pos); + s1pos = writer1('second', s1pos); + const s1posAlt = { ...s1pos }; + const writer2 = streamStore.openWriteStream('st2'); + s1pos = writer1('third', s1pos); + let s2pos = { offset: 0, itemCount: 0 }; + s2pos = writer2('oneth', s2pos); + s1pos = writer1('fourth', s1pos); + s2pos = writer2('twoth', s2pos); + const s2posAlt = { ...s2pos }; + s2pos = writer2('threeth', s2pos); + s2pos = writer2('fourst', s2pos); + streamStore.closeStream('st1'); + streamStore.closeStream('st2'); + const reader1 = streamStore.openReadStream('st1', s1pos); + const reads1 = []; + for (const item of reader1) { + reads1.push(item); + } + t.deepEqual(reads1, ['first', 'second', 'third', 'fourth']); + const writer2alt = streamStore.openWriteStream('st2'); + s2pos = writer2alt('re3', s2posAlt); + streamStore.closeStream('st2'); + const reader2 = streamStore.openReadStream('st2', s2pos); + const reads2 = []; + for (const item of reader2) { + reads2.push(item); + } + t.deepEqual(reads2, ['oneth', 'twoth', 're3']); + + const reader1alt = streamStore.openReadStream('st1', s1pos, s1posAlt); + const reads1alt = []; + for (const item of reader1alt) { + reads1alt.push(item); + } + t.deepEqual(reads1alt, ['third', 'fourth']); + + commit(); + close(); +}); + +test('streamStore mode interlock', t => { + const dbDir = 'testdb'; + t.teardown(() => fs.rmdirSync(dbDir, { recursive: true })); + fs.rmdirSync(dbDir, { recursive: true }); + t.is(isSwingStore(dbDir), false); + const { streamStore, commit, close } = initSwingStore(dbDir); + + const writer1 = streamStore.openWriteStream('st1'); + const s1pos = writer1('first'); + const reader1 = streamStore.openReadStream('st1', s1pos); + t.throws(() => reader1.next(), { + message: `can't read stream "st1" because it's already being used for "write"`, + }); + streamStore.closeStream('st1'); + + const reader1a = streamStore.openReadStream('st1', s1pos); + reader1a.next(); + t.throws(() => streamStore.openWriteStream('st1'), { + message: `can't write stream "st1" because it's already being used for "read"`, + }); + + commit(); + close(); }); test('rejectSimple under SES', t => { diff --git a/packages/swing-store-simple/src/simpleSwingStore.js b/packages/swing-store-simple/src/simpleSwingStore.js index b05eac08e2d..45f86ee2755 100644 --- a/packages/swing-store-simple/src/simpleSwingStore.js +++ b/packages/swing-store-simple/src/simpleSwingStore.js @@ -3,7 +3,7 @@ import fs from 'fs'; import path from 'path'; import Readlines from 'n-readlines'; -import { assert, details as X } from '@agoric/assert'; +import { assert, details as X, q } from '@agoric/assert'; /** * @typedef {{ @@ -15,8 +15,18 @@ import { assert, details as X } from '@agoric/assert'; * }} KVStore * * @typedef {{ - * appendItem: (name: string, item: string) => void, - * readStream: (name: string) => Iterable, + * offset?: number, + * itemCount?: number, + * }} StreamPosition + * + * @typedef {{ + * (item: string, position: StreamPosition|null): StreamPosition + * }} StreamWriter + * + * @typedef {{ + * openWriteStream: (name: string) => StreamWriter, + * openReadStream: (name: string, endPosition: StreamPosition, startPosition?: StreamPosition) => Iterable, + * closeStream: (name: string) => void, * }} StreamStore * * @typedef {{ @@ -63,9 +73,7 @@ function makeStorageInMemory() { * @throws if key is not a string. */ function has(key) { - if (`${key}` !== key) { - throw new Error(`non-string key ${key}`); - } + assert.typeof(key, 'string'); return state.has(key); } @@ -86,12 +94,8 @@ function makeStorageInMemory() { * @throws if either parameter is not a string. */ function* getKeys(start, end) { - if (`${start}` !== start) { - throw new Error(`non-string start ${start}`); - } - if (`${end}` !== end) { - throw new Error(`non-string end ${end}`); - } + assert.typeof(start, 'string'); + assert.typeof(end, 'string'); const keys = Array.from(state.keys()).sort(); for (const k of keys) { @@ -112,9 +116,7 @@ function makeStorageInMemory() { * @throws if key is not a string. */ function get(key) { - if (`${key}` !== key) { - throw new Error(`non-string key ${key}`); - } + assert.typeof(key, 'string'); return state.get(key); } @@ -128,12 +130,8 @@ function makeStorageInMemory() { * @throws if either parameter is not a string. */ function set(key, value) { - if (`${key}` !== key) { - throw new Error(`non-string key ${key}`); - } - if (`${value}` !== value) { - throw new Error(`non-string value ${value}`); - } + assert.typeof(key, 'string'); + assert.typeof(value, 'string'); state.set(key, value); } @@ -146,9 +144,7 @@ function makeStorageInMemory() { * @throws if key is not a string. */ function del(key) { - if (`${key}` !== key) { - throw new Error(`non-string key ${key}`); - } + assert.typeof(key, 'string'); state.delete(key); } @@ -232,49 +228,97 @@ function makeSwingStore(dirPath, forceReset = false) { /** @type {Map>} */ const streams = new Map(); + /** @type {Map} */ + const streamStatus = new Map(); + + function insistStreamName(streamName) { + assert.typeof(streamName, 'string'); + assert( + streamName.match(/^[-\w]+$/), + X`invalid stream name ${q(streamName)}`, + ); + } /** * Generator function that returns an iterator over the items in a stream. * * @param {string} streamName The stream to read + * @param {Object} endPosition The position of the end of the stream + * @param {Object?} startPosition Optional position to start reading from * * @yields {string} an iterator for the items in the named stream */ - function* readStream(streamName) { - /** @type {Array|undefined} */ - const stream = streams.get(streamName); - if (stream) { - let pos = 0; - while (pos < stream.length) { - const result = stream[pos]; - pos += 1; - yield result; - } + function* openReadStream(streamName, endPosition, startPosition) { + insistStreamName(streamName); + const stream = streams.get(streamName) || []; + const status = streamStatus.get(streamName); + assert( + status === 'unused' || !status, + // prettier-ignore + X`can't read stream ${q(streamName)} because it's already being used for ${q(status)}`, + ); + assert(endPosition.itemCount > 0); + streamStatus.set(streamName, 'read'); + stream.length = endPosition.itemCount; + let pos = 0; + if (startPosition) { + pos += startPosition.itemCount; + } + while (pos < stream.length) { + const result = stream[pos]; + pos += 1; + yield result; } + streamStatus.set(streamName, 'unused'); } /** - * Append an item to a stream. + * Obtain a writer for a stream. * - * @param {string} streamName The stream to append to - * @param {string} item The item to append to it + * @param {string} streamName The stream to be written */ - function appendItem(streamName, item) { - assert.typeof( - streamName, - 'string', - X`non-string stream name ${streamName}`, - ); - assert(streamName.match(/^[-\w]+$/)); - let stream = streams.get(streamName); - if (!stream) { - stream = []; - streams.set(streamName, stream); + function openWriteStream(streamName) { + insistStreamName(streamName); + let streamTemp = streams.get(streamName); + if (!streamTemp) { + streamTemp = []; + streams.set(streamName, streamTemp); + streamStatus.set(streamName, 'write'); + } else { + const status = streamStatus.get(streamName); + assert( + status === 'unused', + // prettier-ignore + X`can't write stream ${q(streamName)} because it's already being used for ${q(status)}`, + ); } - stream.push(item); + const stream = streamTemp; + streamStatus.set(streamName, 'write'); + + /** + * Write to a stream. + * + * @param {string} item The item to write + * @param {Object|null} position The position to write the item + * + * @returns {Object} the new position after writing + */ + function write(item, position) { + if (!position) { + position = { itemCount: 0 }; + } + stream[position.itemCount] = item; + return { itemCount: position.itemCount + 1 }; + } + return write; + } + + function closeStream(streamName) { + insistStreamName(streamName); + streamStatus.set(streamName, 'unused'); } - const streamStore = { appendItem, readStream }; + const streamStore = { openReadStream, openWriteStream, closeStream }; return { kvStore, streamStore, commit, close }; } @@ -312,9 +356,7 @@ export function initSwingStore(dirPath) { * @returns {SwingStore} */ export function openSwingStore(dirPath) { - if (`${dirPath}` !== dirPath) { - throw new Error('dirPath must be a string'); - } + assert.typeof(dirPath, 'string'); return makeSwingStore(dirPath, false); } @@ -369,9 +411,7 @@ export function setAllState(kvStore, stuff) { * */ export function isSwingStore(dirPath) { - if (`${dirPath}` !== dirPath) { - throw new Error('dirPath must be a string'); - } + assert.typeof(dirPath, 'string'); if (fs.existsSync(dirPath)) { const storeFile = path.resolve(dirPath, 'swingset-kernel-state.jsonlines'); if (fs.existsSync(storeFile)) { diff --git a/packages/swing-store-simple/test/test-state.js b/packages/swing-store-simple/test/test-state.js index 849f6ed135d..5af636edaa0 100644 --- a/packages/swing-store-simple/test/test-state.js +++ b/packages/swing-store-simple/test/test-state.js @@ -11,7 +11,7 @@ import { isSwingStore, } from '../src/simpleSwingStore'; -function testStorage(t, kvStore) { +function testKVStore(t, kvStore) { t.falsy(kvStore.has('missing')); t.is(kvStore.get('missing'), undefined); @@ -44,7 +44,7 @@ function testStorage(t, kvStore) { test('storageInMemory', t => { const { kvStore } = initSwingStore(); - testStorage(t, kvStore); + testKVStore(t, kvStore); }); test('storageInFile', t => { @@ -53,7 +53,7 @@ test('storageInFile', t => { fs.rmdirSync(dbDir, { recursive: true }); t.is(isSwingStore(dbDir), false); const { kvStore, commit, close } = initSwingStore(dbDir); - testStorage(t, kvStore); + testKVStore(t, kvStore); commit(); const before = getAllState(kvStore); close(); @@ -72,3 +72,70 @@ test('rejectLMDB', t => { fs.writeFileSync(path.resolve(notSimpleDir, 'lock.mdb'), 'lock stuff\n'); t.is(isSwingStore(notSimpleDir), false); }); + +test('streamStore read/write', t => { + const { streamStore, commit, close } = initSwingStore(); + + let s1pos; + const writer1 = streamStore.openWriteStream('st1'); + s1pos = writer1('first', s1pos); + s1pos = writer1('second', s1pos); + const s1posAlt = { ...s1pos }; + const writer2 = streamStore.openWriteStream('st2'); + s1pos = writer1('third', s1pos); + let s2pos = { itemCount: 0 }; + s2pos = writer2('oneth', s2pos); + s1pos = writer1('fourth', s1pos); + s2pos = writer2('twoth', s2pos); + const s2posAlt = { ...s2pos }; + s2pos = writer2('threeth', s2pos); + s2pos = writer2('fourst', s2pos); + streamStore.closeStream('st1'); + streamStore.closeStream('st2'); + const reader1 = streamStore.openReadStream('st1', s1pos); + const reads1 = []; + for (const item of reader1) { + reads1.push(item); + } + t.deepEqual(reads1, ['first', 'second', 'third', 'fourth']); + const writer2alt = streamStore.openWriteStream('st2'); + s2pos = writer2alt('re3', s2posAlt); + streamStore.closeStream('st2'); + const reader2 = streamStore.openReadStream('st2', s2pos); + const reads2 = []; + for (const item of reader2) { + reads2.push(item); + } + t.deepEqual(reads2, ['oneth', 'twoth', 're3']); + + const reader1alt = streamStore.openReadStream('st1', s1pos, s1posAlt); + const reads1alt = []; + for (const item of reader1alt) { + reads1alt.push(item); + } + t.deepEqual(reads1alt, ['third', 'fourth']); + + commit(); + close(); +}); + +test('streamStore mode interlock', t => { + const { streamStore, commit, close } = initSwingStore(); + + const writer1 = streamStore.openWriteStream('st1'); + const s1pos = writer1('first'); + const reader1 = streamStore.openReadStream('st1', s1pos); + t.throws(() => reader1.next(), { + message: `can't read stream "st1" because it's already being used for "write"`, + }); + streamStore.closeStream('st1'); + + const reader1a = streamStore.openReadStream('st1', s1pos); + reader1a.next(); + t.throws(() => streamStore.openWriteStream('st1'), { + message: `can't write stream "st1" because it's already being used for "read"`, + }); + + commit(); + close(); +});