Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 496] Support for correct reconnections limit in consumer #497

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ type Consumer interface {
// The list of MessageID instances of all the topics that the consumer subscribed
GetLastMessageIDs() ([]TopicMessageID, error)

// Closed returns a channel indicating that consumer is closed
Closed() <-chan struct{}

omnilight marked this conversation as resolved.
Show resolved Hide resolved
// Receive a single message.
// This calls blocks until a message is available.
Receive(context.Context) (Message, error)
Expand Down
12 changes: 12 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type consumer struct {
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
close func() // close will be assigned only after full initialization cycle will be ready
stopDiscovery func()

log log.Logger
Expand Down Expand Up @@ -288,6 +289,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
duration = defaultAutoDiscoveryDuration
}
consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)
consumer.close = consumer.closeInternal

consumer.metrics.ConsumersOpened.Inc()
return consumer, nil
Expand Down Expand Up @@ -496,6 +498,10 @@ func (c *consumer) unsubscribe(force bool) error {
return nil
}

func (c *consumer) Closed() <-chan struct{} {
return c.closeCh
}

func (c *consumer) GetLastMessageIDs() ([]TopicMessageID, error) {
ids := make([]TopicMessageID, 0)
for _, pc := range c.consumers {
Expand Down Expand Up @@ -674,6 +680,12 @@ func (c *consumer) NackID(msgID MessageID) {
}

func (c *consumer) Close() {
if c.close != nil {
c.close()
}
}

func (c *consumer) closeInternal() {
c.closeOnce.Do(func() {
c.stopDiscovery()

Expand Down
4 changes: 4 additions & 0 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (c *multiTopicConsumer) Unsubscribe() error {
return errs
}

func (c *multiTopicConsumer) Closed() <-chan struct{} {
return c.closeCh
}

func (c *multiTopicConsumer) UnsubscribeForce() error {
var errs error
for t, consumer := range c.consumers {
Expand Down
15 changes: 10 additions & 5 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,9 @@ func (pc *partitionConsumer) runEventsLoop() {
return
case connectionClosed := <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker(connectionClosed)
if !pc.reconnectToBroker(connectionClosed) {
pc.parentConsumer.Close()
}
}
}
}()
Expand Down Expand Up @@ -1679,7 +1681,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
close(pc.closeCh)
}

func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) bool {
var maxRetry int

if pc.options.maxReconnectToBroker == nil {
Expand All @@ -1697,7 +1699,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
pc.log.Info("consumer state not ready, exit reconnect")
return
return false
}

var assignedBrokerURL string
Expand All @@ -1722,14 +1724,14 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
pc.log.Info("consumer state not ready, exit reconnect")
return
return false
}

err := pc.grabConn(assignedBrokerURL)
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
return
return true
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
errMsg := err.Error()
Expand All @@ -1747,6 +1749,9 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
}

pc.log.Warn("Reached maximum number of reconnection attempts")
return false
}

func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {
Expand Down
4 changes: 4 additions & 0 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func (c *regexConsumer) Unsubscribe() error {
return errs
}

func (c *regexConsumer) Closed() <-chan struct{} {
return c.closeCh
}

func (c *regexConsumer) UnsubscribeForce() error {
var errs error
c.consumersLock.Lock()
Expand Down
Loading