diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 5c038aa53..f5fd493b2 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -338,10 +338,6 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { p.schemaCache.Put(p.schemaInfo, schemaVersion) } - if err != nil { - return err - } - if !p.options.DisableBatching && p.batchBuilder == nil { provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType) if err != nil { @@ -1022,15 +1018,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() { } func (p *partitionProducer) internalFlush(fr *flushRequest) { - // clear all the messages which have sent to dataChan before flush - if len(p.dataChan) != 0 { - oldDataChan := p.dataChan - p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages) - for len(oldDataChan) != 0 { - pendingData := <-oldDataChan - p.internalSend(pendingData) - } - } + p.clearPendingSendRequests() if !p.options.DisableBatching { p.internalFlushCurrentBatch() @@ -1061,6 +1049,25 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { } } +// clearPendingSendRequests makes sure to push forward previous sending requests +// by emptying the data channel. +func (p *partitionProducer) clearPendingSendRequests() { + sizeBeforeFlushing := len(p.dataChan) + + // Bound the for loop to the current length of the channel to ensure that it + // will eventually stop as we only want to ensure that existing messages are + // flushed. + for i := 0; i < sizeBeforeFlushing; i++ { + select { + case pendingData := <-p.dataChan: + p.internalSend(pendingData) + + default: + return + } + } +} + func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) { var err error var msgID MessageID