From 322967cd3a829243b0926ecb453a7e1a9d46faa6 Mon Sep 17 00:00:00 2001 From: lijingfeng Date: Wed, 8 Jul 2020 19:58:10 +0800 Subject: [PATCH 1/8] add interceptor --- pulsar/consumer.go | 3 + pulsar/consumer_impl.go | 1 + pulsar/consumer_interceptor.go | 49 +++++++++++++ pulsar/consumer_partition.go | 14 ++++ pulsar/consumer_test.go | 128 +++++++++++++++++++++++++++++++++ pulsar/producer.go | 3 + pulsar/producer_interceptor.go | 41 +++++++++++ pulsar/producer_partition.go | 10 ++- pulsar/producer_test.go | 102 ++++++++++++++++++++++++++ 9 files changed, 349 insertions(+), 2 deletions(-) create mode 100644 pulsar/consumer_interceptor.go create mode 100644 pulsar/producer_interceptor.go diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 97020b6fd..6292eb727 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -137,6 +137,9 @@ type ConsumerOptions struct { // Mark the subscription as replicated to keep it in sync across clusters ReplicateSubscriptionState bool + + // A chain of interceptors, These interceptors will be called at some points defined in ConsumerInterceptor interface. + Interceptors ConsumerInterceptors } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 0a3220a78..a26fef8bc 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -238,6 +238,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { startMessageID: nil, subscriptionMode: durable, readCompacted: c.options.ReadCompacted, + interceptors: c.options.Interceptors, } cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq) ch <- ConsumerError{ diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go new file mode 100644 index 000000000..e09fe1961 --- /dev/null +++ b/pulsar/consumer_interceptor.go @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type ConsumerInterceptor interface { + // BeforeConsume This is called just before the message is send to Consumer's ConsumerMessage channel. + BeforeConsume(message ConsumerMessage) + + // OnAcknowledge This is called consumer sends the acknowledgment to the broker. + OnAcknowledge(consumer Consumer, msgId MessageID) + + // OnNegativeAcksSend This method will be called when a redelivery from a negative acknowledge occurs. + OnNegativeAcksSend(consumer Consumer, msgIds []MessageID) +} + +type ConsumerInterceptors []ConsumerInterceptor + +func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) { + for i := range x { + x[i].BeforeConsume(message) + } +} + +func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgId MessageID) { + for i := range x { + x[i].OnAcknowledge(consumer, msgId) + } +} + +func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIds []MessageID) { + for i := range x { + x[i].OnNegativeAcksSend(consumer, msgIds) + } +} diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index b4498f6c8..a52428b4b 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -68,6 +68,7 @@ type partitionConsumerOpts struct { subscriptionMode subscriptionMode readCompacted bool disableForceTopicCreation bool + interceptors ConsumerInterceptors } type partitionConsumer struct { @@ -226,6 +227,8 @@ func (pc *partitionConsumer) AckID(msgID *messageID) { msgID: msgID, } pc.eventsCh <- req + + pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) } } @@ -235,6 +238,12 @@ func (pc *partitionConsumer) NackID(msgID *messageID) { func (pc *partitionConsumer) Redeliver(msgIds []messageID) { pc.eventsCh <- &redeliveryRequest{msgIds} + + iMsgIds := make([]MessageID, len(msgIds)) + for i := range iMsgIds { + iMsgIds[i] = &msgIds[i] + } + pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIds) } func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { @@ -442,6 +451,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } } + pc.options.interceptors.BeforeConsume(ConsumerMessage{ + Consumer: pc.parentConsumer, + Message: msg, + }) + messages = append(messages, msg) } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index d598dd36b..85201ea3b 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "log" + "math/rand" "net/http" "strconv" "testing" @@ -1342,3 +1343,130 @@ func TestProducerName(t *testing.T) { consumer.Ack(msg) } } + +type noopConsumerInterceptor struct{} + +func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} + +func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgId MessageID) {} + +func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, messageIds []MessageID) {} + +// copyPropertyInterceptor copy all keys in message properties map and add a suffix +type copyPropertyInterceptor struct { + suffix string +} + +func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) { + properties := message.Properties() + copy := make(map[string]string, len(properties)) + for k, v := range properties { + copy[k+x.suffix] = v + } + for ck, v := range copy { + properties[ck] = v + } +} + +func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgId MessageID) {} + +func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, messageIds []MessageID) {} + +type metricConsumerInterceptor struct { + ackn int + nackn int +} + +func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} + +func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgId MessageID) { + x.ackn++ +} + +func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, messageIds []MessageID) { + x.nackn += len(messageIds) +} + +func TestConsumerWithInterceptors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + rand.Seed(time.Now().Unix()) + topic := fmt.Sprintf("persistent://public/default/test-topic-interceptors-%d", rand.Int()) + ctx := context.Background() + + metric := &metricConsumerInterceptor{} + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + NackRedeliveryDelay: time.Second, // for testing nack + Interceptors: ConsumerInterceptors{ + noopConsumerInterceptor{}, + copyPropertyInterceptor{suffix: "-copy"}, + metric, + }, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }); err != nil { + log.Fatal(err) + } + } + + var nackIds []MessageID + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + "key-1-copy": "pulsar-1", // check properties copy by interceptor + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + + // ack message + if i%2 == 0 { + consumer.Ack(msg) + } else { + nackIds = append(nackIds, msg.ID()) + } + } + assert.Equal(t, 5, metric.ackn) + + for i := range nackIds { + consumer.NackID(nackIds[i]) + } + + time.Sleep(time.Second * 3) // waiting for nack actual perform + assert.Equal(t, 5, metric.nackn) +} diff --git a/pulsar/producer.go b/pulsar/producer.go index 7d44a56e7..cb38e3e96 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -135,6 +135,9 @@ type ProducerOptions struct { // If set to a value greater than 1, messages will be queued until this threshold is reached or // BatchingMaxMessages (see above) has been reached or the batch interval has elapsed. BatchingMaxSize uint + + // A chain of interceptors, These interceptors will be called at some points defined in ProducerInterceptor interface + Interceptors ProducerInterceptors } // Producer is used to publish messages on a topic diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go new file mode 100644 index 000000000..d21974759 --- /dev/null +++ b/pulsar/producer_interceptor.go @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type ProducerInterceptor interface { + // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the + BeforeSend(producer Producer, message *ProducerMessage) + + // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, + // or when sending the message fails. + OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgId MessageID) +} + +type ProducerInterceptors []ProducerInterceptor + +func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) { + for i := range x { + x[i].BeforeSend(producer, message) + } +} + +func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgId MessageID) { + for i := range x { + x[i].OnSendAcknowledgement(producer, message, msgId) + } +} diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e4765fc48..b06a37502 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -408,6 +408,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer callback: callback, flushImmediately: flushImmediately, } + p.options.Interceptors.BeforeSend(p, msg) p.eventsChan <- sr } @@ -438,14 +439,19 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) p.publishSemaphore.Release() } - if sr.callback != nil { + if sr.callback != nil || len(p.options.Interceptors) > 0 { msgID := newMessageID( int64(response.MessageId.GetLedgerId()), int64(response.MessageId.GetEntryId()), idx, p.partitionIdx, ) - sr.callback(msgID, sr.msg, nil) + + if sr.callback != nil { + sr.callback(msgID, sr.msg, nil) + } + + p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 95e4ed1c2..c7cd729ab 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -817,3 +817,105 @@ func TestMaxMessageSize(t *testing.T) { } } } + +type noopProduceInterceptor struct{} + +func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} + +func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgId MessageID) { +} + +// copyPropertyIntercepotr copy all keys in message properties map and add a suffix +type metricProduceInterceptor struct { + sendn int + ackn int +} + +func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) { + x.sendn++ +} + +func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgId MessageID) { + x.ackn++ +} + +func TestProducerWithInterceptors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/test-topic-interceptors" + ctx := context.Background() + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + metric := &metricProduceInterceptor{} + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + Interceptors: ProducerInterceptors{ + noopProduceInterceptor{}, + metric, + }, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if i%2 == 0 { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }) + assert.Nil(t, err) + } else { + producer.SendAsync(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }, func(_ MessageID, _ *ProducerMessage, err error) { + assert.Nil(t, err) + }) + assert.Nil(t, err) + } + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + + // ack message + consumer.Ack(msg) + } + + assert.Equal(t, 10, metric.sendn) + assert.Equal(t, 10, metric.ackn) +} From 1974b20338d52513d8d32a822d67d7506022fd17 Mon Sep 17 00:00:00 2001 From: lijingfeng Date: Thu, 9 Jul 2020 09:56:03 +0800 Subject: [PATCH 2/8] avoid nil interceptors --- pulsar/consumer_impl.go | 4 ++++ pulsar/consumer_interceptor.go | 2 ++ pulsar/producer_impl.go | 4 ++++ pulsar/producer_interceptor.go | 2 ++ 4 files changed, 12 insertions(+) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index a26fef8bc..73d160a43 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -74,6 +74,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { options.ReceiverQueueSize = 1000 } + if options.Interceptors == nil { + options.Interceptors = defaultConsumerInterceptors + } + // did the user pass in a message channel? messageCh := options.MessageChannel if options.MessageChannel == nil { diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go index e09fe1961..2775a3a39 100644 --- a/pulsar/consumer_interceptor.go +++ b/pulsar/consumer_interceptor.go @@ -47,3 +47,5 @@ func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIds []Mes x[i].OnNegativeAcksSend(consumer, msgIds) } } + +var defaultConsumerInterceptors = make(ConsumerInterceptors, 0) diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 35dae2847..0366c02ab 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -77,6 +77,10 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { batchingMaxPublishDelay = defaultBatchingMaxPublishDelay } + if options.Interceptors == nil { + options.Interceptors = defaultProducerInterceptors + } + if options.MessageRouter == nil { internalRouter := internal.NewDefaultRouter( internal.NewSystemClock(), diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go index d21974759..11df06e29 100644 --- a/pulsar/producer_interceptor.go +++ b/pulsar/producer_interceptor.go @@ -39,3 +39,5 @@ func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message * x[i].OnSendAcknowledgement(producer, message, msgId) } } + +var defaultProducerInterceptors = make(ProducerInterceptors, 0) From 8de446cef501f901af124734b6818f34d50aa5e7 Mon Sep 17 00:00:00 2001 From: lijingfeng Date: Thu, 9 Jul 2020 10:02:05 +0800 Subject: [PATCH 3/8] fix nil options testing --- pulsar/consumer_partition_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 5a5a94e1a..b62c96350 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -34,6 +34,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { queueCh: make(chan []*message, 1), eventsCh: eventsCh, compressionProviders: make(map[pb.CompressionType]compression.Provider), + options: &partitionConsumerOpts{}, } headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage) From b202dc60acd8777e2ba9ab5eaa44018a94205226 Mon Sep 17 00:00:00 2001 From: lijingfeng Date: Thu, 9 Jul 2020 10:08:17 +0800 Subject: [PATCH 4/8] fix nil options testing --- pulsar/consumer_partition_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index b62c96350..e71b1c99a 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -64,6 +64,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { queueCh: make(chan []*message, 1), eventsCh: eventsCh, compressionProviders: make(map[pb.CompressionType]compression.Provider), + options: &partitionConsumerOpts{}, } headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1) @@ -93,6 +94,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { queueCh: make(chan []*message, 1), eventsCh: eventsCh, compressionProviders: make(map[pb.CompressionType]compression.Provider), + options: &partitionConsumerOpts{}, } headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10) From 5ba9aa357d7177c19ed7f7333606c4b9b37e1cde Mon Sep 17 00:00:00 2001 From: lijingfeng Date: Thu, 9 Jul 2020 10:14:55 +0800 Subject: [PATCH 5/8] fix style --- pulsar/consumer_interceptor.go | 12 ++++++------ pulsar/consumer_test.go | 14 +++++++------- pulsar/producer_interceptor.go | 6 +++--- pulsar/producer_test.go | 4 ++-- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go index 2775a3a39..db46b7842 100644 --- a/pulsar/consumer_interceptor.go +++ b/pulsar/consumer_interceptor.go @@ -22,10 +22,10 @@ type ConsumerInterceptor interface { BeforeConsume(message ConsumerMessage) // OnAcknowledge This is called consumer sends the acknowledgment to the broker. - OnAcknowledge(consumer Consumer, msgId MessageID) + OnAcknowledge(consumer Consumer, msgID MessageID) // OnNegativeAcksSend This method will be called when a redelivery from a negative acknowledge occurs. - OnNegativeAcksSend(consumer Consumer, msgIds []MessageID) + OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) } type ConsumerInterceptors []ConsumerInterceptor @@ -36,15 +36,15 @@ func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) { } } -func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgId MessageID) { +func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID MessageID) { for i := range x { - x[i].OnAcknowledge(consumer, msgId) + x[i].OnAcknowledge(consumer, msgID) } } -func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIds []MessageID) { +func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) { for i := range x { - x[i].OnNegativeAcksSend(consumer, msgIds) + x[i].OnNegativeAcksSend(consumer, msgIDs) } } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 85201ea3b..c1e357cdd 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1348,9 +1348,9 @@ type noopConsumerInterceptor struct{} func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} -func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgId MessageID) {} +func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} -func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, messageIds []MessageID) {} +func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} // copyPropertyInterceptor copy all keys in message properties map and add a suffix type copyPropertyInterceptor struct { @@ -1368,9 +1368,9 @@ func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) { } } -func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgId MessageID) {} +func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) {} -func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, messageIds []MessageID) {} +func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} type metricConsumerInterceptor struct { ackn int @@ -1379,12 +1379,12 @@ type metricConsumerInterceptor struct { func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} -func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgId MessageID) { +func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) { x.ackn++ } -func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, messageIds []MessageID) { - x.nackn += len(messageIds) +func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) { + x.nackn += len(msgIDs) } func TestConsumerWithInterceptors(t *testing.T) { diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go index 11df06e29..cb2cc152f 100644 --- a/pulsar/producer_interceptor.go +++ b/pulsar/producer_interceptor.go @@ -23,7 +23,7 @@ type ProducerInterceptor interface { // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, // or when sending the message fails. - OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgId MessageID) + OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) } type ProducerInterceptors []ProducerInterceptor @@ -34,9 +34,9 @@ func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMes } } -func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgId MessageID) { +func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { for i := range x { - x[i].OnSendAcknowledgement(producer, message, msgId) + x[i].OnSendAcknowledgement(producer, message, msgID) } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index c7cd729ab..fa8d34599 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -822,7 +822,7 @@ type noopProduceInterceptor struct{} func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} -func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgId MessageID) { +func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { } // copyPropertyIntercepotr copy all keys in message properties map and add a suffix @@ -835,7 +835,7 @@ func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *Produc x.sendn++ } -func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgId MessageID) { +func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { x.ackn++ } From 7a37e9a719b615918e77c7006ded1a9eb71a54c8 Mon Sep 17 00:00:00 2001 From: lijingfeng Date: Thu, 9 Jul 2020 10:31:37 +0800 Subject: [PATCH 6/8] fix data race --- pulsar/consumer_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index c1e357cdd..fe5ff9835 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -24,6 +24,7 @@ import ( "math/rand" "net/http" "strconv" + "sync/atomic" "testing" "time" @@ -1373,18 +1374,18 @@ func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) {} type metricConsumerInterceptor struct { - ackn int - nackn int + ackn int32 + nackn int32 } func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {} func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID MessageID) { - x.ackn++ + atomic.AddInt32(&x.ackn, 1) } func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) { - x.nackn += len(msgIDs) + atomic.AddInt32(&x.nackn, int32(len(msgIDs))) } func TestConsumerWithInterceptors(t *testing.T) { From 5492dcd41b9ba795b0683e6709ce95546ee65d80 Mon Sep 17 00:00:00 2001 From: lijingfeng Date: Thu, 9 Jul 2020 10:34:52 +0800 Subject: [PATCH 7/8] fix data race --- pulsar/consumer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index fe5ff9835..572a61306 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1462,12 +1462,12 @@ func TestConsumerWithInterceptors(t *testing.T) { nackIds = append(nackIds, msg.ID()) } } - assert.Equal(t, 5, metric.ackn) + assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn)) for i := range nackIds { consumer.NackID(nackIds[i]) } time.Sleep(time.Second * 3) // waiting for nack actual perform - assert.Equal(t, 5, metric.nackn) + assert.Equal(t, int32(5), atomic.LoadInt32(&metric.nackn)) } From 9bd662ae67da809289de11cdd6932c76d698178f Mon Sep 17 00:00:00 2001 From: lijingfeng Date: Thu, 9 Jul 2020 10:59:26 +0800 Subject: [PATCH 8/8] review test --- pulsar/consumer_test.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 572a61306..cb9df8fc3 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "log" - "math/rand" "net/http" "strconv" "sync/atomic" @@ -1396,8 +1395,7 @@ func TestConsumerWithInterceptors(t *testing.T) { assert.Nil(t, err) defer client.Close() - rand.Seed(time.Now().Unix()) - topic := fmt.Sprintf("persistent://public/default/test-topic-interceptors-%d", rand.Int()) + topic := newTopicName() ctx := context.Background() metric := &metricConsumerInterceptor{} @@ -1468,6 +1466,25 @@ func TestConsumerWithInterceptors(t *testing.T) { consumer.NackID(nackIds[i]) } - time.Sleep(time.Second * 3) // waiting for nack actual perform + // receive 5 nack messages + for i := 0; i < 5; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i*2+1) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + "key-1-copy": "pulsar-1", // check properties copy by interceptor + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + + // ack message + consumer.Ack(msg) + } + assert.Equal(t, int32(5), atomic.LoadInt32(&metric.nackn)) }