diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 8d3c7710c..c1fe4541a 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -137,6 +137,9 @@ type ConsumerOptions struct { // Mark the subscription as replicated to keep it in sync across clusters ReplicateSubscriptionState bool + + // A chain of interceptors, These interceptors will be called at some points defined in ConsumerInterceptor interface. + Interceptors ConsumerInterceptors } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index e3db6707f..ef93037c0 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -94,6 +94,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { options.ReceiverQueueSize = 1000 } + if options.Interceptors == nil { + options.Interceptors = defaultConsumerInterceptors + } + if options.Name == "" { options.Name = generateRandomName() } @@ -262,6 +266,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { startMessageID: messageID{}, subscriptionMode: durable, readCompacted: c.options.ReadCompacted, + interceptors: c.options.Interceptors, } cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq) ch <- ConsumerError{ diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go new file mode 100644 index 000000000..db46b7842 --- /dev/null +++ b/pulsar/consumer_interceptor.go @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type ConsumerInterceptor interface { + // BeforeConsume This is called just before the message is send to Consumer's ConsumerMessage channel. + BeforeConsume(message ConsumerMessage) + + // OnAcknowledge This is called consumer sends the acknowledgment to the broker. + OnAcknowledge(consumer Consumer, msgID MessageID) + + // OnNegativeAcksSend This method will be called when a redelivery from a negative acknowledge occurs. + OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) +} + +type ConsumerInterceptors []ConsumerInterceptor + +func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) { + for i := range x { + x[i].BeforeConsume(message) + } +} + +func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID MessageID) { + for i := range x { + x[i].OnAcknowledge(consumer, msgID) + } +} + +func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) { + for i := range x { + x[i].OnNegativeAcksSend(consumer, msgIDs) + } +} + +var defaultConsumerInterceptors = make(ConsumerInterceptors, 0) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index acf897da8..0c723c8f0 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -114,6 +114,7 @@ type partitionConsumerOpts struct { subscriptionMode subscriptionMode readCompacted bool disableForceTopicCreation bool + interceptors ConsumerInterceptors } type partitionConsumer struct { @@ -274,6 +275,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) { msgID: msgID, } pc.eventsCh <- req + + pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) } } @@ -284,6 +287,12 @@ func (pc *partitionConsumer) NackID(msgID messageID) { func (pc *partitionConsumer) Redeliver(msgIds []messageID) { pc.eventsCh <- &redeliveryRequest{msgIds} + + iMsgIds := make([]MessageID, len(msgIds)) + for i := range iMsgIds { + iMsgIds[i] = &msgIds[i] + } + pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIds) } func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { @@ -498,6 +507,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } } + pc.options.interceptors.BeforeConsume(ConsumerMessage{ + Consumer: pc.parentConsumer, + Message: msg, + }) + messages = append(messages, msg) } diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 0fcbdc5d0..01831a467 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -34,6 +34,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { queueCh: make(chan []*message, 1), eventsCh: eventsCh, compressionProviders: make(map[pb.CompressionType]compression.Provider), + options: &partitionConsumerOpts{}, } headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage) @@ -63,6 +64,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { queueCh: make(chan []*message, 1), eventsCh: eventsCh, compressionProviders: make(map[pb.CompressionType]compression.Provider), + options: &partitionConsumerOpts{}, } headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1) @@ -92,6 +94,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { queueCh: make(chan []*message, 1), eventsCh: eventsCh, compressionProviders: make(map[pb.CompressionType]compression.Provider), + options: &partitionConsumerOpts{}, } headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 4031f7dba..a3a22b697 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -23,6 +23,7 @@ import ( "log" "net/http" "strconv" + "sync/atomic" "testing" "time" @@ -1343,6 +1344,151 @@ func TestProducerName(t *testing.T) { } } +type noopConsumerInterceptor struct{} + +func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} + +func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} + +func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} + +// copyPropertyInterceptor copy all keys in message properties map and add a suffix +type copyPropertyInterceptor struct { + suffix string +} + +func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) { + properties := message.Properties() + copy := make(map[string]string, len(properties)) + for k, v := range properties { + copy[k+x.suffix] = v + } + for ck, v := range copy { + properties[ck] = v + } +} + +func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} + +func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} + +type metricConsumerInterceptor struct { + ackn int32 + nackn int32 +} + +func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} + +func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) { + atomic.AddInt32(&x.ackn, 1) +} + +func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) { + atomic.AddInt32(&x.nackn, int32(len(msgIDs))) +} + +func TestConsumerWithInterceptors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + metric := &metricConsumerInterceptor{} + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + NackRedeliveryDelay: time.Second, // for testing nack + Interceptors: ConsumerInterceptors{ + noopConsumerInterceptor{}, + copyPropertyInterceptor{suffix: "-copy"}, + metric, + }, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }); err != nil { + log.Fatal(err) + } + } + + var nackIds []MessageID + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + "key-1-copy": "pulsar-1", // check properties copy by interceptor + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + + // ack message + if i%2 == 0 { + consumer.Ack(msg) + } else { + nackIds = append(nackIds, msg.ID()) + } + } + assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn)) + + for i := range nackIds { + consumer.NackID(nackIds[i]) + } + + // receive 5 nack messages + for i := 0; i < 5; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i*2+1) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + "key-1-copy": "pulsar-1", // check properties copy by interceptor + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + + // ack message + consumer.Ack(msg) + } + + assert.Equal(t, int32(5), atomic.LoadInt32(&metric.nackn)) +} + func TestConsumerName(t *testing.T) { assert := assert.New(t) diff --git a/pulsar/producer.go b/pulsar/producer.go index 7d44a56e7..cb38e3e96 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -135,6 +135,9 @@ type ProducerOptions struct { // If set to a value greater than 1, messages will be queued until this threshold is reached or // BatchingMaxMessages (see above) has been reached or the batch interval has elapsed. BatchingMaxSize uint + + // A chain of interceptors, These interceptors will be called at some points defined in ProducerInterceptor interface + Interceptors ProducerInterceptors } // Producer is used to publish messages on a topic diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 01c5d76fe..4ee0d8da1 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -97,6 +97,10 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { batchingMaxPublishDelay = defaultBatchingMaxPublishDelay } + if options.Interceptors == nil { + options.Interceptors = defaultProducerInterceptors + } + if options.MessageRouter == nil { internalRouter := internal.NewDefaultRouter( internal.NewSystemClock(), diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go new file mode 100644 index 000000000..cb2cc152f --- /dev/null +++ b/pulsar/producer_interceptor.go @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type ProducerInterceptor interface { + // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the + BeforeSend(producer Producer, message *ProducerMessage) + + // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, + // or when sending the message fails. + OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) +} + +type ProducerInterceptors []ProducerInterceptor + +func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) { + for i := range x { + x[i].BeforeSend(producer, message) + } +} + +func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { + for i := range x { + x[i].OnSendAcknowledgement(producer, message, msgID) + } +} + +var defaultProducerInterceptors = make(ProducerInterceptors, 0) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b8dc13dd3..3832d695f 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -444,6 +444,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer flushImmediately: flushImmediately, publishTime: time.Now(), } + p.options.Interceptors.BeforeSend(p, msg) messagesPending.Inc() bytesPending.Add(float64(len(sr.msg.Payload))) @@ -488,14 +489,19 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) bytesPending.Sub(payloadSize) } - if sr.callback != nil { + if sr.callback != nil || len(p.options.Interceptors) > 0 { msgID := newMessageID( int64(response.MessageId.GetLedgerId()), int64(response.MessageId.GetEntryId()), int32(idx), p.partitionIdx, ) - sr.callback(msgID, sr.msg, nil) + + if sr.callback != nil { + sr.callback(msgID, sr.msg, nil) + } + + p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 8d389cbf4..d06ddbbe3 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -817,3 +817,105 @@ func TestMaxMessageSize(t *testing.T) { } } } + +type noopProduceInterceptor struct{} + +func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} + +func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +} + +// copyPropertyIntercepotr copy all keys in message properties map and add a suffix +type metricProduceInterceptor struct { + sendn int + ackn int +} + +func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) { + x.sendn++ +} + +func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { + x.ackn++ +} + +func TestProducerWithInterceptors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/test-topic-interceptors" + ctx := context.Background() + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + metric := &metricProduceInterceptor{} + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + Interceptors: ProducerInterceptors{ + noopProduceInterceptor{}, + metric, + }, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if i%2 == 0 { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }) + assert.Nil(t, err) + } else { + producer.SendAsync(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }, func(_ MessageID, _ *ProducerMessage, err error) { + assert.Nil(t, err) + }) + assert.Nil(t, err) + } + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + + // ack message + consumer.Ack(msg) + } + + assert.Equal(t, 10, metric.sendn) + assert.Equal(t, 10, metric.ackn) +}