Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement subscriber #127

Merged
merged 1 commit into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ linters-settings:
# Default: true
skipRecvDeref: false

gomnd:
mnd:
# List of function patterns to exclude from analysis.
# Values always ignored: `time.Date`,
# `strconv.FormatInt`, `strconv.FormatUint`, `strconv.FormatFloat`,
Expand Down Expand Up @@ -200,7 +200,6 @@ linters:
- durationcheck # checks for two durations multiplied together
- errname # checks that sentinel errors are prefixed with the Err and error types are suffixed with the Error
- errorlint # finds code that will cause problems with the error wrapping scheme introduced in Go 1.13
- execinquery # checks query string in Query function which reads your Go src files and warning it finds
- exhaustive # checks exhaustiveness of enum switch statements
- exportloopref # checks for pointers to enclosing loop variables
- forbidigo # forbids identifiers
Expand All @@ -215,7 +214,7 @@ linters:
- gocyclo # computes and checks the cyclomatic complexity of functions
- godot # checks if comments end in a period
- goimports # in addition to fixing imports, goimports also formats your code in the same style as gofmt
- gomnd # detects magic numbers
- mnd # detects magic numbers
- gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod
- gomodguard # allow and block lists linter for direct Go module dependencies. This is different from depguard where there are different block types for example version constraints and module recommendations
- goprintffuncname # checks that printf-like functions are named with f at the end
Expand Down Expand Up @@ -276,6 +275,7 @@ linters:
#- depguard # [replaced by gomodguard] checks if package imports are in a list of acceptable packages
#- dogsled # checks assignments with too many blank identifiers (e.g. x, _, _, _, := f())
#- dupword # [useless without config] checks for duplicate words in the source code
#- execinquery # checks query string in Query function which reads your Go src files and warning it finds
#- errchkjson # [don't see profit + I'm against of omitting errors like in the first example https://github.com/breml/errchkjson] checks types passed to the json encoding functions. Reports unsupported types and optionally reports occasions, where the check for the returned error can be omitted
#- forcetypeassert # [replaced by errcheck] finds forced type assertions
#- goerr113 # [too strict] checks the errors handling expressions
Expand Down
3 changes: 3 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"github.com/x4b1/messenger"
)

// MessageIDKey defines the key that will be send the message unique identifier.
const MessageIDKey = "message_id"

//go:generate moq -stub -out x_broker_mock_test.go . Broker

// Broker is the interface that wraps the basic message publishing.
Expand Down
6 changes: 3 additions & 3 deletions broker/pubsub/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestPublishWithNoOrderingKey(t *testing.T) {
msgs := srv.Messages()
require.Len(t, msgs, 1)
require.Equal(t, m.Payload(), msgs[0].Data)
require.Equal(t, m.Metadata(), msgs[0].Attributes)
require.EqualValues(t, m.Metadata(), msgs[0].Attributes)
require.Empty(t, msgs[0].OrderingKey)
}

Expand All @@ -78,7 +78,7 @@ func TestPublishWithDefaultOrderingKey(t *testing.T) {
msgs := srv.Messages()
require.Len(t, msgs, 1)
require.Equal(t, m.Payload(), msgs[0].Data)
require.Equal(t, m.Metadata(), msgs[0].Attributes)
require.EqualValues(t, m.Metadata(), msgs[0].Attributes)
require.Equal(t, ordKey, msgs[0].OrderingKey)
}

