Skip to content

Commit

Permalink
Fix deadlock when connection closed (#376)
Browse files Browse the repository at this point in the history
Fixes #366

### Motivation

In current code of `pulsar/internal/connection.go` we have 2 channels, closeCh and incomingRequestsCh. when the connection closes, the current mis-use of these 2 channels may have a deadlock. 
PR #366 has detailed steps to reproduce and the root cause [analysis](#366 (comment)) .
This PR tries to fix the deadlock.

### Modifications
- make the close logic independent, not in the same loop of normal events handling.
- when the connection closed, handle the existing requests in the channel and return an error to avoid deadlock.

### Verifying this change
passed the tests in #366 
current ut passed
  • Loading branch information
jiazhai committed Oct 9, 2020
1 parent 3ab75cd commit c03f45f
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 64 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
github.com/kr/pretty v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
Expand Down
36 changes: 24 additions & 12 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ type partitionConsumer struct {
startMessageID trackingMessageID
lastDequeuedMsg trackingMessageID

eventsCh chan interface{}
connectedCh chan struct{}
closeCh chan struct{}
clearQueueCh chan func(id trackingMessageID)
eventsCh chan interface{}
connectedCh chan struct{}
connectClosedCh chan connectionClosed
closeCh chan struct{}
clearQueueCh chan func(id trackingMessageID)

nackTracker *negativeAcksTracker
dlq *dlqRouter
Expand All @@ -174,12 +175,13 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 3),
eventsCh: make(chan interface{}, 10),
queueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: options.startMessageID,
connectedCh: make(chan struct{}),
messageCh: messageCh,
connectClosedCh: make(chan connectionClosed, 10),
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
Expand Down Expand Up @@ -566,7 +568,8 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) b

func (pc *partitionConsumer) ConnectionClosed() {
// Trigger reconnection in the consumer goroutine
pc.eventsCh <- &connectionClosed{}
pc.log.Debug("connection closed and send to connectClosedCh")
pc.connectClosedCh <- connectionClosed{}
}

// Flow command gives additional permits to send messages to the consumer.
Expand Down Expand Up @@ -733,11 +736,22 @@ func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
}()
pc.log.Debug("get into runEventsLoop")

go func() {
for {
select {
case <-pc.closeCh:
return
case <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker()
}
}
}()

