Skip to content

Commit

Permalink
Add support for validation scopes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Oct 3, 2024
1 parent 6466491 commit 1624d84
Show file tree
Hide file tree
Showing 38 changed files with 331 additions and 371 deletions.
52 changes: 28 additions & 24 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/dogmatiq/configkit/message"
"github.com/dogmatiq/dodeca/logging"
"github.com/dogmatiq/dogma"
"github.com/dogmatiq/enginekit/collections/sets"
"github.com/dogmatiq/interopspec/envelopespec"
"github.com/dogmatiq/verity/eventstream"
"github.com/dogmatiq/verity/eventstream/memorystream"
Expand Down Expand Up @@ -88,7 +89,7 @@ func (e *Engine) initApp(

e.apps[id.Key] = a

for mt := range x.Packer.Produced {
for mt := range x.Packer.Produced.All() {
e.executors[mt] = x
}

Expand Down Expand Up @@ -126,10 +127,11 @@ func (e *Engine) newEventCache(
// TODO: https://github.com/dogmatiq/verity/issues/226
// Make buffer size configurable.
BufferSize: 0,
Types: cfg.
MessageTypes().
Produced.
FilterByRole(message.EventRole),
Types: sets.NewFromKeys(
cfg.
MessageTypes().
Produced(message.EventKind),
),
}, nil
}

Expand All @@ -147,10 +149,11 @@ func (e *Engine) newEventStream(
// TODO: https://github.com/dogmatiq/verity/issues/76
// Make pre-fetch buffer size configurable.
PreFetch: 10,
Types: cfg.
MessageTypes().
Produced.
FilterByRole(message.EventRole),
Types: sets.NewFromKeys(
cfg.
MessageTypes().
Produced(message.EventKind),
),
}
}

