diff --git a/README.md b/README.md index c5d90d6..90abda9 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # SQL Event Store + Demonstration of a SQL event store with deduplication and guaranteed event ordering. The database rules are intended to prevent incorrect information from entering into an event stream. You are assumed to have familiarity with [event sourcing](https://martinfowler.com/eaaDev/EventSourcing.html). This project uses a node test suite and SQLite to ensure the DDL complies with the design requirements. This SQLite event store can be used in highly-constrained environments that require an embedded event store, like a mobile device or an IoT system. @@ -11,7 +12,7 @@ The [Postgres version](./postgres-event-store.ddl) of SQL event store has the sa The postgres version can be tested with the [test-postgres.js]() script. Run this file instead of `test-sqlite.js`. It will connect to the postgres server defined in the environment variables, according to [node-postgres](https://node-postgres.com/features/connecting). -### Running +### Running SQLite Tests One must have Node and NPM installed (Node 16 is what I used) and then: @@ -27,6 +28,30 @@ Once it has finished installing the dependencies, run the [tests](test-sqlite.js The test uses [sql.js](https://github.com/kripken/sql.js), the pure Javascript port of SQLite for reliable compilation and test execution. The test will dump the test database to `test-event-store.sqlite` for your examination. +### Running PostgreSQL Tests + + +One must have Node and NPM installed (Node 16 is what I used) and then: + +```bash +> npm install +``` + +Start a Postgres server and create a database called `eventstoretest`. +You can run the docker-compose file in this project to start a Postgres server. + +```bash +> docker-compose up -d +``` + +Once the server is running, run the [tests](test-postgres.js) with: + +```bash +> node test-postgres.js +``` + + + ### Conceptual Model An Event is an unalterable statement of fact that has occurred in the past. It has a name, like `food-eaten`, and it is scoped to an [Entity](https://en.wikiquote.org/wiki/Entity), or an identifiable existence in the world. Entities are individually identified by business-relevant keys that uniquely identify one entity from another. @@ -51,7 +76,7 @@ This event store consists of two tables [as described in the DDL](./sqlite-event #### `entity_events` Table | Column | Notes | -|----------|-----------------------| +| -------- | --------------------- | | `entity` | The entity name. | | `event` | An entity event name. | @@ -60,7 +85,7 @@ The `entity_events` table controls the entity and event names that can be used i #### `events` Table | Column | Notes | -|--------------|-------------------------------------------------------------------------------------------------------------------------------------| +| ------------ | ----------------------------------------------------------------------------------------------------------------------------------- | | `entity` | The entity name. Part of a composite foreign key to `entity_events`. | | `entityKey` | The business identifier for the entity. | | `event` | The event name. Part of a composite foreign key to `entity_events`. | diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6655d5d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,18 @@ +# Author: Fabien Sartor. +# Creation Date: 20/05/2022 +# Copyright: Secheron SA + +version: '2.4' + + +services: + + postgres: + image: postgres:14.6 + container_name: 'postgres' + ports: + - "5432:5432" + environment: + POSTGRES_USER: admin + POSTGRES_PASSWORD: admin + POSTGRES_DB: eventstoretest diff --git a/postgres-event-store.ddl b/postgres-event-store.ddl index e010956..82c5fb0 100644 --- a/postgres-event-store.ddl +++ b/postgres-event-store.ddl @@ -31,10 +31,9 @@ CREATE TABLE events entitykey TEXT NOT NULL, event TEXT NOT NULL, data JSONB NOT NULL, - eventid UUID NOT NULL UNIQUE, commandid UUID NOT NULL UNIQUE, -- previous event uuid; null for first event; null does not trigger UNIQUE constraint - previousid UUID UNIQUE, + previousSequence BIGINT UNIQUE, timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, -- ordering sequence sequence BIGSERIAL PRIMARY KEY, -- sequence for all events in all entities @@ -51,17 +50,17 @@ CREATE OR REPLACE RULE ignore_update_events AS ON UPDATE TO events DO INSTEAD NOTHING; --- Can only use null previousId for first event in an entity +-- Can only use null previousSequence for first event in an entity CREATE OR REPLACE FUNCTION check_first_event_for_entity() RETURNS trigger AS $$ BEGIN - IF (NEW.previousid IS NULL + IF (NEW.previousSequence IS NULL AND EXISTS (SELECT 1 FROM events WHERE NEW.entitykey = entitykey AND NEW.entity = entity)) THEN - RAISE EXCEPTION 'previousid can only be null for first entity event'; + RAISE EXCEPTION 'previousSequence can only be null for first entity event'; END IF; RETURN NEW; END; @@ -77,27 +76,53 @@ CREATE TRIGGER first_event_for_entity --- previousId must be in the same entity as the event -CREATE OR REPLACE FUNCTION check_previousid_in_same_entity() RETURNS trigger AS +-- previousSequence must be in the same entity as the event +CREATE OR REPLACE FUNCTION check_previousSequence_in_same_entity() +RETURNS trigger AS $$ BEGIN - IF (NEW.previousid IS NOT NULL - AND NOT EXISTS (SELECT 1 - FROM events - WHERE NEW.previousid = eventid - AND NEW.entitykey = entitykey - AND NEW.entity = entity)) + IF (NEW.previousSequence IS NOT NULL + AND NEW.previousSequence != ( + SELECT sequence FROM events + WHERE NEW.entitykey = entitykey + AND NEW.entity = entity + ORDER BY sequence DESC + LIMIT 1 + ) + ) THEN - RAISE EXCEPTION 'previousid must be in the same entity'; -END IF; -RETURN NEW; + RAISE EXCEPTION 'previousSequence must be the last entry of the event stream for the same entity'; + END IF; + RETURN NEW; END; $$ - LANGUAGE plpgsql; +LANGUAGE plpgsql; -DROP TRIGGER IF EXISTS previousid_in_same_entity ON events; -CREATE TRIGGER previousid_in_same_entity +DROP TRIGGER IF EXISTS previousSequence_in_same_entity ON events; +CREATE TRIGGER previousSequence_in_same_entity BEFORE INSERT ON events FOR EACH ROW - EXECUTE FUNCTION check_previousid_in_same_entity(); + EXECUTE FUNCTION check_previousSequence_in_same_entity(); + + + + +truncate events; +truncate entity_events cascade; +ALTER SEQUENCE events_sequence_seq RESTART WITH 1; + +-- INSERT INTO entity_events (entity, event) VALUES ('thing', 'thing-created'); +-- INSERT INTO entity_events (entity, event) VALUES ('thing', 'thing-deleted'); + +-- INSERT INTO entity_events (entity, event) VALUES ('table-tennis', 'ball-pinged'); +-- INSERT INTO entity_events (entity, event) VALUES ('table-tennis', 'ball-ponged'); + +-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'home', 'ball-pinged', '{}', '00000000-0000-0000-0000-000000000000', null); +-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'home', 'ball-ponged', '{}', '00000000-0000-0000-0000-000000000001', 1); +-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'home', 'ball-ponged', '{}', '00000000-0000-0000-0000-000000000002', 2); + +-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'work', 'ball-pinged', '{}', '00000000-0000-0000-0000-000000000003', null); +-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'work', 'ball-ponged', '{}', '00000000-0000-0000-0000-000000000004', 4); +-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'work', 'ball-ponged', '{}', '00000000-0000-0000-0000-000000000005', 5); + diff --git a/test-postgres.js b/test-postgres.js index b6e5f7c..5ffd0b5 100644 --- a/test-postgres.js +++ b/test-postgres.js @@ -17,7 +17,14 @@ const pingEvent = 'ball-pinged'; const pongEvent = 'ball-ponged'; async function initDb() { - const db = new pg.Client() + const conf = { + user: 'admin', + password: 'admin', + host: 'localhost', + database: 'eventstoretest', + port: 5432 + } + const db = new pg.Client(conf) db.connect() await loadDdl(db); return db; @@ -32,6 +39,15 @@ async function shutdownDb(db) { await db.end(); } +async function lastSequenceOf(db, entity, entityKey) { + // Create the sql to get the last sequence number for the entity + const lastSequence = 'SELECT sequence FROM events WHERE entity = $1 AND entitykey = $2 ORDER BY sequence DESC LIMIT 2'; + const res = await db.query(lastSequence, [entity, entityKey]) + console.log(res.rows[0]) + return res; + // { name: 'brianc', email: 'brian.m.carlson@gmail.com' } +} + // use t.plan() for async testing too. test('setup', async setup => { const db = await initDb(); @@ -78,15 +94,13 @@ test('setup', async setup => { }); setup.test('insert events', t => { - const stmt = 'INSERT INTO events (entity, entityKey, event, data, eventId, commandId, previousId) VALUES ($1, $2, $3, $4, $5, $6, $7)'; + const stmt = 'INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ($1, $2, $3, $4, $5, $6)'; const thingKey = '1'; const homeTableKey = 'home'; const workTableKey = 'work'; const commandId1 = uuid.v4(); const commandId2 = uuid.v4(); - const thingEventId1 = uuid.v4(); - const thingEventId2 = uuid.v4(); const pingEventHomeId = uuid.v4(); const pingEventWorkId = uuid.v4(); @@ -96,27 +110,23 @@ test('setup', async setup => { t.test('cannot insert empty columns', async assert => { await assert.rejects( - () => db.query(stmt, [null, thingKey, thingCreatedEvent, data, thingEventId1, commandId1, null]), + () => db.query(stmt, [null, thingKey, thingCreatedEvent, data, commandId1, null]), /error: null value in column "entity" of relation "events" violates not-null constraint/, 'cannot insert null entity'); await assert.rejects( - () => db.query(stmt, [thingEntity, null, thingCreatedEvent, data, thingEventId1, commandId1, null]), + () => db.query(stmt, [thingEntity, null, thingCreatedEvent, data, commandId1, null]), /error: null value in column "entitykey" of relation "events" violates not-null constraint/, 'cannot insert null entity key'); await assert.rejects( - () => db.query(stmt, [thingEntity, thingKey, null, data, thingEventId1, commandId1, null]), + () => db.query(stmt, [thingEntity, thingKey, null, data, commandId1, null]), /error: null value in column "event" of relation "events" violates not-null constraint/, 'cannot insert null event'); await assert.rejects( - () => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, null, thingEventId1, commandId1, null]), + () => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, null, commandId1, null]), /error: null value in column "data" of relation "events" violates not-null constraint/, 'cannot insert null event data'); await assert.rejects( - () => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, null, commandId1, null]), - /error: null value in column "eventid" of relation "events" violates not-null constraint/, - 'cannot insert null event id'); - await assert.rejects( - () => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, thingEventId1, null, null]), + () => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, null, null]), /error: null value in column "commandid" of relation "events" violates not-null constraint/, 'cannot insert null command'); assert.end(); @@ -124,11 +134,7 @@ test('setup', async setup => { t.test('UUIDs format for IDs', async assert => { await assert.rejects( - () => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, 'not-a-uuid', commandId1, null]), - /error: invalid input syntax for type uuid: "not-a-uuid"/, - 'eventId must be a UUID'); - await assert.rejects( - () => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, thingEventId1, 'not-a-uuid', null]), + () => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, 'not-a-uuid', null]), /error: invalid input syntax for type uuid: "not-a-uuid"/, 'commandId must be a UUID'); assert.end(); @@ -136,49 +142,82 @@ test('setup', async setup => { t.test('Cannot insert event from wrong entity', async assert => { await assert.rejects( - () => db.query(stmt, [tableTennisEntity, thingKey, thingCreatedEvent, data, thingEventId1, commandId1, null]), + () => db.query(stmt, [tableTennisEntity, thingKey, thingCreatedEvent, data, commandId1, null]), /error: insert or update on table "events" violates foreign key constraint "events_entity_event_fkey"/, 'cannot insert event in wrong entity'); assert.end(); }); t.test('insert events for an entity', async assert => { - await assert.doesNotReject(() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, thingEventId1, commandId1, null])); - await assert.doesNotReject(() => db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, thingEventId2, commandId2, thingEventId1])); - await assert.doesNotReject(() => db.query(stmt, [tableTennisEntity, homeTableKey, pingEvent, data, pingEventHomeId, uuid.v4(), null])); - await assert.doesNotReject(() => db.query(stmt, [tableTennisEntity, workTableKey, pingEvent, data, pingEventWorkId, uuid.v4(), null])); + await assert.doesNotReject(() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, commandId1, null])); + await assert.doesNotReject(() => { + let res = lastSequenceOf(db, thingEntity, thingKey) + return res.then(r => { + return db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, commandId2, r.rows[0].sequence]) + }); + }); + await assert.doesNotReject(() => db.query(stmt, [tableTennisEntity, homeTableKey, pingEvent, data, uuid.v4(), null])); + await assert.doesNotReject(() => db.query(stmt, [tableTennisEntity, workTableKey, pingEvent, data, uuid.v4(), null])); assert.end(); }); - t.test('previousId rules', async assert => { + t.test('previousSequence rules', async assert => { await assert.rejects( - () => db.query(stmt, [tableTennisEntity, homeTableKey, pingEvent, data, pingEventHomeId, uuid.v4(), null]), - /error: previousid can only be null for first entity event/, - 'cannot insert multiple null previousid for an entity'); + () => db.query(stmt, [tableTennisEntity, homeTableKey, pingEvent, data, uuid.v4(), null]), + /error: previousSequence can only be null for first entity event/, + 'cannot insert multiple null previousSequence for an entity'); + await assert.rejects( + () => { + let res = lastSequenceOf(db, tableTennisEntity, homeTableKey) + return res.then(r => { + return db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), r.rows[0].sequence]) + }); + }, + /error: previousSequence must be the last entry of the event stream for the same entity/, + 'previousSequence must be in same entity 1'); await assert.rejects( - () => db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), uuid.v4(), pingEventHomeId]), - /error: previousid must be in the same entity/, - 'previousid must be in same entity'); + () => { + let res = lastSequenceOf(db, thingEntity, thingKey) + return res.then(r => { + return db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), r.rows[0].sequence]) + }); + }, + /error: previousSequence must be the last entry of the event stream for the same entity/, + 'previousSequence must be in same entity 2'); + await assert.rejects( + () => db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), 3]), + /error: previousSequence must be the last entry of the event stream for the same entity/, + 'previousSequence must be in same entity 3'); + await assert.doesNotReject( + () => { + let res = lastSequenceOf(db, tableTennisEntity, workTableKey) + return res.then(r => { + return db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), r.rows[0].sequence]) + }); + }), assert.end(); }); t.test('Cannot insert duplicates', async assert => { await assert.rejects( - () => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, thingEventId2, commandId2, thingEventId1]), - /error: duplicate key value violates unique constraint "events_eventid_key"/, - 'cannot insert complete duplicate event'); - await assert.rejects( - () => db.query(stmt, [tableTennisEntity, homeTableKey, pongEvent, data, pingEventHomeId, uuid.v4(), pingEventHomeId]), - /error: duplicate key value violates unique constraint "events_eventid_key"/, - 'cannot insert different event for same id'); - await assert.rejects( - () => db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, uuid.v4(), commandId1, thingEventId2]), + () => { + let res = lastSequenceOf(db, thingEntity, thingKey) + return res.then(r => { + return db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, commandId2, r.rows[0].sequence]) + }); + }, /error: duplicate key value violates unique constraint "events_commandid_key"/, 'cannot insert different event for same command'); - await assert.rejects( - () => db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, uuid.v4(), uuid.v4(), thingEventId1]), - /error: duplicate key value violates unique constraint "events_previousid_key"/, - 'cannot insert different event for same previous'); + await assert.rejects( + () => { + let res = lastSequenceOf(db, thingEntity, thingKey) + return res.then(async r => { + let previousPreviousId = r.rows[1].sequence; + return db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, uuid.v4(), previousPreviousId]) + }); + }, + /error: previousSequence must be the last entry of the event stream for the same entity/, + 'cannot insert different event for same previous'); assert.end(); }); }); diff --git a/test.js b/test.js new file mode 100644 index 0000000..1d90070 --- /dev/null +++ b/test.js @@ -0,0 +1,44 @@ +const tape = require('tape') +const _test = require('tape-promise').default; +const test = _test(tape) +const fs = require('fs'); +const pg = require('pg'); + + + + +async function initDb() { + const conf = { + user: 'admin', + password: 'admin', + host: 'localhost', + database: 'eventstoretest', + port: 5432 + } + const db = new pg.Client(conf) + db.connect() + await loadDdl(db); + return db; +} + +async function shutdownDb(db) { + await db.end(); + } + + +const db = await initDb(); + +const text = 'INSERT INTO users(name, email) VALUES($1, $2) RETURNING *' +const values = ['brianc', 'brian.m.carlson@gmail.com'] + + +try { + const res = await client.query(text, values) + console.log(res.rows[0]) + // { name: 'brianc', email: 'brian.m.carlson@gmail.com' } + } catch (err) { + console.log(err.stack) + } + + +await shutdownDb(db); \ No newline at end of file