Skip to content

Commit

Permalink
Refactor pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Jun 30, 2024
1 parent 5b18df7 commit 7533d09
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 27 deletions.
7 changes: 1 addition & 6 deletions pubsub/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ func NewPubHealthChecker(name string, client *pubsub.Client, resourceId string,
}
return &HealthChecker{name: name, client: client, permissionType: PermissionPublish, resourceId: resourceId, timeout: 4 * time.Second}
}
func NewSubHealthChecker(name string, client *pubsub.Client, resourceId string, timeout ...time.Duration) *HealthChecker {
if len(timeout) >= 1 {
return &HealthChecker{name: name, client: client, permissionType: PermissionSubscribe, resourceId: resourceId, timeout: timeout[0]}
}
return &HealthChecker{name: name, client: client, permissionType: PermissionSubscribe, resourceId: resourceId, timeout: 4 * time.Second}
}

func (h *HealthChecker) Name() string {
return h.name
}
Expand Down
31 changes: 10 additions & 21 deletions pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ import (
var CheckTopicPermission = CheckPermission

type Publisher struct {
Client *pubsub.Client
Topic *pubsub.Topic
Convert func(context.Context, []byte)([]byte, error)
Client *pubsub.Client
Topic *pubsub.Topic
Convert func(context.Context, []byte) ([]byte, error)
}

func NewPublisher(ctx context.Context, client *pubsub.Client, topicId string, c *TopicConfig, options...func(context.Context, []byte)([]byte, error)) *Publisher {
func NewPublisher(ctx context.Context, client *pubsub.Client, topicId string, c *TopicConfig, options ...func(context.Context, []byte) ([]byte, error)) *Publisher {
topic := client.Topic(topicId)
CheckTopicPermission(ctx, topic.IAM(), "pubsub.topics.publish")
var convert func(context.Context, []byte)([]byte, error)
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
}
return &Publisher{Client: client, Topic: ConfigureTopic(topic, c), Convert: convert}
}

func NewPublisherByConfig(ctx context.Context, c PublisherConfig, options...func(context.Context, []byte)([]byte, error)) (*Publisher, error) {
func NewPublisherByConfig(ctx context.Context, c PublisherConfig, options ...func(context.Context, []byte) ([]byte, error)) (*Publisher, error) {
if c.Retry.Retry1 <= 0 {
client, err := NewPubSubClient(ctx, []byte(c.Client.Credentials), c.Client.ProjectId)
if err != nil {
Expand Down Expand Up @@ -60,25 +60,13 @@ func ConfigureTopic(topic *pubsub.Topic, c *TopicConfig) *pubsub.Topic {
}
return topic
}
func (p *Publisher) Put(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
return p.Publish(ctx, data, attributes)
}
func (p *Publisher) Send(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
return p.Publish(ctx, data, attributes)
}
func (p *Publisher) Produce(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
return p.Publish(ctx, data, attributes)
}
func (p *Publisher) Write(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
return p.Publish(ctx, data, attributes)
}
func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) error {
var binary = data
var err error
if p.Convert != nil {
binary, err = p.Convert(ctx, data)
if err != nil {
return "", err
return err
}
}
msg := &pubsub.Message{
Expand All @@ -89,7 +77,8 @@ func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[str
}

publishResult := p.Topic.Publish(ctx, msg)
return publishResult.Get(ctx)
_, err = publishResult.Get(ctx)
return err
}

func CheckPermission(ctx0 context.Context, iam *iam.Handle, permission string) {
Expand Down

0 comments on commit 7533d09

Please sign in to comment.