Skip to content

Commit

Permalink
Issue #448 make state thread safe in consumer_partition and connection (
Browse files Browse the repository at this point in the history
#451)

* make state thread safe in consumer_partition and connection

* use uber atomic for all state management

* fix go lint error

* Apply suggestions from code review

Co-authored-by: Matteo Merli <mmerli@apache.org>
  • Loading branch information
zzzming and merlimat committed Jan 26, 2021
1 parent c24191b commit 58a327d
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 45 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/testify v1.4.0
github.com/yahoo/athenz v1.8.55
go.uber.org/atomic v1.7.0
)

replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
11 changes: 7 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -81,7 +79,6 @@ github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM=
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.8 h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY=
Expand Down Expand Up @@ -161,11 +158,14 @@ github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand All @@ -180,6 +180,7 @@ golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -205,13 +206,14 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand All @@ -220,6 +222,7 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down
50 changes: 33 additions & 17 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"

"go.uber.org/atomic"
)

var (
Expand All @@ -38,7 +40,8 @@ var (
type consumerState int

const (
consumerInit consumerState = iota
// consumer states
consumerInit = iota
consumerReady
consumerClosing
consumerClosed
Expand Down Expand Up @@ -86,7 +89,7 @@ type partitionConsumer struct {

// this is needed for sending ConsumerMessage on the messageCh
parentConsumer Consumer
state consumerState
state atomic.Int32
options *partitionConsumerOpts

conn internal.Connection
Expand Down Expand Up @@ -128,7 +131,6 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
messageCh chan ConsumerMessage, dlq *dlqRouter,
metrics *internal.TopicMetrics) (*partitionConsumer, error) {
pc := &partitionConsumer{
state: consumerInit,
parentConsumer: parent,
client: client,
options: options,
Expand All @@ -150,6 +152,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
metrics: metrics,
}
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
"name": pc.name,
"topic": options.topic,
Expand All @@ -165,7 +168,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
return nil, err
}
pc.log.Info("Created consumer")
pc.state = consumerReady
pc.setConsumerState(consumerReady)

if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
msgID, err := pc.requestGetLastMessageID()
Expand Down Expand Up @@ -203,12 +206,13 @@ func (pc *partitionConsumer) Unsubscribe() error {
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)

if pc.state == consumerClosed || pc.state == consumerClosing {
state := pc.getConsumerState()
if state == consumerClosed || state == consumerClosing {
pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
return
}

pc.state = consumerClosing
pc.setConsumerState(consumerClosing)
requestID := pc.client.rpcClient.NewRequestID()
cmdUnsubscribe := &pb.CommandUnsubscribe{
RequestId: proto.Uint64(requestID),
Expand All @@ -219,7 +223,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
pc.log.WithError(err).Error("Failed to unsubscribe consumer")
unsub.err = err
// Set the state to ready for closing the consumer
pc.state = consumerReady
pc.setConsumerState(consumerReady)
// Should'nt remove the consumer handler
return
}
Expand All @@ -229,7 +233,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
pc.nackTracker.Close()
}
pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
pc.state = consumerClosed
pc.setConsumerState(consumerClosed)
}

func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
Expand Down Expand Up @@ -309,8 +313,17 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
})
}

func (pc *partitionConsumer) getConsumerState() consumerState {
return consumerState(pc.state.Load())
}

func (pc *partitionConsumer) setConsumerState(state consumerState) {
pc.state.Store(int32(state))
}

