From 05d5eca821b84304b14bba30fac50b4d91a88a96 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 20 Feb 2024 18:37:30 +0800 Subject: [PATCH 1/9] Support partitioned topic reader --- pulsar/consumer.go | 7 +++ pulsar/consumer_impl.go | 24 +++++++- pulsar/consumer_partition.go | 36 ++++++++++++ pulsar/reader.go | 1 + pulsar/reader_impl.go | 106 +++++++++++++---------------------- 5 files changed, 107 insertions(+), 67 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 667bff66c..135dbb2b4 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -246,6 +246,13 @@ type ConsumerOptions struct { // SubscriptionMode specifies the subscription mode to be used when subscribing to a topic. // Default is `Durable` SubscriptionMode SubscriptionMode + + // StartMessageIDInclusive, if true, the reader will start at the `StartMessageID`, included. + // Default is `false` and the reader will start from the "next" message + StartMessageIDInclusive bool + + // startMessageID specifies the message id to start from. Currently, it's only used for the reader internally. + startMessageID *trackingMessageID } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 75d839b41..74f98cc65 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -23,6 +23,7 @@ import ( "math/rand" "strconv" "sync" + "sync/atomic" "time" "github.com/apache/pulsar-client-go/pulsar/crypto" @@ -384,7 +385,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { metadata: metadata, subProperties: subProperties, replicateSubscriptionState: c.options.ReplicateSubscriptionState, - startMessageID: nil, + startMessageID: c.options.startMessageID, + startMessageIDInclusive: c.options.StartMessageIDInclusive, subscriptionMode: c.options.SubscriptionMode, readCompacted: c.options.ReadCompacted, interceptors: c.options.Interceptors, @@ -707,6 +709,26 @@ func (c *consumer) checkMsgIDPartition(msgID MessageID) error { return nil } +func (c *consumer) hasNext() bool { + var wg sync.WaitGroup + + wg.Add(len(c.consumers)) + + var hasNext atomic.Bool + for _, pc := range c.consumers { + pc := pc + go func() { + defer wg.Done() + if pc.hasNext() { + hasNext.Store(true) + } + }() + } + + wg.Wait() + return hasNext.Load() +} + var r = &random{ R: rand.New(rand.NewSource(time.Now().UnixNano())), } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fd6441c1c..95b5bc094 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -174,6 +174,8 @@ type partitionConsumer struct { chunkedMsgCtxMap *chunkedMsgCtxMap unAckChunksTracker *unAckChunksTracker ackGroupingTracker ackGroupingTracker + + lastMessageInBroker *trackingMessageID } func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) { @@ -1970,6 +1972,40 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, pc.availablePermits.inc() } +func (pc *partitionConsumer) hasNext() bool { + if pc.lastMessageInBroker != nil && pc.hasMoreMessages() { + return true + } + + for { + lastMsgID, err := pc.getLastMessageID() + if err != nil { + pc.log.WithError(err).Error("Failed to get last message id from broker") + continue + } else { + pc.lastMessageInBroker = lastMsgID + break + } + } + + return pc.hasMoreMessages() +} + +func (pc *partitionConsumer) hasMoreMessages() bool { + if pc.lastDequeuedMsg != nil { + return pc.lastMessageInBroker.isEntryIDValid() && pc.lastMessageInBroker.greater(pc.lastDequeuedMsg.messageID) + } + + if pc.options.startMessageIDInclusive { + return pc.lastMessageInBroker.isEntryIDValid() && + pc.lastMessageInBroker.greaterEqual(pc.startMessageID.get().messageID) + } + + // Non-inclusive + return pc.lastMessageInBroker.isEntryIDValid() && + pc.lastMessageInBroker.greater(pc.startMessageID.get().messageID) +} + // _setConn sets the internal connection field of this partition consumer atomically. // Note: should only be called by this partition consumer when a new connection is available. func (pc *partitionConsumer) _setConn(conn internal.Connection) { diff --git a/pulsar/reader.go b/pulsar/reader.go index 5e1a73b98..4fb520fca 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -136,5 +136,6 @@ type Reader interface { SeekByTime(time time.Time) error // GetLastMessageID get the last message id available for consume. + // It only works for single topic reader GetLastMessageID() (MessageID, error) } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 7b260b88d..92b4856ec 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -34,12 +34,12 @@ const ( type reader struct { sync.Mutex - client *client - pc *partitionConsumer - messageCh chan ConsumerMessage - lastMessageInBroker *trackingMessageID - log log.Logger - metrics *internal.LeveledMetrics + client *client + pc *partitionConsumer + messageCh chan ConsumerMessage + log log.Logger + metrics *internal.LeveledMetrics + c *consumer } func newReader(client *client, options ReaderOptions) (Reader, error) { @@ -98,25 +98,25 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { options.ExpireTimeOfIncompleteChunk = time.Minute } - consumerOptions := &partitionConsumerOpts{ - topic: options.Topic, - consumerName: options.Name, - subscription: subscriptionName, - subscriptionType: Exclusive, - receiverQueueSize: receiverQueueSize, + co := &ConsumerOptions{ + Topic: options.Topic, + Name: options.Name, + SubscriptionName: subscriptionName, + Type: Exclusive, + ReceiverQueueSize: receiverQueueSize, + SubscriptionMode: NonDurable, + ReadCompacted: options.ReadCompacted, + Properties: options.Properties, + NackRedeliveryDelay: defaultNackRedeliveryDelay, + ReplicateSubscriptionState: false, + Decryption: options.Decryption, + Schema: options.Schema, + BackoffPolicy: options.BackoffPolicy, + MaxPendingChunkedMessage: options.MaxPendingChunkedMessage, + ExpireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, + AutoAckIncompleteChunk: options.AutoAckIncompleteChunk, startMessageID: startMessageID, - startMessageIDInclusive: options.StartMessageIDInclusive, - subscriptionMode: NonDurable, - readCompacted: options.ReadCompacted, - metadata: options.Properties, - nackRedeliveryDelay: defaultNackRedeliveryDelay, - replicateSubscriptionState: false, - decryption: options.Decryption, - schema: options.Schema, - backoffPolicy: options.BackoffPolicy, - maxPendingChunkedMessage: options.MaxPendingChunkedMessage, - expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, - autoAckIncompleteChunk: options.AutoAckIncompleteChunk, + StartMessageIDInclusive: options.StartMessageIDInclusive, } reader := &reader{ @@ -132,25 +132,25 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, err } - pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq, reader.metrics) + c, err := newInternalConsumer(client, *co, options.Topic, reader.messageCh, dlq, nil, false) if err != nil { close(reader.messageCh) return nil, err } + reader.c = c - reader.pc = pc reader.metrics.ReadersOpened.Inc() return reader, nil } func (r *reader) Topic() string { - return r.pc.topic + return r.c.topic } func (r *reader) Next(ctx context.Context) (Message, error) { for { select { - case cm, ok := <-r.messageCh: + case cm, ok := <-r.c.messageCh: if !ok { return nil, newError(ConsumerClosed, "consumer closed") } @@ -158,9 +158,10 @@ func (r *reader) Next(ctx context.Context) (Message, error) { // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, // it will specify the subscription position anyway msgID := cm.Message.ID() - mid := toTrackingMessageID(msgID) - r.pc.lastDequeuedMsg = mid - r.pc.AckID(mid) + err := r.c.AckID(msgID) + if err != nil { + return nil, err + } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() @@ -169,41 +170,11 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } func (r *reader) HasNext() bool { - if r.lastMessageInBroker != nil && r.hasMoreMessages() { - return true - } - - for { - lastMsgID, err := r.pc.getLastMessageID() - if err != nil { - r.log.WithError(err).Error("Failed to get last message id from broker") - continue - } else { - r.lastMessageInBroker = lastMsgID - break - } - } - - return r.hasMoreMessages() -} - -func (r *reader) hasMoreMessages() bool { - if r.pc.lastDequeuedMsg != nil { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID) - } - - if r.pc.options.startMessageIDInclusive { - return r.lastMessageInBroker.isEntryIDValid() && - r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.get().messageID) - } - - // Non-inclusive - return r.lastMessageInBroker.isEntryIDValid() && - r.lastMessageInBroker.greater(r.pc.startMessageID.get().messageID) + return r.c.hasNext() } func (r *reader) Close() { - r.pc.Close() + r.c.Close() r.client.handlers.Del(r) r.metrics.ReadersClosed.Inc() } @@ -235,16 +206,19 @@ func (r *reader) Seek(msgID MessageID) error { return nil } - return r.pc.Seek(mid) + return r.c.Seek(mid) } func (r *reader) SeekByTime(time time.Time) error { r.Lock() defer r.Unlock() - return r.pc.SeekByTime(time) + return r.c.SeekByTime(time) } func (r *reader) GetLastMessageID() (MessageID, error) { - return r.pc.getLastMessageID() + if len(r.c.consumers) > 1 { + return nil, fmt.Errorf("GetLastMessageID is not supported for multi-topics reader") + } + return r.c.consumers[0].getLastMessageID() } From 506610086e1dbcc51792537891e4b4557b26f826 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 21 Feb 2024 16:09:01 +0800 Subject: [PATCH 2/9] Use uAtomic for atomic.Bool --- pulsar/consumer_impl.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 74f98cc65..272d39e32 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -20,10 +20,10 @@ package pulsar import ( "context" "fmt" + uAtomic "go.uber.org/atomic" "math/rand" "strconv" "sync" - "sync/atomic" "time" "github.com/apache/pulsar-client-go/pulsar/crypto" @@ -714,7 +714,7 @@ func (c *consumer) hasNext() bool { wg.Add(len(c.consumers)) - var hasNext atomic.Bool + var hasNext uAtomic.Bool for _, pc := range c.consumers { pc := pc go func() { From 364a3790770f8f8d808a6a9f3ca0fb9ab5c8d00e Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 21 Feb 2024 17:09:20 +0800 Subject: [PATCH 3/9] Fix tests --- pulsar/consumer_impl.go | 2 +- pulsar/reader_impl.go | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 272d39e32..7e27bb110 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -20,7 +20,6 @@ package pulsar import ( "context" "fmt" - uAtomic "go.uber.org/atomic" "math/rand" "strconv" "sync" @@ -31,6 +30,7 @@ import ( pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" pkgerrors "github.com/pkg/errors" + uAtomic "go.uber.org/atomic" ) const defaultNackRedeliveryDelay = 1 * time.Minute diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 92b4856ec..ebb6e9f67 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -131,8 +131,13 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { if err != nil { return nil, err } + // Provide dummy rlq router with not dlq policy + rlq, err := newRetryRouter(client, nil, false, client.log) + if err != nil { + return nil, err + } - c, err := newInternalConsumer(client, *co, options.Topic, reader.messageCh, dlq, nil, false) + c, err := newInternalConsumer(client, *co, options.Topic, reader.messageCh, dlq, rlq, false) if err != nil { close(reader.messageCh) return nil, err From 0c4b74b693231d0db4f5936bf6db6d4b6b5794d7 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 21 Feb 2024 18:09:01 +0800 Subject: [PATCH 4/9] Fix tests --- pulsar/reader_impl.go | 1 - pulsar/reader_test.go | 18 +++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index ebb6e9f67..cf4ebc6a5 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -35,7 +35,6 @@ const ( type reader struct { sync.Mutex client *client - pc *partitionConsumer messageCh chan ConsumerMessage log log.Logger metrics *internal.LeveledMetrics diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index c8228a7ca..72832ec9c 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -90,10 +90,10 @@ func TestReaderConfigChunk(t *testing.T) { defer r1.Close() // verify specified chunk options - pcOpts := r1.(*reader).pc.options - assert.Equal(t, 50, pcOpts.maxPendingChunkedMessage) - assert.Equal(t, 30*time.Second, pcOpts.expireTimeOfIncompleteChunk) - assert.True(t, pcOpts.autoAckIncompleteChunk) + pcOpts := r1.(*reader).c.options + assert.Equal(t, 50, pcOpts.MaxPendingChunkedMessage) + assert.Equal(t, 30*time.Second, pcOpts.ExpireTimeOfIncompleteChunk) + assert.True(t, pcOpts.AutoAckIncompleteChunk) r2, err := client.CreateReader(ReaderOptions{ Topic: "my-topic2", @@ -103,10 +103,10 @@ func TestReaderConfigChunk(t *testing.T) { defer r2.Close() // verify default chunk options - pcOpts = r2.(*reader).pc.options - assert.Equal(t, 100, pcOpts.maxPendingChunkedMessage) - assert.Equal(t, time.Minute, pcOpts.expireTimeOfIncompleteChunk) - assert.False(t, pcOpts.autoAckIncompleteChunk) + pcOpts = r2.(*reader).c.options + assert.Equal(t, 100, pcOpts.MaxPendingChunkedMessage) + assert.Equal(t, time.Minute, pcOpts.ExpireTimeOfIncompleteChunk) + assert.False(t, pcOpts.AutoAckIncompleteChunk) } func TestReader(t *testing.T) { @@ -880,7 +880,7 @@ func TestReaderWithBackoffPolicy(t *testing.T) { assert.NotNil(t, _reader) assert.Nil(t, err) - partitionConsumerImp := _reader.(*reader).pc + partitionConsumerImp := _reader.(*reader).c.consumers[0] // 1 s startTime := time.Now() partitionConsumerImp.reconnectToBroker() From 0af64b20e1238194f5789ff2339a968d2cb5f7d8 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 21 Feb 2024 19:10:01 +0800 Subject: [PATCH 5/9] Fix tests --- pulsar/consumer_impl.go | 8 ++++++++ pulsar/reader_impl.go | 8 ++++++-- pulsar/reader_test.go | 1 - 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 7e27bb110..1bfb53a18 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -729,6 +729,14 @@ func (c *consumer) hasNext() bool { return hasNext.Load() } +func (c *consumer) setLastDequeuedMsg(msgID MessageID) error { + if err := c.checkMsgIDPartition(msgID); err != nil { + return err + } + c.consumers[msgID.PartitionIdx()].lastDequeuedMsg = toTrackingMessageID(msgID) + return nil +} + var r = &random{ R: rand.New(rand.NewSource(time.Now().UnixNano())), } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index cf4ebc6a5..d4979dcd7 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -154,7 +154,7 @@ func (r *reader) Topic() string { func (r *reader) Next(ctx context.Context) (Message, error) { for { select { - case cm, ok := <-r.c.messageCh: + case cm, ok := <-r.messageCh: if !ok { return nil, newError(ConsumerClosed, "consumer closed") } @@ -162,7 +162,11 @@ func (r *reader) Next(ctx context.Context) (Message, error) { // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, // it will specify the subscription position anyway msgID := cm.Message.ID() - err := r.c.AckID(msgID) + err := r.c.setLastDequeuedMsg(msgID) + if err != nil { + return nil, err + } + err = r.c.AckID(msgID) if err != nil { return nil, err } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 72832ec9c..e87e6a007 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -422,7 +422,6 @@ func TestReaderHasNext(t *testing.T) { assert.NotNil(t, msgID) } - // create reader on 5th message (not included) reader, err := client.CreateReader(ReaderOptions{ Topic: topic, StartMessageID: EarliestMessageID(), From 651140741c5466720b4433222a3addea7da369c9 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 22 Feb 2024 10:18:24 +0800 Subject: [PATCH 6/9] Add test for partitioned reader --- pulsar/reader_test.go | 54 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index e87e6a007..365c3f193 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -20,6 +20,9 @@ package pulsar import ( "context" "fmt" + "github.com/apache/pulsar-client-go/pulsaradmin" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "testing" "time" @@ -153,6 +156,57 @@ func TestReader(t *testing.T) { } } +func TestPartitionedReader(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + admin, err := pulsaradmin.NewClient(&config.Config{}) + assert.Nil(t, err) + ctx := context.Background() + + topicName, err := utils.GetTopicName(topic) + assert.Nil(t, err) + err = admin.Topics().Create(*topicName, 3) + assert.Nil(t, err) + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + defer reader.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + } +} + func TestReaderConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://invalid-hostname:6650", From db6f645ec4d2187f3eb3f37134c4fd75c7e13613 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 22 Feb 2024 11:36:16 +0800 Subject: [PATCH 7/9] Fix lint issue --- pulsar/reader_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 365c3f193..66211f42e 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -20,13 +20,13 @@ package pulsar import ( "context" "fmt" - "github.com/apache/pulsar-client-go/pulsaradmin" - "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" - "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "testing" "time" "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/apache/pulsar-client-go/pulsaradmin" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/google/uuid" "github.com/stretchr/testify/assert" ) From 99d40a0f94ef734fda604e4e30dec7ca00345653 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 22 Feb 2024 12:15:42 +0800 Subject: [PATCH 8/9] Add `TestReaderGetLastMessageIDOnMultiTopics` --- pulsar/consumer.go | 4 ++-- pulsar/reader.go | 2 +- pulsar/reader_impl.go | 4 ++-- pulsar/reader_test.go | 45 ++++++++++++++++++++++++++++++++++--------- 4 files changed, 41 insertions(+), 14 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 135dbb2b4..fea94cf6a 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -247,8 +247,8 @@ type ConsumerOptions struct { // Default is `Durable` SubscriptionMode SubscriptionMode - // StartMessageIDInclusive, if true, the reader will start at the `StartMessageID`, included. - // Default is `false` and the reader will start from the "next" message + // StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included. + // Default is `false` and the consumer will start from the "next" message StartMessageIDInclusive bool // startMessageID specifies the message id to start from. Currently, it's only used for the reader internally. diff --git a/pulsar/reader.go b/pulsar/reader.go index 4fb520fca..1c5235d42 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -136,6 +136,6 @@ type Reader interface { SeekByTime(time time.Time) error // GetLastMessageID get the last message id available for consume. - // It only works for single topic reader + // It only works for single topic reader. It will return an error when the reader is the multi-topic reader. GetLastMessageID() (MessageID, error) } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index d4979dcd7..bf91c67fa 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -97,7 +97,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { options.ExpireTimeOfIncompleteChunk = time.Minute } - co := &ConsumerOptions{ + consumerOptions := &ConsumerOptions{ Topic: options.Topic, Name: options.Name, SubscriptionName: subscriptionName, @@ -136,7 +136,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, err } - c, err := newInternalConsumer(client, *co, options.Topic, reader.messageCh, dlq, rlq, false) + c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false) if err != nil { close(reader.messageCh) return nil, err diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 66211f42e..ccf52875b 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -156,7 +156,7 @@ func TestReader(t *testing.T) { } } -func TestPartitionedReader(t *testing.T) { +func TestReaderOnPartitionedTopic(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, }) @@ -165,15 +165,8 @@ func TestPartitionedReader(t *testing.T) { defer client.Close() topic := newTopicName() - admin, err := pulsaradmin.NewClient(&config.Config{}) - assert.Nil(t, err) + assert.Nil(t, createPartitionedTopic(topic, 3)) ctx := context.Background() - - topicName, err := utils.GetTopicName(topic) - assert.Nil(t, err) - err = admin.Topics().Create(*topicName, 3) - assert.Nil(t, err) - // create reader reader, err := client.CreateReader(ReaderOptions{ Topic: topic, @@ -996,3 +989,37 @@ func TestReaderGetLastMessageID(t *testing.T) { assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID()) assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) } + +func TestReaderGetLastMessageIDOnMultiTopics(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + assert.Nil(t, createPartitionedTopic(topic, 3)) + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + _, err = reader.GetLastMessageID() + assert.NotNil(t, err) +} + +func createPartitionedTopic(topic string, n int) error { + admin, err := pulsaradmin.NewClient(&config.Config{}) + if err != nil { + return err + } + + topicName, err := utils.GetTopicName(topic) + if err != nil { + return err + } + err = admin.Topics().Create(*topicName, n) + if err != nil { + return err + } + return nil +} From dbccdfe416e5b91dd8d59dc2e21218f83b737733 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 23 Feb 2024 14:45:51 +0800 Subject: [PATCH 9/9] quick return for `hasNext` --- pulsar/consumer_impl.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 1bfb53a18..0c31a1aaf 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -30,7 +30,6 @@ import ( pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" pkgerrors "github.com/pkg/errors" - uAtomic "go.uber.org/atomic" ) const defaultNackRedeliveryDelay = 1 * time.Minute @@ -710,23 +709,39 @@ func (c *consumer) checkMsgIDPartition(msgID MessageID) error { } func (c *consumer) hasNext() bool { - var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure all paths cancel the context to avoid context leak + var wg sync.WaitGroup wg.Add(len(c.consumers)) - var hasNext uAtomic.Bool + hasNext := make(chan bool) for _, pc := range c.consumers { pc := pc go func() { defer wg.Done() if pc.hasNext() { - hasNext.Store(true) + select { + case hasNext <- true: + case <-ctx.Done(): + } } }() } - wg.Wait() - return hasNext.Load() + go func() { + wg.Wait() + close(hasNext) // Close the channel after all goroutines have finished + }() + + // Wait for either a 'true' result or for all goroutines to finish + for hn := range hasNext { + if hn { + return true + } + } + + return false } func (c *consumer) setLastDequeuedMsg(msgID MessageID) error {