Skip to content

Commit

Permalink
Fix producer goroutine leak
Browse files Browse the repository at this point in the history
Per Producer there is a goroutine leaked. This goroutine is used for the
partition auto-discovery. This will ensure the goroutine has an exit
condition and is cleanup after the `Close()` of a producer.
  • Loading branch information
simonswine committed Jul 23, 2020
1 parent b9f8c5c commit 1c0e10c
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type producer struct {
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
tickerStop chan struct{}

log *log.Entry
}
Expand Down Expand Up @@ -119,11 +120,17 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
}

p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval)
p.tickerStop = make(chan struct{})

go func() {
for range p.ticker.C {
p.log.Debug("Auto discovering new partitions")
p.internalCreatePartitionsProducers()
for {
select {
case <-p.ticker.C:
p.log.Debug("Auto discovering new partitions")
p.internalCreatePartitionsProducers()
case <-p.tickerStop:
return
}
}
}()

Expand Down Expand Up @@ -282,6 +289,8 @@ func (p *producer) Close() {
defer p.RUnlock()
if p.ticker != nil {
p.ticker.Stop()
close(p.tickerStop)
p.ticker = nil
}

for _, pp := range p.producers {
Expand Down

0 comments on commit 1c0e10c

Please sign in to comment.