diff --git a/_examples/postcard/postcard_test.go b/_examples/postcard/postcard_test.go index b07efc4..e596cd5 100644 --- a/_examples/postcard/postcard_test.go +++ b/_examples/postcard/postcard_test.go @@ -27,11 +27,13 @@ var ( func TestPostcard_Lifecycle(t *testing.T) { id := uuid.NewString() + streamType := "Postcard" assert := assert.New(t) pc, err := postcard.NewPostcard(id) assert.Equal(id, pc.ID()) + assert.Equal(streamType, pc.Stream().Type()) assert.NoError(err) assert.Empty(pc.Addressee()) @@ -58,7 +60,7 @@ func TestPostcard_Lifecycle(t *testing.T) { } assert.Equal(expectedEvents, events) - pcLoaded, err := esja.NewEntity(id, events) + pcLoaded, err := esja.NewEntityWithStringType(id, streamType, events) assert.NoError(err) assert.Equal(senderAddress, pcLoaded.Sender()) diff --git a/entity.go b/entity.go index 6c69039..84e7eab 100644 --- a/entity.go +++ b/entity.go @@ -35,10 +35,29 @@ type Entity[T any] interface { // NewEntity instantiates a new T with the given events applied to it. // At the same time the entity's internal Stream is initialised, // so it can record new upcoming stream. -func NewEntity[T Entity[T]](id string, eventsSlice []VersionedEvent[T]) (*T, error) { +func NewEntity[T Entity[T]]( + id string, + eventsSlice []VersionedEvent[T], +) (*T, error) { + return NewEntityWithStringType(id, "", eventsSlice) +} + +// NewEntityWithStringType instantiates a new T with the given +// stream type and events applied to it. +// At the same time the entity's internal Stream is initialised, +// so it can record new upcoming stream. +func NewEntityWithStringType[T Entity[T]]( + id string, + streamType string, + eventsSlice []VersionedEvent[T], +) (*T, error) { var t T - stream, err := newStream(id, eventsSlice) + stream, err := newStream( + id, + streamType, + eventsSlice, + ) if err != nil { return nil, err } diff --git a/eventstore/sql.go b/eventstore/sql.go index 3eeb179..54b930c 100644 --- a/eventstore/sql.go +++ b/eventstore/sql.go @@ -73,6 +73,7 @@ func (s SQLStore[T]) initializeSchema(ctx context.Context) error { type event struct { streamID string streamVersion int + streamType string eventName string eventPayload []byte } @@ -93,15 +94,21 @@ func (s SQLStore[T]) Load(ctx context.Context, id string) (*T, error) { _ = results.Close() }() + var streamType string + var dbEvents []event for results.Next() { e := event{} - err = results.Scan(&e.streamID, &e.streamVersion, &e.eventName, &e.eventPayload) + err = results.Scan(&e.streamID, &e.streamVersion, &e.streamType, &e.eventName, &e.eventPayload) if err != nil { return nil, fmt.Errorf("error reading row result: %w", err) } + if e.streamType != "" { + streamType = e.streamType + } + dbEvents = append(dbEvents, e) } @@ -132,7 +139,7 @@ func (s SQLStore[T]) Load(ctx context.Context, id string) (*T, error) { }) } - return esja.NewEntity(id, events) + return esja.NewEntityWithStringType(id, streamType, events) } // Save saves the entity's queued events to the database. diff --git a/eventstore/sql_schema_adapter.go b/eventstore/sql_schema_adapter.go index c99733b..addb36e 100644 --- a/eventstore/sql_schema_adapter.go +++ b/eventstore/sql_schema_adapter.go @@ -11,6 +11,7 @@ const ( SELECT stream_id, stream_version, + stream_type, event_name, event_payload FROM %s diff --git a/stream.go b/stream.go index 265d435..d3a5d6d 100644 --- a/stream.go +++ b/stream.go @@ -75,15 +75,22 @@ func (s *Stream[T]) HasEvents() bool { return len(s.queue) > 0 } -func newStream[T any](id string, events []VersionedEvent[T]) (*Stream[T], error) { +func newStream[T any](id string, st string, events []VersionedEvent[T]) (*Stream[T], error) { if len(events) == 0 { return nil, fmt.Errorf("no stream to load") } - e, err := NewStream[T](id) + var e *Stream[T] + var err error + if st == "" { + e, err = NewStream[T](id) + } else { + e, err = NewStreamWithType[T](id, st) + } if err != nil { return nil, err } + e.version = events[len(events)-1].StreamVersion e.queue = events