From 77c61dbc06256647193d1f99a8665a1be0ea2c8d Mon Sep 17 00:00:00 2001 From: Pavel Agaletskiy Date: Sun, 28 Mar 2021 13:04:53 +0300 Subject: [PATCH 1/4] Support for correct reconnections limit in consumer --- pulsar/consumer.go | 3 + pulsar/consumer_impl.go | 4 + pulsar/consumer_multitopic.go | 4 + pulsar/consumer_partition.go | 12 +- pulsar/consumer_regex.go | 4 + pulsar/consumer_test.go | 351 ++++++++++++++++++++-------------- 6 files changed, 229 insertions(+), 149 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 1c52b2993..992c212ea 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -165,6 +165,9 @@ type Consumer interface { // Unsubscribe the consumer Unsubscribe() error + // Closed returns a channel indicating that consumer is closed + Closed() <-chan struct{} + // Receive a single message. // This calls blocks until a message is available. Receive(context.Context) (Message, error) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index d05c495b4..b7f110a2c 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -375,6 +375,10 @@ func (c *consumer) Unsubscribe() error { return nil } +func (c *consumer) Closed() <-chan struct{} { + return c.closeCh +} + func (c *consumer) Receive(ctx context.Context) (message Message, err error) { for { select { diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index dc4ad7b9b..3545e00ce 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -93,6 +93,10 @@ func (c *multiTopicConsumer) Unsubscribe() error { return errs } +func (c *multiTopicConsumer) Closed() <-chan struct{} { + return c.closeCh +} + func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err error) { for { select { diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 031e0a3a6..434ba344d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -784,7 +784,9 @@ func (pc *partitionConsumer) runEventsLoop() { return case <-pc.connectClosedCh: pc.log.Debug("runEventsLoop will reconnect") - pc.reconnectToBroker() + if !pc.reconnectToBroker() { + pc.parentConsumer.Close() + } } } }() @@ -858,7 +860,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { close(pc.closeCh) } -func (pc *partitionConsumer) reconnectToBroker() { +func (pc *partitionConsumer) reconnectToBroker() bool { var ( maxRetry int backoff = internal.Backoff{} @@ -873,7 +875,7 @@ func (pc *partitionConsumer) reconnectToBroker() { for maxRetry != 0 { if pc.getConsumerState() != consumerReady { // Consumer is already closing - return + return false } d := backoff.Next() @@ -884,13 +886,15 @@ func (pc *partitionConsumer) reconnectToBroker() { if err == nil { // Successfully reconnected pc.log.Info("Reconnected consumer to broker") - return + return true } if maxRetry > 0 { maxRetry-- } } + + return false } func (pc *partitionConsumer) grabConn() error { diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 707237065..3a89cfbc5 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -136,6 +136,10 @@ func (c *regexConsumer) Unsubscribe() error { return errs } +func (c *regexConsumer) Closed() <-chan struct{} { + return c.closeCh +} + func (c *regexConsumer) Receive(ctx context.Context) (message Message, err error) { for { select { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index f512b00b4..1c8be45f3 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -30,6 +30,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var ( @@ -42,7 +43,7 @@ func TestProducerConsumer(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "my-topic" @@ -54,7 +55,7 @@ func TestProducerConsumer(t *testing.T) { SubscriptionName: "my-sub", Type: Exclusive, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create producer @@ -62,28 +63,25 @@ func TestProducerConsumer(t *testing.T) { Topic: topic, DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages for i := 0; i < 10; i++ { - if _, err := producer.Send(ctx, &ProducerMessage{ + _, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), Key: "pulsar", Properties: map[string]string{ "key-1": "pulsar-1", }, - }); err != nil { - log.Fatal(err) - } + }) + require.NoError(t, err) } // receive 10 messages for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) - if err != nil { - log.Fatal(err) - } + require.NoError(t, err) expectMsg := fmt.Sprintf("hello-%d", i) expectProperties := map[string]string{ @@ -103,7 +101,7 @@ func TestConsumerConnectError(t *testing.T) { URL: "pulsar://invalid-hostname:6650", }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() @@ -124,7 +122,7 @@ func TestBatchMessageReceive(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := "persistent://public/default/receive-batch" @@ -141,7 +139,7 @@ func TestBatchMessageReceive(t *testing.T) { BatchingMaxMessages: uint(batchSize), DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, topicName, producer.Topic()) defer producer.Close() @@ -149,7 +147,7 @@ func TestBatchMessageReceive(t *testing.T) { Topic: topicName, SubscriptionName: subName, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() count := 0 @@ -159,12 +157,12 @@ func TestBatchMessageReceive(t *testing.T) { Payload: []byte(messageContent), } _, err := producer.Send(ctx, msg) - assert.Nil(t, err) + require.NoError(t, err) } for i := 0; i < numOfMessages; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) consumer.Ack(msg) count++ } @@ -190,7 +188,7 @@ func TestConsumerWithInvalidConf(t *testing.T) { // Expect error in creating consumer assert.Nil(t, consumer) - assert.NotNil(t, err) + require.NoError(t, err) fmt.Println(err.Error()) assert.Equal(t, err.(*Error).Result(), SubscriptionNotFound) @@ -201,7 +199,7 @@ func TestConsumerWithInvalidConf(t *testing.T) { // Expect error in creating consumer assert.Nil(t, consumer) - assert.NotNil(t, err) + require.NoError(t, err) assert.Equal(t, err.(*Error).Result(), TopicNotFound) } @@ -211,7 +209,7 @@ func TestConsumerSubscriptionEarliestPosition(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := fmt.Sprintf("testSeek-%d", time.Now().Unix()) @@ -221,7 +219,7 @@ func TestConsumerSubscriptionEarliestPosition(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send message @@ -229,12 +227,12 @@ func TestConsumerSubscriptionEarliestPosition(t *testing.T) { _, err = producer.Send(ctx, &ProducerMessage{ Payload: []byte("msg-1-content-1"), }) - assert.Nil(t, err) + require.NoError(t, err) _, err = producer.Send(ctx, &ProducerMessage{ Payload: []byte("msg-1-content-2"), }) - assert.Nil(t, err) + require.NoError(t, err) // create consumer consumer, err := client.Subscribe(ConsumerOptions{ @@ -242,11 +240,11 @@ func TestConsumerSubscriptionEarliestPosition(t *testing.T) { SubscriptionName: subName, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, "msg-1-content-1", string(msg.Payload())) } @@ -255,7 +253,7 @@ func TestConsumerKeyShared(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/test-topic-6" @@ -265,7 +263,7 @@ func TestConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() consumer2, err := client.Subscribe(ConsumerOptions{ @@ -273,7 +271,7 @@ func TestConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer2.Close() // create producer @@ -281,7 +279,7 @@ func TestConsumerKeyShared(t *testing.T) { Topic: topic, DisableBatching: true, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() ctx := context.Background() @@ -290,7 +288,7 @@ func TestConsumerKeyShared(t *testing.T) { Key: fmt.Sprintf("key-shared-%d", i%3), Payload: []byte(fmt.Sprintf("value-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } receivedConsumer1 := 0 @@ -324,7 +322,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/testGetPartitions" @@ -336,11 +334,11 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() topics, err := client.TopicPartitions(topic) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, topic+"-partition-0", topics[0]) assert.Equal(t, topic+"-partition-1", topics[1]) assert.Equal(t, topic+"-partition-2", topics[2]) @@ -351,7 +349,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { Type: Exclusive, ReceiverQueueSize: 10, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() ctx := context.Background() @@ -359,14 +357,14 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { _, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } msgs := make([]string, 0) for i := 0; i < 10; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) msgs = append(msgs, string(msg.Payload())) fmt.Printf("Received message msgId: %#v -- content: '%s'\n", @@ -382,7 +380,7 @@ func TestConsumerReceiveTimeout(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "test-topic-with-no-messages" @@ -395,7 +393,7 @@ func TestConsumerReceiveTimeout(t *testing.T) { SubscriptionName: "my-sub1", Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() msg, err := consumer.Receive(ctx) @@ -403,12 +401,75 @@ func TestConsumerReceiveTimeout(t *testing.T) { assert.NotNil(t, err) } +func TestConsumerClosed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + require.NoError(t, err) + defer client.Close() + + topic := "test-topic-for-consumer-closed" + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-consumer-closed", + Type: Shared, + }) + require.NoError(t, err) + defer consumer.Close() + + select { + case <-consumer.Closed(): + + default: + } + + consumer.Close() + + select { + case <-consumer.Closed(): + default: + require.Fail(t, "consumer was not expected to be opened") + } +} + +func TestConsumerClosingOnReconnectionLimit(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + require.NoError(t, err) + defer client.Close() + + topic := "test-topic-reconnection-testing" + maxReconnectToBroker := uint(0) + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-consumer-reconnections", + Type: Shared, + MaxReconnectToBroker: &maxReconnectToBroker, + }) + require.NoError(t, err) + defer consumer.Close() + + testURL := adminURL + "/" + "admin/v2/persistent/public/default/test-topic-reconnection-testing/unload" + makeHTTPCall(t, http.MethodPut, testURL, "64") + + select { + case <-consumer.Closed(): + case <-time.After(5 * time.Second): + require.Fail(t, "consumer must be closed") + } +} + func TestConsumerShared(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/testMultiPartitionConsumerShared" @@ -422,7 +483,7 @@ func TestConsumerShared(t *testing.T) { SubscriptionName: sub, Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() consumer2, err := client.Subscribe(ConsumerOptions{ @@ -430,7 +491,7 @@ func TestConsumerShared(t *testing.T) { SubscriptionName: sub, Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer2.Close() // create producer @@ -438,7 +499,7 @@ func TestConsumerShared(t *testing.T) { Topic: topic, DisableBatching: true, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages with unique payloads @@ -484,7 +545,7 @@ func TestConsumerEventTime(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := "test-event-time" @@ -493,14 +554,14 @@ func TestConsumerEventTime(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() et := timeFromUnixTimestampMillis(uint64(5)) @@ -508,10 +569,10 @@ func TestConsumerEventTime(t *testing.T) { Payload: []byte("test"), EventTime: et, }) - assert.Nil(t, err) + require.NoError(t, err) msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, et, msg.EventTime()) assert.Equal(t, "test", string(msg.Payload())) } @@ -521,7 +582,7 @@ func TestConsumerFlow(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := "test-received-since-flow" @@ -530,7 +591,7 @@ func TestConsumerFlow(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ @@ -538,7 +599,7 @@ func TestConsumerFlow(t *testing.T) { SubscriptionName: "sub-1", ReceiverQueueSize: 4, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() for msgNum := 0; msgNum < 100; msgNum++ { @@ -551,7 +612,7 @@ func TestConsumerFlow(t *testing.T) { for msgNum := 0; msgNum < 100; msgNum++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", msgNum), string(msg.Payload())) } } @@ -561,7 +622,7 @@ func TestConsumerAck(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -570,7 +631,7 @@ func TestConsumerAck(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ @@ -578,7 +639,7 @@ func TestConsumerAck(t *testing.T) { SubscriptionName: "sub-1", Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) const N = 100 @@ -592,7 +653,7 @@ func TestConsumerAck(t *testing.T) { for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) if i < N/2 { @@ -609,13 +670,13 @@ func TestConsumerAck(t *testing.T) { SubscriptionName: "sub-1", Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // We should only receive the 2nd half of messages for i := N / 2; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) consumer.Ack(msg) @@ -627,7 +688,7 @@ func TestConsumerNack(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -636,7 +697,7 @@ func TestConsumerNack(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ @@ -645,7 +706,7 @@ func TestConsumerNack(t *testing.T) { Type: Shared, NackRedeliveryDelay: 1 * time.Second, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() const N = 100 @@ -660,7 +721,7 @@ func TestConsumerNack(t *testing.T) { for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) if i%2 == 0 { @@ -677,7 +738,7 @@ func TestConsumerNack(t *testing.T) { // We should only receive the odd messages for i := 1; i < N; i += 2 { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) consumer.Ack(msg) @@ -689,7 +750,7 @@ func TestConsumerCompression(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -699,14 +760,14 @@ func TestConsumerCompression(t *testing.T) { Topic: topicName, CompressionType: LZ4, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() const N = 100 @@ -721,7 +782,7 @@ func TestConsumerCompression(t *testing.T) { for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) consumer.Ack(msg) } @@ -732,7 +793,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -743,14 +804,14 @@ func TestConsumerCompressionWithBatches(t *testing.T) { CompressionType: ZLib, BatchingMaxPublishDelay: 1 * time.Minute, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() const N = 100 @@ -765,7 +826,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) { for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) consumer.Ack(msg) } @@ -775,7 +836,7 @@ func TestConsumerSeek(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -785,14 +846,14 @@ func TestConsumerSeek(t *testing.T) { Topic: topicName, DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10 @@ -802,7 +863,7 @@ func TestConsumerSeek(t *testing.T) { id, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) if i == N-50 { seekID = id @@ -812,16 +873,16 @@ func TestConsumerSeek(t *testing.T) { // Don't consume all messages so some stay in queues for i := 0; i < N-20; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) consumer.Ack(msg) } err = consumer.Seek(seekID) - assert.Nil(t, err) + require.NoError(t, err) msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload())) } @@ -829,7 +890,7 @@ func TestConsumerSeekByTime(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topicName := newTopicName() @@ -839,44 +900,44 @@ func TestConsumerSeekByTime(t *testing.T) { Topic: topicName, DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "my-sub", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10 const N = 1100 resetTimeStr := "100s" retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr) - assert.Nil(t, err) + require.NoError(t, err) for i := 0; i < N; i++ { _, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } // Don't consume all messages so some stay in queues for i := 0; i < N-20; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) consumer.Ack(msg) } currentTimestamp := time.Now() err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond)) - assert.Nil(t, err) + require.NoError(t, err) for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) consumer.Ack(msg) } @@ -956,7 +1017,7 @@ func TestDLQ(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() dlqTopic := newTopicName() @@ -965,7 +1026,7 @@ func TestDLQ(t *testing.T) { Topic: dlqTopic, SubscriptionName: "dlq", }) - assert.Nil(t, err) + require.NoError(t, err) defer dlqConsumer.Close() topic := newTopicName() @@ -982,14 +1043,14 @@ func TestDLQ(t *testing.T) { DeadLetterTopic: dlqTopic, }, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages @@ -1053,7 +1114,7 @@ func TestDLQMultiTopics(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() dlqTopic := newTopicName() @@ -1062,7 +1123,7 @@ func TestDLQMultiTopics(t *testing.T) { Topic: dlqTopic, SubscriptionName: "dlq", }) - assert.Nil(t, err) + require.NoError(t, err) defer dlqConsumer.Close() topicPrefix := newTopicName() @@ -1085,7 +1146,7 @@ func TestDLQMultiTopics(t *testing.T) { DeadLetterTopic: dlqTopic, }, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create one producer for each topic @@ -1094,7 +1155,7 @@ func TestDLQMultiTopics(t *testing.T) { producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, }) - assert.Nil(t, err) + require.NoError(t, err) producers[i] = producer } @@ -1162,17 +1223,17 @@ func TestRLQ(t *testing.T) { ctx := context.Background() client, err := NewClient(ClientOptions{URL: lookupURL}) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() // 1. Pre-produce N messages producer, err := client.CreateProducer(ProducerOptions{Topic: topic}) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() for i := 0; i < N; i++ { _, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MESSAGE_%d", i))}) - assert.Nil(t, err) + require.NoError(t, err) } // 2. Create consumer on the Retry Topic to reconsume N messages (maxRedeliveries+1) times @@ -1187,13 +1248,13 @@ func TestRLQ(t *testing.T) { RetryEnable: true, NackRedeliveryDelay: 1 * time.Second, }) - assert.Nil(t, err) + require.NoError(t, err) defer rlqConsumer.Close() rlqReceived := 0 for rlqReceived < N*(maxRedeliveries+1) { msg, err := rlqConsumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) rlqConsumer.ReconsumeLater(msg, 1*time.Second) rlqReceived++ } @@ -1212,13 +1273,13 @@ func TestRLQ(t *testing.T) { SubscriptionName: subName, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer dlqConsumer.Close() dlqReceived := 0 for dlqReceived < N { msg, err := dlqConsumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) dlqConsumer.Ack(msg) dlqReceived++ } @@ -1238,7 +1299,7 @@ func TestRLQ(t *testing.T) { Type: Shared, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer checkConsumer.Close() checkCtx, checkCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) @@ -1260,7 +1321,7 @@ func TestRLQMultiTopics(t *testing.T) { ctx := context.Background() client, err := NewClient(ClientOptions{URL: lookupURL}) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() // subscribe multi topics with Retry Topics @@ -1273,7 +1334,7 @@ func TestRLQMultiTopics(t *testing.T) { RetryEnable: true, NackRedeliveryDelay: 1 * time.Second, }) - assert.Nil(t, err) + require.NoError(t, err) defer rlqConsumer.Close() // subscribe DLQ Topic @@ -1282,31 +1343,31 @@ func TestRLQMultiTopics(t *testing.T) { SubscriptionName: subName, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer dlqConsumer.Close() // create multi producers producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01}) - assert.Nil(t, err) + require.NoError(t, err) defer producer01.Close() producer02, err := client.CreateProducer(ProducerOptions{Topic: topic02}) - assert.Nil(t, err) + require.NoError(t, err) defer producer02.Close() // 1. Pre-produce N messages for every topic for i := 0; i < N; i++ { _, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))}) - assert.Nil(t, err) + require.NoError(t, err) _, err = producer02.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_02_%d", i))}) - assert.Nil(t, err) + require.NoError(t, err) } // 2. Create consumer on the Retry Topics to reconsume 2*N messages (maxRedeliveries+1) times rlqReceived := 0 for rlqReceived < 2*N*(maxRedeliveries+1) { msg, err := rlqConsumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) rlqConsumer.ReconsumeLater(msg, 1*time.Second) rlqReceived++ } @@ -1323,7 +1384,7 @@ func TestRLQMultiTopics(t *testing.T) { dlqReceived := 0 for dlqReceived < 2*N { msg, err := dlqConsumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) dlqConsumer.Ack(msg) dlqReceived++ } @@ -1343,7 +1404,7 @@ func TestRLQMultiTopics(t *testing.T) { Type: Shared, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.Nil(t, err) + require.NoError(t, err) defer checkConsumer.Close() timeoutCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) @@ -1358,7 +1419,7 @@ func TestGetDeliveryCount(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -1371,14 +1432,14 @@ func TestGetDeliveryCount(t *testing.T) { NackRedeliveryDelay: 1 * time.Second, Type: Shared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages @@ -1407,14 +1468,14 @@ func TestGetDeliveryCount(t *testing.T) { var msg Message for i := 0; i < 5; i++ { msg, err = consumer.Receive(context.Background()) - assert.Nil(t, err) + require.NoError(t, err) consumer.Nack(msg) } assert.Equal(t, uint32(i+1), msg.RedeliveryCount()) } msg, err := consumer.Receive(context.Background()) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, uint32(3), msg.RedeliveryCount()) } @@ -1422,7 +1483,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -1440,7 +1501,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) { return i }, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() consumer, err := client.Subscribe(ConsumerOptions{ @@ -1448,7 +1509,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) { SubscriptionName: "my-sub", AutoDiscoveryPeriod: 100 * time.Millisecond, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // Increase number of partitions to 10 @@ -1464,14 +1525,14 @@ func TestConsumerAddTopicPartitions(t *testing.T) { Key: fmt.Sprintf("%d", i), Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } msgs := make([]string, 0) for i := 0; i < 10; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) msgs = append(msgs, string(msg.Payload())) fmt.Printf("Received message msgId: %#v -- content: '%s'\n", @@ -1488,7 +1549,7 @@ func TestConsumerNegativeReceiverQueueSize(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -1503,14 +1564,14 @@ func TestConsumerNegativeReceiverQueueSize(t *testing.T) { } }() - assert.Nil(t, err) + require.NoError(t, err) } func TestProducerName(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -1521,7 +1582,7 @@ func TestProducerName(t *testing.T) { Topic: topic, Name: producerName, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // create consumer @@ -1530,7 +1591,7 @@ func TestProducerName(t *testing.T) { SubscriptionName: "my-sub", }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // publish 10 messages to topic @@ -1539,12 +1600,12 @@ func TestProducerName(t *testing.T) { _, err := producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) - assert.Nil(t, err) + require.NoError(t, err) } for i := 0; i < 10; i++ { msg, err := consumer.Receive(ctx) - assert.Nil(t, err) + require.NoError(t, err) assert.Equal(t, msg.ProducerName(), producerName) consumer.Ack(msg) @@ -1599,7 +1660,7 @@ func TestConsumerWithInterceptors(t *testing.T) { URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := newTopicName() @@ -1619,7 +1680,7 @@ func TestConsumerWithInterceptors(t *testing.T) { metric, }, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer.Close() // create producer @@ -1627,7 +1688,7 @@ func TestConsumerWithInterceptors(t *testing.T) { Topic: topic, DisableBatching: false, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() // send 10 messages @@ -1726,7 +1787,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/test-key-based-batch-with-key-shared" @@ -1736,7 +1797,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() consumer2, err := client.Subscribe(ConsumerOptions{ @@ -1744,7 +1805,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer2.Close() // create producer @@ -1754,7 +1815,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { BatcherBuilderType: KeyBasedBatchBuilder, BatchingMaxMessages: 10, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() ctx := context.Background() @@ -1765,7 +1826,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { Key: k, Payload: []byte(fmt.Sprintf("value-%d", i)), }, func(id MessageID, producerMessage *ProducerMessage, err error) { - assert.Nil(t, err) + require.NoError(t, err) }, ) } @@ -1828,7 +1889,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/test-ordering-of-key-based-batch-with-key-shared" @@ -1838,7 +1899,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { SubscriptionName: "sub-1", Type: KeyShared, }) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() // create producer @@ -1849,7 +1910,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { BatchingMaxMessages: 30, BatchingMaxPublishDelay: time.Second * 5, }) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() ctx := context.Background() @@ -1860,7 +1921,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { Key: k, Payload: []byte(fmt.Sprintf("value-%d", i)), }, func(id MessageID, producerMessage *ProducerMessage, err error) { - assert.Nil(t, err) + require.NoError(t, err) }, ) } @@ -1894,7 +1955,7 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { OrderingKey: k, Payload: []byte(fmt.Sprintf("value-%d", i)), }, func(id MessageID, producerMessage *ProducerMessage, err error) { - assert.Nil(t, err) + require.NoError(t, err) }, ) } @@ -1927,7 +1988,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { URL: lookupURL, }, ) - assert.Nil(t, err) + require.NoError(t, err) defer client.Close() topic := "persistent://public/default/test-key-shared-with-ordering-key" @@ -1939,7 +2000,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { Type: KeyShared, }, ) - assert.Nil(t, err) + require.NoError(t, err) defer consumer1.Close() consumer2, err := client.Subscribe( @@ -1949,7 +2010,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { Type: KeyShared, }, ) - assert.Nil(t, err) + require.NoError(t, err) defer consumer2.Close() // create producer @@ -1959,7 +2020,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { DisableBatching: true, }, ) - assert.Nil(t, err) + require.NoError(t, err) defer producer.Close() ctx := context.Background() @@ -1972,7 +2033,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { Payload: []byte(fmt.Sprintf("value-%d", i)), }, ) - assert.Nil(t, err) + require.NoError(t, err) } receivedConsumer1 := 0 From a68ea3a1e098a56a2a3fb68026da225648405ddc Mon Sep 17 00:00:00 2001 From: Pavel Agaletskiy Date: Tue, 22 Jun 2021 09:56:18 +0300 Subject: [PATCH 2/4] Added log about reaching maximum amount of reconnection attempts --- pulsar/consumer_partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 434ba344d..9c67271e3 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -894,6 +894,7 @@ func (pc *partitionConsumer) reconnectToBroker() bool { } } + pc.log.Warn("Reached maximum number of reconnection attempts") return false } From ce2c8bbfee4faf34814fd04941d99f76ef5b3da3 Mon Sep 17 00:00:00 2001 From: Pavel Agaletskiy Date: Tue, 24 Aug 2021 11:13:43 +0300 Subject: [PATCH 3/4] Fixed possible panic when consumer is closed during initialization period --- pulsar/consumer_impl.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 6c4b65bd4..a1954bf3f 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -54,6 +54,8 @@ type consumer struct { closeOnce sync.Once closeCh chan struct{} errorCh chan error + // close will be assigned only after full initialization cycle will be ready + close func() stopDiscovery func() log log.Logger @@ -211,6 +213,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string, duration = defaultAutoDiscoveryDuration } consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration) + consumer.close = consumer.closeInternal return consumer, nil } @@ -502,6 +505,12 @@ func (c *consumer) NackID(msgID MessageID) { } func (c *consumer) Close() { + if c.close != nil { + c.close() + } +} + +func (c *consumer) closeInternal() { c.closeOnce.Do(func() { c.stopDiscovery() From 7c604c6ff9f545f38013f8f6a4d7a404a2df7844 Mon Sep 17 00:00:00 2001 From: Pavel Agaletskiy Date: Thu, 26 Aug 2021 10:35:20 +0300 Subject: [PATCH 4/4] Fixed codestyle and mock --- pulsar/consumer_impl.go | 3 +-- pulsar/internal/pulsartracing/consumer_interceptor_test.go | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index a1954bf3f..12048224f 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -54,8 +54,7 @@ type consumer struct { closeOnce sync.Once closeCh chan struct{} errorCh chan error - // close will be assigned only after full initialization cycle will be ready - close func() + close func() // close will be assigned only after full initialization cycle will be ready stopDiscovery func() log log.Logger diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index b15a926be..e55d844e1 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -76,6 +76,10 @@ func (c *mockConsumer) NackID(msgID pulsar.MessageID) {} func (c *mockConsumer) Close() {} +func (c *mockConsumer) Closed() <-chan struct{} { + return make(chan struct{}) +} + func (c *mockConsumer) Seek(msgID pulsar.MessageID) error { return nil }