Expand All @@ -102,6 +102,6 @@ func TestPublishWithMessageMetadataOrderingKey(t *testing.T) {
msgs := srv.Messages()
require.Len(t, msgs, 1)
require.Equal(t, m.Payload(), msgs[0].Data)
require.Equal(t, m.Metadata(), msgs[0].Attributes)
require.EqualValues(t, m.Metadata(), msgs[0].Attributes)
require.Equal(t, orderingValue, msgs[0].OrderingKey)
}
13 changes: 11 additions & 2 deletions broker/sns/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (

var _ broker.Broker = &Publisher{}

//nolint:typecheck // aws constant to not generate every time.
var awsStringDataType = aws.String("String")

//go:generate moq -pkg sns_test -stub -out publisher_mock_test.go . Client

// Client defines the AWS SNS methods used by the Publisher. This is used for testing purposes.
Expand Down Expand Up @@ -81,10 +84,16 @@ type Publisher struct {
// Publish publishes the given message to the pubsub topic.
func (p Publisher) Publish(ctx context.Context, msg messenger.Message) error {
md := msg.Metadata()
att := make(map[string]types.MessageAttributeValue, len(md))
att := make(map[string]types.MessageAttributeValue)

att[broker.MessageIDKey] = types.MessageAttributeValue{
DataType: awsStringDataType,
StringValue: aws.String(msg.ID()),
}

for k, v := range md {
att[k] = types.MessageAttributeValue{
DataType: aws.String("String"),
DataType: awsStringDataType,
StringValue: aws.String(v),
}
}
Expand Down
16 changes: 10 additions & 6 deletions broker/sns/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/x4b1/messenger"
"github.com/x4b1/messenger/broker"
publisher "github.com/x4b1/messenger/broker/sns"
)

Expand Down Expand Up @@ -65,8 +66,9 @@ func TestPublish(t *testing.T) {
Message: aws.String(string(msg.Payload())),
MessageGroupId: nil,
MessageAttributes: map[string]types.MessageAttributeValue{
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
broker.MessageIDKey: {DataType: aws.String("String"), StringValue: aws.String(msg.MsgID)},
},
TopicArn: aws.String(topicARN),
},
Expand All @@ -79,8 +81,9 @@ func TestPublish(t *testing.T) {
Message: aws.String(string(msg.Payload())),
MessageGroupId: aws.String(defaultOrdKey),
MessageAttributes: map[string]types.MessageAttributeValue{
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
broker.MessageIDKey: {DataType: aws.String("String"), StringValue: aws.String(msg.MsgID)},
},
TopicArn: aws.String(topicARN),
},
Expand All @@ -97,8 +100,9 @@ func TestPublish(t *testing.T) {
Message: aws.String(string(msg.Payload())),
MessageGroupId: aws.String(orderingValue),
MessageAttributes: map[string]types.MessageAttributeValue{
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
"aggregate_id": {DataType: aws.String("String"), StringValue: aws.String(msg.Metadata()["aggregate_id"])},
metaKey: {DataType: aws.String("String"), StringValue: aws.String(orderingValue)},
broker.MessageIDKey: {DataType: aws.String("String"), StringValue: aws.String(msg.MsgID)},
},
TopicArn: aws.String(topicARN),
},
Expand Down
51 changes: 29 additions & 22 deletions broker/sqs/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"fmt"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"

Expand All @@ -14,45 +15,45 @@

var _ broker.Broker = &Publisher{}

//go:generate moq -pkg sqs_test -stub -out publisher_mock_test.go . Client
//nolint:typecheck // aws constant to not generate every time.
var awsStringDataType = aws.String("String")

// Client defines the AWS SQS methods used by the Publisher. This is used for testing purposes.
type Client interface {
GetQueueUrl(context.Context, *sqs.GetQueueUrlInput, ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
}

// Option is a function to set options to Publisher.
type Option func(*Publisher)
// PublisherOption is a function to set options to Publisher.
type PublisherOption func(*Publisher)

// WithMetaOrderingKey setups the metadata key to get the ordering key.
func WithMetaOrderingKey(key string) Option {
// PublisherWithMetaOrderingKey setups the metadata key to get the ordering key.
func PublisherWithMetaOrderingKey(key string) PublisherOption {
return func(p *Publisher) {
p.metaOrdKey = key
}
}

// WithDefaultOrderingKey setups the default ordering key.
func WithDefaultOrderingKey(key string) Option {
// PublisherWithDefaultOrderingKey setups the default ordering key.
func PublisherWithDefaultOrderingKey(key string) PublisherOption {
return func(p *Publisher) {
p.defaultOrdKey = key
}
}

// WithFifoQueue setups the flag to use fifo queue.
func WithFifoQueue(fifo bool) Option {
// PublisherWithFifoQueue setups the flag to use fifo queue.
func PublisherWithFifoQueue(fifo bool) PublisherOption {
return func(p *Publisher) {
p.fifo = fifo
}
}

// Open returns a new Publisher instance.
func Open(ctx context.Context, awsOpts sqs.Options, queue string, opts ...Option) (*Publisher, error) {
return New(ctx, sqs.New(awsOpts), queue, opts...)
// NewPublisherFromDefault returns a new Publisher instance.
func NewPublisherFromDefault(ctx context.Context, queue string, opts ...PublisherOption) (*Publisher, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("loading aws config from default: %w", err)

Check warning on line 49 in broker/sqs/publisher.go

View check run for this annotation

Codecov / codecov/patch

broker/sqs/publisher.go#L46-L49

Added lines #L46 - L49 were not covered by tests
}

return NewPublisher(ctx, sqs.NewFromConfig(cfg), queue, opts...)

Check warning on line 52 in broker/sqs/publisher.go

View check run for this annotation

Codecov / codecov/patch

broker/sqs/publisher.go#L52

Added line #L52 was not covered by tests
}

// New returns a new Publisher instance.
func New(ctx context.Context, svc Client, queue string, opts ...Option) (*Publisher, error) {
// NewPublisher returns a new Publisher instance.
func NewPublisher(ctx context.Context, svc Client, queue string, opts ...PublisherOption) (*Publisher, error) {
q, err := svc.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{QueueName: aws.String(queue)})
if err != nil {
return nil, fmt.Errorf("getting queue url: %w", err)
Expand Down Expand Up @@ -87,10 +88,16 @@
// Publish publishes the given message to the pubsub topic.
func (p Publisher) Publish(ctx context.Context, msg messenger.Message) error {
md := msg.Metadata()
att := make(map[string]types.MessageAttributeValue, len(md))
att := make(map[string]types.MessageAttributeValue)

att[broker.MessageIDKey] = types.MessageAttributeValue{
DataType: awsStringDataType,
StringValue: aws.String(msg.ID()),
}

for k, v := range md {
att[k] = types.MessageAttributeValue{
DataType: aws.String("String"),
DataType: awsStringDataType,
StringValue: aws.String(v),
}
}
Expand Down
Loading
Loading