Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Reconnection logic and Backoff policy doesn't work correctly #1197

Merged
merged 18 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions pulsar/internal/backoff.go → pulsar/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package internal
package backoff
nodece marked this conversation as resolved.
Show resolved Hide resolved

import (
"math/rand"
Expand All @@ -26,10 +26,17 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// BackoffPolicy parameterize the following options in the reconnection logic to
// Policy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
type BackoffPolicy interface {
type Policy interface {
// Next returns the delay to wait before next retry
Next() time.Duration

// IsMaxBackoffReached evaluates if the max number of retries is reached
IsMaxBackoffReached() bool

// Reset the backoff to the initial state
Reset()
}

// DefaultBackoff computes the delay before retrying an action.
Expand All @@ -38,6 +45,13 @@ type DefaultBackoff struct {
backoff time.Duration
}

func NewDefaultBackoff() Policy {
return &DefaultBackoff{}
}
func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy {
return &DefaultBackoff{backoff: backoff / 2}
}

const maxBackoff = 60 * time.Second

// Next returns the delay to wait before next retry
Expand All @@ -61,3 +75,7 @@ func (b *DefaultBackoff) Next() time.Duration {
func (b *DefaultBackoff) IsMaxBackoffReached() bool {
return b.backoff >= maxBackoff
}

func (b *DefaultBackoff) Reset() {
b.backoff = 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package internal
package backoff

import (
"testing"
Expand Down Expand Up @@ -58,4 +58,6 @@ func TestBackoff_NextMaxValue(t *testing.T) {
assert.Equal(t, true, backoff.IsMaxBackoffReached())
// max value is 60 seconds + 20% jitter = 72 seconds
assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second))
backoff.Reset()
assert.Equal(t, false, backoff.IsMaxBackoffReached())
}
15 changes: 7 additions & 8 deletions pulsar/blue_green_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (suite *BlueGreenMigrationTestSuite) TestTopicMigration() {
for _, scenario := range []topicUnloadTestCase{

{
testCaseName: "proxyConnection",
blueAdminURL: "http://localhost:8080",
testCaseName: "proxyConnection",
blueAdminURL: "http://localhost:8080",
blueClientUrl: "pulsar://localhost:6650",
greenAdminURL: "http://localhost:8081",
migrationBody: `
Expand Down Expand Up @@ -83,17 +83,17 @@ func testTopicMigrate(
migrationBody string) {
runtime.GOMAXPROCS(1)
const (
cluster = "cluster-a"
cluster = "cluster-a"
tenant = utils.PUBLICTENANT
namespace = utils.DEFAULTNAMESPACE

blueBroker1URL = "pulsar://broker-1:6650"
blueBroker2URL = "pulsar://broker-2:6650"
blueBroker1URL = "pulsar://broker-1:6650"
blueBroker2URL = "pulsar://broker-2:6650"
greenBroker1URL = "pulsar://green-broker-1:6650"
greenBroker2URL = "pulsar://green-broker-2:6650"

blueBroker1LookupURL = "broker-1:8080"
blueBroker2LookupURL = "broker-2:8080"
blueBroker1LookupURL = "broker-1:8080"
blueBroker2LookupURL = "broker-2:8080"
greenBroker1LookupURL = "green-broker-1:8080"
greenBroker2LookupURL = "green-broker-2:8080"
)
Expand Down Expand Up @@ -234,7 +234,6 @@ func testTopicMigrate(
req.NoError(err)
req.NotEmpty(bundleRange)


unloadURL := fmt.Sprintf(
"/admin/v2/namespaces/%s/%s/%s/unload?destinationBroker=%s",
tenant, namespace, bundleRange, dstTopicBrokerLookupURL)
Expand Down
6 changes: 3 additions & 3 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"
)

// ConsumerMessage represents a pair of a Consumer and Message.
Expand Down Expand Up @@ -207,9 +207,9 @@ type ConsumerOptions struct {
// MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint

// BackoffPolicy parameterize the following options in the reconnection logic to
// BackOffPolicyFunc parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
BackOffPolicyFunc func() backoff.Policy

// Decryption represents the encryption related fields required by the consumer to decrypt a message.
Decryption *MessageDecryptionInfo
Expand Down
7 changes: 4 additions & 3 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,12 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}
}

dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log)
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name,
options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, client.log)
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -453,7 +454,7 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu
readCompacted: options.ReadCompacted,
interceptors: options.Interceptors,
maxReconnectToBroker: options.MaxReconnectToBroker,
backoffPolicy: options.BackoffPolicy,
backOffPolicyFunc: options.BackOffPolicyFunc,
keySharedPolicy: options.KeySharedPolicy,
schema: options.Schema,
decryption: options.Decryption,
Expand Down
41 changes: 23 additions & 18 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"google.golang.org/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/crypto"
Expand Down Expand Up @@ -110,7 +112,7 @@ type partitionConsumerOpts struct {
disableForceTopicCreation bool
interceptors ConsumerInterceptors
maxReconnectToBroker *uint
backoffPolicy internal.BackoffPolicy
backOffPolicyFunc func() backoff.Policy
keySharedPolicy *KeySharedPolicy
schema Schema
decryption *MessageDecryptionInfo
Expand Down Expand Up @@ -182,6 +184,7 @@ type partitionConsumer struct {
lastMessageInBroker *trackingMessageID

redirectedClusterURI string
backoffPolicyFunc func() backoff.Policy
}

func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
Expand Down Expand Up @@ -318,6 +321,13 @@ func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
messageCh chan ConsumerMessage, dlq *dlqRouter,
metrics *internal.LeveledMetrics) (*partitionConsumer, error) {
var boFunc func() backoff.Policy
if options.backOffPolicyFunc != nil {
boFunc = options.backOffPolicyFunc
} else {
boFunc = backoff.NewDefaultBackoff
}

pc := &partitionConsumer{
parentConsumer: parent,
client: client,
Expand All @@ -339,6 +349,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
backoffPolicyFunc: boFunc,
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
Expand Down Expand Up @@ -581,12 +592,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
}
remainTime := pc.client.operationTimeout
var backoff internal.BackoffPolicy
if pc.options.backoffPolicy != nil {
backoff = pc.options.backoffPolicy
} else {
backoff = &internal.DefaultBackoff{}
}
bo := pc.backoffPolicyFunc()
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
Expand All @@ -604,7 +610,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
pc.log.WithError(err).Error("Failed to getLastMessageID")
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
}
nextDelay := backoff.Next()
nextDelay := bo.Next()
if nextDelay > remainTime {
nextDelay = remainTime
}
Expand Down Expand Up @@ -1684,18 +1690,17 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}

func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
var maxRetry int
var (
maxRetry int
delayReconnectTime, totalDelayReconnectTime time.Duration
)

if pc.options.maxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*pc.options.maxReconnectToBroker)
}

var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
bo := pc.backoffPolicyFunc()

for maxRetry != 0 {
if pc.getConsumerState() != consumerReady {
Expand All @@ -1710,11 +1715,10 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Attempt connecting to the assigned broker just once
} else if pc.options.backoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = pc.options.backoffPolicy.Next()
delayReconnectTime = bo.Next()
}
totalDelayReconnectTime += delayReconnectTime
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved

pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
Expand All @@ -1733,6 +1737,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
bo.Reset()
return
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
Expand All @@ -1747,7 +1752,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
if maxRetry == 0 || bo.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
}
Expand Down
10 changes: 6 additions & 4 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
Name: "regex-consumer",
}

dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer",
nil, log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -198,8 +199,9 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
Name: "regex-consumer",
}

dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer",
nil, log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
Expand Down
16 changes: 10 additions & 6 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
Expand Down Expand Up @@ -3874,12 +3876,14 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {

topicName := newTopicName()

backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
bo := newTestBackoffPolicy(1*time.Second, 4*time.Second)
_consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
BackoffPolicy: backoff,
BackOffPolicyFunc: func() backoff.Policy {
return bo
},
})
assert.Nil(t, err)
defer _consumer.Close()
Expand All @@ -3888,22 +3892,22 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {
// 1 s
startTime := time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
assert.True(t, bo.IsExpectedIntervalFrom(startTime))

// 2 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
assert.True(t, bo.IsExpectedIntervalFrom(startTime))

// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
assert.True(t, bo.IsExpectedIntervalFrom(startTime))

// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
assert.True(t, bo.IsExpectedIntervalFrom(startTime))
}

func TestAckWithMessageID(t *testing.T) {
Expand Down
Loading
Loading