From fe05e224237ca02ab817f8217baafe3845d12e80 Mon Sep 17 00:00:00 2001 From: PGarule Date: Mon, 30 May 2022 16:19:23 +0530 Subject: [PATCH 1/5] seek and every partition of topic and check for error --- pulsar/consumer_impl.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index e88753861..69d6509b9 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -589,11 +589,20 @@ func (c *consumer) Seek(msgID MessageID) error { func (c *consumer) SeekByTime(time time.Time) error { c.Lock() defer c.Unlock() - if len(c.consumers) > 1 { - return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions") + errChan := make(chan error, len(c.consumers)) + // run SeekByTime on every partition of topic + for _, cons := range c.consumers { + errChan <- cons.SeekByTime(time) } - return c.consumers[0].SeekByTime(time) + // check if there are any errors on running SeekByTime on every partition of topic + for err := range errChan { + if err != nil { + return newError(SeekFailed, err.Error()) + } + } + + return nil } var r = &random{ From 118151c08eba77f0bca7b9b08c445cb080b1bbe7 Mon Sep 17 00:00:00 2001 From: PGarule Date: Mon, 30 May 2022 17:52:11 +0530 Subject: [PATCH 2/5] use array to store errors instead of channles --- pulsar/consumer_impl.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 69d6509b9..c1da73927 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -589,14 +589,14 @@ func (c *consumer) Seek(msgID MessageID) error { func (c *consumer) SeekByTime(time time.Time) error { c.Lock() defer c.Unlock() - errChan := make(chan error, len(c.consumers)) + errorSeek := make([]error, len(c.consumers)) // run SeekByTime on every partition of topic - for _, cons := range c.consumers { - errChan <- cons.SeekByTime(time) + for i, cons := range c.consumers { + errorSeek[i] = cons.SeekByTime(time) } // check if there are any errors on running SeekByTime on every partition of topic - for err := range errChan { + for _, err := range errorSeek { if err != nil { return newError(SeekFailed, err.Error()) } From afadb2449f817b4267c827fa10b72815028aa5d3 Mon Sep 17 00:00:00 2001 From: PGarule Date: Mon, 30 May 2022 18:27:50 +0530 Subject: [PATCH 3/5] add test case to test seek by time on partitioned topic --- pulsar/consumer_test.go | 73 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 036688472..20d429072 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -3052,3 +3052,76 @@ func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) { assert.NotNil(t, msg) consumer.Ack(msg) } + +// TestConsumerSeekByTimeOnPartitionedTopic test seek by time on partitioned topic. +// It is based on existing test case [TestConsumerSeekByTime] but for partitioned topic. +func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + // Create topic with 5 partitions + topicAdminURL := "admin/v2/persistent/public/default/TestSeekByTimeOnPartitionedTopic/partitions" + err = httpPut(topicAdminURL, 5) + defer httpDelete(topicAdminURL) + assert.Nil(t, err) + + topicName := "persistent://public/default/TestSeekByTimeOnPartitionedTopic" + + partitions, err := client.TopicPartitions(topicName) + assert.Nil(t, err) + assert.Equal(t, len(partitions), 5) + for i := 0; i < 5; i++ { + assert.Equal(t, partitions[i], + fmt.Sprintf("%s-partition-%d", topicName, i)) + } + + ctx := context.Background() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "my-sub", + }) + assert.Nil(t, err) + defer consumer.Close() + + // Use value bigger than 1000 to full-fill queue channel with size 1000 and message channel with size 10 + const N = 1100 + resetTimeStr := "100s" + retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr) + assert.Nil(t, err) + + for i := 0; i < N; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.Nil(t, err) + } + + // Don't consume all messages so some stay in queues + for i := 0; i < N-20; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + consumer.Ack(msg) + } + + currentTimestamp := time.Now() + err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond)) + assert.Nil(t, err) + + // should be able to consume all messages once again + for i := 0; i < N; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + consumer.Ack(msg) + } +} From 35902b2f7c2d00050550887a19188292ac01a591 Mon Sep 17 00:00:00 2001 From: PGarule Date: Wed, 1 Jun 2022 21:01:51 +0530 Subject: [PATCH 4/5] wrap errors --- pulsar/consumer_impl.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index c1da73927..7380b1e77 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -30,6 +30,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" + pkgerrors "github.com/pkg/errors" ) const defaultNackRedeliveryDelay = 1 * time.Minute @@ -595,14 +596,16 @@ func (c *consumer) SeekByTime(time time.Time) error { errorSeek[i] = cons.SeekByTime(time) } + var errs error // check if there are any errors on running SeekByTime on every partition of topic for _, err := range errorSeek { if err != nil { - return newError(SeekFailed, err.Error()) + msg := fmt.Sprintf("unable to SeekByTime for topic=%s partition=%s", c.topic, c.Subscription()) + errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg) } } - return nil + return errs } var r = &random{ From 48e3f05b97ac3ffb79b2ed1ca395b64e620972e1 Mon Sep 17 00:00:00 2001 From: PGarule Date: Wed, 1 Jun 2022 21:06:51 +0530 Subject: [PATCH 5/5] refactor method --- pulsar/consumer_impl.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 7380b1e77..2328ca882 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -590,21 +590,14 @@ func (c *consumer) Seek(msgID MessageID) error { func (c *consumer) SeekByTime(time time.Time) error { c.Lock() defer c.Unlock() - errorSeek := make([]error, len(c.consumers)) - // run SeekByTime on every partition of topic - for i, cons := range c.consumers { - errorSeek[i] = cons.SeekByTime(time) - } - var errs error - // check if there are any errors on running SeekByTime on every partition of topic - for _, err := range errorSeek { - if err != nil { - msg := fmt.Sprintf("unable to SeekByTime for topic=%s partition=%s", c.topic, c.Subscription()) + // run SeekByTime on every partition of topic + for _, cons := range c.consumers { + if err := cons.SeekByTime(time); err != nil { + msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", c.topic, c.Subscription()) errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg) } } - return errs }