Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate logrus logger to aws and grpc packages #60

Merged
merged 2 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

[[constraint]]
name = "github.com/infobloxopen/atlas-app-toolkit"
version = "v0.17.0"
version = "v0.21.0"

[[constraint]]
name = "github.com/aws/aws-sdk-go"
Expand Down
16 changes: 8 additions & 8 deletions aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"strconv"
"time"

Expand All @@ -20,6 +19,7 @@ import (
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/google/uuid"
pubsub "github.com/infobloxopen/atlas-pubsub"
"github.com/sirupsen/logrus"
)

// some arbitrary prefix I came up with to help distinguish between aws broker
Expand All @@ -43,27 +43,27 @@ var ErrMessageRetentionPeriodOutOfRange = errors.New("The message retention peri

// VerifyPermissions checks if the aws config exists and checks if it has permissions to
// create sns topics, send messages, create SQS topics, delete topics, and delete sqs queues
func VerifyPermissions(sess *session.Session) error {
func VerifyPermissions(sess *session.Session, log *logrus.Logger) error {
// Check if environment contains aws config
topic := "verifyPermissions"
subscriptionID := uuid.New().String()

log.Println("verify permissions: creating subscriber")
subscriber, err := newSubscriber(sns.New(sess), sqs.New(sess), topic, subscriptionID)
subscriber, err := newSubscriber(sns.New(sess), sqs.New(sess), topic, subscriptionID, SubscribeWithLogger(log))
if err != nil {
return err
}

log.Println("verify permissions: creating publisher")
publisher, err := newPublisher(sns.New(sess), topic)
publisher, err := newPublisher(sns.New(sess), topic, PublishWithLogger(log))
if err != nil {
return err
}
return verifyPermissions(subscriber, publisher)
return verifyPermissions(subscriber, publisher, log)
}

// verifyPermissions checks if the aws config has correct permissions
func verifyPermissions(subscriber *awsSubscriber, publisher *publisher) error {
func verifyPermissions(subscriber *awsSubscriber, publisher *publisher, log *logrus.Logger) error {
defer func() {
// Delete subscription and subscriber queue
log.Println("verify permissions: deleting subscription")
Expand Down Expand Up @@ -162,13 +162,13 @@ func ensureTopic(topic string, snsClient snsiface.SNSAPI) (*string, error) {

// ensureQueue returns the queueURL for the given queueName, creating the queue
// if it doesn't exist
func ensureQueue(queueName *string, sqsClient sqsiface.SQSAPI) (*string, error) {
func ensureQueue(queueName *string, sqsClient sqsiface.SQSAPI, logger *logrus.Logger) (*string, error) {
queueURLResp, queueURLErr := sqsClient.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: queueName})
if queueURLErr == nil {
return queueURLResp.QueueUrl, nil
}
if awsErr, ok := queueURLErr.(awserr.Error); ok && awsErr.Code() == sqs.ErrCodeQueueDoesNotExist {
log.Printf("AWS: creating queue: %q", *queueName)
logger.Infof("AWS: creating queue: %q", *queueName)
createResp, createErr := sqsClient.CreateQueue(&sqs.CreateQueueInput{
QueueName: queueName,
})
Expand Down
11 changes: 6 additions & 5 deletions aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/sirupsen/logrus"
)

// setupVerifyPermissions sets up a publisher and subscriber and returns the mocks for further configuration.
Expand All @@ -36,7 +37,7 @@ func setupVerifyPermissions(t *testing.T) (*mockSNS, *mockSQS, func() error) {
}
s.queueArn = aws.String(queueArn)

return &snsMock, &sqsMock, func() error { return verifyPermissions(s, p) }
return &snsMock, &sqsMock, func() error { return verifyPermissions(s, p, logrus.StandardLogger()) }
}

func TestVerifyPermissions(t *testing.T) {
Expand Down Expand Up @@ -118,7 +119,7 @@ func TestEnsureQueue(t *testing.T) {
{ // verify nil error returns queue url
expectedQueueURL := aws.String("expected queue url")
sqsSpy.stubbedGetQueueURLOutput = &sqs.GetQueueUrlOutput{QueueUrl: expectedQueueURL}
actualQueueURL, err := ensureQueue(queueName, &sqsSpy)
actualQueueURL, err := ensureQueue(queueName, &sqsSpy, logrus.StandardLogger())
if err != nil {
t.Errorf("expected GetQueueUrl not to return error, but got \"%v\"", err)
} else if *actualQueueURL != *expectedQueueURL {
Expand All @@ -129,7 +130,7 @@ func TestEnsureQueue(t *testing.T) {
expectedCreatedQueueURL := aws.String("expected queue URL")
sqsSpy.stubbedGetQueueURLError = awserr.New(sqs.ErrCodeQueueDoesNotExist, "foo", errors.New("foo"))
sqsSpy.stubbedCreateQueueOutput = &sqs.CreateQueueOutput{QueueUrl: expectedCreatedQueueURL}
actualCreatedQueueURL, actualErr := ensureQueue(queueName, &sqsSpy)
actualCreatedQueueURL, actualErr := ensureQueue(queueName, &sqsSpy, logrus.StandardLogger())
if sqsSpy.spiedCreateQueueInput == nil {
t.Error("expected sqs.CreateQueue to be called, but wasn't")
} else {
Expand All @@ -148,15 +149,15 @@ func TestEnsureQueue(t *testing.T) {
{ // verify ensureQueue forwards any sqs.CreateQueue errors
expectedError := errors.New("expected sqs.CreateQueue error")
sqsSpy.stubbedCreateQueueError = expectedError
_, actualError := ensureQueue(queueName, &sqsSpy)
_, actualError := ensureQueue(queueName, &sqsSpy, logrus.StandardLogger())
if expectedError != actualError {
t.Errorf("expected sqs.CreateQueue to return error \"%s\", but returned \"%s\"", expectedError.Error(), actualError.Error())
}
}
{ // verify ensureQueue returns any non-404 errors from sqs.GetQueueURL
expectedError := errors.New("expected sqs.GetQueueURL error")
sqsSpy.stubbedGetQueueURLError = expectedError
_, actualError := ensureQueue(queueName, &sqsSpy)
_, actualError := ensureQueue(queueName, &sqsSpy, logrus.StandardLogger())
if expectedError != actualError {
t.Errorf("expected sqs.GetQueueURL to return error \"%s\", but return \"%s\"", expectedError.Error(), actualError.Error())
}
Expand Down
37 changes: 25 additions & 12 deletions aws/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,61 @@ package aws

import (
"context"
"log"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sns/snsiface"
pubsub "github.com/infobloxopen/atlas-pubsub"
"github.com/sirupsen/logrus"
)

type PublisherOption func(*publisher)

func PublishWithLogger(logger *logrus.Logger) PublisherOption {
return func(pub *publisher) {
pub.logger = logger
}
}

// NewPublisher creates a new AWS message broker that will publish
// messages to the given topic.
// TODO: info on permissions needed within the config to make this work
//
// Topic names must be made up of only uppercase and lowercase
// ASCII letters, numbers, underscores, and hyphens, and must be between 1 and
// 247 characters long.
func NewPublisher(sess *session.Session, topic string) (pubsub.Publisher, error) {
return newPublisher(sns.New(sess), topic)
func NewPublisher(sess *session.Session, topic string, opts ...PublisherOption) (pubsub.Publisher, error) {
return newPublisher(sns.New(sess), topic, opts...)
}

func newPublisher(snsClient snsiface.SNSAPI, topic string) (*publisher, error) {
log.Printf("AWS: ensuring SNS topic exists for %q", topic)
func newPublisher(snsClient snsiface.SNSAPI, topic string, opts ...PublisherOption) (*publisher, error) {
p := publisher{
sns: snsClient,
logger: logrus.StandardLogger(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this logger will be overrided, but maybe it will be better to use toolkit logger with some default level (logrus.InfoLevel) as a default logger? It up to you, for me it seems more consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this. I decided that using the StandardLogger seemed to make more sense as the default. This also allows using the global Logrus functions to change the behavior of this internal logging without having to make further changes (https://github.com/sirupsen/logrus/blob/master/exported.go#L14-L37)

}
for _, opt := range opts {
opt(&p)
}

p.logger.Infof("AWS: ensuring SNS topic exists for %q", topic)
topicArn, err := ensureTopic(topic, snsClient)
if err != nil {
return nil, err
}

p := publisher{
sns: snsClient,
topicArn: *topicArn,
}
p.topicArn = *topicArn

return &p, nil
}

type publisher struct {
sns snsiface.SNSAPI
topicArn string
logger *logrus.Logger
}

func (p publisher) Publish(ctx context.Context, msg []byte, metadata map[string]string) error {
log.Printf("AWS: publish to topic %q", p.topicArn)
p.logger.Debugf("AWS: publish to topic %q", p.topicArn)
message := encodeToSNSMessage(msg)
messageAttributes := encodeMessageAttributes(metadata)

Expand All @@ -57,7 +70,7 @@ func (p publisher) Publish(ctx context.Context, msg []byte, metadata map[string]
}

func (p publisher) DeleteTopic(ctx context.Context) error {
log.Printf("AWS: delete topic %q", p.topicArn)
p.logger.Infof("AWS: delete topic %q", p.topicArn)
_, err := p.sns.DeleteTopic(&sns.DeleteTopicInput{
TopicArn: aws.String(p.topicArn),
})
Expand Down
4 changes: 3 additions & 1 deletion aws/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"reflect"
"strings"
"testing"

"github.com/sirupsen/logrus"
)

// TestNewPublisher verifies that aws creates a topic for the given topic name,
Expand Down Expand Up @@ -58,7 +60,7 @@ func TestPublish(t *testing.T) {
metadata := map[string]string{"foo": "bar"}
spy := mockSNS{stubbedPublishError: errors.New("test publish error")}

p := publisher{sns: &spy, topicArn: "foo"}
p := publisher{sns: &spy, topicArn: "foo", logger: logrus.StandardLogger()}
p.Publish(context.Background(), msg, metadata)
spiedInput := spy.spiedPublishInput

Expand Down
Loading