Skip to content

Commit

Permalink
Fixed pooled buffer lifecycle (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Jun 26, 2020
1 parent f7ca912 commit ba79ab0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 39 deletions.
19 changes: 6 additions & 13 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ const (
DefaultMaxMessagesPerBatch = 1000
)

type ConnectionHolder interface {
GetConnection() Connection
type BuffersPool interface {
GetBuffer() Buffer
}

// BatchBuilder wraps the objects needed to build a batch.
Expand All @@ -62,13 +62,13 @@ type BatchBuilder struct {
callbacks []interface{}

compressionProvider compression.Provider
cnxHolder ConnectionHolder
buffersPool BuffersPool
}

// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
cnxHolder ConnectionHolder) (*BatchBuilder, error) {
bufferPool BuffersPool) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
Expand All @@ -91,7 +91,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
},
callbacks: []interface{}{},
compressionProvider: getCompressionProvider(compressionType, level),
cnxHolder: cnxHolder,
buffersPool: bufferPool,
}

if compressionType != pb.CompressionType_NONE {
Expand Down Expand Up @@ -169,14 +169,7 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks
uncompressedSize := bb.buffer.ReadableBytes()
bb.msgMetadata.UncompressedSize = &uncompressedSize

cnx := bb.cnxHolder.GetConnection()
var buffer Buffer
if cnx == nil {
buffer = NewBuffer(int(uncompressedSize))
} else {
buffer = cnx.GetBufferFromPool()
}

buffer := bb.buffersPool.GetBuffer()
serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)

callbacks = bb.callbacks
Expand Down
16 changes: 0 additions & 16 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ type Connection interface {
DeleteConsumeHandler(id uint64)
ID() string
GetMaxMessageSize() int32
GetBufferFromPool() Buffer
Close()
}

Expand Down Expand Up @@ -161,8 +160,6 @@ type connection struct {
auth auth.Provider

maxMessageSize int32

buffersPool sync.Pool
}

func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
Expand Down Expand Up @@ -193,11 +190,6 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO
writeRequestsCh: make(chan Buffer, 256),
listeners: make(map[uint64]ConnectionListener),
consumerHandlers: make(map[uint64]ConsumerHandler),
buffersPool: sync.Pool{
New: func() interface{} {
return NewBuffer(1024)
},
},
}
cnx.reader = newConnectionReader(cnx)
cnx.cond = sync.NewCond(cnx)
Expand Down Expand Up @@ -352,8 +344,6 @@ func (c *connection) run() {
return
}
c.internalWriteData(data)
// Return buffer to the pool since we're now done using it
c.buffersPool.Put(data)

case <-c.pingTicker.C:
c.sendPing()
Expand Down Expand Up @@ -778,9 +768,3 @@ func (c *connection) ID() string {
func (c *connection) GetMaxMessageSize() int32 {
return c.maxMessageSize
}

func (c *connection) GetBufferFromPool() Buffer {
b := c.buffersPool.Get().(Buffer)
b.Clear()
return b
}
30 changes: 20 additions & 10 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type partitionProducer struct {
batchFlushTicker *time.Ticker

// Channel where app is posting messages to be published
eventsChan chan interface{}
eventsChan chan interface{}
buffersPool sync.Pool

publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
Expand All @@ -88,13 +89,18 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
}

p := &partitionProducer{
state: producerInit,
log: log.WithField("topic", topic),
client: client,
topic: topic,
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
state: producerInit,
log: log.WithField("topic", topic),
client: client,
topic: topic,
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
buffersPool: sync.Pool{
New: func() interface{} {
return internal.NewBuffer(1024)
},
},
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
Expand Down Expand Up @@ -181,8 +187,10 @@ func (p *partitionProducer) grabCnx() error {

type connectionClosed struct{}

func (p *partitionProducer) GetConnection() internal.Connection {
return p.cnx
func (p *partitionProducer) GetBuffer() internal.Buffer {
b := p.buffersPool.Get().(internal.Buffer)
b.Clear()
return b
}

func (p *partitionProducer) ConnectionClosed() {
Expand Down Expand Up @@ -442,6 +450,8 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)

// Mark this pending item as done
pi.completed = true
// Return buffer to the pool since we're now done using it
p.buffersPool.Put(pi.batchData)
}

func (p *partitionProducer) internalClose(req *closeProducer) {
Expand Down

0 comments on commit ba79ab0

Please sign in to comment.