From 545f57916abadecf0208bf9ee98c94bcf05ce98f Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 11 Apr 2022 16:37:55 +0800 Subject: [PATCH] fix: add service not ready check --- pulsar/internal/connection.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index a025abf14b..0f8f172029 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -522,10 +522,13 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd) case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE: + partitionMetadataResponse := cmd.PartitionMetadataResponse + c.checkServerError(partitionMetadataResponse.Error) c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd) case pb.BaseCommand_LOOKUP_RESPONSE: lookupResult := cmd.LookupTopicResponse + c.checkServerError(lookupResult.Error) c.handleResponse(lookupResult.GetRequestId(), cmd) case pb.BaseCommand_CONSUMER_STATS_RESPONSE: @@ -574,6 +577,16 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl } } +func (c *connection) checkServerError(err *pb.ServerError) { + if err == nil { + return + } + + if *err == pb.ServerError_ServiceNotReady { + c.Close() + } +} + func (c *connection) Write(data Buffer) { c.writeRequestsCh <- data }