Skip to content

Commit

Permalink
[fix] Avoid a data race when flushing with load (#1261)
Browse files Browse the repository at this point in the history
Fixes #1258

### Motivation

While flushing, the data channel is switched if a new allocated one which can cause the loss of messages because the length can be zero which would stop the procedure and at the same time a new message can be sent to the channel.

### Modifications

Instead of allocating a new channel, it empties the existing one up to the length of the buffer of the channel before proceeding with the flush.

(cherry picked from commit 8dd4ed1)
  • Loading branch information
Gilthoniel authored and RobertIndie committed Jul 31, 2024
1 parent 53fc938 commit e44bd04
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,6 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
p.schemaCache.Put(p.schemaInfo, schemaVersion)
}

if err != nil {
return err
}

if !p.options.DisableBatching && p.batchBuilder == nil {
provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType)
if err != nil {
Expand Down Expand Up @@ -1022,15 +1018,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
}

func (p *partitionProducer) internalFlush(fr *flushRequest) {
// clear all the messages which have sent to dataChan before flush
if len(p.dataChan) != 0 {
oldDataChan := p.dataChan
p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages)
for len(oldDataChan) != 0 {
pendingData := <-oldDataChan
p.internalSend(pendingData)
}
}
p.clearPendingSendRequests()

if !p.options.DisableBatching {
p.internalFlushCurrentBatch()
Expand Down Expand Up @@ -1061,6 +1049,25 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
}
}

// clearPendingSendRequests makes sure to push forward previous sending requests
// by emptying the data channel.
func (p *partitionProducer) clearPendingSendRequests() {
sizeBeforeFlushing := len(p.dataChan)

// Bound the for loop to the current length of the channel to ensure that it
// will eventually stop as we only want to ensure that existing messages are
// flushed.
for i := 0; i < sizeBeforeFlushing; i++ {
select {
case pendingData := <-p.dataChan:
p.internalSend(pendingData)

default:
return
}
}
}

func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
var err error
var msgID MessageID
Expand Down

0 comments on commit e44bd04

Please sign in to comment.