Skip to content

Commit

Permalink
Fix data race when accessing partition producer state (#215)
Browse files Browse the repository at this point in the history
Fix data race when accessing partition producer state
  • Loading branch information
cornelk committed May 18, 2020
1 parent e1c3822 commit bc647c6
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,16 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
)

type producerState int

const (
producerInit producerState = iota
// producer states
producerInit int32 = iota
producerReady
producerClosing
producerClosed
)

type partitionProducer struct {
state producerState
state int32
client *client
topic string
log *log.Entry
Expand Down Expand Up @@ -107,7 +106,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions

p.log = p.log.WithField("producer_name", p.producerName)
p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
p.state = producerReady
atomic.StoreInt32(&p.state, producerReady)

go p.runEventsLoop()

Expand Down Expand Up @@ -181,7 +180,7 @@ func (p *partitionProducer) ConnectionClosed() {
func (p *partitionProducer) reconnectToBroker() {
backoff := internal.Backoff{}
for {
if p.state != producerReady {
if atomic.LoadInt32(&p.state) != producerReady {
// Producer is already closing
return
}
Expand Down Expand Up @@ -439,11 +438,10 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)

func (p *partitionProducer) internalClose(req *closeProducer) {
defer req.waitGroup.Done()
if p.state != producerReady {
if !atomic.CompareAndSwapInt32(&p.state, producerReady, producerClosing) {
return
}

p.state = producerClosing
p.log.Info("Closing producer")

id := p.client.rpcClient.NewRequestID()
Expand All @@ -458,7 +456,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.log.Info("Closed producer")
}

p.state = producerClosed
atomic.StoreInt32(&p.state, producerClosed)
p.cnx.UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()
}
Expand All @@ -479,7 +477,7 @@ func (p *partitionProducer) Flush() error {
}

func (p *partitionProducer) Close() {
if p.state != producerReady {
if atomic.LoadInt32(&p.state) != producerReady {
// Producer is closing
return
}
Expand Down

0 comments on commit bc647c6

Please sign in to comment.