Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock when connection closed #376

Merged
merged 7 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ type partitionConsumer struct {
startMessageID trackingMessageID
lastDequeuedMsg trackingMessageID

eventsCh chan interface{}
connectedCh chan struct{}
closeCh chan struct{}
clearQueueCh chan func(id trackingMessageID)
eventsCh chan interface{}
connectedCh chan struct{}
connectClosedCh chan struct{}
closeCh chan struct{}
clearQueueCh chan func(id trackingMessageID)

nackTracker *negativeAcksTracker
dlq *dlqRouter
Expand All @@ -180,6 +181,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
startMessageID: options.startMessageID,
connectedCh: make(chan struct{}),
messageCh: messageCh,
connectClosedCh: make(chan struct{}),
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
Expand Down Expand Up @@ -564,7 +566,8 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) b

func (pc *partitionConsumer) ConnectionClosed() {
// Trigger reconnection in the consumer goroutine
pc.eventsCh <- &connectionClosed{}
pc.log.Debug("connection closed and send to connectClosedCh")
pc.connectClosedCh <- connectionClosed{}
}

// Flow command gives additional permits to send messages to the consumer.
Expand Down Expand Up @@ -731,6 +734,20 @@ func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
}()
pc.log.Debug("get into runEventsLoop")

go func() {
for {
select {
case <-pc.closeCh:
return
case <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker()
}
}
}()

for {
select {
case <-pc.closeCh:
Expand All @@ -749,8 +766,6 @@ func (pc *partitionConsumer) runEventsLoop() {
pc.internalSeek(v)
case *seekByTimeRequest:
pc.internalSeekByTime(v)
case *connectionClosed:
pc.reconnectToBroker()
case *closeRequest:
pc.internalClose(v)
return
Expand Down
2 changes: 2 additions & 0 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var ErrCorruptedMessage = errors.New("corrupted message")
// ErrEOM is the error returned by ReadMessage when no more input is available.
var ErrEOM = errors.New("EOF")

var ErrConnectionClosed = errors.New("connection closed")

func NewMessageReader(headersAndPayload Buffer) *MessageReader {
return &MessageReader{
buffer: headersAndPayload,
Expand Down
54 changes: 36 additions & 18 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type ConnectionListener interface {
// Connection is a interface of client cnx.
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand)
SendRequestNoWait(req *pb.BaseCommand) error
WriteData(data Buffer)
RegisterListener(id uint64, listener ConnectionListener)
UnregisterListener(id uint64)
Expand All @@ -110,21 +110,15 @@ type ConsumerHandler interface {
type connectionState int32

const (
connectionInit = 0
connectionConnecting = 1
connectionTCPConnected = 2
connectionReady = 3
connectionClosed = 4
connectionInit = 0
connectionReady = 1
connectionClosed = 2
)

func (s connectionState) String() string {
switch s {
case connectionInit:
return "Initializing"
case connectionConnecting:
return "Connecting"
case connectionTCPConnected:
return "TCPConnected"
case connectionReady:
return "Ready"
case connectionClosed:
Expand Down Expand Up @@ -277,8 +271,6 @@ func (c *connection) connect() bool {
c.log.Info("TCP connection established")
c.Unlock()

c.changeState(connectionTCPConnected)

return true
}

Expand Down Expand Up @@ -349,11 +341,20 @@ func (c *connection) waitUntilReady() error {
return nil
}

func (c *connection) failLeftRequestsWhenClose() {
for req := range c.incomingRequestsCh {
c.internalSendRequest(req)
}
close(c.incomingRequestsCh)
}

func (c *connection) run() {
// All reads come from the reader goroutine
go c.reader.readFromConnection()
go c.runPingCheck()

c.log.Debugf("Connection run start channel %+v, requestLength %d", c, len(c.incomingRequestsCh))

defer func() {
// all the accesses to the pendingReqs should be happened in this run loop thread,
// including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239
Expand All @@ -370,6 +371,7 @@ func (c *connection) run() {
for {
select {
case <-c.closeCh:
c.failLeftRequestsWhenClose()
return

case req := <-c.incomingRequestsCh:
Expand Down Expand Up @@ -553,19 +555,28 @@ func (c *connection) Write(data Buffer) {

func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
c.incomingRequestsCh <- &request{
id: &requestID,
cmd: req,
callback: callback,
if c.state == connectionClosed {
callback(req, ErrConnectionClosed)
} else {
c.incomingRequestsCh <- &request{
id: &requestID,
cmd: req,
callback: callback,
}
}
}

func (c *connection) SendRequestNoWait(req *pb.BaseCommand) {
func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error {
if c.state == connectionClosed {
return ErrConnectionClosed
}

c.incomingRequestsCh <- &request{
id: nil,
cmd: req,
callback: nil,
}
return nil
}

func (c *connection) internalSendRequest(req *request) {
Expand All @@ -574,7 +585,14 @@ func (c *connection) internalSendRequest(req *request) {
c.pendingReqs[*req.id] = req
}
c.pendingLock.Unlock()
c.writeCommand(req.cmd)
if c.state == connectionClosed {
c.log.Warnf("internalSendRequest failed for connectionClosed")
if req.callback != nil {
req.callback(req.cmd, ErrConnectionClosed)
}
} else {
c.writeCommand(req.cmd)
}
}

func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
Expand Down
3 changes: 2 additions & 1 deletion pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType
return nil, nil
}

func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
assert.Fail(c.t, "Shouldn't be called")
return nil
}

func responseType(r pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupTopicResponse_LookupType {
Expand Down
7 changes: 3 additions & 4 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type RPCClient interface {
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message)
RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error

RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
}
Expand Down Expand Up @@ -103,7 +103,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
}
ch := make(chan Res, 10)

// TODO: in here, the error of callback always nil
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
ch <- Res{&RPCResult{
Cnx: cnx,
Expand Down Expand Up @@ -162,9 +161,9 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.Ba
return rpcResult, rpcErr
}

func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) {
func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error {
rpcRequestCount.Inc()
cnx.SendRequestNoWait(baseCommand(cmdType, message))
return cnx.SendRequestNoWait(baseCommand(cmdType, message))
}

func (c *rpcClient) NewRequestID() uint64 {
Expand Down