From 6ce5421b07e23ff252ba5aaa754e1a0a5bab4cc5 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 23 Feb 2024 15:49:08 +0800 Subject: [PATCH] [feat] Support partitioned topic reader (#1178) Master Issue: #1177 ### Motivation Currently, there is an issue with the reader implementation. If the reader is creating, it won't get the topic metadata from the topic. The reader can only read messages from a single topic. If the topic is a partitioned topic, the reader won't know that and will try to create a non-partition topic with the same name. And it will lead to this issue: https://github.com/apache/pulsar/issues/22032 ### Modifications - Support partitioned topic reader (cherry picked from commit 3b9b1f8895d8924ec98db4612806b9871f1d135b) --- pulsar/consumer.go | 7 +++ pulsar/consumer_impl.go | 47 ++++++++++++++- pulsar/consumer_partition.go | 36 +++++++++++ pulsar/reader.go | 1 + pulsar/reader_impl.go | 112 +++++++++++++++-------------------- pulsar/reader_test.go | 100 +++++++++++++++++++++++++++---- 6 files changed, 227 insertions(+), 76 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 667bff66c..fea94cf6a 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -246,6 +246,13 @@ type ConsumerOptions struct { // SubscriptionMode specifies the subscription mode to be used when subscribing to a topic. // Default is `Durable` SubscriptionMode SubscriptionMode + + // StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included. + // Default is `false` and the consumer will start from the "next" message + StartMessageIDInclusive bool + + // startMessageID specifies the message id to start from. Currently, it's only used for the reader internally. + startMessageID *trackingMessageID } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 75d839b41..0c31a1aaf 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -384,7 +384,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { metadata: metadata, subProperties: subProperties, replicateSubscriptionState: c.options.ReplicateSubscriptionState, - startMessageID: nil, + startMessageID: c.options.startMessageID, + startMessageIDInclusive: c.options.StartMessageIDInclusive, subscriptionMode: c.options.SubscriptionMode, readCompacted: c.options.ReadCompacted, interceptors: c.options.Interceptors, @@ -707,6 +708,50 @@ func (c *consumer) checkMsgIDPartition(msgID MessageID) error { return nil } +func (c *consumer) hasNext() bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure all paths cancel the context to avoid context leak + + var wg sync.WaitGroup + wg.Add(len(c.consumers)) + + hasNext := make(chan bool) + for _, pc := range c.consumers { + pc := pc + go func() { + defer wg.Done() + if pc.hasNext() { + select { + case hasNext <- true: + case <-ctx.Done(): + } + } + }() + } + + go func() { + wg.Wait() + close(hasNext) // Close the channel after all goroutines have finished + }() + + // Wait for either a 'true' result or for all goroutines to finish + for hn := range hasNext { + if hn { + return true + } + } + + return false +} + +func (c *consumer) setLastDequeuedMsg(msgID MessageID) error { + if err := c.checkMsgIDPartition(msgID); err != nil { + return err + } + c.consumers[msgID.PartitionIdx()].lastDequeuedMsg = toTrackingMessageID(msgID) + return nil +} + var r = &random{ R: rand.New(rand.NewSource(time.Now().UnixNano())), } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fd6441c1c..95b5bc094 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -174,6 +174,8 @@ type partitionConsumer struct { chunkedMsgCtxMap *chunkedMsgCtxMap unAckChunksTracker *unAckChunksTracker ackGroupingTracker ackGroupingTracker + + lastMessageInBroker *trackingMessageID } func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) { @@ -1970,6 +1972,40 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, pc.availablePermits.inc() } +func (pc *partitionConsumer) hasNext() bool { + if pc.lastMessageInBroker != nil && pc.hasMoreMessages() { + return true + } + + for { + lastMsgID, err := pc.getLastMessageID() + if err != nil { + pc.log.WithError(err).Error("Failed to get last message id from broker") + continue + } else { + pc.lastMessageInBroker = lastMsgID + break + } + } + + return pc.hasMoreMessages() +} + +func (pc *partitionConsumer) hasMoreMessages() bool { + if pc.lastDequeuedMsg != nil { + return pc.lastMessageInBroker.isEntryIDValid() && pc.lastMessageInBroker.greater(pc.lastDequeuedMsg.messageID) + } + + if pc.options.startMessageIDInclusive { + return pc.lastMessageInBroker.isEntryIDValid() && + pc.lastMessageInBroker.greaterEqual(pc.startMessageID.get().messageID) + } + + // Non-inclusive + return pc.lastMessageInBroker.isEntryIDValid() && + pc.lastMessageInBroker.greater(pc.startMessageID.get().messageID) +} + // _setConn sets the internal connection field of this partition consumer atomically. // Note: should only be called by this partition consumer when a new connection is available. func (pc *partitionConsumer) _setConn(conn internal.Connection) { diff --git a/pulsar/reader.go b/pulsar/reader.go index 5e1a73b98..1c5235d42 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -136,5 +136,6 @@ type Reader interface { SeekByTime(time time.Time) error // GetLastMessageID get the last message id available for consume. + // It only works for single topic reader. It will return an error when the reader is the multi-topic reader. GetLastMessageID() (MessageID, error) } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 7b260b88d..bf91c67fa 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -34,12 +34,11 @@ const ( type reader struct { sync.Mutex - client *client - pc *partitionConsumer - messageCh chan ConsumerMessage - lastMessageInBroker *trackingMessageID - log log.Logger - metrics *internal.LeveledMetrics + client *client + messageCh chan ConsumerMessage + log log.Logger + metrics *internal.LeveledMetrics + c *consumer } func newReader(client *client, options ReaderOptions) (Reader, error) { @@ -98,25 +97,25 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { options.ExpireTimeOfIncompleteChunk = time.Minute } - consumerOptions := &partitionConsumerOpts{ - topic: options.Topic, - consumerName: options.Name, - subscription: subscriptionName, - subscriptionType: Exclusive, - receiverQueueSize: receiverQueueSize, + consumerOptions := &ConsumerOptions{ + Topic: options.Topic, + Name: options.Name, + SubscriptionName: subscriptionName, + Type: Exclusive, + ReceiverQueueSize: receiverQueueSize, + SubscriptionMode: NonDurable, + ReadCompacted: options.ReadCompacted, + Properties: options.Properties, + NackRedeliveryDelay: defaultNackRedeliveryDelay, + ReplicateSubscriptionState: false, + Decryption: options.Decryption, + Schema: options.Schema, + BackoffPolicy: options.BackoffPolicy, + MaxPendingChunkedMessage: options.MaxPendingChunkedMessage, + ExpireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, + AutoAckIncompleteChunk: options.AutoAckIncompleteChunk, startMessageID: startMessageID, - startMessageIDInclusive: options.StartMessageIDInclusive, - subscriptionMode: NonDurable, - readCompacted: options.ReadCompacted, - metadata: options.Properties, - nackRedeliveryDelay: defaultNackRedeliveryDelay, - replicateSubscriptionState: false, - decryption: options.Decryption, - schema: options.Schema, - backoffPolicy: options.BackoffPolicy, - maxPendingChunkedMessage: options.MaxPendingChunkedMessage, - expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, - autoAckIncompleteChunk: options.AutoAckIncompleteChunk, + StartMessageIDInclusive: options.StartMessageIDInclusive, } reader := &reader{ @@ -131,20 +130,25 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { if err != nil { return nil, err } + // Provide dummy rlq router with not dlq policy + rlq, err := newRetryRouter(client, nil, false, client.log) + if err != nil { + return nil, err + } - pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq, reader.metrics) + c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false) if err != nil { close(reader.messageCh) return nil, err } + reader.c = c - reader.pc = pc reader.metrics.ReadersOpened.Inc() return reader, nil } func (r *reader) Topic() string { - return r.pc.topic + return r.c.topic } func (r *reader) Next(ctx context.Context) (Message, error) { @@ -158,9 +162,14 @@ func (r *reader) Next(ctx context.Context) (Message, error) { // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, // it will specify the subscription position anyway msgID := cm.Message.ID() - mid := toTrackingMessageID(msgID) - r.pc.lastDequeuedMsg = mid - r.pc.AckID(mid) + err := r.c.setLastDequeuedMsg(msgID) + if err != nil { + return nil, err + } + err = r.c.AckID(msgID) + if err != nil { + return nil, err + } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() @@ -169,41 +178,11 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } func (r *reader) HasNext() bool { - if r.lastMessageInBroker != nil && r.hasMoreMessages() { - return true - } - - for { - lastMsgID, err := r.pc.getLastMessageID() - if err != nil { - r.log.WithError(err).Error("Failed to get last message id from broker") - continue - } else { - r.lastMessageInBroker = lastMsgID - break - } - } - - return r.hasMoreMessages() -} - -func (r *reader) hasMoreMessages() bool { - if r.pc.lastDequeuedMsg != nil { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID) - } - - if r.pc.options.startMessageIDInclusive { - return r.lastMessageInBroker.isEntryIDValid() && - r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.get().messageID) - } - - // Non-inclusive - return r.lastMessageInBroker.isEntryIDValid() && - r.lastMessageInBroker.greater(r.pc.startMessageID.get().messageID) + return r.c.hasNext() } func (r *reader) Close() { - r.pc.Close() + r.c.Close() r.client.handlers.Del(r) r.metrics.ReadersClosed.Inc() } @@ -235,16 +214,19 @@ func (r *reader) Seek(msgID MessageID) error { return nil } - return r.pc.Seek(mid) + return r.c.Seek(mid) } func (r *reader) SeekByTime(time time.Time) error { r.Lock() defer r.Unlock() - return r.pc.SeekByTime(time) + return r.c.SeekByTime(time) } func (r *reader) GetLastMessageID() (MessageID, error) { - return r.pc.getLastMessageID() + if len(r.c.consumers) > 1 { + return nil, fmt.Errorf("GetLastMessageID is not supported for multi-topics reader") + } + return r.c.consumers[0].getLastMessageID() } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index c8228a7ca..ccf52875b 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -24,6 +24,9 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/apache/pulsar-client-go/pulsaradmin" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/google/uuid" "github.com/stretchr/testify/assert" ) @@ -90,10 +93,10 @@ func TestReaderConfigChunk(t *testing.T) { defer r1.Close() // verify specified chunk options - pcOpts := r1.(*reader).pc.options - assert.Equal(t, 50, pcOpts.maxPendingChunkedMessage) - assert.Equal(t, 30*time.Second, pcOpts.expireTimeOfIncompleteChunk) - assert.True(t, pcOpts.autoAckIncompleteChunk) + pcOpts := r1.(*reader).c.options + assert.Equal(t, 50, pcOpts.MaxPendingChunkedMessage) + assert.Equal(t, 30*time.Second, pcOpts.ExpireTimeOfIncompleteChunk) + assert.True(t, pcOpts.AutoAckIncompleteChunk) r2, err := client.CreateReader(ReaderOptions{ Topic: "my-topic2", @@ -103,10 +106,10 @@ func TestReaderConfigChunk(t *testing.T) { defer r2.Close() // verify default chunk options - pcOpts = r2.(*reader).pc.options - assert.Equal(t, 100, pcOpts.maxPendingChunkedMessage) - assert.Equal(t, time.Minute, pcOpts.expireTimeOfIncompleteChunk) - assert.False(t, pcOpts.autoAckIncompleteChunk) + pcOpts = r2.(*reader).c.options + assert.Equal(t, 100, pcOpts.MaxPendingChunkedMessage) + assert.Equal(t, time.Minute, pcOpts.ExpireTimeOfIncompleteChunk) + assert.False(t, pcOpts.AutoAckIncompleteChunk) } func TestReader(t *testing.T) { @@ -153,6 +156,50 @@ func TestReader(t *testing.T) { } } +func TestReaderOnPartitionedTopic(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + assert.Nil(t, createPartitionedTopic(topic, 3)) + ctx := context.Background() + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + defer reader.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + } +} + func TestReaderConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://invalid-hostname:6650", @@ -422,7 +469,6 @@ func TestReaderHasNext(t *testing.T) { assert.NotNil(t, msgID) } - // create reader on 5th message (not included) reader, err := client.CreateReader(ReaderOptions{ Topic: topic, StartMessageID: EarliestMessageID(), @@ -880,7 +926,7 @@ func TestReaderWithBackoffPolicy(t *testing.T) { assert.NotNil(t, _reader) assert.Nil(t, err) - partitionConsumerImp := _reader.(*reader).pc + partitionConsumerImp := _reader.(*reader).c.consumers[0] // 1 s startTime := time.Now() partitionConsumerImp.reconnectToBroker() @@ -943,3 +989,37 @@ func TestReaderGetLastMessageID(t *testing.T) { assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID()) assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) } + +func TestReaderGetLastMessageIDOnMultiTopics(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + assert.Nil(t, createPartitionedTopic(topic, 3)) + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + _, err = reader.GetLastMessageID() + assert.NotNil(t, err) +} + +func createPartitionedTopic(topic string, n int) error { + admin, err := pulsaradmin.NewClient(&config.Config{}) + if err != nil { + return err + } + + topicName, err := utils.GetTopicName(topic) + if err != nil { + return err + } + err = admin.Topics().Create(*topicName, n) + if err != nil { + return err + } + return nil +}