Skip to content

Commit

Permalink
[PIP 90] go client retrieve broker metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
shoothzj committed Mar 22, 2022
1 parent 5c04811 commit ab51f2d
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 6 deletions.
24 changes: 22 additions & 2 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pbMsgID := response.GetMessageId()

reader := internal.NewMessageReader(headersAndPayload)
brokerMetadata, err := reader.ReadBrokerMetadata()
if err != nil {
// todo optimize use more appropriate error codes
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
return err
}
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch)
return err
}

decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
// error decrypting the payload
if err != nil {
Expand Down Expand Up @@ -597,7 +602,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
pc.AckID(msgID)
continue
}

var messageIndex *uint64
var brokerPublishTime *time.Time
if brokerMetadata != nil {
if brokerMetadata.Index != nil {
aux := brokerMetadata.GetIndex() - uint64(numMsgs) + uint64(i) + 1
messageIndex = &aux
}
if brokerMetadata.BrokerTimestamp != nil {
aux := timeFromUnixTimestampMillis(*brokerMetadata.BrokerTimestamp)
brokerPublishTime = &aux
}
}
// set the consumer so we know how to ack the message id
msgID.consumer = pc
var msg *message
Expand All @@ -616,6 +632,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
orderingKey: string(smm.OrderingKey),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
}
} else {
msg = &message{
Expand All @@ -631,6 +649,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
}
}

Expand Down
10 changes: 10 additions & 0 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ type message struct {
redeliveryCount uint32
schema Schema
encryptionContext *EncryptionContext
index *uint64
brokerPublishTime *time.Time
}

