Skip to content

Commit

Permalink
feat: implement subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
x4b1 committed May 12, 2024
1 parent 96f9832 commit 3c62091
Show file tree
Hide file tree
Showing 30 changed files with 1,399 additions and 336 deletions.
6 changes: 3 additions & 3 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ linters-settings:
# Default: true
skipRecvDeref: false

gomnd:
mnd:
# List of function patterns to exclude from analysis.
# Values always ignored: `time.Date`,
# `strconv.FormatInt`, `strconv.FormatUint`, `strconv.FormatFloat`,
Expand Down Expand Up @@ -200,7 +200,6 @@ linters:
- durationcheck # checks for two durations multiplied together
- errname # checks that sentinel errors are prefixed with the Err and error types are suffixed with the Error
- errorlint # finds code that will cause problems with the error wrapping scheme introduced in Go 1.13
- execinquery # checks query string in Query function which reads your Go src files and warning it finds
- exhaustive # checks exhaustiveness of enum switch statements
- exportloopref # checks for pointers to enclosing loop variables
- forbidigo # forbids identifiers
Expand All @@ -215,7 +214,7 @@ linters:
- gocyclo # computes and checks the cyclomatic complexity of functions
- godot # checks if comments end in a period
- goimports # in addition to fixing imports, goimports also formats your code in the same style as gofmt
- gomnd # detects magic numbers
- mnd # detects magic numbers
- gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod
- gomodguard # allow and block lists linter for direct Go module dependencies. This is different from depguard where there are different block types for example version constraints and module recommendations
- goprintffuncname # checks that printf-like functions are named with f at the end
Expand Down Expand Up @@ -276,6 +275,7 @@ linters:
#- depguard # [replaced by gomodguard] checks if package imports are in a list of acceptable packages
#- dogsled # checks assignments with too many blank identifiers (e.g. x, _, _, _, := f())
#- dupword # [useless without config] checks for duplicate words in the source code
#- execinquery # checks query string in Query function which reads your Go src files and warning it finds
#- errchkjson # [don't see profit + I'm against of omitting errors like in the first example https://github.com/breml/errchkjson] checks types passed to the json encoding functions. Reports unsupported types and optionally reports occasions, where the check for the returned error can be omitted
#- forcetypeassert # [replaced by errcheck] finds forced type assertions
#- goerr113 # [too strict] checks the errors handling expressions
Expand Down
3 changes: 3 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"github.com/x4b1/messenger"
)

// MessageIDKey defines the key that will be send the message unique identifier.
const MessageIDKey = "message_id"

//go:generate moq -stub -out x_broker_mock_test.go . Broker

