Skip to content

Commit

Permalink
asynchronized send timeout checking (#460)
Browse files Browse the repository at this point in the history
Fixes #458

### Motivation

For current producer, send timeout checking triggered by interval batch flush
If connection closed, the producer eventloop will blocked to reconnect to broker, lead to batch flush and send timeout checking take no effective, java-client timer did effective in this situation

### Modifications

Asynchronized send timeout by running in independent goroutine until producer closed, and without a pending queue lock

### Verifying this change

- [x] Make sure that the change passes the CI checks.

### Others

Without pending queue lock, the send timeout checking  gets more complicated, I don't know if it's worth it for performance.
  • Loading branch information
wuYin committed Feb 9, 2021
1 parent 6548277 commit 0e76dc2
Showing 1 changed file with 81 additions and 37 deletions.
118 changes: 81 additions & 37 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
p.setProducerState(producerReady)

if p.options.SendTimeout > 0 {
go p.failTimeoutMessages()
}
go p.runEventsLoop()

return p, nil
Expand Down Expand Up @@ -427,10 +430,6 @@ type pendingItem struct {
}

func (p *partitionProducer) internalFlushCurrentBatch() {
if p.options.SendTimeout > 0 {
p.failTimeoutMessages()
}

batchData, sequenceID, callbacks := p.batchBuilder.Flush()
if batchData == nil {
return
Expand All @@ -446,46 +445,91 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
}

func (p *partitionProducer) failTimeoutMessages() {
// since Closing/Closed connection couldn't be reopen, load and compare is safe
state := p.getProducerState()
if state == producerClosing || state == producerClosed {
return
diff := func(sentAt time.Time) time.Duration {
return p.options.SendTimeout - time.Since(sentAt)
}

item := p.pendingQueue.Peek()
if item == nil {
// pending queue is empty
return
}
t := time.NewTimer(p.options.SendTimeout)
defer t.Stop()

pi := item.(*pendingItem)
if time.Since(pi.sentAt) < p.options.SendTimeout {
// pending messages not timeout yet
return
}
for range t.C {
state := p.getProducerState()
if state == producerClosing || state == producerClosed {
return
}

item := p.pendingQueue.Peek()
if item == nil {
// pending queue is empty
t.Reset(p.options.SendTimeout)
continue
}
oldestItem := item.(*pendingItem)
if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
// none of these pending messages have timed out, wait and retry
t.Reset(nextWaiting)
continue
}

p.log.Infof("Failing %d messages", p.pendingQueue.Size())
for p.pendingQueue.Size() > 0 {
pi = p.pendingQueue.Poll().(*pendingItem)
pi.Lock()
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
p.publishSemaphore.Release()
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.metrics.PublishErrorsTimeout.Inc()
p.log.WithError(errSendTimeout).
WithField("size", size).
WithField("properties", sr.msg.Properties)
// since pending queue is not thread safe because of there is no global iteration lock
// to control poll from pending queue, current goroutine and connection receipt handler
// iterate pending queue at the same time, this maybe a performance trade-off
// see https://github.com/apache/pulsar-client-go/pull/301
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
// double check
t.Reset(p.options.SendTimeout)
continue
}
p.log.Infof("Failing %d messages", viewSize)
lastViewItem := curViewItems[viewSize-1].(*pendingItem)

// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
item := p.pendingQueue.Poll()
if item == nil {
t.Reset(p.options.SendTimeout)
break
}
if sr.callback != nil {
sr.callback(nil, sr.msg, errSendTimeout)

pi := item.(*pendingItem)
pi.Lock()
if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
// current and subsequent items not timeout yet, stop iterating
t.Reset(nextWaiting)
pi.Unlock()
break
}

for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
p.publishSemaphore.Release()
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.metrics.PublishErrorsTimeout.Inc()
p.log.WithError(errSendTimeout).
WithField("size", size).
WithField("properties", sr.msg.Properties)
}
if sr.callback != nil {
sr.callback(nil, sr.msg, errSendTimeout)
}
}

// flag the send has completed with error, flush make no effect
pi.completed = true
buffersPool.Put(pi.batchData)
pi.Unlock()

// finally reached the last view item, current iteration ends
if pi == lastViewItem {
t.Reset(p.options.SendTimeout)
break
}
}
buffersPool.Put(pi.batchData)
pi.Unlock()
}
}

Expand Down

0 comments on commit 0e76dc2

Please sign in to comment.