for {
select {
case <-pc.closeCh:
return
case i := <-pc.eventsCh:
for i := range pc.eventsCh {
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
Expand All @@ -751,8 +765,6 @@ func (pc *partitionConsumer) runEventsLoop() {
pc.internalSeek(v)
case *seekByTimeRequest:
pc.internalSeekByTime(v)
case *connectionClosed:
pc.reconnectToBroker()
case *closeRequest:
pc.internalClose(v)
return
Expand Down
28 changes: 3 additions & 25 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,11 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
if err != nil {
t.Fatal(err)
}

rc.discover()
time.Sleep(300 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)

consumers = cloneConsumers(rc)
assert.Equal(t, 1, len(consumers))

// delete the topic
if err := deleteTopic(topic); err != nil {
t.Fatal(err)
}

rc.discover()
time.Sleep(300 * time.Millisecond)

consumers = cloneConsumers(rc)
assert.Equal(t, 0, len(consumers))
}

func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string) {
Expand Down Expand Up @@ -228,7 +216,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
defer deleteTopic(myTopic)

rc.discover()
time.Sleep(300 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)

consumers = cloneConsumers(rc)
assert.Equal(t, 0, len(consumers))
Expand All @@ -241,20 +229,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
}

rc.discover()
time.Sleep(300 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)

consumers = cloneConsumers(rc)
assert.Equal(t, 1, len(consumers))

// delete the topic
err = deleteTopic(fooTopic)
assert.Nil(t, err)

rc.discover()
time.Sleep(300 * time.Millisecond)

consumers = cloneConsumers(rc)
assert.Equal(t, 0, len(consumers))
}

func TestRegexConsumer(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var ErrCorruptedMessage = errors.New("corrupted message")
// ErrEOM is the error returned by ReadMessage when no more input is available.
var ErrEOM = errors.New("EOF")

var ErrConnectionClosed = errors.New("connection closed")

func NewMessageReader(headersAndPayload Buffer) *MessageReader {
return &MessageReader{
buffer: headersAndPayload,
Expand Down
54 changes: 36 additions & 18 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type ConnectionListener interface {
// Connection is a interface of client cnx.
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand)
SendRequestNoWait(req *pb.BaseCommand) error
WriteData(data Buffer)
RegisterListener(id uint64, listener ConnectionListener)
UnregisterListener(id uint64)
Expand All @@ -110,21 +110,15 @@ type ConsumerHandler interface {
type connectionState int32

const (
connectionInit = 0
connectionConnecting = 1
connectionTCPConnected = 2
connectionReady = 3
connectionClosed = 4
connectionInit = 0
connectionReady = 1
connectionClosed = 2
)

func (s connectionState) String() string {
switch s {
case connectionInit:
return "Initializing"
case connectionConnecting:
return "Connecting"
case connectionTCPConnected:
return "TCPConnected"
case connectionReady:
return "Ready"
case connectionClosed:
Expand Down Expand Up @@ -286,8 +280,6 @@ func (c *connection) connect() bool {
c.log.Info("TCP connection established")
c.Unlock()

c.changeState(connectionTCPConnected)

return true
}

Expand Down Expand Up @@ -358,11 +350,20 @@ func (c *connection) waitUntilReady() error {
return nil
}

func (c *connection) failLeftRequestsWhenClose() {
for req := range c.incomingRequestsCh {
c.internalSendRequest(req)
}
close(c.incomingRequestsCh)
}

func (c *connection) run() {
// All reads come from the reader goroutine
go c.reader.readFromConnection()
go c.runPingCheck()

c.log.Debugf("Connection run start channel %+v, requestLength %d", c, len(c.incomingRequestsCh))

defer func() {
// all the accesses to the pendingReqs should be happened in this run loop thread,
// including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239
Expand All @@ -379,6 +380,7 @@ func (c *connection) run() {
for {
select {
case <-c.closeCh:
c.failLeftRequestsWhenClose()
return

case req := <-c.incomingRequestsCh:
Expand Down Expand Up @@ -563,19 +565,28 @@ func (c *connection) Write(data Buffer) {

func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
c.incomingRequestsCh <- &request{
id: &requestID,
cmd: req,
callback: callback,
if c.state == connectionClosed {
callback(req, ErrConnectionClosed)
} else {
c.incomingRequestsCh <- &request{
id: &requestID,
cmd: req,
callback: callback,
}
}
}

func (c *connection) SendRequestNoWait(req *pb.BaseCommand) {
func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
if c.state == connectionClosed {
return ErrConnectionClosed
}

c.incomingRequestsCh <- &request{
id: nil,
cmd: req,
callback: nil,
}
return nil
}

func (c *connection) internalSendRequest(req *request) {
Expand All @@ -584,7 +595,14 @@ func (c *connection) internalSendRequest(req *request) {
c.pendingReqs[*req.id] = req
}
c.pendingLock.Unlock()
c.writeCommand(req.cmd)
if c.state == connectionClosed {
c.log.Warnf("internalSendRequest failed for connectionClosed")
if req.callback != nil {
req.callback(req.cmd, ErrConnectionClosed)
}
} else {
c.writeCommand(req.cmd)
}
}

func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
Expand Down
3 changes: 2 additions & 1 deletion pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType
return nil, nil
}

func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
assert.Fail(c.t, "Shouldn't be called")
return nil
}

func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupTopicResponse_LookupType {
Expand Down
7 changes: 3 additions & 4 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type RPCClient interface {
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message)
RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error

RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
}
Expand Down Expand Up @@ -103,7 +103,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
}
ch := make(chan Res, 10)

// TODO: in here, the error of callback always nil
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
ch <- Res{&RPCResult{
Cnx: cnx,
Expand Down Expand Up @@ -162,9 +161,9 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.Ba
return rpcResult, rpcErr
}

func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
rpcRequestCount.Inc()
cnx.SendRequestNoWait(baseCommand(cmdType, message))
return cnx.SendRequestNoWait(baseCommand(cmdType, message))
}

func (c *rpcClient) NewRequestID() uint64 {
Expand Down
10 changes: 6 additions & 4 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type partitionProducer struct {
// Channel where app is posting messages to be published
eventsChan chan interface{}

connectClosedCh chan connectionClosed

publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
lastSequenceID int64
Expand Down Expand Up @@ -133,6 +135,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
connectClosedCh: make(chan connectionClosed, 10),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
Expand Down Expand Up @@ -236,7 +239,7 @@ func (p *partitionProducer) GetBuffer() internal.Buffer {
func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
p.eventsChan <- &connectionClosed{}
p.connectClosedCh <- connectionClosed{}
}

func (p *partitionProducer) reconnectToBroker() {
Expand Down Expand Up @@ -267,15 +270,14 @@ func (p *partitionProducer) runEventsLoop() {
switch v := i.(type) {
case *sendRequest:
p.internalSend(v)
case *connectionClosed:
p.reconnectToBroker()
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}

case <-p.connectClosedCh:
p.reconnectToBroker()
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
Expand Down

0 comments on commit c03f45f

Please sign in to comment.