Skip to content

Commit

Permalink
Fix negative WaitGroup counter issue (#712)
Browse files Browse the repository at this point in the history
* Fix negative WaitGroup counter issue

Signed-off-by: xiaolongran <xiaolongran@tencent.com>

* use cas

Signed-off-by: xiaolongran <xiaolongran@tencent.com>

* Make sure the callback logic is atomic

Signed-off-by: xiaolongran <xiaolongran@tencent.com>
  • Loading branch information
wolfstudy committed Jan 20, 2022
1 parent a119bab commit 90305e8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 32 deletions.
6 changes: 3 additions & 3 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"

"go.uber.org/atomic"
uAtomic "go.uber.org/atomic"
)

var (
Expand Down Expand Up @@ -110,10 +110,10 @@ type partitionConsumer struct {

// this is needed for sending ConsumerMessage on the messageCh
parentConsumer Consumer
state atomic.Int32
state uAtomic.Int32
options *partitionConsumerOpts

conn atomic.Value
conn uAtomic.Value

topic string
name string
Expand Down
61 changes: 32 additions & 29 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"

ua "go.uber.org/atomic"
uAtomic "go.uber.org/atomic"
)

type producerState int32
Expand All @@ -60,12 +60,12 @@ var (
var errTopicNotFount = "TopicNotFound"

type partitionProducer struct {
state ua.Int32
state uAtomic.Int32
client *client
topic string
log log.Logger

conn atomic.Value
conn uAtomic.Value

options *ProducerOptions
producerName string
Expand Down Expand Up @@ -675,7 +675,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {

pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
if !ok {
fr.waitGroup.Done()
close(fr.doneCh)
return
}

Expand All @@ -688,35 +688,39 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
// The last item in the queue has been completed while we were
// looking at it. It's safe at this point to assume that every
// message enqueued before Flush() was called are now persisted
fr.waitGroup.Done()
close(fr.doneCh)
return
}

sendReq := &sendRequest{
msg: nil,
callback: func(id MessageID, message *ProducerMessage, e error) {
fr.err = e
fr.waitGroup.Done()
close(fr.doneCh)
},
}

pi.sendRequests = append(pi.sendRequests, sendReq)
}

func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
wg := sync.WaitGroup{}
wg.Add(1)

var err error
var msgID MessageID

// use atomic bool to avoid race
isDone := uAtomic.NewBool(false)
doneCh := make(chan struct{})

p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
err = e
msgID = ID
wg.Done()
if isDone.CAS(false, true) {
err = e
msgID = ID
close(doneCh)
}
}, true)

wg.Wait()
// wait for send request to finish
<-doneCh
return msgID, err
}

Expand Down Expand Up @@ -828,7 +832,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
}

func (p *partitionProducer) internalClose(req *closeProducer) {
defer req.waitGroup.Done()
defer close(req.doneCh)
if !p.casProducerState(producerReady, producerClosing) {
return
}
Expand Down Expand Up @@ -863,14 +867,15 @@ func (p *partitionProducer) LastSequenceID() int64 {
}

func (p *partitionProducer) Flush() error {
wg := sync.WaitGroup{}
wg.Add(1)

cp := &flushRequest{&wg, nil}
p.eventsChan <- cp
flushReq := &flushRequest{
doneCh: make(chan struct{}),
err: nil,
}
p.eventsChan <- flushReq

wg.Wait()
return cp.err
// wait for the flush request to complete
<-flushReq.doneCh
return flushReq.err
}

func (p *partitionProducer) getProducerState() producerState {
Expand All @@ -893,13 +898,11 @@ func (p *partitionProducer) Close() {
return
}

wg := sync.WaitGroup{}
wg.Add(1)

cp := &closeProducer{&wg}
cp := &closeProducer{doneCh: make(chan struct{})}
p.eventsChan <- cp

wg.Wait()
// wait for close producer request to complete
<-cp.doneCh
}

type sendRequest struct {
Expand All @@ -911,12 +914,12 @@ type sendRequest struct {
}

type closeProducer struct {
waitGroup *sync.WaitGroup
doneCh chan struct{}
}

type flushRequest struct {
waitGroup *sync.WaitGroup
err error
doneCh chan struct{}
err error
}

func (i *pendingItem) Complete() {
Expand Down

0 comments on commit 90305e8

Please sign in to comment.