Skip to content

Update PostgreSQL to use the sequence number to check previous events… #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# SQL Event Store

Demonstration of a SQL event store with deduplication and guaranteed event ordering. The database rules are intended to prevent incorrect information from entering into an event stream. You are assumed to have familiarity with [event sourcing](https://martinfowler.com/eaaDev/EventSourcing.html).

This project uses a node test suite and SQLite to ensure the DDL complies with the design requirements. This SQLite event store can be used in highly-constrained environments that require an embedded event store, like a mobile device or an IoT system.
Expand All @@ -11,7 +12,7 @@ The [Postgres version](./postgres-event-store.ddl) of SQL event store has the sa

The postgres version can be tested with the [test-postgres.js]() script. Run this file instead of `test-sqlite.js`. It will connect to the postgres server defined in the environment variables, according to [node-postgres](https://node-postgres.com/features/connecting).

### Running
### Running SQLite Tests

One must have Node and NPM installed (Node 16 is what I used) and then:

Expand All @@ -27,6 +28,30 @@ Once it has finished installing the dependencies, run the [tests](test-sqlite.js

The test uses [sql.js](https://github.com/kripken/sql.js), the pure Javascript port of SQLite for reliable compilation and test execution. The test will dump the test database to `test-event-store.sqlite` for your examination.

### Running PostgreSQL Tests


One must have Node and NPM installed (Node 16 is what I used) and then:

```bash
> npm install
```

Start a Postgres server and create a database called `eventstoretest`.
You can run the docker-compose file in this project to start a Postgres server.

```bash
> docker-compose up -d
```

Once the server is running, run the [tests](test-postgres.js) with:

```bash
> node test-postgres.js
```



### Conceptual Model

An Event is an unalterable statement of fact that has occurred in the past. It has a name, like `food-eaten`, and it is scoped to an [Entity](https://en.wikiquote.org/wiki/Entity), or an identifiable existence in the world. Entities are individually identified by business-relevant keys that uniquely identify one entity from another.
Expand All @@ -51,7 +76,7 @@ This event store consists of two tables [as described in the DDL](./sqlite-event
#### `entity_events` Table

| Column | Notes |
|----------|-----------------------|
| -------- | --------------------- |
| `entity` | The entity name. |
| `event` | An entity event name. |

Expand All @@ -60,7 +85,7 @@ The `entity_events` table controls the entity and event names that can be used i
#### `events` Table

| Column | Notes |
|--------------|-------------------------------------------------------------------------------------------------------------------------------------|
| ------------ | ----------------------------------------------------------------------------------------------------------------------------------- |
| `entity` | The entity name. Part of a composite foreign key to `entity_events`. |
| `entityKey` | The business identifier for the entity. |
| `event` | The event name. Part of a composite foreign key to `entity_events`. |
Expand Down
18 changes: 18 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Author: Fabien Sartor.
# Creation Date: 20/05/2022
# Copyright: Secheron SA

version: '2.4'


services:

postgres:
image: postgres:14.6
container_name: 'postgres'
ports:
- "5432:5432"
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: admin
POSTGRES_DB: eventstoretest
65 changes: 45 additions & 20 deletions postgres-event-store.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ CREATE TABLE events
entitykey TEXT NOT NULL,
event TEXT NOT NULL,
data JSONB NOT NULL,
eventid UUID NOT NULL UNIQUE,
commandid UUID NOT NULL UNIQUE,
-- previous event uuid; null for first event; null does not trigger UNIQUE constraint
previousid UUID UNIQUE,
previousSequence BIGINT UNIQUE,
timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
-- ordering sequence
sequence BIGSERIAL PRIMARY KEY, -- sequence for all events in all entities
Expand All @@ -51,17 +50,17 @@ CREATE OR REPLACE RULE ignore_update_events AS ON UPDATE TO events
DO INSTEAD NOTHING;


-- Can only use null previousId for first event in an entity
-- Can only use null previousSequence for first event in an entity
CREATE OR REPLACE FUNCTION check_first_event_for_entity() RETURNS trigger AS
$$
BEGIN
IF (NEW.previousid IS NULL
IF (NEW.previousSequence IS NULL
AND EXISTS (SELECT 1
FROM events
WHERE NEW.entitykey = entitykey
AND NEW.entity = entity))
THEN
RAISE EXCEPTION 'previousid can only be null for first entity event';
RAISE EXCEPTION 'previousSequence can only be null for first entity event';
END IF;
RETURN NEW;
END;
Expand All @@ -77,27 +76,53 @@ CREATE TRIGGER first_event_for_entity



-- previousId must be in the same entity as the event
CREATE OR REPLACE FUNCTION check_previousid_in_same_entity() RETURNS trigger AS
-- previousSequence must be in the same entity as the event
CREATE OR REPLACE FUNCTION check_previousSequence_in_same_entity()
RETURNS trigger AS
$$
BEGIN
IF (NEW.previousid IS NOT NULL
AND NOT EXISTS (SELECT 1
FROM events
WHERE NEW.previousid = eventid
AND NEW.entitykey = entitykey
AND NEW.entity = entity))
IF (NEW.previousSequence IS NOT NULL
AND NEW.previousSequence != (
SELECT sequence FROM events
WHERE NEW.entitykey = entitykey
AND NEW.entity = entity
ORDER BY sequence DESC
LIMIT 1
)
)
THEN
RAISE EXCEPTION 'previousid must be in the same entity';
END IF;
RETURN NEW;
RAISE EXCEPTION 'previousSequence must be the last entry of the event stream for the same entity';
END IF;
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS previousid_in_same_entity ON events;
CREATE TRIGGER previousid_in_same_entity
DROP TRIGGER IF EXISTS previousSequence_in_same_entity ON events;
CREATE TRIGGER previousSequence_in_same_entity
BEFORE INSERT
ON events
FOR EACH ROW
EXECUTE FUNCTION check_previousid_in_same_entity();
EXECUTE FUNCTION check_previousSequence_in_same_entity();




truncate events;
truncate entity_events cascade;
ALTER SEQUENCE events_sequence_seq RESTART WITH 1;

-- INSERT INTO entity_events (entity, event) VALUES ('thing', 'thing-created');
-- INSERT INTO entity_events (entity, event) VALUES ('thing', 'thing-deleted');

-- INSERT INTO entity_events (entity, event) VALUES ('table-tennis', 'ball-pinged');
-- INSERT INTO entity_events (entity, event) VALUES ('table-tennis', 'ball-ponged');

-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'home', 'ball-pinged', '{}', '00000000-0000-0000-0000-000000000000', null);
-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'home', 'ball-ponged', '{}', '00000000-0000-0000-0000-000000000001', 1);
-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'home', 'ball-ponged', '{}', '00000000-0000-0000-0000-000000000002', 2);

-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'work', 'ball-pinged', '{}', '00000000-0000-0000-0000-000000000003', null);
-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'work', 'ball-ponged', '{}', '00000000-0000-0000-0000-000000000004', 4);
-- INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ('table-tennis', 'work', 'ball-ponged', '{}', '00000000-0000-0000-0000-000000000005', 5);

125 changes: 82 additions & 43 deletions test-postgres.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ const pingEvent = 'ball-pinged';
const pongEvent = 'ball-ponged';

async function initDb() {
const db = new pg.Client()
const conf = {
user: 'admin',
password: 'admin',
host: 'localhost',
database: 'eventstoretest',
port: 5432
}
const db = new pg.Client(conf)
db.connect()
await loadDdl(db);
return db;
Expand All @@ -32,6 +39,15 @@ async function shutdownDb(db) {
await db.end();
}

async function lastSequenceOf(db, entity, entityKey) {
// Create the sql to get the last sequence number for the entity
const lastSequence = 'SELECT sequence FROM events WHERE entity = $1 AND entitykey = $2 ORDER BY sequence DESC LIMIT 2';
const res = await db.query(lastSequence, [entity, entityKey])
console.log(res.rows[0])
return res;
// { name: 'brianc', email: 'brian.m.carlson@gmail.com' }
}

// use t.plan() for async testing too.
test('setup', async setup => {
const db = await initDb();
Expand Down Expand Up @@ -78,15 +94,13 @@ test('setup', async setup => {
});

setup.test('insert events', t => {
const stmt = 'INSERT INTO events (entity, entityKey, event, data, eventId, commandId, previousId) VALUES ($1, $2, $3, $4, $5, $6, $7)';
const stmt = 'INSERT INTO events (entity, entityKey, event, data, commandId, previousSequence) VALUES ($1, $2, $3, $4, $5, $6)';
const thingKey = '1';
const homeTableKey = 'home';
const workTableKey = 'work';

const commandId1 = uuid.v4();
const commandId2 = uuid.v4();
const thingEventId1 = uuid.v4();
const thingEventId2 = uuid.v4();

const pingEventHomeId = uuid.v4();
const pingEventWorkId = uuid.v4();
Expand All @@ -96,89 +110,114 @@ test('setup', async setup => {

t.test('cannot insert empty columns', async assert => {
await assert.rejects(
() => db.query(stmt, [null, thingKey, thingCreatedEvent, data, thingEventId1, commandId1, null]),
() => db.query(stmt, [null, thingKey, thingCreatedEvent, data, commandId1, null]),
/error: null value in column "entity" of relation "events" violates not-null constraint/,
'cannot insert null entity');
await assert.rejects(
() => db.query(stmt, [thingEntity, null, thingCreatedEvent, data, thingEventId1, commandId1, null]),
() => db.query(stmt, [thingEntity, null, thingCreatedEvent, data, commandId1, null]),
/error: null value in column "entitykey" of relation "events" violates not-null constraint/,
'cannot insert null entity key');
await assert.rejects(
() => db.query(stmt, [thingEntity, thingKey, null, data, thingEventId1, commandId1, null]),
() => db.query(stmt, [thingEntity, thingKey, null, data, commandId1, null]),
/error: null value in column "event" of relation "events" violates not-null constraint/,
'cannot insert null event');
await assert.rejects(
() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, null, thingEventId1, commandId1, null]),
() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, null, commandId1, null]),
/error: null value in column "data" of relation "events" violates not-null constraint/,
'cannot insert null event data');
await assert.rejects(
() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, null, commandId1, null]),
/error: null value in column "eventid" of relation "events" violates not-null constraint/,
'cannot insert null event id');
await assert.rejects(
() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, thingEventId1, null, null]),
() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, null, null]),
/error: null value in column "commandid" of relation "events" violates not-null constraint/,
'cannot insert null command');
assert.end();
});

