From 9322f73ca5b3cfb54d74b1c975ead6528016113d Mon Sep 17 00:00:00 2001 From: Krzysztof Reczek Date: Sat, 28 Jan 2023 17:32:24 +0100 Subject: [PATCH 1/7] [POC] Add snapshots --- _examples/counter/counter.go | 61 ++++++++ _examples/counter/counter_test.go | 27 ++++ _examples/counter/events.go | 27 ++++ _examples/counter/go.mod | 17 +++ _examples/counter/go.sum | 19 +++ _examples/counter/snapshot.go | 16 ++ _examples/counter/storage/eventstore_test.go | 67 ++++++++ _examples/postcard/go.mod | 2 +- _examples/postcard/go.sum | 2 - _examples/postcard/postcard.go | 16 +- _examples/postcard/storage/eventstore_test.go | 2 +- eventstore/inmemory.go | 144 +++++++++++++++--- snapshot.go | 16 ++ 13 files changed, 383 insertions(+), 33 deletions(-) create mode 100644 _examples/counter/counter.go create mode 100644 _examples/counter/counter_test.go create mode 100644 _examples/counter/events.go create mode 100644 _examples/counter/go.mod create mode 100644 _examples/counter/go.sum create mode 100644 _examples/counter/snapshot.go create mode 100644 _examples/counter/storage/eventstore_test.go create mode 100644 snapshot.go diff --git a/_examples/counter/counter.go b/_examples/counter/counter.go new file mode 100644 index 0000000..d3e956a --- /dev/null +++ b/_examples/counter/counter.go @@ -0,0 +1,61 @@ +package counter + +import ( + "github.com/ThreeDotsLabs/esja" +) + +type Counter struct { + stream *esja.Stream[Counter] + + id string + currentValue int +} + +func NewCounter(id string) (*Counter, error) { + s, err := esja.NewStreamWithType[Counter](id, "Counter") + if err != nil { + return nil, err + } + + p := &Counter{ + stream: s, + } + + err = p.stream.Record(p, Created{ + ID: id, + }) + if err != nil { + return nil, err + } + + return p, nil +} + +func (c Counter) Stream() *esja.Stream[Counter] { + return c.stream +} + +func (c Counter) NewWithStream(s *esja.Stream[Counter]) *Counter { + return &Counter{stream: s} +} + +func (c Counter) Snapshot() esja.Snapshot[Counter] { + return Snapshot{ + ID: c.id, + CurrentValue: c.currentValue, + } +} + +func (c Counter) ID() string { + return c.id +} + +func (c *Counter) CurrentValue() int { + return c.currentValue +} + +func (c *Counter) IncrementBy(v int) error { + return c.stream.Record(c, IncrementedBy{ + Value: v, + }) +} diff --git a/_examples/counter/counter_test.go b/_examples/counter/counter_test.go new file mode 100644 index 0000000..4026aea --- /dev/null +++ b/_examples/counter/counter_test.go @@ -0,0 +1,27 @@ +package counter_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + counter "postcard" +) + +func TestCounter(t *testing.T) { + c, err := counter.NewCounter("ID") + require.NoError(t, err) + + require.Equal(t, 0, c.CurrentValue()) + + err = c.IncrementBy(10) + require.NoError(t, err) + + require.Equal(t, 10, c.CurrentValue()) + + err = c.IncrementBy(20) + require.NoError(t, err) + err = c.IncrementBy(10) + require.NoError(t, err) + + require.Equal(t, 40, c.CurrentValue()) +} diff --git a/_examples/counter/events.go b/_examples/counter/events.go new file mode 100644 index 0000000..0788406 --- /dev/null +++ b/_examples/counter/events.go @@ -0,0 +1,27 @@ +package counter + +type Created struct { + ID string +} + +func (Created) EventName() string { + return "Created_v1" +} + +func (e Created) ApplyTo(c *Counter) error { + c.id = e.ID + return nil +} + +type IncrementedBy struct { + Value int +} + +func (IncrementedBy) EventName() string { + return "Created_v1" +} + +func (e IncrementedBy) ApplyTo(c *Counter) error { + c.currentValue += e.Value + return nil +} diff --git a/_examples/counter/go.mod b/_examples/counter/go.mod new file mode 100644 index 0000000..227e4d8 --- /dev/null +++ b/_examples/counter/go.mod @@ -0,0 +1,17 @@ +module counter + +go 1.18 + +require ( + github.com/ThreeDotsLabs/esja v0.0.0-20221208191400-8fbb493947e7 + github.com/google/uuid v1.3.0 + github.com/stretchr/testify v1.8.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/ThreeDotsLabs/esja => ../../ diff --git a/_examples/counter/go.sum b/_examples/counter/go.sum new file mode 100644 index 0000000..629bca6 --- /dev/null +++ b/_examples/counter/go.sum @@ -0,0 +1,19 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/_examples/counter/snapshot.go b/_examples/counter/snapshot.go new file mode 100644 index 0000000..52a20e2 --- /dev/null +++ b/_examples/counter/snapshot.go @@ -0,0 +1,16 @@ +package counter + +type Snapshot struct { + ID string + CurrentValue int +} + +func (s Snapshot) EventName() string { + return "CounterSnapshot_v1" +} + +func (s Snapshot) ApplyTo(c *Counter) error { + c.id = s.ID + c.currentValue = s.CurrentValue + return nil +} diff --git a/_examples/counter/storage/eventstore_test.go b/_examples/counter/storage/eventstore_test.go new file mode 100644 index 0000000..f536fa0 --- /dev/null +++ b/_examples/counter/storage/eventstore_test.go @@ -0,0 +1,67 @@ +package storage_test + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ThreeDotsLabs/esja/eventstore" + + "counter" +) + +func TestCounter_Repositories(t *testing.T) { + testCases := []struct { + name string + repository eventstore.EventStore[counter.Counter] + }{ + { + name: "in_memory", + repository: eventstore.NewInMemoryStore[counter.Counter](eventstore.InMemoryStoreConfig{MakeSnapshotEveryNVersions: 100}), + }, + } + + ctx := context.Background() + for i := range testCases { + tc := testCases[i] + t.Run(tc.name, func(t *testing.T) { + id := uuid.NewString() + + c, err := counter.NewCounter(id) + assert.NoError(t, err) + assert.Equal(t, id, c.ID()) + assert.Equal(t, 0, c.CurrentValue()) + + _, err = tc.repository.Load(ctx, id) + assert.ErrorIs(t, err, eventstore.ErrEntityNotFound) + + err = tc.repository.Save(ctx, c) + require.NoError(t, err) + + fromRepo, err := tc.repository.Load(ctx, id) + assert.NoError(t, err) + assert.Equal(t, c.ID(), fromRepo.ID()) + assert.Equal(t, c.CurrentValue(), fromRepo.CurrentValue()) + + incrementFor := 300 + for i := 0; i < incrementFor; i++ { + c, err = tc.repository.Load(ctx, id) + require.NoError(t, err) + + err = c.IncrementBy(1) + require.NoError(t, err) + + err = tc.repository.Save(ctx, c) + require.NoError(t, err) + } + + fromRepo, err = tc.repository.Load(ctx, id) + assert.NoError(t, err) + assert.Equal(t, c.ID(), fromRepo.ID()) + assert.Equal(t, incrementFor, fromRepo.CurrentValue()) + }) + } +} diff --git a/_examples/postcard/go.mod b/_examples/postcard/go.mod index 33f18a3..e861062 100644 --- a/_examples/postcard/go.mod +++ b/_examples/postcard/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( github.com/ThreeDotsLabs/esja v0.0.0-20221208191400-8fbb493947e7 + github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963 github.com/google/uuid v1.3.0 github.com/lib/pq v1.10.6 github.com/mattn/go-sqlite3 v1.14.16 @@ -11,7 +12,6 @@ require ( ) require ( - github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/_examples/postcard/go.sum b/_examples/postcard/go.sum index acabb5d..8b99bf7 100644 --- a/_examples/postcard/go.sum +++ b/_examples/postcard/go.sum @@ -1,5 +1,3 @@ -github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d h1:bZBc4vDne17OYqoeSceaq5eaZM6vOUMVaJIBy+dNRH4= -github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d/go.mod h1:wu5cEZEjFUIXR9hdniDvGbbZARrYHTRi6G2bNaSCC/E= github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963 h1:4EQlsCpfwxjn5ijR8fdL6ap1q04guWUCHgnZ+jPdEjY= github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963/go.mod h1:wu5cEZEjFUIXR9hdniDvGbbZARrYHTRi6G2bNaSCC/E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/_examples/postcard/postcard.go b/_examples/postcard/postcard.go index fbe2e0c..dca6258 100644 --- a/_examples/postcard/postcard.go +++ b/_examples/postcard/postcard.go @@ -44,14 +44,6 @@ func NewPostcard(id string) (*Postcard, error) { return p, nil } -func (p *Postcard) Send() error { - if p.sent { - return fmt.Errorf("postcard already sent") - } - - return p.stream.Record(p, Sent{}) -} - func (p Postcard) Stream() *esja.Stream[Postcard] { return p.stream } @@ -80,6 +72,14 @@ func (p Postcard) Sent() bool { return p.sent } +func (p *Postcard) Send() error { + if p.sent { + return fmt.Errorf("postcard already sent") + } + + return p.stream.Record(p, Sent{}) +} + func (p *Postcard) Write(content string) error { return p.stream.Record(p, Written{ Content: content, diff --git a/_examples/postcard/storage/eventstore_test.go b/_examples/postcard/storage/eventstore_test.go index f09e04e..15c805f 100644 --- a/_examples/postcard/storage/eventstore_test.go +++ b/_examples/postcard/storage/eventstore_test.go @@ -42,7 +42,7 @@ func TestPostcard_Repositories(t *testing.T) { }{ { name: "in_memory", - repository: eventstore.NewInMemoryStore[postcard.Postcard](), + repository: eventstore.NewInMemoryStore[postcard.Postcard](eventstore.InMemoryStoreConfig{}), }, { name: "postgres_simple", diff --git a/eventstore/inmemory.go b/eventstore/inmemory.go index a35c05f..dfe19b6 100644 --- a/eventstore/inmemory.go +++ b/eventstore/inmemory.go @@ -8,15 +8,26 @@ import ( "github.com/ThreeDotsLabs/esja" ) +type InMemoryStoreConfig struct { + // MakeSnapshotEveryNVersions configures a frequency of snapshot creation + // Once the current event version and last snapshot version difference exceeds the value, + // a new snapshot will be created with a version of the current event version. + MakeSnapshotEveryNVersions int +} + type InMemoryStore[T esja.Entity[T]] struct { - lock sync.RWMutex - events map[string][]esja.VersionedEvent[T] + lock sync.RWMutex + events map[string][]esja.VersionedEvent[T] + snapshots map[string][]esja.VersionedEvent[T] + config InMemoryStoreConfig } -func NewInMemoryStore[T esja.Entity[T]]() *InMemoryStore[T] { +func NewInMemoryStore[T esja.Entity[T]](config InMemoryStoreConfig) *InMemoryStore[T] { return &InMemoryStore[T]{ - lock: sync.RWMutex{}, - events: map[string][]esja.VersionedEvent[T]{}, + lock: sync.RWMutex{}, + events: map[string][]esja.VersionedEvent[T]{}, + snapshots: map[string][]esja.VersionedEvent[T]{}, + config: config, } } @@ -24,12 +35,27 @@ func (i *InMemoryStore[T]) Load(_ context.Context, id string) (*T, error) { i.lock.RLock() defer i.lock.RUnlock() + // In the other databases this could be optimized + // as we do not need to load events of version lower than the lastSnapshot version. events, ok := i.events[id] if !ok { return nil, ErrEntityNotFound } - return esja.NewEntity(id, events) + var eventsToApply []esja.VersionedEvent[T] + + s, found := i.loadLastSnapshot(id) + if found { + eventsToApply = append(eventsToApply, s) + } + + for _, e := range events { + if e.StreamVersion > s.StreamVersion { + eventsToApply = append(eventsToApply, e) + } + } + + return esja.NewEntity(id, eventsToApply) } func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { @@ -40,25 +66,101 @@ func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { return errors.New("target to save must not be nil") } - stm := *t + entity := *t + currentVersion, err := i.storeEntityEvents(entity) + if err != nil { + return err + } + + entityWithSnapshots, ok := supportsSnapshots(t) + if !ok { + return nil + } + + err = i.storeEntitySnapshot(entityWithSnapshots, currentVersion) + if err != nil { + return err + } + + return nil +} + +func (i *InMemoryStore[T]) loadLastSnapshot(id string) (esja.VersionedEvent[T], bool) { + snapshots, found := i.snapshots[id] + var lastSnapshot esja.VersionedEvent[T] + for _, s := range snapshots { + if s.StreamVersion >= lastSnapshot.StreamVersion { + lastSnapshot = s + } + } + return lastSnapshot, found +} - events := stm.Stream().PopEvents() +func (i *InMemoryStore[T]) storeEntityEvents(entity T) (int, error) { + events := entity.Stream().PopEvents() if len(events) == 0 { - return errors.New("no events to save") - } - - if priorEvents, ok := i.events[stm.Stream().ID()]; !ok { - i.events[stm.Stream().ID()] = events - } else { - for _, event := range events { - if len(priorEvents) > 0 { - if priorEvents[len(priorEvents)-1].StreamVersion >= event.StreamVersion { - return errors.New("stream version duplicate") - } - } - i.events[stm.Stream().ID()] = append(i.events[stm.Stream().ID()], event) + return 0, errors.New("no events to save") + } + + _, ok := i.events[entity.Stream().ID()] + if !ok { + i.events[entity.Stream().ID()] = make([]esja.VersionedEvent[T], 0) + } + + currentVersion := 0 + for _, e := range events { + if e.StreamVersion > currentVersion { + currentVersion = e.StreamVersion } + i.events[entity.Stream().ID()] = append( + i.events[entity.Stream().ID()], + e, + ) + } + + return currentVersion, nil +} + +func (i *InMemoryStore[T]) storeEntitySnapshot( + entity esja.EntityWithSnapshots[T], + currentVersion int, +) error { + if i.config.MakeSnapshotEveryNVersions <= 0 { + return nil + } + + lastSnapshot, found := i.loadLastSnapshot(entity.Stream().ID()) + lastSnapshotVersion := 0 + if found { + lastSnapshotVersion = lastSnapshot.StreamVersion } + if currentVersion-lastSnapshotVersion < i.config.MakeSnapshotEveryNVersions { + return nil + } + + snapshot := entity.Snapshot() + snapshotVersioned := esja.VersionedEvent[T]{ + Event: esja.Event[T](snapshot), + StreamVersion: currentVersion, + } + + _, ok := i.snapshots[entity.Stream().ID()] + if !ok { + i.snapshots[entity.Stream().ID()] = make([]esja.VersionedEvent[T], 0) + } + + i.snapshots[entity.Stream().ID()] = append( + i.snapshots[entity.Stream().ID()], + snapshotVersioned, + ) + return nil } + +func supportsSnapshots[T esja.Entity[T]](t *T) (esja.EntityWithSnapshots[T], bool) { + var entity interface{} + entity = *t + entityWithSnapshots, ok := entity.(esja.EntityWithSnapshots[T]) + return entityWithSnapshots, ok +} diff --git a/snapshot.go b/snapshot.go new file mode 100644 index 0000000..bc26d5b --- /dev/null +++ b/snapshot.go @@ -0,0 +1,16 @@ +package esja + +// Snapshot is an Event which stores and applies the current state back to the Entity. +type Snapshot[T any] interface { + Event[T] +} + +// EntityWithSnapshots is an optional extension to the Entity interface. +// When implemented it informs that Entity supports snapshots +// and those should be created in the event store when applicable. +type EntityWithSnapshots[T any] interface { + Entity[T] + + // Snapshot returns a Snapshot representing current state of the Entity. + Snapshot() Snapshot[T] +} From 446038a15a3ae58bd7f2698d29ae2a9e9bba26d1 Mon Sep 17 00:00:00 2001 From: Krzysztof Reczek Date: Sat, 28 Jan 2023 17:42:35 +0100 Subject: [PATCH 2/7] [POC] Add snapshots --- snapshot.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/snapshot.go b/snapshot.go index bc26d5b..b2a3810 100644 --- a/snapshot.go +++ b/snapshot.go @@ -1,6 +1,6 @@ package esja -// Snapshot is an Event which stores and applies the current state back to the Entity. +// Snapshot is an Event that stores and applies the current state back to the Entity. type Snapshot[T any] interface { Event[T] } @@ -11,6 +11,6 @@ type Snapshot[T any] interface { type EntityWithSnapshots[T any] interface { Entity[T] - // Snapshot returns a Snapshot representing current state of the Entity. + // Snapshot returns a Snapshot representing current the state of the Entity. Snapshot() Snapshot[T] } From 6c69f4e629fde472b44e81a8d3ac905402818bf5 Mon Sep 17 00:00:00 2001 From: Krzysztof Reczek Date: Sat, 28 Jan 2023 17:54:50 +0100 Subject: [PATCH 3/7] [POC] Add snapshots --- _examples/postcard/storage/eventstore_test.go | 2 +- eventstore/inmemory.go | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/_examples/postcard/storage/eventstore_test.go b/_examples/postcard/storage/eventstore_test.go index 15c805f..c0d5293 100644 --- a/_examples/postcard/storage/eventstore_test.go +++ b/_examples/postcard/storage/eventstore_test.go @@ -12,11 +12,11 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "postcard/storage" "github.com/ThreeDotsLabs/esja/eventstore" "postcard" - "postcard/storage" ) var ( diff --git a/eventstore/inmemory.go b/eventstore/inmemory.go index dfe19b6..077c3ca 100644 --- a/eventstore/inmemory.go +++ b/eventstore/inmemory.go @@ -102,16 +102,27 @@ func (i *InMemoryStore[T]) storeEntityEvents(entity T) (int, error) { return 0, errors.New("no events to save") } - _, ok := i.events[entity.Stream().ID()] + priorEvents, ok := i.events[entity.Stream().ID()] if !ok { i.events[entity.Stream().ID()] = make([]esja.VersionedEvent[T], 0) } - currentVersion := 0 + lastVersion := 0 + for _, e := range priorEvents { + if e.StreamVersion > lastVersion { + lastVersion = e.StreamVersion + } + } + + currentVersion := lastVersion for _, e := range events { + if e.StreamVersion <= currentVersion { + return 0, errors.New("stream version duplicate") + } if e.StreamVersion > currentVersion { currentVersion = e.StreamVersion } + i.events[entity.Stream().ID()] = append( i.events[entity.Stream().ID()], e, From 78e5968873304489befd3dc0cfef826e1ba02f10 Mon Sep 17 00:00:00 2001 From: Krzysztof Reczek Date: Sun, 29 Jan 2023 23:56:20 +0100 Subject: [PATCH 4/7] Separate Snapshot from Entity --- _examples/counter/counter_test.go | 3 +- _examples/counter/snapshot.go | 2 +- applicable.go | 7 ++++ entity.go | 59 ++++++++++++++++++++++++++++--- event.go | 8 +++-- eventstore/inmemory.go | 20 +++++------ snapshot.go | 13 ++++++- stream.go | 16 --------- 8 files changed, 92 insertions(+), 36 deletions(-) create mode 100644 applicable.go diff --git a/_examples/counter/counter_test.go b/_examples/counter/counter_test.go index 4026aea..9f31863 100644 --- a/_examples/counter/counter_test.go +++ b/_examples/counter/counter_test.go @@ -4,7 +4,8 @@ import ( "testing" "github.com/stretchr/testify/require" - counter "postcard" + + "counter" ) func TestCounter(t *testing.T) { diff --git a/_examples/counter/snapshot.go b/_examples/counter/snapshot.go index 52a20e2..3da4e57 100644 --- a/_examples/counter/snapshot.go +++ b/_examples/counter/snapshot.go @@ -5,7 +5,7 @@ type Snapshot struct { CurrentValue int } -func (s Snapshot) EventName() string { +func (s Snapshot) SnapshotName() string { return "CounterSnapshot_v1" } diff --git a/applicable.go b/applicable.go new file mode 100644 index 0000000..ab449f4 --- /dev/null +++ b/applicable.go @@ -0,0 +1,7 @@ +package esja + +// Applicable defines a model that can be applied to an Entity/ +type Applicable[T any] interface { + // ApplyTo applies the event to the entity. + ApplyTo(*T) error +} diff --git a/entity.go b/entity.go index 6c69039..7f7b065 100644 --- a/entity.go +++ b/entity.go @@ -1,5 +1,7 @@ package esja +import "fmt" + // Entity represents the event-sourced type saved and loaded by the event store. // In DDD terms, it is the "aggregate root". // @@ -32,21 +34,70 @@ type Entity[T any] interface { NewWithStream(*Stream[T]) *T } +// NewEntityWithSnapshot instantiates a new T with the given snapshot and events applied to it. +// At the same time the entity's internal Stream is initialised, +// so it can record new upcoming stream. +func NewEntityWithSnapshot[T Entity[T]]( + id string, + snapshot VersionedSnapshot[T], + events []VersionedEvent[T], +) (*T, error) { + var t T + + stream, err := NewStream[T](id) + if err != nil { + return nil, err + } + + stream.queue = events + stream.version = snapshot.StreamVersion + if len(events) != 0 { + stream.version = events[len(events)-1].StreamVersion + } + + target := t.NewWithStream(stream) + + err = snapshot.ApplyTo(target) + if err != nil { + return nil, err + } + + events = stream.PopEvents() + for _, e := range events { + err := e.ApplyTo(target) + if err != nil { + return nil, err + } + } + + return target, nil +} + // NewEntity instantiates a new T with the given events applied to it. // At the same time the entity's internal Stream is initialised, // so it can record new upcoming stream. -func NewEntity[T Entity[T]](id string, eventsSlice []VersionedEvent[T]) (*T, error) { +func NewEntity[T Entity[T]]( + id string, + events []VersionedEvent[T], +) (*T, error) { + if len(events) == 0 { + return nil, fmt.Errorf("no stream to load") + } + var t T - stream, err := newStream(id, eventsSlice) + stream, err := NewStream[T](id) if err != nil { return nil, err } - eventsSlice = stream.PopEvents() + stream.queue = events + stream.version = events[len(events)-1].StreamVersion target := t.NewWithStream(stream) - for _, e := range eventsSlice { + + events = stream.PopEvents() + for _, e := range events { err := e.ApplyTo(target) if err != nil { return nil, err diff --git a/event.go b/event.go index 5d25d78..77de30f 100644 --- a/event.go +++ b/event.go @@ -1,6 +1,11 @@ package esja +// Event is a simple Entity event model type Event[T any] interface { + // Applicable interface requires that Event itself implements the logic + // how it is applied to the Entity. + Applicable[T] + // EventName should identify the event and the version of its schema. // // Example: @@ -9,9 +14,6 @@ type Event[T any] interface { // return "FooCreated_v1" // } EventName() string - - // ApplyTo applies the event to the entity. - ApplyTo(*T) error } // VersionedEvent is an event with a corresponding stream version. diff --git a/eventstore/inmemory.go b/eventstore/inmemory.go index 077c3ca..5c3efcf 100644 --- a/eventstore/inmemory.go +++ b/eventstore/inmemory.go @@ -18,7 +18,7 @@ type InMemoryStoreConfig struct { type InMemoryStore[T esja.Entity[T]] struct { lock sync.RWMutex events map[string][]esja.VersionedEvent[T] - snapshots map[string][]esja.VersionedEvent[T] + snapshots map[string][]esja.VersionedSnapshot[T] config InMemoryStoreConfig } @@ -26,7 +26,7 @@ func NewInMemoryStore[T esja.Entity[T]](config InMemoryStoreConfig) *InMemorySto return &InMemoryStore[T]{ lock: sync.RWMutex{}, events: map[string][]esja.VersionedEvent[T]{}, - snapshots: map[string][]esja.VersionedEvent[T]{}, + snapshots: map[string][]esja.VersionedSnapshot[T]{}, config: config, } } @@ -45,8 +45,8 @@ func (i *InMemoryStore[T]) Load(_ context.Context, id string) (*T, error) { var eventsToApply []esja.VersionedEvent[T] s, found := i.loadLastSnapshot(id) - if found { - eventsToApply = append(eventsToApply, s) + if !found { + return esja.NewEntity(id, events) } for _, e := range events { @@ -55,7 +55,7 @@ func (i *InMemoryStore[T]) Load(_ context.Context, id string) (*T, error) { } } - return esja.NewEntity(id, eventsToApply) + return esja.NewEntityWithSnapshot(id, s, eventsToApply) } func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { @@ -85,9 +85,9 @@ func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { return nil } -func (i *InMemoryStore[T]) loadLastSnapshot(id string) (esja.VersionedEvent[T], bool) { +func (i *InMemoryStore[T]) loadLastSnapshot(id string) (esja.VersionedSnapshot[T], bool) { snapshots, found := i.snapshots[id] - var lastSnapshot esja.VersionedEvent[T] + var lastSnapshot esja.VersionedSnapshot[T] for _, s := range snapshots { if s.StreamVersion >= lastSnapshot.StreamVersion { lastSnapshot = s @@ -151,14 +151,14 @@ func (i *InMemoryStore[T]) storeEntitySnapshot( } snapshot := entity.Snapshot() - snapshotVersioned := esja.VersionedEvent[T]{ - Event: esja.Event[T](snapshot), + snapshotVersioned := esja.VersionedSnapshot[T]{ + Snapshot: snapshot, StreamVersion: currentVersion, } _, ok := i.snapshots[entity.Stream().ID()] if !ok { - i.snapshots[entity.Stream().ID()] = make([]esja.VersionedEvent[T], 0) + i.snapshots[entity.Stream().ID()] = make([]esja.VersionedSnapshot[T], 0) } i.snapshots[entity.Stream().ID()] = append( diff --git a/snapshot.go b/snapshot.go index b2a3810..3375239 100644 --- a/snapshot.go +++ b/snapshot.go @@ -2,7 +2,18 @@ package esja // Snapshot is an Event that stores and applies the current state back to the Entity. type Snapshot[T any] interface { - Event[T] + // Applicable interface requires that each snapshot itself implements + // the logic how the snapshot data is applied back to the Entity. + Applicable[T] + + // SnapshotName should identify the snapshot and the version of its schema. + SnapshotName() string +} + +// VersionedSnapshot is a snapshot with a corresponding current stream version. +type VersionedSnapshot[T any] struct { + Snapshot[T] + StreamVersion int } // EntityWithSnapshots is an optional extension to the Entity interface. diff --git a/stream.go b/stream.go index 265d435..31ac097 100644 --- a/stream.go +++ b/stream.go @@ -2,7 +2,6 @@ package esja import ( "errors" - "fmt" ) // Stream represents a queue of events and basic stream properties. @@ -74,18 +73,3 @@ func (s *Stream[T]) PopEvents() []VersionedEvent[T] { func (s *Stream[T]) HasEvents() bool { return len(s.queue) > 0 } - -func newStream[T any](id string, events []VersionedEvent[T]) (*Stream[T], error) { - if len(events) == 0 { - return nil, fmt.Errorf("no stream to load") - } - - e, err := NewStream[T](id) - if err != nil { - return nil, err - } - e.version = events[len(events)-1].StreamVersion - e.queue = events - - return e, nil -} From 23a9edfbdd27ec79288fcacb24d291df1e4010da Mon Sep 17 00:00:00 2001 From: Krzysztof Reczek Date: Sun, 29 Jan 2023 23:57:22 +0100 Subject: [PATCH 5/7] Separate Snapshot from Entity --- applicable.go | 7 ------- event.go | 5 ++--- snapshot.go | 5 ++--- 3 files changed, 4 insertions(+), 13 deletions(-) delete mode 100644 applicable.go diff --git a/applicable.go b/applicable.go deleted file mode 100644 index ab449f4..0000000 --- a/applicable.go +++ /dev/null @@ -1,7 +0,0 @@ -package esja - -// Applicable defines a model that can be applied to an Entity/ -type Applicable[T any] interface { - // ApplyTo applies the event to the entity. - ApplyTo(*T) error -} diff --git a/event.go b/event.go index 77de30f..239dafc 100644 --- a/event.go +++ b/event.go @@ -2,9 +2,8 @@ package esja // Event is a simple Entity event model type Event[T any] interface { - // Applicable interface requires that Event itself implements the logic - // how it is applied to the Entity. - Applicable[T] + // ApplyTo applies the event to the entity. + ApplyTo(*T) error // EventName should identify the event and the version of its schema. // diff --git a/snapshot.go b/snapshot.go index 3375239..62a8cb1 100644 --- a/snapshot.go +++ b/snapshot.go @@ -2,9 +2,8 @@ package esja // Snapshot is an Event that stores and applies the current state back to the Entity. type Snapshot[T any] interface { - // Applicable interface requires that each snapshot itself implements - // the logic how the snapshot data is applied back to the Entity. - Applicable[T] + // ApplyTo applies the snapshot to the entity. + ApplyTo(*T) error // SnapshotName should identify the snapshot and the version of its schema. SnapshotName() string From 5dd30e7e1632a39ddb321ccc5e979903f232011c Mon Sep 17 00:00:00 2001 From: Krzysztof Reczek Date: Mon, 30 Jan 2023 00:03:11 +0100 Subject: [PATCH 6/7] Separate Snapshot from Entity --- _examples/counter/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_examples/counter/events.go b/_examples/counter/events.go index 0788406..84e249e 100644 --- a/_examples/counter/events.go +++ b/_examples/counter/events.go @@ -18,7 +18,7 @@ type IncrementedBy struct { } func (IncrementedBy) EventName() string { - return "Created_v1" + return "IncrementedBy_v1" } func (e IncrementedBy) ApplyTo(c *Counter) error { From b360aba752ad7abce402a2c35a6e8d1868a75504 Mon Sep 17 00:00:00 2001 From: Krzysztof Reczek Date: Mon, 30 Jan 2023 00:04:51 +0100 Subject: [PATCH 7/7] Separate Snapshot from Entity --- entity.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/entity.go b/entity.go index 7f7b065..5867650 100644 --- a/entity.go +++ b/entity.go @@ -36,7 +36,7 @@ type Entity[T any] interface { // NewEntityWithSnapshot instantiates a new T with the given snapshot and events applied to it. // At the same time the entity's internal Stream is initialised, -// so it can record new upcoming stream. +// so it can record new upcoming events. func NewEntityWithSnapshot[T Entity[T]]( id string, snapshot VersionedSnapshot[T], @@ -75,7 +75,7 @@ func NewEntityWithSnapshot[T Entity[T]]( // NewEntity instantiates a new T with the given events applied to it. // At the same time the entity's internal Stream is initialised, -// so it can record new upcoming stream. +// so it can record new upcoming events. func NewEntity[T Entity[T]]( id string, events []VersionedEvent[T],