From 7533d09b23acc4cb85e11373ceec8d50bcf353ef Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Sun, 30 Jun 2024 19:50:03 +0700 Subject: [PATCH] Refactor pubsub --- pubsub/health_checker.go | 7 +------ pubsub/publisher.go | 31 ++++++++++--------------------- 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/pubsub/health_checker.go b/pubsub/health_checker.go index 2bab16e..f50fdc2 100644 --- a/pubsub/health_checker.go +++ b/pubsub/health_checker.go @@ -34,12 +34,7 @@ func NewPubHealthChecker(name string, client *pubsub.Client, resourceId string, } return &HealthChecker{name: name, client: client, permissionType: PermissionPublish, resourceId: resourceId, timeout: 4 * time.Second} } -func NewSubHealthChecker(name string, client *pubsub.Client, resourceId string, timeout ...time.Duration) *HealthChecker { - if len(timeout) >= 1 { - return &HealthChecker{name: name, client: client, permissionType: PermissionSubscribe, resourceId: resourceId, timeout: timeout[0]} - } - return &HealthChecker{name: name, client: client, permissionType: PermissionSubscribe, resourceId: resourceId, timeout: 4 * time.Second} -} + func (h *HealthChecker) Name() string { return h.name } diff --git a/pubsub/publisher.go b/pubsub/publisher.go index c0a7940..37f5043 100644 --- a/pubsub/publisher.go +++ b/pubsub/publisher.go @@ -11,22 +11,22 @@ import ( var CheckTopicPermission = CheckPermission type Publisher struct { - Client *pubsub.Client - Topic *pubsub.Topic - Convert func(context.Context, []byte)([]byte, error) + Client *pubsub.Client + Topic *pubsub.Topic + Convert func(context.Context, []byte) ([]byte, error) } -func NewPublisher(ctx context.Context, client *pubsub.Client, topicId string, c *TopicConfig, options...func(context.Context, []byte)([]byte, error)) *Publisher { +func NewPublisher(ctx context.Context, client *pubsub.Client, topicId string, c *TopicConfig, options ...func(context.Context, []byte) ([]byte, error)) *Publisher { topic := client.Topic(topicId) CheckTopicPermission(ctx, topic.IAM(), "pubsub.topics.publish") - var convert func(context.Context, []byte)([]byte, error) + var convert func(context.Context, []byte) ([]byte, error) if len(options) > 0 { convert = options[0] } return &Publisher{Client: client, Topic: ConfigureTopic(topic, c), Convert: convert} } -func NewPublisherByConfig(ctx context.Context, c PublisherConfig, options...func(context.Context, []byte)([]byte, error)) (*Publisher, error) { +func NewPublisherByConfig(ctx context.Context, c PublisherConfig, options ...func(context.Context, []byte) ([]byte, error)) (*Publisher, error) { if c.Retry.Retry1 <= 0 { client, err := NewPubSubClient(ctx, []byte(c.Client.Credentials), c.Client.ProjectId) if err != nil { @@ -60,25 +60,13 @@ func ConfigureTopic(topic *pubsub.Topic, c *TopicConfig) *pubsub.Topic { } return topic } -func (p *Publisher) Put(ctx context.Context, data []byte, attributes map[string]string) (string, error) { - return p.Publish(ctx, data, attributes) -} -func (p *Publisher) Send(ctx context.Context, data []byte, attributes map[string]string) (string, error) { - return p.Publish(ctx, data, attributes) -} -func (p *Publisher) Produce(ctx context.Context, data []byte, attributes map[string]string) (string, error) { - return p.Publish(ctx, data, attributes) -} -func (p *Publisher) Write(ctx context.Context, data []byte, attributes map[string]string) (string, error) { - return p.Publish(ctx, data, attributes) -} -func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) error { var binary = data var err error if p.Convert != nil { binary, err = p.Convert(ctx, data) if err != nil { - return "", err + return err } } msg := &pubsub.Message{ @@ -89,7 +77,8 @@ func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[str } publishResult := p.Topic.Publish(ctx, msg) - return publishResult.Get(ctx) + _, err = publishResult.Get(ctx) + return err } func CheckPermission(ctx0 context.Context, iam *iam.Handle, permission string) {