From 07504344d27aa2df483bf381c5ef0e02b52a72d5 Mon Sep 17 00:00:00 2001 From: zengguan <916028390@qq.com> Date: Wed, 14 Jun 2023 17:12:48 +0800 Subject: [PATCH 1/2] fix: split sendRequest and make reconnectToBroker and other operate in the same coroutine --- pulsar/producer_partition.go | 34 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index c4a460eed..381360efd 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -84,7 +84,8 @@ type partitionProducer struct { compressionProvider compression.Provider // Channel where app is posting messages to be published - eventsChan chan interface{} + dataChan chan *sendRequest + cmdChan chan interface{} closeCh chan struct{} connectClosedCh chan connectionClosed @@ -150,7 +151,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions log: logger, options: options, producerID: client.rpcClient.NewProducerID(), - eventsChan: make(chan interface{}, maxPendingMessages), + dataChan: make(chan *sendRequest, maxPendingMessages), + cmdChan: make(chan interface{}, 10), connectClosedCh: make(chan connectionClosed, 10), closeCh: make(chan struct{}), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), @@ -438,31 +440,21 @@ func (p *partitionProducer) reconnectToBroker() { } func (p *partitionProducer) runEventsLoop() { - go func() { - for { - select { - case <-p.closeCh: - p.log.Info("close producer, exit reconnect") - return - case <-p.connectClosedCh: - p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker() - } - } - }() - for { select { - case i := <-p.eventsChan: + case data := <-p.dataChan: + p.internalSend(data) + case i := <-p.cmdChan: switch v := i.(type) { - case *sendRequest: - p.internalSend(v) case *flushRequest: p.internalFlush(v) case *closeProducer: p.internalClose(v) return } + case <-p.connectClosedCh: + p.log.Info("runEventsLoop will reconnect in producer") + p.reconnectToBroker() case <-p.batchFlushTicker.C: p.internalFlushCurrentBatch() } @@ -1165,7 +1157,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer } p.options.Interceptors.BeforeSend(p, msg) - p.eventsChan <- sr + p.dataChan <- sr if !p.options.DisableBlockIfQueueFull { // block if queue full @@ -1317,7 +1309,7 @@ func (p *partitionProducer) Flush() error { doneCh: make(chan struct{}), err: nil, } - p.eventsChan <- flushReq + p.cmdChan <- flushReq // wait for the flush request to complete <-flushReq.doneCh @@ -1345,7 +1337,7 @@ func (p *partitionProducer) Close() { } cp := &closeProducer{doneCh: make(chan struct{})} - p.eventsChan <- cp + p.cmdChan <- cp // wait for close producer request to complete <-cp.doneCh From 8902e21842fae7836b01b77a976816e31743fd4b Mon Sep 17 00:00:00 2001 From: zengguan <916028390@qq.com> Date: Thu, 15 Jun 2023 11:57:34 +0800 Subject: [PATCH 2/2] remove closeCh after not used --- pulsar/producer_partition.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 381360efd..6bd90818e 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -86,7 +86,6 @@ type partitionProducer struct { // Channel where app is posting messages to be published dataChan chan *sendRequest cmdChan chan interface{} - closeCh chan struct{} connectClosedCh chan connectionClosed publishSemaphore internal.Semaphore @@ -154,7 +153,6 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions dataChan: make(chan *sendRequest, maxPendingMessages), cmdChan: make(chan interface{}, 10), connectClosedCh: make(chan connectionClosed, 10), - closeCh: make(chan struct{}), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType), compression.Level(options.CompressionLevel)), @@ -1296,8 +1294,6 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.setProducerState(producerClosed) p._getConn().UnregisterListener(p.producerID) p.batchFlushTicker.Stop() - - close(p.closeCh) } func (p *partitionProducer) LastSequenceID() int64 {