Skip to content

Commit

Permalink
feat: provide streamStore implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
FUDCo committed May 27, 2021
1 parent 069201d commit e094914
Show file tree
Hide file tree
Showing 4 changed files with 395 additions and 109 deletions.
197 changes: 150 additions & 47 deletions packages/swing-store-lmdb/src/lmdbSwingStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
import fs from 'fs';
import os from 'os';
import path from 'path';
import util from 'util';
import Readlines from 'n-readlines';

import lmdb from 'node-lmdb';

import { assert, details as X } from '@agoric/assert';
import { assert, details as X, q } from '@agoric/assert';

const encoder = new util.TextEncoder();

/**
* @typedef { import('@agoric/swing-store-simple').KVStore } KVStore
* @typedef { import('@agoric/swing-store-simple').StreamStore } StreamStore
* @typedef { import('@agoric/swing-store-simple').StreamWriter } StreamWriter
* @typedef { import('@agoric/swing-store-simple').SwingStore } SwingStore
*/

Expand Down Expand Up @@ -47,9 +51,6 @@ function makeSwingStore(dirPath, forceReset = false) {
create: true,
});

const activeStreamFds = new Set();
const streamFds = new Map();

function ensureTxn() {
if (!txn) {
txn = lmdbEnv.beginTxn();
Expand All @@ -73,7 +74,7 @@ function makeSwingStore(dirPath, forceReset = false) {
* @throws if key is not a string.
*/
function get(key) {
assert.typeof(key, 'string', X`non-string key ${key}`);
assert.typeof(key, 'string');
ensureTxn();
let result = txn.getString(dbi, key);
if (result === null) {
Expand All @@ -97,8 +98,8 @@ function makeSwingStore(dirPath, forceReset = false) {
* @throws if either parameter is not a string.
*/
function* getKeys(start, end) {
assert.typeof(start, 'string', X`non-string start ${start}`);
assert.typeof(end, 'string', X`non-string end ${end}`);
assert.typeof(start, 'string');
assert.typeof(end, 'string');

ensureTxn();
const cursor = new lmdb.Cursor(txn, dbi);
Expand All @@ -120,7 +121,7 @@ function makeSwingStore(dirPath, forceReset = false) {
* @throws if key is not a string.
*/
function has(key) {
assert.typeof(key, 'string', X`non-string key ${key}`);
assert.typeof(key, 'string');
return get(key) !== undefined;
}

Expand All @@ -134,8 +135,8 @@ function makeSwingStore(dirPath, forceReset = false) {
* @throws if either parameter is not a string.
*/
function set(key, value) {
assert.typeof(key, 'string', X`non-string key ${key}`);
assert.typeof(value, 'string', X`non-string value ${value}`);
assert.typeof(key, 'string');
assert.typeof(value, 'string');
ensureTxn();
txn.putString(dbi, key, value);
}
Expand All @@ -149,7 +150,7 @@ function makeSwingStore(dirPath, forceReset = false) {
* @throws if key is not a string.
*/
function del(key) {
assert.typeof(key, 'string', X`non-string key ${key}`);
assert.typeof(key, 'string');
if (has(key)) {
ensureTxn();
txn.del(dbi, key);
Expand All @@ -164,67 +165,175 @@ function makeSwingStore(dirPath, forceReset = false) {
delete: del,
};

/** @type {Set<number>} */
const activeStreamFds = new Set();
/** @type {Map<string, number>} */
const streamFds = new Map();
/** @type {Map<string, string>} */
const streamStatus = new Map();

function insistStreamName(streamName) {
assert.typeof(streamName, 'string');
assert(
streamName.match(/^[-\w]+$/),
X`invalid stream name ${q(streamName)}`,
);
}

/**
* Generator function that returns an iterator over the items in a stream.
*
* @param {string} streamName The stream to read
* @param {Object} endPosition The position of the end of the stream
* @param {Object?} startPosition Optional position to start reading from
*
* @yields {string} an iterator for the items in the named stream
*/
function* readStream(streamName) {
try {
const fd = fs.openSync(`${dirPath}/${streamName}`, 'r');
if (fd) {
function* openReadStream(streamName, endPosition, startPosition) {
insistStreamName(streamName);
const status = streamStatus.get(streamName);
assert(
status === 'unused' || !status,
// prettier-ignore
X`can't read stream ${q(streamName)} because it's already being used for ${q(status)}`,
);
let itemCount = endPosition.itemCount;
if (endPosition.offset === 0) {
assert(itemCount === 0);
} else {
assert(itemCount > 0);
assert(endPosition.offset > 0);
let fd;
try {
streamStatus.set(streamName, 'read');
const filePath = `${dirPath}/${streamName}`;
fs.truncateSync(filePath, endPosition.offset);
fd = fs.openSync(filePath, 'r');
// let startOffset = 0;
let skipCount = 0;
if (startPosition) {
itemCount -= startPosition.itemCount;
// startOffset = startPosition.offset;
skipCount = startPosition.itemCount;
}
const reader = new Readlines(fd);
// We would like to be able to seek Readlines to a particular position
// in the file before it starts reading. Unfortunately, it is hardcoded
// to reset to 0 at the start and then manually walk itself through the
// file, ignoring whatever current position that the fd is set to.
// Investigation has revealed that giving it a 'position' option for
// where to start reading is trivial (~4 lines of code) change, but that
// would cause us to diverge from the official npm version. There are
// even a couple of forks on NPM that do this, but they have like 2
// downloads per week so I don't trust them. Until this is resolved,
// the only way to realize a different starting point than 0 is to
// simply read and ignore some number of records, which the following
// code does. It's ideal, but it works.
while (skipCount > 0) {
reader.next();
skipCount -= 1;
}
// const reader = new Readlines(fd, { position: startOffset });
while (true) {
const line = /** @type {string|false} */ (reader.next());
if (line) {
yield line;
itemCount -= 1;
assert(itemCount >= 0);
const result = line.toString();
yield result;
} else {
break;
}
}
fs.closeSync(fd);
}
} catch (e) {
if (e.code !== 'ENOENT') {
throw e;
} finally {
streamStatus.set(streamName, 'unused');
assert(itemCount === 0, X`leftover item count ${q(itemCount)}`);
}
}
}

/**
* Append an item to a stream.
* Obtain a writer for a stream.
*
* @param {string} streamName The stream to be written
*
* @param {string} streamName The stream to append to
* @param {string} item The item to append to it
* @returns {StreamWriter} a writer for the named stream
*/
function appendItem(streamName, item) {
assert.typeof(
streamName,
'string',
X`non-string stream name ${streamName}`,
function openWriteStream(streamName) {
insistStreamName(streamName);
const status = streamStatus.get(streamName);
assert(
status === 'unused' || !status,
// prettier-ignore
X`can't write stream ${q(streamName)} because it's already being used for ${q(status)}`,
);
assert(streamName.match(/^[-\w]+$/));
let fd = streamFds.get(streamName);
if (!fd) {
fd = fs.openSync(`${dirPath}/${streamName}`, 'a');
streamFds.set(streamName, fd);
streamStatus.set(streamName, 'write');

// XXX fdTemp is a workaround for a flaw in TypeScript's type inference
// It should be fd, which it should be changed to when and if they fix tsc.
let fdTemp = streamFds.get(streamName);
if (!fdTemp) {
const filePath = `${dirPath}/${streamName}`;
const mode = fs.existsSync(filePath) ? 'r+' : 'w';
fdTemp = fs.openSync(filePath, mode);
streamFds.set(streamName, fdTemp);
}
const fd = fdTemp;
activeStreamFds.add(fd);
fs.writeSync(fd, item);
fs.writeSync(fd, '\n');

/**
* Write to a stream.
*
* @param {string} item The item to write
* @param {Object|null} position The position to write the item
*
* @returns {Object} the new position after writing
*/
function write(item, position) {
assert.typeof(item, 'string');
assert(
streamFds.get(streamName) === fd,
X`write to closed stream ${q(streamName)}`,
);
if (!position) {
position = { offset: 0, itemCount: 0 };
}
const buf = encoder.encode(`${item}\n`);
fs.writeSync(fd, buf, 0, buf.length, position.offset);
fs.fsyncSync(fd);
return {
offset: position.offset + buf.length,
itemCount: position.itemCount + 1,
};
}
return write;
}

const streamStore = { appendItem, readStream };
/**
* Close a stream.
*
* @param {string} streamName The stream to close
*/
function closeStream(streamName) {
insistStreamName(streamName);
const fd = streamFds.get(streamName);
if (fd) {
fs.closeSync(fd);
streamFds.delete(streamName);
activeStreamFds.delete(fd);
streamStatus.set(streamName, 'unused');
}
}

const streamStore = { openReadStream, openWriteStream, closeStream };

/**
* Commit unsaved changes.
*/
function commit() {
if (txn) {
for (const fd of activeStreamFds) {
fs.fsyncSync(fd);
fs.closeSync(fd);
}
activeStreamFds.clear();
txn.commit();
Expand Down Expand Up @@ -266,9 +375,7 @@ function makeSwingStore(dirPath, forceReset = false) {
* @returns {SwingStore}
*/
export function initSwingStore(dirPath) {
if (`${dirPath}` !== dirPath) {
throw new Error('dirPath must be a string');
}
assert.typeof(dirPath, 'string');
return makeSwingStore(dirPath, true);
}

Expand All @@ -284,9 +391,7 @@ export function initSwingStore(dirPath) {
* @returns {SwingStore}
*/
export function openSwingStore(dirPath) {
if (`${dirPath}` !== dirPath) {
throw new Error('dirPath must be a string');
}
assert.typeof(dirPath, 'string');
return makeSwingStore(dirPath, false);
}

Expand All @@ -302,9 +407,7 @@ export function openSwingStore(dirPath) {
*
*/
export function isSwingStore(dirPath) {
if (`${dirPath}` !== dirPath) {
throw new Error('dirPath must be a string');
}
assert.typeof(dirPath, 'string');
if (fs.existsSync(dirPath)) {
const storeFile = path.resolve(dirPath, 'data.mdb');
if (fs.existsSync(storeFile)) {
Expand Down
Loading

0 comments on commit e094914

Please sign in to comment.