t.test('UUIDs format for IDs', async assert => {
await assert.rejects(
() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, 'not-a-uuid', commandId1, null]),
/error: invalid input syntax for type uuid: "not-a-uuid"/,
'eventId must be a UUID');
await assert.rejects(
() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, thingEventId1, 'not-a-uuid', null]),
() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, 'not-a-uuid', null]),
/error: invalid input syntax for type uuid: "not-a-uuid"/,
'commandId must be a UUID');
assert.end();
});

t.test('Cannot insert event from wrong entity', async assert => {
await assert.rejects(
() => db.query(stmt, [tableTennisEntity, thingKey, thingCreatedEvent, data, thingEventId1, commandId1, null]),
() => db.query(stmt, [tableTennisEntity, thingKey, thingCreatedEvent, data, commandId1, null]),
/error: insert or update on table "events" violates foreign key constraint "events_entity_event_fkey"/,
'cannot insert event in wrong entity');
assert.end();
});

t.test('insert events for an entity', async assert => {
await assert.doesNotReject(() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, thingEventId1, commandId1, null]));
await assert.doesNotReject(() => db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, thingEventId2, commandId2, thingEventId1]));
await assert.doesNotReject(() => db.query(stmt, [tableTennisEntity, homeTableKey, pingEvent, data, pingEventHomeId, uuid.v4(), null]));
await assert.doesNotReject(() => db.query(stmt, [tableTennisEntity, workTableKey, pingEvent, data, pingEventWorkId, uuid.v4(), null]));
await assert.doesNotReject(() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, commandId1, null]));
await assert.doesNotReject(() => {
let res = lastSequenceOf(db, thingEntity, thingKey)
return res.then(r => {
return db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, commandId2, r.rows[0].sequence])
});
});
await assert.doesNotReject(() => db.query(stmt, [tableTennisEntity, homeTableKey, pingEvent, data, uuid.v4(), null]));
await assert.doesNotReject(() => db.query(stmt, [tableTennisEntity, workTableKey, pingEvent, data, uuid.v4(), null]));
assert.end();
});