Expand All @@ -169,10 +172,11 @@ func (e *Engine) newCommandExecutor(
Key: cfg.Identity().Key,
},
Marshaler: e.opts.Marshaler,
Produced: cfg.
MessageTypes().
Consumed.
FilterByRole(message.CommandRole),
Produced: sets.NewFromKeys(
cfg.
MessageTypes().
Consumed(message.EventKind),
),
},
}
}
Expand All @@ -188,7 +192,7 @@ func (e *Engine) newEntryPoint(
hf := &handlerFactory{
opts: e.opts,
queue: q,
queueEvents: message.TypeSet{},
queueEvents: sets.New[message.Type](),
aggregateLoader: &aggregate.Loader{
AggregateRepository: ds,
EventRepository: ds,
Expand Down Expand Up @@ -221,7 +225,7 @@ func (e *Engine) newEntryPoint(
type handlerFactory struct {
opts *engineOptions
queue *queue.Queue
queueEvents message.TypeSet
queueEvents *sets.Set[message.Type]
aggregateLoader *aggregate.Loader
processLoader *process.Loader
engineLogger logging.Logger
Expand Down Expand Up @@ -278,8 +282,8 @@ func (f *handlerFactory) VisitRichAggregate(_ context.Context, cfg configkit.Ric
Packer: &parcel.Packer{
Application: f.app,
Marshaler: f.opts.Marshaler,
Produced: cfg.MessageTypes().Produced,
Consumed: cfg.MessageTypes().Consumed,
Produced: sets.NewFromKeys(cfg.MessageTypes().Produced()),
Consumed: sets.NewFromKeys(cfg.MessageTypes().Consumed()),
},
LoadTimeout: f.opts.MessageTimeout,
Logger: f.appLogger,
Expand Down Expand Up @@ -311,8 +315,8 @@ func (f *handlerFactory) VisitRichProcess(_ context.Context, cfg configkit.RichP
Packer: &parcel.Packer{
Application: f.app,
Marshaler: f.opts.Marshaler,
Produced: cfg.MessageTypes().Produced,
Consumed: cfg.MessageTypes().Consumed,
Produced: sets.NewFromKeys(cfg.MessageTypes().Produced()),
Consumed: sets.NewFromKeys(cfg.MessageTypes().Consumed()),
},
LoadTimeout: f.opts.MessageTimeout,
Logger: f.appLogger,
Expand All @@ -332,8 +336,8 @@ func (f *handlerFactory) VisitRichIntegration(_ context.Context, cfg configkit.R
Packer: &parcel.Packer{
Application: f.app,
Marshaler: f.opts.Marshaler,
Produced: cfg.MessageTypes().Produced,
Consumed: cfg.MessageTypes().Consumed,
Produced: sets.NewFromKeys(cfg.MessageTypes().Produced()),
Consumed: sets.NewFromKeys(cfg.MessageTypes().Consumed()),
},
Logger: f.appLogger,
})
Expand All @@ -346,10 +350,10 @@ func (f *handlerFactory) VisitRichProjection(context.Context, configkit.RichProj
}

func (f *handlerFactory) addRoutes(cfg configkit.RichHandler, h handler.Handler) {
for mt, r := range cfg.MessageTypes().Consumed {
for mt := range cfg.MessageTypes().Consumed() {
f.handler[mt] = append(f.handler[mt], h)

if r == message.EventRole {
if mt.Kind() == message.EventKind {
f.queueEvents.Add(mt)
}

Expand All @@ -360,7 +364,7 @@ func (f *handlerFactory) addRoutes(cfg configkit.RichHandler, h handler.Handler)
cfg.Identity().Name,
f.app.Name,
mt,
r,
mt.Kind(),
)
}
}
2 changes: 1 addition & 1 deletion engineoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func WithProjectionCompactTimeout(d time.Duration) EngineOption {
func NewDefaultMarshaler(configs []configkit.RichApplication) marshaler.Marshaler {
var types []reflect.Type
for _, cfg := range configs {
for t := range cfg.MessageTypes().All() {
for t := range cfg.MessageTypes() {
types = append(types, t.ReflectType())
}

Expand Down
1 change: 1 addition & 0 deletions engineoption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package verity
import (
"time"

//revive:disable:dot-imports
"github.com/dogmatiq/configkit"
"github.com/dogmatiq/dodeca/logging"
"github.com/dogmatiq/dogma"
Expand Down
23 changes: 14 additions & 9 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/dogmatiq/configkit"
"github.com/dogmatiq/configkit/message"
"github.com/dogmatiq/enginekit/collections/sets"
"github.com/dogmatiq/interopspec/envelopespec"
"github.com/dogmatiq/verity/eventstream"
"github.com/dogmatiq/verity/handler/projection"
Expand Down Expand Up @@ -65,18 +66,18 @@ func (e *Engine) runStreamConsumerForQueue(
s eventstream.Stream,
a *app,
) error {
events := message.TypeSet{}
var events *sets.Set[message.Type]

Check warning on line 69 in eventstream.go

View check run for this annotation

Codecov / codecov/patch

eventstream.go#L69

Added line #L69 was not covered by tests

// Find all events that are consumed by processes. Events consumed by
// projections are not included as each projection handler has its own
// consumer.
for _, h := range a.Config.RichHandlers().Processes() {
h.MessageTypes().Consumed.RangeByRole(
message.EventRole,
func(t message.Type) bool {
events.Add(t)
return true
},
events = events.Union(
sets.NewFromKeys(
h.
MessageTypes().
Consumed(message.EventKind),
),

Check warning on line 80 in eventstream.go

View check run for this annotation

Codecov / codecov/patch

eventstream.go#L75-L80

Added lines #L75 - L80 were not covered by tests
)
}

Expand Down Expand Up @@ -118,8 +119,12 @@ func (e *Engine) runStreamConsumerForProjection(
h configkit.RichProjection,
) error {
c := &eventstream.Consumer{
Stream: s,
EventTypes: h.MessageTypes().Consumed,
Stream: s,
EventTypes: sets.NewFromKeys(
h.
MessageTypes().
Consumed(),
),
Handler: &projection.StreamAdaptor{
Identity: &envelopespec.Identity{
Name: h.Identity().Name,
Expand Down
13 changes: 7 additions & 6 deletions eventstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/dogmatiq/configkit"
"github.com/dogmatiq/configkit/message"
"github.com/dogmatiq/dodeca/logging"
"github.com/dogmatiq/enginekit/collections/sets"
"github.com/dogmatiq/linger"
"github.com/dogmatiq/linger/backoff"
"golang.org/x/sync/semaphore"
Expand All @@ -33,7 +34,7 @@ type Consumer struct {
Stream Stream

// EventTypes is the set of event types that the handler consumes.
EventTypes message.TypeCollection
EventTypes *sets.Set[message.Type]

// Handler is the target for the events from the stream.
Handler Handler
Expand Down Expand Up @@ -97,9 +98,9 @@ func (c *Consumer) consume(ctx context.Context) error {
return err
}

relevant := message.IntersectionT(c.EventTypes, produced)
relevant := c.EventTypes.Intersection(produced)

if len(relevant) == 0 {
if relevant.Len() == 0 {
logging.Debug(
c.Logger,
"stream does not produce any relevant event types",
Expand All @@ -108,7 +109,7 @@ func (c *Consumer) consume(ctx context.Context) error {
return nil
}

for t := range relevant {
for t := range relevant.All() {
logging.Debug(
c.Logger,
"consuming '%s' events",
Expand All @@ -132,7 +133,7 @@ func (c *Consumer) consume(ctx context.Context) error {
// open opens a stream cursor based on the offset given by the handler.
func (c *Consumer) open(
ctx context.Context,
types message.TypeSet,
types *sets.Set[message.Type],
) (Cursor, error) {
var err error
c.offset, err = c.Handler.NextOffset(ctx, c.Stream.Application())
Expand All @@ -143,7 +144,7 @@ func (c *Consumer) open(
logging.Debug(
c.Logger,
"consuming %d event type(s), beginning at offset %d",
len(types),
types.Len(),
c.offset,
)

Expand Down
17 changes: 9 additions & 8 deletions eventstream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/dogmatiq/configkit"
"github.com/dogmatiq/configkit/message"
"github.com/dogmatiq/dodeca/logging"
"github.com/dogmatiq/enginekit/collections/sets"
. "github.com/dogmatiq/enginekit/enginetest/stubs"
"github.com/dogmatiq/linger/backoff"
. "github.com/dogmatiq/verity/eventstream"
Expand Down Expand Up @@ -66,7 +67,7 @@ var _ = Describe("type Consumer", func() {

mstream = &memorystream.Stream{
App: configkit.MustNewIdentity("<app-name>", DefaultAppKey),
Types: message.NewTypeSet(
Types: sets.New(
message.TypeFor[EventStub[TypeA]](),
message.TypeFor[EventStub[TypeB]](),
),
Expand All @@ -89,7 +90,7 @@ var _ = Describe("type Consumer", func() {

consumer = &Consumer{
Stream: stream,
EventTypes: message.NewTypeSet(
EventTypes: sets.New(
message.TypeFor[EventStub[TypeA]](),
),
Handler: eshandler,
Expand Down Expand Up @@ -130,8 +131,8 @@ var _ = Describe("type Consumer", func() {
It("returns if the stream does not produce any relevant events", func() {
stream.EventTypesFunc = func(
context.Context,
) (message.TypeCollection, error) {
return message.NewTypeSet(
) (*sets.Set[message.Type], error) {
return sets.New(
message.TypeFor[EventStub[TypeC]](),
), nil
}
Expand All @@ -144,7 +145,7 @@ var _ = Describe("type Consumer", func() {
stream.OpenFunc = func(
context.Context,
uint64,
message.TypeCollection,
*sets.Set[message.Type],
) (Cursor, error) {
stream.OpenFunc = nil

Expand All @@ -168,7 +169,7 @@ var _ = Describe("type Consumer", func() {
It("restarts the consumer if querying the stream's event types returns an error", func() {
stream.EventTypesFunc = func(
context.Context,
) (message.TypeCollection, error) {
) (*sets.Set[message.Type], error) {
stream.EventTypesFunc = nil

eshandler.HandleEventFunc = func(
Expand Down Expand Up @@ -213,7 +214,7 @@ var _ = Describe("type Consumer", func() {

It("restarts the consumer if the event offset is earlier than the consumed offset", func() {
// Ensure the consumer tries to consume all event types.
consumer.EventTypes = message.NewTypeSet(
consumer.EventTypes = sets.New(
message.TypeFor[EventStub[TypeA]](),
message.TypeFor[EventStub[TypeB]](),
)
Expand All @@ -223,7 +224,7 @@ var _ = Describe("type Consumer", func() {
stream.OpenFunc = func(
ctx context.Context,
offset uint64,
types message.TypeCollection,
types *sets.Set[message.Type],
) (Cursor, error) {
// Reset the stream to behave normally again on the next
// attempt at opening a cursor.
Expand Down
Loading

0 comments on commit 1624d84

Please sign in to comment.