Skip to content

Commit

Permalink
Add command topic Deduplication(streamnative/pulsar-admin-go#246) (ap…
Browse files Browse the repository at this point in the history
…ache#408)

* Add command topic Deduplication(streamnative/pulsar-admin-go#246)

- pulsarctl topics get-deduplication [topic]
- pulsarctl topics set-deduplication [topic] -e
- pulsarctl topics remove-deduplication [topic]

* Modify prompt

Signed-off-by: limingnihao <limingnihao@live.com>
  • Loading branch information
limingnihao authored and tisonkun committed Aug 15, 2023
1 parent 02a64d8 commit a2e93b3
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions pulsaradmin/pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ type Topics interface {

// RemoveDispatchRate Remove message dispatch rate for a topic
RemoveDispatchRate(utils.TopicName) error

// GetDeduplicationStatus Get the deduplication policy for a topic
GetDeduplicationStatus(utils.TopicName) (bool, error)

// SetDeduplicationStatus Set the deduplication policy for a topic
SetDeduplicationStatus(utils.TopicName, bool) error

// RemoveDeduplicationStatus Remove the deduplication policy for a topic
RemoveDeduplicationStatus(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -519,3 +528,19 @@ func (t *topics) RemoveDispatchRate(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetDeduplicationStatus(topic utils.TopicName) (bool, error) {
var enabled bool
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
err := t.pulsar.Client.Get(endpoint, &enabled)
return enabled, err
}

func (t *topics) SetDeduplicationStatus(topic utils.TopicName, enabled bool) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
return t.pulsar.Client.Post(endpoint, enabled)
}
func (t *topics) RemoveDeduplicationStatus(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
return t.pulsar.Client.Delete(endpoint)
}

0 comments on commit a2e93b3

Please sign in to comment.