t.test('previousId rules', async assert => {
t.test('previousSequence rules', async assert => {
await assert.rejects(
() => db.query(stmt, [tableTennisEntity, homeTableKey, pingEvent, data, pingEventHomeId, uuid.v4(), null]),
/error: previousid can only be null for first entity event/,
'cannot insert multiple null previousid for an entity');
() => db.query(stmt, [tableTennisEntity, homeTableKey, pingEvent, data, uuid.v4(), null]),
/error: previousSequence can only be null for first entity event/,
'cannot insert multiple null previousSequence for an entity');
await assert.rejects(
() => {
let res = lastSequenceOf(db, tableTennisEntity, homeTableKey)
return res.then(r => {
return db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), r.rows[0].sequence])
});
},
/error: previousSequence must be the last entry of the event stream for the same entity/,
'previousSequence must be in same entity 1');
await assert.rejects(
() => db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), uuid.v4(), pingEventHomeId]),
/error: previousid must be in the same entity/,
'previousid must be in same entity');
() => {
let res = lastSequenceOf(db, thingEntity, thingKey)
return res.then(r => {
return db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), r.rows[0].sequence])
});
},
/error: previousSequence must be the last entry of the event stream for the same entity/,
'previousSequence must be in same entity 2');
await assert.rejects(
() => db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), 3]),
/error: previousSequence must be the last entry of the event stream for the same entity/,
'previousSequence must be in same entity 3');
await assert.doesNotReject(
() => {
let res = lastSequenceOf(db, tableTennisEntity, workTableKey)
return res.then(r => {
return db.query(stmt, [tableTennisEntity, workTableKey, pongEvent, data, uuid.v4(), r.rows[0].sequence])
});
}),
assert.end();
});

