Skip to content

Commit

Permalink
fix: add service not ready check
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Apr 11, 2022
1 parent ae400ca commit 545f579
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,13 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)

case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
partitionMetadataResponse := cmd.PartitionMetadataResponse
c.checkServerError(partitionMetadataResponse.Error)
c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd)

case pb.BaseCommand_LOOKUP_RESPONSE:
lookupResult := cmd.LookupTopicResponse
c.checkServerError(lookupResult.Error)
c.handleResponse(lookupResult.GetRequestId(), cmd)

case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
Expand Down Expand Up @@ -574,6 +577,16 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
}
}

func (c *connection) checkServerError(err *pb.ServerError) {
if err == nil {
return
}

if *err == pb.ServerError_ServiceNotReady {
c.Close()
}
}

func (c *connection) Write(data Buffer) {
c.writeRequestsCh <- data
}
Expand Down

0 comments on commit 545f579

Please sign in to comment.