Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

feat: add GetMessageID api #28

Merged
merged 2 commits into from
Jul 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/admin/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type Topics interface {
// GetLastMessageID returns the last commit message Id of a topic
GetLastMessageID(utils.TopicName) (utils.MessageID, error)

// GetMessageID returns the message Id by timestamp(ms) of a topic
GetMessageID(utils.TopicName, int64) (utils.MessageID, error)

// GetStats returns the stats for the topic
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
GetStats(utils.TopicName) (utils.TopicStats, error)
Expand Down Expand Up @@ -377,6 +380,13 @@ func (t *topics) GetLastMessageID(topic utils.TopicName) (utils.MessageID, error
return messageID, err
}

func (t *topics) GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, error) {
var messageID utils.MessageID
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageid", strconv.FormatInt(timestamp, 10))
err := t.pulsar.Client.Get(endpoint, &messageID)
return messageID, err
}

func (t *topics) GetStats(topic utils.TopicName) (utils.TopicStats, error) {
var stats utils.TopicStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats")
Expand Down