-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
102 lines (88 loc) · 3.34 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package mediator
import (
"context"
"errors"
"log/slog"
)
type (
// NotificationHandler can receive notifications from the [Publisher].
// NotificationHandlers can be subscribed to a [Notification] using the [Subscribe] function.
NotificationHandler[T any] interface {
Handle(ctx context.Context, l *slog.Logger, event T) error
}
notificationHandler[T any] struct {
handleFunc func(ctx context.Context, l *slog.Logger, event T) error
}
// Publisher can be used to subscribe and publish notifications.
// Publishing and subscribing is done through the [Publish] and [Subscribe] functions that take
// Publisher as a parameter.
//
// Publisher also holds the [Pipeline] that messages pass through before they get handled.
//
// The interface is implemented by [Mediator].
Publisher interface {
// getPipeline() Pipeline
getLogger() *slog.Logger
getAllNotifiers() map[any][]any
newNotifier(key any, notifier any)
}
// Notification is an event that can be published through the [Publisher].
Notification[T any] interface{}
)
func (nh notificationHandler[T]) Handle(ctx context.Context, l *slog.Logger, event T) error {
return nh.handleFunc(ctx, l, event)
}
// NewNotificationHandler is a utility function for creating a [NotificationHandler] without having to define a type.
// This is especially useful when writing tests.
func NewNotificationHandler[T any](handleFunc func(ctx context.Context, l *slog.Logger, event T) error) NotificationHandler[T] {
return notificationHandler[T]{
handleFunc: handleFunc,
}
}
// Subscribe to a [Notification] using [Publisher].
// When a [Notification] is published, every subscriber triggers the [Pipeline].
// So every subscriber in for the event makes the [Notification] go through the chain.
func Subscribe[T any](p Publisher, s NotificationHandler[T]) error {
p.newNotifier(key[T]{}, s)
return nil
}
// Publish a [Notification] using [Publisher].
//
// The [Publisher] interface is implemented by [Mediator].
func Publish[T Notification[any]](ctx context.Context, p Publisher, notification T) error {
return PublishWithLogger(ctx, p.getLogger(), p, notification)
}
// PublishWithLogger publishes a [Notification] using [Publisher].
// This function is like [Publish], but with a logger parameter.
// Passing a logger can be useful if you want to add attributes to the logger in the caller.
//
// The [Publisher] interface is implemented by [Mediator].
func PublishWithLogger[T Notification[any]](ctx context.Context, l *slog.Logger, p Publisher, notification T) error {
allHandlers := p.getAllNotifiers()
handlers := allHandlers[key[T]{}]
var errs []error
for _, handler := range handlers {
h, ok := handler.(NotificationHandler[T])
if !ok {
// This shouldn't happen, but catching it just in case to prevent possible panics
errs = append(errs, errors.New("subscribers contain a broken handler that doesn't implement the NotificationHandler interface"))
}
/*
handler := p.getPipeline().Then(func(ctx context.Context, l *slog.Logger, _ Message) (any, error) {
return nil, h.Handle(ctx, l, notification)
})
_, err := handler.Handle(ctx, l, NewNotificationMessage[T](notification))
if err != nil {
errs = append(errs, err)
}
*/
err := h.Handle(ctx, l, notification)
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}