Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix events stored with no stream_type #32

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion _examples/postcard/postcard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ var (

func TestPostcard_Lifecycle(t *testing.T) {
id := uuid.NewString()
streamType := "Postcard"

assert := assert.New(t)

pc, err := postcard.NewPostcard(id)
assert.Equal(id, pc.ID())
assert.Equal(streamType, pc.Stream().Type())
assert.NoError(err)

assert.Empty(pc.Addressee())
Expand All @@ -58,7 +60,7 @@ func TestPostcard_Lifecycle(t *testing.T) {
}
assert.Equal(expectedEvents, events)

pcLoaded, err := esja.NewEntity(id, events)
pcLoaded, err := esja.NewEntityWithStringType(id, streamType, events)
assert.NoError(err)

assert.Equal(senderAddress, pcLoaded.Sender())
Expand Down
23 changes: 21 additions & 2 deletions entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,29 @@ type Entity[T any] interface {
// NewEntity instantiates a new T with the given events applied to it.
// At the same time the entity's internal Stream is initialised,
// so it can record new upcoming stream.
func NewEntity[T Entity[T]](id string, eventsSlice []VersionedEvent[T]) (*T, error) {
func NewEntity[T Entity[T]](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider keeping this one and adding NewEntityWithStreamType for easier use when we don't care about the stream type (like in the in-memory repository).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id string,
eventsSlice []VersionedEvent[T],
) (*T, error) {
return NewEntityWithStringType(id, "", eventsSlice)
}

// NewEntityWithStringType instantiates a new T with the given
// stream type and events applied to it.
// At the same time the entity's internal Stream is initialised,
// so it can record new upcoming stream.
func NewEntityWithStringType[T Entity[T]](
id string,
streamType string,
eventsSlice []VersionedEvent[T],
) (*T, error) {
var t T

stream, err := newStream(id, eventsSlice)
stream, err := newStream(
id,
streamType,
eventsSlice,
)
if err != nil {
return nil, err
}
Expand Down
11 changes: 9 additions & 2 deletions eventstore/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (s SQLStore[T]) initializeSchema(ctx context.Context) error {
type event struct {
streamID string
streamVersion int
streamType string
eventName string
eventPayload []byte
}
Expand All @@ -93,15 +94,21 @@ func (s SQLStore[T]) Load(ctx context.Context, id string) (*T, error) {
_ = results.Close()
}()

var streamType string

var dbEvents []event
for results.Next() {
e := event{}

err = results.Scan(&e.streamID, &e.streamVersion, &e.eventName, &e.eventPayload)
err = results.Scan(&e.streamID, &e.streamVersion, &e.streamType, &e.eventName, &e.eventPayload)
if err != nil {
return nil, fmt.Errorf("error reading row result: %w", err)
}

if e.streamType != "" {
streamType = e.streamType
}

dbEvents = append(dbEvents, e)
}

Expand Down Expand Up @@ -132,7 +139,7 @@ func (s SQLStore[T]) Load(ctx context.Context, id string) (*T, error) {
})
}

return esja.NewEntity(id, events)
return esja.NewEntityWithStringType(id, streamType, events)
}

// Save saves the entity's queued events to the database.
Expand Down
1 change: 1 addition & 0 deletions eventstore/sql_schema_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
SELECT
stream_id,
stream_version,
stream_type,
event_name,
event_payload
FROM %s
Expand Down
11 changes: 9 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,22 @@ func (s *Stream[T]) HasEvents() bool {
return len(s.queue) > 0
}

func newStream[T any](id string, events []VersionedEvent[T]) (*Stream[T], error) {
func newStream[T any](id string, st string, events []VersionedEvent[T]) (*Stream[T], error) {
if len(events) == 0 {
return nil, fmt.Errorf("no stream to load")
}

e, err := NewStream[T](id)
var e *Stream[T]
var err error
if st == "" {
e, err = NewStream[T](id)
} else {
e, err = NewStreamWithType[T](id, st)
}
if err != nil {
return nil, err
}

e.version = events[len(events)-1].StreamVersion
e.queue = events

Expand Down