Skip to content

Commit

Permalink
fix: peek message will return -1 for partitionIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Aug 8, 2024
1 parent dad98f1 commit a20a1cf
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pulsaradmin/pkg/admin/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ const (
)

func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) {

msgID := resp.Header.Get("X-Pulsar-Message-ID")
ID, err := utils.ParseMessageID(msgID)
ID, err := utils.ParseMessageIDWithPartitionIndex(msgID, topic.GetPartitionIndex())
if err != nil {
return nil, err
}
Expand Down
54 changes: 54 additions & 0 deletions pulsaradmin/pkg/admin/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,60 @@ func TestGetMessagesByID(t *testing.T) {

}

func TestPeekMessageForPartitionedTopic(t *testing.T) {
ctx := context.Background()
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
topicName, _ := utils.GetTopicName(topic)
subName := "test-sub"

cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

err = admin.Topics().Create(*topicName, 2)
assert.NoError(t, err)

err = admin.Subscriptions().Create(*topicName, subName, utils.Earliest)
assert.NoError(t, err)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.NoError(t, err)
defer producer.Close()

for i := 0; i < 100; i++ {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}, nil)
}
err = producer.Flush()
if err != nil {
return
}

for i := 0; i < 2; i++ {
topicWithPartition := fmt.Sprintf("%s-partition-%d", topic, i)
topicName, err := utils.GetTopicName(topicWithPartition)
assert.NoError(t, err)
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 10)
assert.NoError(t, err)
assert.NotNil(t, messages)
for _, msg := range messages {
assert.Equal(t, msg.GetMessageID().PartitionIndex, i)
}
}
}

func TestGetMessageByID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
Expand Down
9 changes: 9 additions & 0 deletions pulsaradmin/pkg/utils/message_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ type MessageID struct {
var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1}
var Earliest = MessageID{-1, -1, -1, -1}

func ParseMessageIDWithPartitionIndex(str string, index int) (*MessageID, error) {
id, err := ParseMessageID(str)
if err != nil {
return nil, err
}
id.PartitionIndex = index
return id, nil
}

func ParseMessageID(str string) (*MessageID, error) {
s := strings.Split(str, ":")

Expand Down
4 changes: 4 additions & 0 deletions pulsaradmin/pkg/utils/topic_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func (t *TopicName) GetPartition(index int) (*TopicName, error) {
return GetTopicName(topicNameWithPartition)
}

func (t *TopicName) GetPartitionIndex() int {
return t.partitionIndex
}

func getPartitionIndex(topic string) int {
if strings.Contains(topic, PARTITIONEDTOPICSUFFIX) {
parts := strings.Split(topic, "-")
Expand Down

0 comments on commit a20a1cf

Please sign in to comment.