From 303babd092ea6c09b9dee0f2a2a38e0190e4d7d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Thu, 29 Dec 2022 23:51:58 +0100 Subject: [PATCH 1/7] Experimental Stream --- _examples/postcard/postcard.go | 28 ++++----- _examples/postcard/postcard_test.go | 10 ++-- _examples/postcard/storage/eventstore_test.go | 4 +- eventstore/eventstore.go | 4 +- eventstore/inmemory.go | 14 ++--- eventstore/sql.go | 14 ++--- stream/events.go | 59 +++++++++++++++---- stream/events_test.go | 12 ++-- stream/stream.go | 39 ++++++------ 9 files changed, 110 insertions(+), 74 deletions(-) diff --git a/_examples/postcard/postcard.go b/_examples/postcard/postcard.go index 51bbd26..828a625 100644 --- a/_examples/postcard/postcard.go +++ b/_examples/postcard/postcard.go @@ -7,7 +7,7 @@ import ( ) type Postcard struct { - events *stream.Events[Postcard] + stream *stream.Stream[Postcard] id string @@ -24,11 +24,17 @@ type Address struct { } func NewPostcard(id string) (*Postcard, error) { + s, err := stream.NewStream[Postcard](stream.ID(id)) + if err != nil { + return nil, err + } + p := &Postcard{ - events: new(stream.Events[Postcard]), + stream: s, } - err := stream.Record[Postcard](p, Created{ + // Can we make it s.Record(p, Created{) ? + err = stream.Record[Postcard](p, Created{ ID: id, }) if err != nil { @@ -46,20 +52,12 @@ func (p *Postcard) Send() error { return stream.Record[Postcard](p, Sent{}) } -func (p Postcard) StreamType() string { - return "Postcard" -} - -func (p Postcard) StreamID() stream.ID { - return stream.ID(p.id) -} - -func (p Postcard) Events() *stream.Events[Postcard] { - return p.events +func (p Postcard) Stream() *stream.Stream[Postcard] { + return p.stream } -func (p Postcard) NewFromEvents(events *stream.Events[Postcard]) *Postcard { - return &Postcard{events: events} +func (p Postcard) NewFromStream(stream *stream.Stream[Postcard]) *Postcard { + return &Postcard{stream: stream} } func (p Postcard) ID() string { diff --git a/_examples/postcard/postcard_test.go b/_examples/postcard/postcard_test.go index e8fbe99..5181e99 100644 --- a/_examples/postcard/postcard_test.go +++ b/_examples/postcard/postcard_test.go @@ -48,7 +48,7 @@ func TestPostcard_Lifecycle(t *testing.T) { require.NoError(t, err) assert.Equal("content", pc.Content()) - events := pc.Events().PopEvents() + events := pc.Stream().PopEvents() assert.Len(events, 3) expectedEvents := []stream.VersionedEvent[postcard.Postcard]{ @@ -58,7 +58,7 @@ func TestPostcard_Lifecycle(t *testing.T) { } assert.Equal(expectedEvents, events) - pcLoaded, err := stream.New(events) + pcLoaded, err := stream.New(stream.ID(id), events) assert.NoError(err) assert.Equal(senderAddress, pcLoaded.Sender()) @@ -66,10 +66,10 @@ func TestPostcard_Lifecycle(t *testing.T) { assert.Equal("content", pcLoaded.Content()) assert.False(pcLoaded.Sent()) - events = pc.Events().PopEvents() + events = pc.Stream().PopEvents() assert.Len(events, 0) - events = pcLoaded.Events().PopEvents() + events = pcLoaded.Stream().PopEvents() assert.Len(events, 0) err = pcLoaded.Write("new content") @@ -79,7 +79,7 @@ func TestPostcard_Lifecycle(t *testing.T) { require.NoError(t, err) assert.True(pcLoaded.Sent()) - events = pcLoaded.Events().PopEvents() + events = pcLoaded.Stream().PopEvents() assert.Len(events, 2) expectedEvents = []stream.VersionedEvent[postcard.Postcard]{ diff --git a/_examples/postcard/storage/eventstore_test.go b/_examples/postcard/storage/eventstore_test.go index d641a81..b17b117 100644 --- a/_examples/postcard/storage/eventstore_test.go +++ b/_examples/postcard/storage/eventstore_test.go @@ -156,7 +156,7 @@ func TestPostcard_Repositories(t *testing.T) { assert.Equal(t, pc.ID(), fromRepo2.ID()) assert.Equal(t, pc.Addressee(), fromRepo2.Addressee()) assert.Equal(t, pc.Sender(), fromRepo2.Sender()) - assert.Empty(t, fromRepo2.Events().PopEvents()) + assert.Empty(t, fromRepo2.Stream().PopEvents()) err = fromRepo2.Write("content") require.NoError(t, err) @@ -182,7 +182,7 @@ func TestPostcard_Repositories(t *testing.T) { assert.Equal(t, addresseeAddress, fromRepo3.Addressee()) assert.Equal(t, "content", fromRepo3.Content()) assert.True(t, fromRepo3.Sent()) - assert.Empty(t, fromRepo3.Events().PopEvents()) + assert.Empty(t, fromRepo3.Stream().PopEvents()) }) } } diff --git a/eventstore/eventstore.go b/eventstore/eventstore.go index 7fb9790..118b655 100644 --- a/eventstore/eventstore.go +++ b/eventstore/eventstore.go @@ -9,8 +9,8 @@ import ( var ErrStreamNotFound = errors.New("stream not found by ID") -// EventStore loads and saves T implementing stream.Stream -type EventStore[T stream.Stream[T]] interface { +// EventStore loads and saves T implementing stream.Entity +type EventStore[T stream.Entity[T]] interface { // Load will fetch all events for `StreamID()` and use them // to instantiate a pointer to `T` using `FromEvents()` and return it. Load(ctx context.Context, id stream.ID) (*T, error) diff --git a/eventstore/inmemory.go b/eventstore/inmemory.go index dd719a5..b5efb4c 100644 --- a/eventstore/inmemory.go +++ b/eventstore/inmemory.go @@ -8,12 +8,12 @@ import ( "github.com/ThreeDotsLabs/esja/stream" ) -type InMemoryStore[T stream.Stream[T]] struct { +type InMemoryStore[T stream.Entity[T]] struct { lock sync.RWMutex events map[stream.ID][]stream.VersionedEvent[T] } -func NewInMemoryStore[T stream.Stream[T]]() *InMemoryStore[T] { +func NewInMemoryStore[T stream.Entity[T]]() *InMemoryStore[T] { return &InMemoryStore[T]{ lock: sync.RWMutex{}, events: map[stream.ID][]stream.VersionedEvent[T]{}, @@ -29,7 +29,7 @@ func (i *InMemoryStore[T]) Load(_ context.Context, id stream.ID) (*T, error) { return nil, ErrStreamNotFound } - return stream.New(events) + return stream.New(id, events) } func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { @@ -42,13 +42,13 @@ func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { stm := *t - events := stm.Events().PopEvents() + events := stm.Stream().PopEvents() if len(events) == 0 { return errors.New("no events to save") } - if priorEvents, ok := i.events[stm.StreamID()]; !ok { - i.events[stm.StreamID()] = events + if priorEvents, ok := i.events[stm.Stream().ID()]; !ok { + i.events[stm.Stream().ID()] = events } else { for _, event := range events { if len(priorEvents) > 0 { @@ -56,7 +56,7 @@ func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { return errors.New("stream version duplicate") } } - i.events[stm.StreamID()] = append(i.events[stm.StreamID()], event) + i.events[stm.Stream().ID()] = append(i.events[stm.Stream().ID()], event) } } diff --git a/eventstore/sql.go b/eventstore/sql.go index 0423b41..3b531b2 100644 --- a/eventstore/sql.go +++ b/eventstore/sql.go @@ -28,7 +28,7 @@ type schemaAdapter[A any] interface { } // SQLStore is an implementation of the EventStore interface using an SQLStore database. -type SQLStore[T stream.Stream[T]] struct { +type SQLStore[T stream.Entity[T]] struct { db ContextExecutor config SQLConfig[T] } @@ -36,7 +36,7 @@ type SQLStore[T stream.Stream[T]] struct { // NewSQLStore creates a new SQL EventStore. // The streamType is used to identify the stream type in the database. It should be a constant string and not change. // The serializer is used to translate the events to a database-friendly format and back. -func NewSQLStore[T stream.Stream[T]]( +func NewSQLStore[T stream.Entity[T]]( ctx context.Context, db ContextExecutor, config SQLConfig[T], @@ -126,7 +126,7 @@ func (s SQLStore[T]) Load(ctx context.Context, id stream.ID) (*T, error) { return nil, ErrStreamNotFound } - return stream.New(events) + return stream.New(id, events) } // Save saves the stream's queued events to the database. @@ -137,14 +137,14 @@ func (s SQLStore[T]) Save(ctx context.Context, t *T) (err error) { stm := *t - events := stm.Events().PopEvents() + events := stm.Stream().PopEvents() if len(events) == 0 { return errors.New("no events to save") } serializedEvents := make([]storageEvent[T], len(events)) for i, event := range events { - mapped, err := s.config.Mapper.ToTransport(stm.StreamID(), event.Event) + mapped, err := s.config.Mapper.ToTransport(stm.Stream().ID(), event.Event) if err != nil { return fmt.Errorf("error serializing event: %w", err) } @@ -156,12 +156,12 @@ func (s SQLStore[T]) Save(ctx context.Context, t *T) (err error) { serializedEvents[i] = storageEvent[T]{ VersionedEvent: event, - streamID: stm.StreamID().String(), + streamID: stm.Stream().ID().String(), payload: payload, } } - stmType := stream.GetStreamType(t) + stmType := stm.Stream().Type() query, args, err := s.config.SchemaAdapter.InsertQuery(stmType, serializedEvents) if err != nil { return fmt.Errorf("error building insert query: %w", err) diff --git a/stream/events.go b/stream/events.go index 8fa629c..4dbc3bd 100644 --- a/stream/events.go +++ b/stream/events.go @@ -1,6 +1,9 @@ package stream -import "fmt" +import ( + "errors" + "fmt" +) // EventName identifies the type of the event and the version of its schema, e.g. "FooCreated_v1". type EventName string @@ -24,15 +27,48 @@ type VersionedEvent[A any] struct { StreamVersion int } -// Events stores events. +// Stream stores events. // Zero-value is a valid state, ready to use. -type Events[A any] struct { - version int - queue []VersionedEvent[A] +type Stream[A any] struct { + id ID + streamType string + metadata any + version int + queue []VersionedEvent[A] +} + +func NewStream[A any](id ID) (*Stream[A], error) { + if id == "" { + return nil, errors.New("empty id") + } + + return &Stream[A]{ + id: id, + }, nil +} + +func NewStreamWithMetadata[A any](id ID, streamType string, metadata any) (*Stream[A], error) { + s, err := NewStream[A](id) + if err != nil { + return nil, err + } + + s.streamType = streamType + s.metadata = metadata + + return s, nil +} + +func (e *Stream[A]) ID() ID { + return e.id +} + +func (e *Stream[A]) Type() string { + return e.streamType } // Record puts a new Event on the queue with proper version. -func (e *Events[A]) Record(event Event[A]) { +func (e *Stream[A]) Record(event Event[A]) { e.version += 1 e.queue = append(e.queue, VersionedEvent[A]{ Event: event, @@ -41,7 +77,7 @@ func (e *Events[A]) Record(event Event[A]) { } // PopEvents returns the events on the queue and clears it. -func (e *Events[A]) PopEvents() []VersionedEvent[A] { +func (e *Stream[A]) PopEvents() []VersionedEvent[A] { tmp := make([]VersionedEvent[A], len(e.queue)) copy(tmp, e.queue) e.queue = []VersionedEvent[A]{} @@ -50,16 +86,19 @@ func (e *Events[A]) PopEvents() []VersionedEvent[A] { } // HasEvents returns true if there are any queued events. -func (e *Events[A]) HasEvents() bool { +func (e *Stream[A]) HasEvents() bool { return len(e.queue) > 0 } -func newEvents[A any](events []VersionedEvent[A]) (*Events[A], error) { +func newEvents[A any](id ID, events []VersionedEvent[A]) (*Stream[A], error) { if len(events) == 0 { return nil, fmt.Errorf("no events to load") } - e := new(Events[A]) + e, err := NewStream[A](id) + if err != nil { + return nil, err + } e.version = events[len(events)-1].StreamVersion e.queue = events diff --git a/stream/events_test.go b/stream/events_test.go index cb53cea..9d3ae79 100644 --- a/stream/events_test.go +++ b/stream/events_test.go @@ -1,6 +1,7 @@ package stream_test import ( + "github.com/stretchr/testify/require" "testing" "github.com/stretchr/testify/assert" @@ -9,18 +10,18 @@ import ( ) type Stream struct { - events *stream.Events[Stream] + events *stream.Stream[Stream] } func (s Stream) StreamID() stream.ID { return "ID" } -func (s Stream) Events() *stream.Events[Stream] { +func (s Stream) Stream() *stream.Stream[Stream] { return s.events } -func (s Stream) NewFromEvents(events *stream.Events[Stream]) *Stream { +func (s Stream) NewFromStream(events *stream.Stream[Stream]) *Stream { return &Stream{events: events} } @@ -40,7 +41,8 @@ func TestNewEventsQueue(t *testing.T) { var event1 stream.Event[Stream] = Event{ID: 1} var event2 stream.Event[Stream] = Event{ID: 2} - es := new(stream.Events[Stream]) + es, err := stream.NewStream[Stream]("ID") + require.NoError(t, err) s := Stream{ events: es, } @@ -50,7 +52,7 @@ func TestNewEventsQueue(t *testing.T) { events := es.PopEvents() assert.Len(t, events, 0) - err := stream.Record(&s, event1) + err = stream.Record(&s, event1) assert.NoError(t, err) err = stream.Record(&s, event2) assert.NoError(t, err) diff --git a/stream/stream.go b/stream/stream.go index 7af7920..9fd34b0 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -1,10 +1,10 @@ package stream -// Stream represents the type saved and loaded by the event store. +// Entity represents the type saved and loaded by the event store. // In DDD terms, it is the "aggregate root". // -// In order for your domain type to implement Stream: -// - Embed pointer to Events queue. +// In order for your domain type to implement Entity: +// - Embed pointer to Stream queue. // - Implement the interface methods in accordance with its description. // // Then an EventStore will be able to store and load it. @@ -12,7 +12,7 @@ package stream // Example: // // type User struct { -// events *stream.Events[User] +// events *stream.Stream[User] // id string // } // @@ -20,22 +20,19 @@ package stream // return stream.ID(u.id) // } // -// func (u User) Events() *stream.Events[User] { +// func (u User) Stream() *stream.Stream[User] { // return u.events // } // -// func (u User) NewFromEvents(events *stream.Events[User]) *User { +// func (u User) NewFromEvents(events *stream.Stream[User]) *User { // return &User{events: events} // } -type Stream[T any] interface { - // StreamID returns a unique identifier (usually the same as your stream's internal ID). - StreamID() ID +type Entity[T any] interface { + // Stream exposes a pointer to the Stream queue. + Stream() *Stream[T] - // Events exposes a pointer to the Events queue. - Events() *Events[T] - - // NewFromEvents returns a new instance with the provided Events queue. - NewFromEvents(events *Events[T]) *T + // NewFromEvents returns a new instance with the provided Stream queue. + NewFromStream(events *Stream[T]) *T } // ID is the unique identifier of a stream. @@ -45,32 +42,32 @@ func (i ID) String() string { return string(i) } -// Record applies a provided Event and puts that into the stream's internal Events queue. -func Record[T Stream[T]](stream *T, e Event[T]) error { +// Record applies a provided Event and puts that into the stream's internal Stream queue. +func Record[T Entity[T]](stream *T, e Event[T]) error { err := e.ApplyTo(stream) if err != nil { return err } - (*stream).Events().Record(e) + (*stream).Stream().Record(e) return nil } // New instantiates a new T with all events applied to it. -// At the same time the stream's internal Events queue is initialised, +// At the same time the stream's internal Stream queue is initialised, // so it can record new upcoming events. -func New[T Stream[T]](eventsSlice []VersionedEvent[T]) (*T, error) { +func New[T Entity[T]](id ID, eventsSlice []VersionedEvent[T]) (*T, error) { var t T - events, err := newEvents(eventsSlice) + events, err := newEvents(id, eventsSlice) if err != nil { return nil, err } eventsSlice = events.PopEvents() - target := t.NewFromEvents(events) + target := t.NewFromStream(events) for _, e := range eventsSlice { err := e.ApplyTo(target) if err != nil { From f565b47c97359f65d142ee11ca80d4409b42449e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Fri, 30 Dec 2022 23:42:40 +0100 Subject: [PATCH 2/7] Update --- _examples/postcard/postcard.go | 13 ++- _examples/postcard/postcard_test.go | 2 +- eventstore/eventstore.go | 8 +- eventstore/inmemory.go | 2 +- eventstore/sql.go | 2 +- stream/entity.go | 63 ++++++++++++ stream/events.go | 106 -------------------- stream/events_test.go | 85 ---------------- stream/stream.go | 148 +++++++++++++++++----------- stream/stream_test.go | 81 +++++++++++++++ stream/typed_stream.go | 21 ---- stream/typed_stream_test.go | 29 ------ 12 files changed, 246 insertions(+), 314 deletions(-) create mode 100644 stream/entity.go delete mode 100644 stream/events.go delete mode 100644 stream/events_test.go create mode 100644 stream/stream_test.go delete mode 100644 stream/typed_stream.go delete mode 100644 stream/typed_stream_test.go diff --git a/_examples/postcard/postcard.go b/_examples/postcard/postcard.go index 828a625..0792220 100644 --- a/_examples/postcard/postcard.go +++ b/_examples/postcard/postcard.go @@ -24,7 +24,7 @@ type Address struct { } func NewPostcard(id string) (*Postcard, error) { - s, err := stream.NewStream[Postcard](stream.ID(id)) + s, err := stream.NewStreamWithType[Postcard](stream.ID(id), "Postcard") if err != nil { return nil, err } @@ -33,8 +33,7 @@ func NewPostcard(id string) (*Postcard, error) { stream: s, } - // Can we make it s.Record(p, Created{) ? - err = stream.Record[Postcard](p, Created{ + err = p.stream.Record(p, Created{ ID: id, }) if err != nil { @@ -49,14 +48,14 @@ func (p *Postcard) Send() error { return fmt.Errorf("postcard already sent") } - return stream.Record[Postcard](p, Sent{}) + return p.stream.Record(p, Sent{}) } func (p Postcard) Stream() *stream.Stream[Postcard] { return p.stream } -func (p Postcard) NewFromStream(stream *stream.Stream[Postcard]) *Postcard { +func (p Postcard) NewWithStream(stream *stream.Stream[Postcard]) *Postcard { return &Postcard{stream: stream} } @@ -81,13 +80,13 @@ func (p Postcard) Sent() bool { } func (p *Postcard) Write(content string) error { - return stream.Record[Postcard](p, Written{ + return p.stream.Record(p, Written{ Content: content, }) } func (p *Postcard) Address(sender Address, addressee Address) error { - return stream.Record[Postcard](p, Addressed{ + return p.stream.Record(p, Addressed{ Sender: sender, Addressee: addressee, }) diff --git a/_examples/postcard/postcard_test.go b/_examples/postcard/postcard_test.go index 5181e99..88c2fd0 100644 --- a/_examples/postcard/postcard_test.go +++ b/_examples/postcard/postcard_test.go @@ -58,7 +58,7 @@ func TestPostcard_Lifecycle(t *testing.T) { } assert.Equal(expectedEvents, events) - pcLoaded, err := stream.New(stream.ID(id), events) + pcLoaded, err := stream.NewEntity(stream.ID(id), events) assert.NoError(err) assert.Equal(senderAddress, pcLoaded.Sender()) diff --git a/eventstore/eventstore.go b/eventstore/eventstore.go index 118b655..64cab65 100644 --- a/eventstore/eventstore.go +++ b/eventstore/eventstore.go @@ -11,11 +11,9 @@ var ErrStreamNotFound = errors.New("stream not found by ID") // EventStore loads and saves T implementing stream.Entity type EventStore[T stream.Entity[T]] interface { - // Load will fetch all events for `StreamID()` and use them - // to instantiate a pointer to `T` using `FromEvents()` and return it. + // Load fetches all events for the stream id and returns a new instance of T based on them. Load(ctx context.Context, id stream.ID) (*T, error) - // Save will call `PopEvents()` and then save them - // under the stream's id from `StreamID()`. - Save(ctx context.Context, stream *T) error + // Save saves events recorded in the entity's stream. + Save(ctx context.Context, entity *T) error } diff --git a/eventstore/inmemory.go b/eventstore/inmemory.go index b5efb4c..14dfabe 100644 --- a/eventstore/inmemory.go +++ b/eventstore/inmemory.go @@ -29,7 +29,7 @@ func (i *InMemoryStore[T]) Load(_ context.Context, id stream.ID) (*T, error) { return nil, ErrStreamNotFound } - return stream.New(id, events) + return stream.NewEntity(id, events) } func (i *InMemoryStore[T]) Save(_ context.Context, t *T) error { diff --git a/eventstore/sql.go b/eventstore/sql.go index 3b531b2..bed3005 100644 --- a/eventstore/sql.go +++ b/eventstore/sql.go @@ -126,7 +126,7 @@ func (s SQLStore[T]) Load(ctx context.Context, id stream.ID) (*T, error) { return nil, ErrStreamNotFound } - return stream.New(id, events) + return stream.NewEntity(id, events) } // Save saves the stream's queued events to the database. diff --git a/stream/entity.go b/stream/entity.go new file mode 100644 index 0000000..a4786c2 --- /dev/null +++ b/stream/entity.go @@ -0,0 +1,63 @@ +package stream + +// Entity represents the type saved and loaded by the event store. +// In DDD terms, it is the "aggregate root". +// +// In order for your domain type to implement Entity: +// - Embed pointer to the Stream. +// - Implement the interface methods in accordance with its description. +// +// Then an EventStore will be able to store and load it. +// +// Example: +// +// type User struct { +// stream *stream.Stream[User] +// id string +// } +// +// func (u User) Stream() *stream.Stream[User] { +// return u.stream +// } +// +// func (u User) NewWithStream(stream *stream.Stream[User]) *User { +// return &User{stream: stream} +// } +type Entity[T any] interface { + // Stream exposes a pointer to the Stream. + Stream() *Stream[T] + + // NewWithStream returns a new instance with the provided Stream queue. + NewWithStream(*Stream[T]) *T +} + +// ID is the unique identifier of a stream. +type ID string + +func (i ID) String() string { + return string(i) +} + +// NewEntity instantiates a new T with the given events applied to it. +// At the same time the entity's internal Stream is initialised, +// so it can record new upcoming stream. +func NewEntity[T Entity[T]](id ID, eventsSlice []VersionedEvent[T]) (*T, error) { + var t T + + stream, err := newStream(id, eventsSlice) + if err != nil { + return nil, err + } + + eventsSlice = stream.PopEvents() + + target := t.NewWithStream(stream) + for _, e := range eventsSlice { + err := e.ApplyTo(target) + if err != nil { + return nil, err + } + } + + return target, nil +} diff --git a/stream/events.go b/stream/events.go deleted file mode 100644 index 4dbc3bd..0000000 --- a/stream/events.go +++ /dev/null @@ -1,106 +0,0 @@ -package stream - -import ( - "errors" - "fmt" -) - -// EventName identifies the type of the event and the version of its schema, e.g. "FooCreated_v1". -type EventName string - -type Event[T any] interface { - // EventName should identify the event and the version of its schema. - // - // Example implementation: - // func (e FooCreated) EventName() EventName { - // return "FooCreated_v1" - // } - EventName() EventName - - // ApplyTo applies the event to the stream. - ApplyTo(*T) error -} - -// VersionedEvent is an event with a corresponding stream version. -type VersionedEvent[A any] struct { - Event[A] - StreamVersion int -} - -// Stream stores events. -// Zero-value is a valid state, ready to use. -type Stream[A any] struct { - id ID - streamType string - metadata any - version int - queue []VersionedEvent[A] -} - -func NewStream[A any](id ID) (*Stream[A], error) { - if id == "" { - return nil, errors.New("empty id") - } - - return &Stream[A]{ - id: id, - }, nil -} - -func NewStreamWithMetadata[A any](id ID, streamType string, metadata any) (*Stream[A], error) { - s, err := NewStream[A](id) - if err != nil { - return nil, err - } - - s.streamType = streamType - s.metadata = metadata - - return s, nil -} - -func (e *Stream[A]) ID() ID { - return e.id -} - -func (e *Stream[A]) Type() string { - return e.streamType -} - -// Record puts a new Event on the queue with proper version. -func (e *Stream[A]) Record(event Event[A]) { - e.version += 1 - e.queue = append(e.queue, VersionedEvent[A]{ - Event: event, - StreamVersion: e.version, - }) -} - -// PopEvents returns the events on the queue and clears it. -func (e *Stream[A]) PopEvents() []VersionedEvent[A] { - tmp := make([]VersionedEvent[A], len(e.queue)) - copy(tmp, e.queue) - e.queue = []VersionedEvent[A]{} - - return tmp -} - -// HasEvents returns true if there are any queued events. -func (e *Stream[A]) HasEvents() bool { - return len(e.queue) > 0 -} - -func newEvents[A any](id ID, events []VersionedEvent[A]) (*Stream[A], error) { - if len(events) == 0 { - return nil, fmt.Errorf("no events to load") - } - - e, err := NewStream[A](id) - if err != nil { - return nil, err - } - e.version = events[len(events)-1].StreamVersion - e.queue = events - - return e, nil -} diff --git a/stream/events_test.go b/stream/events_test.go deleted file mode 100644 index 9d3ae79..0000000 --- a/stream/events_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package stream_test - -import ( - "github.com/stretchr/testify/require" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/ThreeDotsLabs/esja/stream" -) - -type Stream struct { - events *stream.Stream[Stream] -} - -func (s Stream) StreamID() stream.ID { - return "ID" -} - -func (s Stream) Stream() *stream.Stream[Stream] { - return s.events -} - -func (s Stream) NewFromStream(events *stream.Stream[Stream]) *Stream { - return &Stream{events: events} -} - -type Event struct { - ID int -} - -func (e Event) EventName() stream.EventName { - return "Event" -} - -func (e Event) ApplyTo(_ *Stream) error { - return nil -} - -func TestNewEventsQueue(t *testing.T) { - var event1 stream.Event[Stream] = Event{ID: 1} - var event2 stream.Event[Stream] = Event{ID: 2} - - es, err := stream.NewStream[Stream]("ID") - require.NoError(t, err) - s := Stream{ - events: es, - } - - assert.False(t, es.HasEvents()) - - events := es.PopEvents() - assert.Len(t, events, 0) - - err = stream.Record(&s, event1) - assert.NoError(t, err) - err = stream.Record(&s, event2) - assert.NoError(t, err) - - assert.True(t, es.HasEvents()) - - events = es.PopEvents() - assert.Len(t, events, 2) - assert.False(t, es.HasEvents()) - - assert.Equal(t, event1, events[0].Event) - assert.Equal(t, 1, events[0].StreamVersion) - - assert.Equal(t, event2, events[1].Event) - assert.Equal(t, 2, events[1].StreamVersion) - - events = es.PopEvents() - assert.Len(t, events, 0) - - var event3 stream.Event[Stream] = Event{ID: 3} - - err = stream.Record(&s, event3) - assert.NoError(t, err) - - events = es.PopEvents() - assert.Len(t, events, 1) - - assert.Equal(t, event3, events[0].Event) - assert.Equal(t, 3, events[0].StreamVersion) -} diff --git a/stream/stream.go b/stream/stream.go index 9fd34b0..e9b0638 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -1,79 +1,111 @@ package stream -// Entity represents the type saved and loaded by the event store. -// In DDD terms, it is the "aggregate root". -// -// In order for your domain type to implement Entity: -// - Embed pointer to Stream queue. -// - Implement the interface methods in accordance with its description. -// -// Then an EventStore will be able to store and load it. -// -// Example: -// -// type User struct { -// events *stream.Stream[User] -// id string -// } -// -// func (u User) StreamID() stream.ID { -// return stream.ID(u.id) -// } -// -// func (u User) Stream() *stream.Stream[User] { -// return u.events -// } -// -// func (u User) NewFromEvents(events *stream.Stream[User]) *User { -// return &User{events: events} -// } -type Entity[T any] interface { - // Stream exposes a pointer to the Stream queue. - Stream() *Stream[T] - - // NewFromEvents returns a new instance with the provided Stream queue. - NewFromStream(events *Stream[T]) *T +import ( + "errors" + "fmt" +) + +// EventName identifies the type of the event and the version of its schema, e.g. "FooCreated_v1". +type EventName string + +type Event[T any] interface { + // EventName should identify the event and the version of its schema. + // + // Example implementation: + // func (e FooCreated) EventName() EventName { + // return "FooCreated_v1" + // } + EventName() EventName + + // ApplyTo applies the event to the stream. + ApplyTo(*T) error +} + +// VersionedEvent is an event with a corresponding stream version. +type VersionedEvent[T any] struct { + Event[T] + StreamVersion int +} + +// Stream stores stream. +// Zero-value is a valid state, ready to use. +type Stream[T any] struct { + id ID + streamType string + version int + queue []VersionedEvent[T] } -// ID is the unique identifier of a stream. -type ID string +func NewStream[T any](id ID) (*Stream[T], error) { + if id == "" { + return nil, errors.New("empty id") + } -func (i ID) String() string { - return string(i) + return &Stream[T]{ + id: id, + }, nil } -// Record applies a provided Event and puts that into the stream's internal Stream queue. -func Record[T Entity[T]](stream *T, e Event[T]) error { - err := e.ApplyTo(stream) +func NewStreamWithType[T any](id ID, streamType string) (*Stream[T], error) { + s, err := NewStream[T](id) if err != nil { - return err + return nil, err } - (*stream).Stream().Record(e) + s.streamType = streamType - return nil + return s, nil } -// New instantiates a new T with all events applied to it. -// At the same time the stream's internal Stream queue is initialised, -// so it can record new upcoming events. -func New[T Entity[T]](id ID, eventsSlice []VersionedEvent[T]) (*T, error) { - var t T +func (s *Stream[T]) ID() ID { + return s.id +} + +func (s *Stream[T]) Type() string { + return s.streamType +} - events, err := newEvents(id, eventsSlice) +// Record applies the provided Event to the entity and puts it into the stream's queue with proper version. +func (s *Stream[T]) Record(entity *T, event Event[T]) error { + err := event.ApplyTo(entity) if err != nil { - return nil, err + return err } - eventsSlice = events.PopEvents() + s.version += 1 + s.queue = append(s.queue, VersionedEvent[T]{ + Event: event, + StreamVersion: s.version, + }) + + return nil +} + +// PopEvents returns the stream on the queue and clears it. +func (s *Stream[T]) PopEvents() []VersionedEvent[T] { + tmp := make([]VersionedEvent[T], len(s.queue)) + copy(tmp, s.queue) + s.queue = []VersionedEvent[T]{} + + return tmp +} + +// HasEvents returns true if there are any queued stream. +func (s *Stream[T]) HasEvents() bool { + return len(s.queue) > 0 +} - target := t.NewFromStream(events) - for _, e := range eventsSlice { - err := e.ApplyTo(target) - if err != nil { - return nil, err - } +func newStream[T any](id ID, events []VersionedEvent[T]) (*Stream[T], error) { + if len(events) == 0 { + return nil, fmt.Errorf("no stream to load") + } + + e, err := NewStream[T](id) + if err != nil { + return nil, err } + e.version = events[len(events)-1].StreamVersion + e.queue = events - return target, nil + return e, nil } diff --git a/stream/stream_test.go b/stream/stream_test.go new file mode 100644 index 0000000..5eb42e0 --- /dev/null +++ b/stream/stream_test.go @@ -0,0 +1,81 @@ +package stream_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ThreeDotsLabs/esja/stream" +) + +type Entity struct { + stream *stream.Stream[Entity] +} + +func (s Entity) Stream() *stream.Stream[Entity] { + return s.stream +} + +func (s Entity) NewWithStream(stream *stream.Stream[Entity]) *Entity { + return &Entity{stream: stream} +} + +type Event struct { + ID int +} + +func (e Event) EventName() stream.EventName { + return "Event" +} + +func (e Event) ApplyTo(_ *Entity) error { + return nil +} + +func TestNewEventsQueue(t *testing.T) { + var event1 stream.Event[Entity] = Event{ID: 1} + var event2 stream.Event[Entity] = Event{ID: 2} + + str, err := stream.NewStream[Entity]("ID") + require.NoError(t, err) + s := &Entity{ + stream: str, + } + + assert.False(t, str.HasEvents()) + + events := str.PopEvents() + assert.Len(t, events, 0) + + err = str.Record(s, event1) + assert.NoError(t, err) + err = str.Record(s, event2) + assert.NoError(t, err) + + assert.True(t, str.HasEvents()) + + events = str.PopEvents() + assert.Len(t, events, 2) + assert.False(t, str.HasEvents()) + + assert.Equal(t, event1, events[0].Event) + assert.Equal(t, 1, events[0].StreamVersion) + + assert.Equal(t, event2, events[1].Event) + assert.Equal(t, 2, events[1].StreamVersion) + + events = str.PopEvents() + assert.Len(t, events, 0) + + var event3 stream.Event[Entity] = Event{ID: 3} + + err = str.Record(s, event3) + assert.NoError(t, err) + + events = str.PopEvents() + assert.Len(t, events, 1) + + assert.Equal(t, event3, events[0].Event) + assert.Equal(t, 3, events[0].StreamVersion) +} diff --git a/stream/typed_stream.go b/stream/typed_stream.go deleted file mode 100644 index b7fabb8..0000000 --- a/stream/typed_stream.go +++ /dev/null @@ -1,21 +0,0 @@ -package stream - -// TypedStream is an optional interface that defines a stream type string -// which may be used by the repositories to mark event database records. -type TypedStream interface { - StreamType() string -} - -// GetStreamType returns stream type of generic type if -// it implemented the TypedStream interface. -// Otherwise, the empty string is returned. -func GetStreamType(stream any) string { - streamType := "" - - st, ok := stream.(TypedStream) - if ok { - streamType = st.StreamType() - } - - return streamType -} diff --git a/stream/typed_stream_test.go b/stream/typed_stream_test.go deleted file mode 100644 index de28e87..0000000 --- a/stream/typed_stream_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package stream_test - -import ( - "testing" - - "github.com/ThreeDotsLabs/esja/stream" - "github.com/stretchr/testify/assert" -) - -type testStructA struct{} - -type testStructB struct{} - -func (t testStructB) StreamType() string { - return "TestStructB" -} - -type testStructC struct{} - -func (t *testStructC) StreamType() string { - return "TestStructC" -} - -func TestGetStreamType(t *testing.T) { - assert.Equal(t, "", stream.GetStreamType(testStructA{})) - assert.Equal(t, "TestStructB", stream.GetStreamType(testStructB{})) - assert.Equal(t, "", stream.GetStreamType(testStructC{})) - assert.Equal(t, "TestStructC", stream.GetStreamType(&testStructC{})) -} From 31eefce4de6e66e00bb7f95f76e9094d34b93d80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Fri, 30 Dec 2022 23:53:00 +0100 Subject: [PATCH 3/7] Refactor --- _examples/postcard/postcard.go | 1 + stream/event.go | 23 +++++++++++++++++++++++ stream/stream.go | 22 ---------------------- 3 files changed, 24 insertions(+), 22 deletions(-) create mode 100644 stream/event.go diff --git a/_examples/postcard/postcard.go b/_examples/postcard/postcard.go index 0792220..32f374b 100644 --- a/_examples/postcard/postcard.go +++ b/_examples/postcard/postcard.go @@ -16,6 +16,7 @@ type Postcard struct { content string sent bool } + type Address struct { Name string `anonymize:"true"` Line1 string diff --git a/stream/event.go b/stream/event.go new file mode 100644 index 0000000..c4a8af5 --- /dev/null +++ b/stream/event.go @@ -0,0 +1,23 @@ +package stream + +// EventName identifies the type of the event and the version of its schema, e.g. "FooCreated_v1". +type EventName string + +type Event[T any] interface { + // EventName should identify the event and the version of its schema. + // + // Example implementation: + // func (e FooCreated) EventName() EventName { + // return "FooCreated_v1" + // } + EventName() EventName + + // ApplyTo applies the event to the stream. + ApplyTo(*T) error +} + +// VersionedEvent is an event with a corresponding stream version. +type VersionedEvent[T any] struct { + Event[T] + StreamVersion int +} diff --git a/stream/stream.go b/stream/stream.go index e9b0638..cd875dc 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -5,28 +5,6 @@ import ( "fmt" ) -// EventName identifies the type of the event and the version of its schema, e.g. "FooCreated_v1". -type EventName string - -type Event[T any] interface { - // EventName should identify the event and the version of its schema. - // - // Example implementation: - // func (e FooCreated) EventName() EventName { - // return "FooCreated_v1" - // } - EventName() EventName - - // ApplyTo applies the event to the stream. - ApplyTo(*T) error -} - -// VersionedEvent is an event with a corresponding stream version. -type VersionedEvent[T any] struct { - Event[T] - StreamVersion int -} - // Stream stores stream. // Zero-value is a valid state, ready to use. type Stream[T any] struct { From f7e5c57427eb8a7f37f2a12e421124debf91c2d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Fri, 6 Jan 2023 00:18:29 +0100 Subject: [PATCH 4/7] Add watermill example --- _examples/postcard/cmd/main.go | 133 ++++++++++++++ _examples/postcard/docker-compose.yml | 14 ++ _examples/postcard/go.mod | 12 +- _examples/postcard/go.sum | 219 +++++++++++++++++++++++- _examples/postcard/storage/watermill.go | 55 ++++++ 5 files changed, 431 insertions(+), 2 deletions(-) create mode 100644 _examples/postcard/cmd/main.go create mode 100644 _examples/postcard/docker-compose.yml create mode 100644 _examples/postcard/storage/watermill.go diff --git a/_examples/postcard/cmd/main.go b/_examples/postcard/cmd/main.go new file mode 100644 index 0000000..8ab3b8d --- /dev/null +++ b/_examples/postcard/cmd/main.go @@ -0,0 +1,133 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "log" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp" + sql2 "github.com/ThreeDotsLabs/watermill-sql/pkg/sql" + "github.com/ThreeDotsLabs/watermill/components/forwarder" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/uuid" + _ "github.com/lib/pq" + + "postcard" + "postcard/storage" +) + +func main() { + conn := "host=localhost port=5432 user=postgres password=password dbname=postgres sslmode=disable" + db, err := sql.Open("postgres", conn) + if err != nil { + log.Fatal(err) + } + + repo, err := storage.NewDefaultSimplePostcardRepository(context.Background(), db) + if err != nil { + log.Fatal(err) + } + + logger := watermill.NewStdLogger(false, false) + + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + log.Fatal(err) + } + + sqlSub, err := sql2.NewSubscriber(db, sql2.SubscriberConfig{ + SchemaAdapter: storage.WatermillSchemaAdapter{}, + OffsetsAdapter: sql2.DefaultPostgreSQLOffsetsAdapter{}, + InitializeSchema: true, + }, logger) + + amqpURI := "amqp://guest:guest@localhost:5672/" + amqpConfig := amqp.NewDurablePubSubConfig(amqpURI, func(topic string) string { + return topic + }) + + amqpPub, err := amqp.NewPublisher(amqpConfig, logger) + if err != nil { + log.Fatal(err) + } + + amqpSub, err := amqp.NewSubscriber(amqpConfig, logger) + if err != nil { + log.Fatal(err) + } + + router.AddHandler( + "sql-to-amqp", + "events", + sqlSub, + "events", + amqpPub, + func(msg *message.Message) ([]*message.Message, error) { + return []*message.Message{msg}, nil + }, + ) + + router.AddNoPublisherHandler( + "read-amqp", + "events", + amqpSub, + func(msg *message.Message) error { + fmt.Printf("Received message %v: %v\n", msg.UUID, string(msg.Payload)) + return nil + }, + ) + + fwd, err := forwarder.NewForwarder(sqlSub, amqpPub, logger, forwarder.Config{ + ForwarderTopic: "events", + }) + if err != nil { + log.Fatal(err) + } + + go func() { + err := fwd.Run(context.Background()) + if err != nil { + log.Fatal(err) + } + }() + + go func() { + err = router.Run(context.Background()) + if err != nil { + log.Fatal(err) + } + }() + + <-fwd.Running() + <-router.Running() + + pc, err := postcard.NewPostcard(uuid.NewString()) + if err != nil { + log.Fatal(err) + } + + err = repo.Save(context.Background(), pc) + if err != nil { + log.Fatal(err) + } + + err = pc.Address(postcard.Address{ + Name: "Someone", + Line1: "Somewhere", + }, postcard.Address{ + Name: "Who", + Line1: "Where", + }) + if err != nil { + log.Fatal(err) + } + + err = repo.Save(context.Background(), pc) + if err != nil { + log.Fatal(err) + } + + select {} +} diff --git a/_examples/postcard/docker-compose.yml b/_examples/postcard/docker-compose.yml new file mode 100644 index 0000000..94999eb --- /dev/null +++ b/_examples/postcard/docker-compose.yml @@ -0,0 +1,14 @@ +version: '3' +services: + postgres: + image: postgres + environment: + - POSTGRES_PASSWORD=password + - POSTGRES_DB=esja + ports: + - "5432:5432" + + rabbitmq: + image: rabbitmq:3.7-management + ports: + - 5672:5672 diff --git a/_examples/postcard/go.mod b/_examples/postcard/go.mod index 13b991a..0a53276 100644 --- a/_examples/postcard/go.mod +++ b/_examples/postcard/go.mod @@ -4,6 +4,10 @@ go 1.18 require ( github.com/ThreeDotsLabs/esja v0.0.0-20221208191400-8fbb493947e7 + github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d + github.com/ThreeDotsLabs/watermill v1.2.0-rc.11 + github.com/ThreeDotsLabs/watermill-amqp v1.1.4 + github.com/ThreeDotsLabs/watermill-sql v1.3.7 github.com/google/uuid v1.3.0 github.com/lib/pq v1.10.6 github.com/mattn/go-sqlite3 v1.14.16 @@ -11,9 +15,15 @@ require ( ) require ( - github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/oklog/ulid v1.3.1 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/streadway/amqp v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/_examples/postcard/go.sum b/_examples/postcard/go.sum index c5ee9af..fdb741b 100644 --- a/_examples/postcard/go.sum +++ b/_examples/postcard/go.sum @@ -1,25 +1,242 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d h1:bZBc4vDne17OYqoeSceaq5eaZM6vOUMVaJIBy+dNRH4= github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d/go.mod h1:wu5cEZEjFUIXR9hdniDvGbbZARrYHTRi6G2bNaSCC/E= +github.com/ThreeDotsLabs/watermill v1.0.2/go.mod h1:vZCPh7eN0P7r2qKau4SfmcUZ83+3JXWkRl4BiWUlqFw= +github.com/ThreeDotsLabs/watermill v1.2.0-rc.11 h1:tQJ3L/AnfliXaxaq+ElHOfzi0Vx+AN8cAnIOLcUTrxo= +github.com/ThreeDotsLabs/watermill v1.2.0-rc.11/go.mod h1:QLZSaklpSZ/7yv288LL2DFOgCEi86VYEmQvzmaMlHoA= +github.com/ThreeDotsLabs/watermill-amqp v1.1.4 h1:vOdc8a0m0sMPAJZ2CMLx5a+fwlgeeojOFPwgj7+nlJA= +github.com/ThreeDotsLabs/watermill-amqp v1.1.4/go.mod h1:5RtpKNTriXCWQZ67YDg1G7qsphZoUue/EWOmQqTZi3Q= +github.com/ThreeDotsLabs/watermill-sql v1.3.7 h1:NXZvwE9bcZm2rSd4T1zYGDCsTkKK6Urzf6Lb809Ar28= +github.com/ThreeDotsLabs/watermill-sql v1.3.7/go.mod h1:xHOjLwBd3KIIvF9EHrcleblP08SlY+PIV6yke8GD7IA= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= +github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= +github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= +github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= +github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= +github.com/jackc/pgconn v1.4.0/go.mod h1:Y2O3ZDF0q4mMacyWV3AstPJpeHXWGEetiFttmq5lahk= +github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= +github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= +github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8= +github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= +github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= +github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY= +github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= +github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= +github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= +github.com/jackc/pgtype v1.2.0/go.mod h1:5m2OfMh1wTK7x+Fk952IDmI4nw3nPrvtQdM0ZT4WpC0= +github.com/jackc/pgtype v1.3.1-0.20200510190516-8cd94a14c75a/go.mod h1:vaogEUkALtxZMCH411K+tKzNpwzCKU+AnPzBKZ+I+Po= +github.com/jackc/pgtype v1.3.1-0.20200606141011-f6355165a91c/go.mod h1:cvk9Bgu/VzJ9/lxTO5R5sf80p0DiucVtN7ZxvaC4GmQ= +github.com/jackc/pgtype v1.4.2 h1:t+6LWm5eWPLX1H5Se702JSBcirq6uWa4jiG4wV1rAWY= +github.com/jackc/pgtype v1.4.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig= +github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= +github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= +github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= +github.com/jackc/pgx/v4 v4.5.0/go.mod h1:EpAKPLdnTorwmPUUsqrPxy5fphV18j9q3wrfRXgo+kA= +github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6fOLDxqtlyhe9UWgfIi9R8+8v8GKV5TRA/o= +github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg= +github.com/jackc/pgx/v4 v4.8.1 h1:SUbCLP2pXvf/Sr/25KsuI4aTxiFYIvpfk4l6aTSdyCw= +github.com/jackc/pgx/v4 v4.8.1/go.mod h1:4HOLxrl8wToZJReD04/yB20GDwf4KBYETvlHciCnwW0= +github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs= github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lithammer/shortuuid/v3 v3.0.4/go.mod h1:RviRjexKqIzx/7r1peoAITm6m7gnif/h+0zmolKJjzw= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= +github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= +google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/_examples/postcard/storage/watermill.go b/_examples/postcard/storage/watermill.go new file mode 100644 index 0000000..9692d03 --- /dev/null +++ b/_examples/postcard/storage/watermill.go @@ -0,0 +1,55 @@ +package storage + +import ( + "database/sql" + "errors" + + "github.com/google/uuid" + + sql2 "github.com/ThreeDotsLabs/watermill-sql/pkg/sql" + "github.com/ThreeDotsLabs/watermill/message" +) + +type WatermillSchemaAdapter struct{} + +func (s WatermillSchemaAdapter) SchemaInitializingQueries(topic string) []string { + return nil +} + +func (s WatermillSchemaAdapter) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) { + return "", nil, errors.New("not supported") +} + +func (s WatermillSchemaAdapter) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql2.OffsetsAdapter) (string, []interface{}) { + nextOffsetQuery, nextOffsetArgs := offsetsAdapter.NextOffsetQuery(topic, consumerGroup) + selectQuery := ` + SELECT id, stream_id, event_payload FROM events + WHERE + id > (` + nextOffsetQuery + `) + ORDER BY + id ASC + LIMIT 1` + + return selectQuery, nextOffsetArgs +} + +type schemaRow struct { + ID int64 + StreamID []byte + Payload []byte +} + +func (s WatermillSchemaAdapter) UnmarshalMessage(row *sql.Row) (offset int, msg *message.Message, err error) { + r := schemaRow{} + err = row.Scan(&r.ID, &r.StreamID, &r.Payload) + if err != nil { + return 0, nil, err + } + + // TODO Event UUID? + msg = message.NewMessage(uuid.NewString(), r.Payload) + + // TODO Metadata? + + return int(r.ID), msg, nil +} From c667fe39880375f512acf5cd01b25939610ee9a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Fri, 6 Jan 2023 20:46:42 +0100 Subject: [PATCH 5/7] Remove --- stream/entity.go | 63 --------------------------------- stream/event.go | 23 ------------ stream/stream_test.go | 81 ------------------------------------------- 3 files changed, 167 deletions(-) delete mode 100644 stream/entity.go delete mode 100644 stream/event.go delete mode 100644 stream/stream_test.go diff --git a/stream/entity.go b/stream/entity.go deleted file mode 100644 index a4786c2..0000000 --- a/stream/entity.go +++ /dev/null @@ -1,63 +0,0 @@ -package stream - -// Entity represents the type saved and loaded by the event store. -// In DDD terms, it is the "aggregate root". -// -// In order for your domain type to implement Entity: -// - Embed pointer to the Stream. -// - Implement the interface methods in accordance with its description. -// -// Then an EventStore will be able to store and load it. -// -// Example: -// -// type User struct { -// stream *stream.Stream[User] -// id string -// } -// -// func (u User) Stream() *stream.Stream[User] { -// return u.stream -// } -// -// func (u User) NewWithStream(stream *stream.Stream[User]) *User { -// return &User{stream: stream} -// } -type Entity[T any] interface { - // Stream exposes a pointer to the Stream. - Stream() *Stream[T] - - // NewWithStream returns a new instance with the provided Stream queue. - NewWithStream(*Stream[T]) *T -} - -// ID is the unique identifier of a stream. -type ID string - -func (i ID) String() string { - return string(i) -} - -// NewEntity instantiates a new T with the given events applied to it. -// At the same time the entity's internal Stream is initialised, -// so it can record new upcoming stream. -func NewEntity[T Entity[T]](id ID, eventsSlice []VersionedEvent[T]) (*T, error) { - var t T - - stream, err := newStream(id, eventsSlice) - if err != nil { - return nil, err - } - - eventsSlice = stream.PopEvents() - - target := t.NewWithStream(stream) - for _, e := range eventsSlice { - err := e.ApplyTo(target) - if err != nil { - return nil, err - } - } - - return target, nil -} diff --git a/stream/event.go b/stream/event.go deleted file mode 100644 index c4a8af5..0000000 --- a/stream/event.go +++ /dev/null @@ -1,23 +0,0 @@ -package stream - -// EventName identifies the type of the event and the version of its schema, e.g. "FooCreated_v1". -type EventName string - -type Event[T any] interface { - // EventName should identify the event and the version of its schema. - // - // Example implementation: - // func (e FooCreated) EventName() EventName { - // return "FooCreated_v1" - // } - EventName() EventName - - // ApplyTo applies the event to the stream. - ApplyTo(*T) error -} - -// VersionedEvent is an event with a corresponding stream version. -type VersionedEvent[T any] struct { - Event[T] - StreamVersion int -} diff --git a/stream/stream_test.go b/stream/stream_test.go deleted file mode 100644 index 5eb42e0..0000000 --- a/stream/stream_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package stream_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/ThreeDotsLabs/esja/stream" -) - -type Entity struct { - stream *stream.Stream[Entity] -} - -func (s Entity) Stream() *stream.Stream[Entity] { - return s.stream -} - -func (s Entity) NewWithStream(stream *stream.Stream[Entity]) *Entity { - return &Entity{stream: stream} -} - -type Event struct { - ID int -} - -func (e Event) EventName() stream.EventName { - return "Event" -} - -func (e Event) ApplyTo(_ *Entity) error { - return nil -} - -func TestNewEventsQueue(t *testing.T) { - var event1 stream.Event[Entity] = Event{ID: 1} - var event2 stream.Event[Entity] = Event{ID: 2} - - str, err := stream.NewStream[Entity]("ID") - require.NoError(t, err) - s := &Entity{ - stream: str, - } - - assert.False(t, str.HasEvents()) - - events := str.PopEvents() - assert.Len(t, events, 0) - - err = str.Record(s, event1) - assert.NoError(t, err) - err = str.Record(s, event2) - assert.NoError(t, err) - - assert.True(t, str.HasEvents()) - - events = str.PopEvents() - assert.Len(t, events, 2) - assert.False(t, str.HasEvents()) - - assert.Equal(t, event1, events[0].Event) - assert.Equal(t, 1, events[0].StreamVersion) - - assert.Equal(t, event2, events[1].Event) - assert.Equal(t, 2, events[1].StreamVersion) - - events = str.PopEvents() - assert.Len(t, events, 0) - - var event3 stream.Event[Entity] = Event{ID: 3} - - err = str.Record(s, event3) - assert.NoError(t, err) - - events = str.PopEvents() - assert.Len(t, events, 1) - - assert.Equal(t, event3, events[0].Event) - assert.Equal(t, 3, events[0].StreamVersion) -} From b830d280eb019afba161999136ab718ea99f7301 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Fri, 6 Jan 2023 20:47:11 +0100 Subject: [PATCH 6/7] Fix --- eventstore/eventstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventstore/eventstore.go b/eventstore/eventstore.go index efa81b1..b50ba90 100644 --- a/eventstore/eventstore.go +++ b/eventstore/eventstore.go @@ -11,7 +11,7 @@ var ErrEntityNotFound = errors.New("entity not found by ID") // EventStore loads and saves T implementing esja.Entity. type EventStore[T esja.Entity[T]] interface { - // Load fetches all events for the stream id and returns a new instance of T based on them. + // Load fetches all events for the ID and returns a new instance of T based on them. Load(ctx context.Context, id string) (*T, error) // Save saves events recorded in the entity's stream. From daa85c6bd0dcdde3851c1d6d98439d21574b7087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Fri, 6 Jan 2023 20:47:45 +0100 Subject: [PATCH 7/7] tidy --- _examples/postcard/go.mod | 3 +-- _examples/postcard/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/_examples/postcard/go.mod b/_examples/postcard/go.mod index 79e8113..45a88ea 100644 --- a/_examples/postcard/go.mod +++ b/_examples/postcard/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/ThreeDotsLabs/esja v0.0.0-20221208191400-8fbb493947e7 - github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d + github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963 github.com/ThreeDotsLabs/watermill v1.2.0-rc.11 github.com/ThreeDotsLabs/watermill-amqp v1.1.4 github.com/ThreeDotsLabs/watermill-sql v1.3.7 @@ -16,7 +16,6 @@ require ( require ( github.com/cenkalti/backoff/v3 v3.2.2 // indirect - github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect diff --git a/_examples/postcard/go.sum b/_examples/postcard/go.sum index fdb741b..7a1b858 100644 --- a/_examples/postcard/go.sum +++ b/_examples/postcard/go.sum @@ -1,6 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d h1:bZBc4vDne17OYqoeSceaq5eaZM6vOUMVaJIBy+dNRH4= -github.com/ThreeDotsLabs/pii v0.0.0-20221221144555-f2186024b30d/go.mod h1:wu5cEZEjFUIXR9hdniDvGbbZARrYHTRi6G2bNaSCC/E= +github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963 h1:4EQlsCpfwxjn5ijR8fdL6ap1q04guWUCHgnZ+jPdEjY= +github.com/ThreeDotsLabs/pii v0.0.0-20230103125711-e0908da9a963/go.mod h1:wu5cEZEjFUIXR9hdniDvGbbZARrYHTRi6G2bNaSCC/E= github.com/ThreeDotsLabs/watermill v1.0.2/go.mod h1:vZCPh7eN0P7r2qKau4SfmcUZ83+3JXWkRl4BiWUlqFw= github.com/ThreeDotsLabs/watermill v1.2.0-rc.11 h1:tQJ3L/AnfliXaxaq+ElHOfzi0Vx+AN8cAnIOLcUTrxo= github.com/ThreeDotsLabs/watermill v1.2.0-rc.11/go.mod h1:QLZSaklpSZ/7yv288LL2DFOgCEi86VYEmQvzmaMlHoA=