// Broker is the interface that wraps the basic message publishing.
Expand Down
13 changes: 11 additions & 2 deletions broker/sns/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (

var _ broker.Broker = &Publisher{}

//nolint:typecheck // aws constant to not generate every time.
var awsStringDataType = aws.String("String")

//go:generate moq -pkg sns_test -stub -out publisher_mock_test.go . Client

// Client defines the AWS SNS methods used by the Publisher. This is used for testing purposes.
Expand Down Expand Up @@ -81,10 +84,16 @@ type Publisher struct {
// Publish publishes the given message to the pubsub topic.
func (p Publisher) Publish(ctx context.Context, msg messenger.Message) error {
md := msg.Metadata()
att := make(map[string]types.MessageAttributeValue, len(md))
att := make(map[string]types.MessageAttributeValue)

att[broker.MessageIDKey] = types.MessageAttributeValue{
DataType: awsStringDataType,
StringValue: aws.String(msg.ID()),
}

for k, v := range md {
att[k] = types.MessageAttributeValue{
DataType: aws.String("String"),
DataType: awsStringDataType,
StringValue: aws.String(v),
}
}
Expand Down
16 changes: 10 additions & 6 deletions broker/sns/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/x4b1/messenger"
"github.com/x4b1/messenger/broker"
publisher "github.com/x4b1/messenger/broker/sns"
)

Expand Down Expand Up @@ -65,8 +66,9 @@ func TestPublish(t *testing.T) {
Message: aws.String(string(msg.Payload())),
MessageGroupId: nil,
MessageAttributes: map[string]types.MessageAttributeValue{
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
broker.MessageIDKey: {DataType: aws.String("String"), StringValue: aws.String(msg.MsgID)},
},
TopicArn: aws.String(topicARN),
},
Expand All @@ -79,8 +81,9 @@ func TestPublish(t *testing.T) {
Message: aws.String(string(msg.Payload())),
MessageGroupId: aws.String(defaultOrdKey),
MessageAttributes: map[string]types.MessageAttributeValue{
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
broker.MessageIDKey: {DataType: aws.String("String"), StringValue: aws.String(msg.MsgID)},
},
TopicArn: aws.String(topicARN),
},
Expand All @@ -97,8 +100,9 @@ func TestPublish(t *testing.T) {
Message: aws.String(string(msg.Payload())),
MessageGroupId: aws.String(orderingValue),
MessageAttributes: map[string]types.MessageAttributeValue{
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
broker.MessageIDKey: {DataType: aws.String("String"), StringValue: aws.String(msg.MsgID)},
},
TopicArn: aws.String(topicARN),
},
Expand Down
51 changes: 29 additions & 22 deletions broker/sqs/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"

Expand All @@ -14,45 +15,45 @@ import (

var _ broker.Broker = &Publisher{}

//go:generate moq -pkg sqs_test -stub -out publisher_mock_test.go . Client
//nolint:typecheck // aws constant to not generate every time.
var awsStringDataType = aws.String("String")

// Client defines the AWS SQS methods used by the Publisher. This is used for testing purposes.
type Client interface {
GetQueueUrl(context.Context, *sqs.GetQueueUrlInput, ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
}

// Option is a function to set options to Publisher.
type Option func(*Publisher)
// PublisherOption is a function to set options to Publisher.
type PublisherOption func(*Publisher)

// WithMetaOrderingKey setups the metadata key to get the ordering key.
func WithMetaOrderingKey(key string) Option {
// PublisherWithMetaOrderingKey setups the metadata key to get the ordering key.
func PublisherWithMetaOrderingKey(key string) PublisherOption {
return func(p *Publisher) {
p.metaOrdKey = key
}
}

// WithDefaultOrderingKey setups the default ordering key.
func WithDefaultOrderingKey(key string) Option {
// PublisherWithDefaultOrderingKey setups the default ordering key.
func PublisherWithDefaultOrderingKey(key string) PublisherOption {
return func(p *Publisher) {
p.defaultOrdKey = key
}
}

// WithFifoQueue setups the flag to use fifo queue.
func WithFifoQueue(fifo bool) Option {
// PublisherWithFifoQueue setups the flag to use fifo queue.
func PublisherWithFifoQueue(fifo bool) PublisherOption {
return func(p *Publisher) {
p.fifo = fifo
}
}

// Open returns a new Publisher instance.
func Open(ctx context.Context, awsOpts sqs.Options, queue string, opts ...Option) (*Publisher, error) {
return New(ctx, sqs.New(awsOpts), queue, opts...)
// NewPublisherFromDefault returns a new Publisher instance.
func NewPublisherFromDefault(ctx context.Context, queue string, opts ...PublisherOption) (*Publisher, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("loading aws config from default: %w", err)
}

return NewPublisher(ctx, sqs.NewFromConfig(cfg), queue, opts...)
}

// New returns a new Publisher instance.
func New(ctx context.Context, svc Client, queue string, opts ...Option) (*Publisher, error) {
// NewPublisher returns a new Publisher instance.
func NewPublisher(ctx context.Context, svc Client, queue string, opts ...PublisherOption) (*Publisher, error) {
q, err := svc.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{QueueName: aws.String(queue)})
if err != nil {
return nil, fmt.Errorf("getting queue url: %w", err)
Expand Down Expand Up @@ -87,10 +88,16 @@ type Publisher struct {
// Publish publishes the given message to the pubsub topic.
func (p Publisher) Publish(ctx context.Context, msg messenger.Message) error {
md := msg.Metadata()
att := make(map[string]types.MessageAttributeValue, len(md))
att := make(map[string]types.MessageAttributeValue)

att[broker.MessageIDKey] = types.MessageAttributeValue{
DataType: awsStringDataType,
StringValue: aws.String(msg.ID()),
}

for k, v := range md {
att[k] = types.MessageAttributeValue{
DataType: aws.String("String"),
DataType: awsStringDataType,
StringValue: aws.String(v),
}
}
Expand Down
124 changes: 122 additions & 2 deletions broker/sqs/publisher_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3c62091

Please sign in to comment.