func (msg *message) Topic() string {
Expand Down Expand Up @@ -299,6 +301,14 @@ func (msg *message) GetEncryptionContext() *EncryptionContext {
return msg.encryptionContext
}

func (msg *message) Index() *uint64 {
return msg.index
}

func (msg *message) BrokerPublishTime() *time.Time {
return msg.brokerPublishTime
}

func newAckTracker(size int) *ackTracker {
var batchIDs *big.Int
if size <= 64 {
Expand Down
6 changes: 6 additions & 0 deletions pulsar/internal/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Buffer interface {

Read(size uint32) []byte

Skip(size uint32)

Get(readerIndex uint32, size uint32) []byte

ReadableSlice() []byte
Expand Down Expand Up @@ -122,6 +124,10 @@ func (b *buffer) Read(size uint32) []byte {
return res
}

func (b *buffer) Skip(size uint32) {
b.readerIdx += size
}

func (b *buffer) Get(readerIdx uint32, size uint32) []byte {
return b.data[readerIdx : readerIdx+size]
}
Expand Down
20 changes: 18 additions & 2 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package internal

import (
"encoding/binary"
"errors"
"fmt"

Expand All @@ -34,8 +35,9 @@ const (
// MessageFramePadding is for metadata and other frame headers
MessageFramePadding = 10 * 1024
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
MaxFrameSize = MaxMessageSize + MessageFramePadding
magicCrc32c uint16 = 0x0e01
MaxFrameSize = MaxMessageSize + MessageFramePadding
magicCrc32c uint16 = 0x0e01
magicBrokerEntryMetadata uint16 = 0x0e02
)

// ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
Expand Down Expand Up @@ -119,6 +121,20 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
return &meta, nil
}

func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) {
magicNumber := binary.BigEndian.Uint16(r.buffer.Get(r.buffer.ReaderIndex(), 2))
if magicNumber != magicBrokerEntryMetadata {
return nil, nil
}
r.buffer.Skip(2)
size := r.buffer.ReadUint32()
var brokerEntryMetadata pb.BrokerEntryMetadata
if err := proto.Unmarshal(r.buffer.Read(size), &brokerEntryMetadata); err != nil {
return nil, err
}
return &brokerEntryMetadata, nil
}

func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
return nil, nil, ErrEOM
Expand Down
18 changes: 18 additions & 0 deletions pulsar/internal/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ func TestReadMessageMetadata(t *testing.T) {
assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
}

func TestReadBrokerEntryMetadata(t *testing.T) {
// read old style message (not batched)
reader := NewMessageReaderFromArray(brokerEntryMeta)
meta, err := reader.ReadBrokerMetadata()
if err != nil {
t.Fatal(err)
}
var expectedBrokerTimestamp uint64 = 1646983036054
assert.Equal(t, expectedBrokerTimestamp, *meta.BrokerTimestamp)
var expectedIndex uint64 = 5
assert.Equal(t, expectedIndex, *meta.Index)
}

func TestReadMessageOldFormat(t *testing.T) {
reader := NewMessageReaderFromArray(rawCompatSingleMessage)
_, err := reader.ReadMessageMetadata()
Expand Down Expand Up @@ -210,3 +223,8 @@ var rawBatchMessage10 = []byte{
0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
0x6f,
}

var brokerEntryMeta = []byte{
0x0e, 0x02, 0x00, 0x00, 0x00, 0x09, 0x08, 0x96,
0xf9, 0xda, 0xbe, 0xf7, 0x2f, 0x10, 0x05,
}
5 changes: 3 additions & 2 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
PulsarVersion = "0.1"
ClientVersionString = "Pulsar Go " + PulsarVersion

PulsarProtocolVersion = int32(pb.ProtocolVersion_v13)
PulsarProtocolVersion = int32(pb.ProtocolVersion_v18)
)

type TLSOptions struct {
Expand Down Expand Up @@ -292,7 +292,8 @@ func (c *connection) doHandshake() bool {
AuthMethodName: proto.String(c.auth.Name()),
AuthData: authData,
FeatureFlags: &pb.FeatureFlags{
SupportsAuthRefresh: proto.Bool(true),
SupportsAuthRefresh: proto.Bool(true),
SupportsBrokerEntryMetadata: proto.Bool(true),
},
}

Expand Down
8 changes: 8 additions & 0 deletions pulsar/internal/pulsartracing/message_carrier_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,11 @@ func (msg *mockConsumerMessage) ProducerName() string {
func (msg *mockConsumerMessage) GetEncryptionContext() *pulsar.EncryptionContext {
return &pulsar.EncryptionContext{}
}

func (msg *mockConsumerMessage) Index() *uint64 {
return nil
}

func (msg *mockConsumerMessage) BrokerPublishTime() *time.Time {
return nil
}
8 changes: 8 additions & 0 deletions pulsar/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ type Message interface {
// GetEncryptionContext returns the ecryption context of the message.
// It will be used by the application to parse the undecrypted message.
GetEncryptionContext() *EncryptionContext

// Index returns index from broker entry metadata,
// or empty if the feature is not enabled in the broker.
Index() *uint64

// BrokerPublishTime returns broker publish time from broker entry metadata,
// or empty if the feature is not enabled in the broker.
BrokerPublishTime() *time.Time
}

// MessageID identifier for a particular message
Expand Down
16 changes: 16 additions & 0 deletions pulsar/negative_acks_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ func (msg *mockMessage1) GetEncryptionContext() *EncryptionContext {
return &EncryptionContext{}
}

func (msg *mockMessage1) Index() *uint64 {
return nil
}

func (msg *mockMessage1) BrokerPublishTime() *time.Time {
return nil
}

type mockMessage2 struct {
properties map[string]string
}
Expand Down Expand Up @@ -300,3 +308,11 @@ func (msg *mockMessage2) ProducerName() string {
func (msg *mockMessage2) GetEncryptionContext() *EncryptionContext {
return &EncryptionContext{}
}

func (msg *mockMessage2) Index() *uint64 {
return nil
}

func (msg *mockMessage2) BrokerPublishTime() *time.Time {
return nil
}

0 comments on commit ab51f2d

Please sign in to comment.