Skip to content

Commit

Permalink
[Issue 52]Add interceptor (#314)
Browse files Browse the repository at this point in the history
### Motivation
Add A chain of interceptors for Producer and Consumer as an option, these interceptors will be called at some points, it can be used for tracing, metrics, and so on.

### Modifications
Add two files for interceptor definition.
Call interceptor's methods at appropriate position.
* review test

Co-authored-by: lijingfeng <lijingfeng@laiye.com>
  • Loading branch information
snowcrumble and lijingfeng committed Jul 14, 2020
1 parent 5fea0e8 commit b434511
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ type ConsumerOptions struct {

// Mark the subscription as replicated to keep it in sync across clusters
ReplicateSubscriptionState bool

// A chain of interceptors, These interceptors will be called at some points defined in ConsumerInterceptor interface.
Interceptors ConsumerInterceptors
}

// Consumer is an interface that abstracts behavior of Pulsar's consumer
Expand Down
5 changes: 5 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
options.ReceiverQueueSize = 1000
}

if options.Interceptors == nil {
options.Interceptors = defaultConsumerInterceptors
}

if options.Name == "" {
options.Name = generateRandomName()
}
Expand Down Expand Up @@ -262,6 +266,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
startMessageID: messageID{},
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
ch <- ConsumerError{
Expand Down
51 changes: 51 additions & 0 deletions pulsar/consumer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

type ConsumerInterceptor interface {
// BeforeConsume This is called just before the message is send to Consumer's ConsumerMessage channel.
BeforeConsume(message ConsumerMessage)

// OnAcknowledge This is called consumer sends the acknowledgment to the broker.
OnAcknowledge(consumer Consumer, msgID MessageID)

// OnNegativeAcksSend This method will be called when a redelivery from a negative acknowledge occurs.
OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
}

type ConsumerInterceptors []ConsumerInterceptor

func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) {
for i := range x {
x[i].BeforeConsume(message)
}
}

func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID MessageID) {
for i := range x {
x[i].OnAcknowledge(consumer, msgID)
}
}

func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {
for i := range x {
x[i].OnNegativeAcksSend(consumer, msgIDs)
}
}

var defaultConsumerInterceptors = make(ConsumerInterceptors, 0)
14 changes: 14 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type partitionConsumerOpts struct {
subscriptionMode subscriptionMode
readCompacted bool
disableForceTopicCreation bool
interceptors ConsumerInterceptors
}

type partitionConsumer struct {
Expand Down Expand Up @@ -274,6 +275,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
msgID: msgID,
}
pc.eventsCh <- req

pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
}
}

Expand All @@ -284,6 +287,12 @@ func (pc *partitionConsumer) NackID(msgID messageID) {

func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
pc.eventsCh <- &redeliveryRequest{msgIds}

iMsgIds := make([]MessageID, len(msgIds))
for i := range iMsgIds {
iMsgIds[i] = &msgIds[i]
}
pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIds)
}

func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
Expand Down Expand Up @@ -498,6 +507,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
}
}

pc.options.interceptors.BeforeConsume(ConsumerMessage{
Consumer: pc.parentConsumer,
Message: msg,
})

messages = append(messages, msg)
}

Expand Down
3 changes: 3 additions & 0 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
}

headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
Expand Down Expand Up @@ -63,6 +64,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
Expand Down Expand Up @@ -92,6 +94,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
Expand Down
146 changes: 146 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"log"
"net/http"
"strconv"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1343,6 +1344,151 @@ func TestProducerName(t *testing.T) {
}
}

type noopConsumerInterceptor struct{}

func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}

func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}

func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}

// copyPropertyInterceptor copy all keys in message properties map and add a suffix
type copyPropertyInterceptor struct {
suffix string
}

func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
properties := message.Properties()
copy := make(map[string]string, len(properties))
for k, v := range properties {
copy[k+x.suffix] = v
}
for ck, v := range copy {
properties[ck] = v
}
}

func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {}

func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {}

type metricConsumerInterceptor struct {
ackn int32
nackn int32
}

func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}

func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {
atomic.AddInt32(&x.ackn, 1)
}

func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {
atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
}

func TestConsumerWithInterceptors(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()

metric := &metricConsumerInterceptor{}

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
NackRedeliveryDelay: time.Second, // for testing nack
Interceptors: ConsumerInterceptors{
noopConsumerInterceptor{},
copyPropertyInterceptor{suffix: "-copy"},
metric,
},
})
assert.Nil(t, err)
defer consumer.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
}); err != nil {
log.Fatal(err)
}
}

var nackIds []MessageID
// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
"key-1-copy": "pulsar-1", // check properties copy by interceptor
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())

// ack message
if i%2 == 0 {
consumer.Ack(msg)
} else {
nackIds = append(nackIds, msg.ID())
}
}
assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn))

for i := range nackIds {
consumer.NackID(nackIds[i])
}

// receive 5 nack messages
for i := 0; i < 5; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

expectMsg := fmt.Sprintf("hello-%d", i*2+1)
expectProperties := map[string]string{
"key-1": "pulsar-1",
"key-1-copy": "pulsar-1", // check properties copy by interceptor
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())

// ack message
consumer.Ack(msg)
}

assert.Equal(t, int32(5), atomic.LoadInt32(&metric.nackn))
}

func TestConsumerName(t *testing.T) {
assert := assert.New(t)

Expand Down
3 changes: 3 additions & 0 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type ProducerOptions struct {
// If set to a value greater than 1, messages will be queued until this threshold is reached or
// BatchingMaxMessages (see above) has been reached or the batch interval has elapsed.
BatchingMaxSize uint

// A chain of interceptors, These interceptors will be called at some points defined in ProducerInterceptor interface
Interceptors ProducerInterceptors
}

// Producer is used to publish messages on a topic
Expand Down
4 changes: 4 additions & 0 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}

if options.Interceptors == nil {
options.Interceptors = defaultProducerInterceptors
}

if options.MessageRouter == nil {
internalRouter := internal.NewDefaultRouter(
internal.NewSystemClock(),
Expand Down
43 changes: 43 additions & 0 deletions pulsar/producer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

type ProducerInterceptor interface {
// BeforeSend This is called before send the message to the brokers. This method is allowed to modify the
BeforeSend(producer Producer, message *ProducerMessage)

// OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged,
// or when sending the message fails.
OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID)
}

type ProducerInterceptors []ProducerInterceptor

func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) {
for i := range x {
x[i].BeforeSend(producer, message)
}
}

func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
for i := range x {
x[i].OnSendAcknowledgement(producer, message, msgID)
}
}

var defaultProducerInterceptors = make(ProducerInterceptors, 0)
Loading

0 comments on commit b434511

Please sign in to comment.