func (pc *partitionConsumer) Close() {
if pc.state != consumerReady {

if pc.getConsumerState() != consumerReady {
return
}

Expand Down Expand Up @@ -339,7 +352,8 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
}

func (pc *partitionConsumer) requestSeek(msgID messageID) error {
if pc.state == consumerClosing || pc.state == consumerClosed {
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
pc.log.Error("Consumer was already closed")
return nil
}
Expand Down Expand Up @@ -382,7 +396,8 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error {
func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
defer close(seek.doneCh)

if pc.state == consumerClosing || pc.state == consumerClosed {
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
pc.log.Error("Consumer was already closed")
return
}
Expand Down Expand Up @@ -773,23 +788,24 @@ func (pc *partitionConsumer) runEventsLoop() {

func (pc *partitionConsumer) internalClose(req *closeRequest) {
defer close(req.doneCh)
if pc.state != consumerReady {
state := pc.getConsumerState()
if state != consumerReady {
// this might be redundant but to ensure nack tracker is closed
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
return
}

if pc.state == consumerClosed || pc.state == consumerClosing {
if state == consumerClosed || state == consumerClosing {
pc.log.Error("The consumer is closing or has been closed")
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
return
}

pc.state = consumerClosing
pc.setConsumerState(consumerClosing)
pc.log.Infof("Closing consumer=%d", pc.consumerID)

requestID := pc.client.rpcClient.NewRequestID()
Expand All @@ -808,7 +824,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
provider.Close()
}

pc.state = consumerClosed
pc.setConsumerState(consumerClosed)
pc.conn.DeleteConsumeHandler(pc.consumerID)
if pc.nackTracker != nil {
pc.nackTracker.Close()
Expand All @@ -829,7 +845,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
}

for maxRetry != 0 {
if pc.state != consumerReady {
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
return
}
Expand Down Expand Up @@ -947,7 +963,7 @@ func (pc *partitionConsumer) grabConn() error {
}

func (pc *partitionConsumer) clearQueueAndGetNextMessage() trackingMessageID {
if pc.state != consumerReady {
if pc.getConsumerState() != consumerReady {
return trackingMessageID{}
}
wg := &sync.WaitGroup{}
Expand Down
41 changes: 26 additions & 15 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"

ua "go.uber.org/atomic"
)

const (
Expand Down Expand Up @@ -85,9 +87,9 @@ type ConsumerHandler interface {
type connectionState int32

const (
connectionInit = 0
connectionReady = 1
connectionClosed = 2
connectionInit = iota
connectionReady
connectionClosed
)

func (s connectionState) String() string {
Expand Down Expand Up @@ -119,7 +121,7 @@ type incomingCmd struct {
type connection struct {
sync.Mutex
cond *sync.Cond
state int32
state ua.Int32
connectionTimeout time.Duration
closeOnce sync.Once

Expand Down Expand Up @@ -172,7 +174,6 @@ type connectionOptions struct {

func newConnection(opts connectionOptions) *connection {
cnx := &connection{
state: int32(connectionInit),
connectionTimeout: opts.connectionTimeout,
logicalAddr: opts.logicalAddr,
physicalAddr: opts.physicalAddr,
Expand All @@ -199,6 +200,7 @@ func newConnection(opts connectionOptions) *connection {
consumerHandlers: make(map[uint64]ConsumerHandler),
metrics: opts.metrics,
}
cnx.setState(connectionInit)
cnx.reader = newConnectionReader(cnx)
cnx.cond = sync.NewCond(cnx)
return cnx
Expand Down Expand Up @@ -316,9 +318,9 @@ func (c *connection) waitUntilReady() error {
c.Lock()
defer c.Unlock()

for c.state != connectionReady {
c.log.Debugf("Wait until connection is ready. State: %s", connectionState(c.state))
if c.state == connectionClosed {
for c.getState() != connectionReady {
c.log.Debugf("Wait until connection is ready. State: %s", c.getState().String())
if c.getState() == connectionClosed {
return errors.New("connection error")
}
// wait for a new connection state change
Expand Down Expand Up @@ -428,7 +430,7 @@ func (c *connection) WriteData(data Buffer) {
// 1. blocked, in which case we need to wait until we have space
// 2. the connection is already closed, then we need to bail out
c.log.Debug("Couldn't write on connection channel immediately")
state := connectionState(atomic.LoadInt32(&c.state))
state := c.getState()
if state != connectionReady {
c.log.Debug("Connection was already closed")
return
Expand Down Expand Up @@ -543,7 +545,7 @@ func (c *connection) Write(data Buffer) {

func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
if c.state == connectionClosed {
if c.getState() == connectionClosed {
callback(req, ErrConnectionClosed)
} else {
c.incomingRequestsCh <- &request{
Expand All @@ -555,7 +557,7 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
}

func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
if c.state == connectionClosed {
if c.getState() == connectionClosed {
return ErrConnectionClosed
}

Expand All @@ -573,7 +575,7 @@ func (c *connection) internalSendRequest(req *request) {
c.pendingReqs[*req.id] = req
}
c.pendingLock.Unlock()
if c.state == connectionClosed {
if c.getState() == connectionClosed {
c.log.Warnf("internalSendRequest failed for connectionClosed")
if req.callback != nil {
req.callback(req.cmd, ErrConnectionClosed)
Expand Down Expand Up @@ -757,12 +759,13 @@ func (c *connection) Close() {

c.cond.Broadcast()

if c.state == connectionClosed {
if c.getState() == connectionClosed {
return
}

c.log.Info("Connection closed")
c.state = connectionClosed
// do not use changeState() since they share the same lock
c.setState(connectionClosed)
c.TriggerClose()
c.pingTicker.Stop()
c.pingCheckTicker.Stop()
Expand All @@ -787,11 +790,19 @@ func (c *connection) Close() {

func (c *connection) changeState(state connectionState) {
c.Lock()
atomic.StoreInt32(&c.state, int32(state))
c.setState(state)
c.cond.Broadcast()
c.Unlock()
}

func (c *connection) getState() connectionState {
return connectionState(c.state.Load())
}

func (c *connection) setState(state connectionState) {
c.state.Store(int32(state))
}

func (c *connection) newRequestID() uint64 {
return atomic.AddUint64(&c.requestIDGenerator, 1)
}
Expand Down
Loading

0 comments on commit 58a327d

Please sign in to comment.