t.test('Cannot insert duplicates', async assert => {
await assert.rejects(
() => db.query(stmt, [thingEntity, thingKey, thingCreatedEvent, data, thingEventId2, commandId2, thingEventId1]),
/error: duplicate key value violates unique constraint "events_eventid_key"/,
'cannot insert complete duplicate event');
await assert.rejects(
() => db.query(stmt, [tableTennisEntity, homeTableKey, pongEvent, data, pingEventHomeId, uuid.v4(), pingEventHomeId]),
/error: duplicate key value violates unique constraint "events_eventid_key"/,
'cannot insert different event for same id');
await assert.rejects(
() => db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, uuid.v4(), commandId1, thingEventId2]),
() => {
let res = lastSequenceOf(db, thingEntity, thingKey)
return res.then(r => {
return db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, commandId2, r.rows[0].sequence])
});
},
/error: duplicate key value violates unique constraint "events_commandid_key"/,
'cannot insert different event for same command');
await assert.rejects(
() => db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, uuid.v4(), uuid.v4(), thingEventId1]),
/error: duplicate key value violates unique constraint "events_previousid_key"/,
'cannot insert different event for same previous');
await assert.rejects(
() => {
let res = lastSequenceOf(db, thingEntity, thingKey)
return res.then(async r => {
let previousPreviousId = r.rows[1].sequence;
return db.query(stmt, [thingEntity, thingKey, thingDeletedEvent, data, uuid.v4(), previousPreviousId])
});
},
/error: previousSequence must be the last entry of the event stream for the same entity/,
'cannot insert different event for same previous');
assert.end();
});
});
Expand Down
Loading