Skip to content

Commit

Permalink
Remove races reading and updating the default router state
Browse files Browse the repository at this point in the history
Previously, we used atomic operations to read and update parts of the
default router state.  Unfortunately, the reads and updates could race
under concurrent calls which leads to unnecessary clock reads and an
associated slowdown in performance.

Now, we use atomic addition to increment the message count and batch size.
This removes the race condition by ensuring that each go-routine will have
a unique messageCount, and hence only one will perform the clock read.

Signed-off-by: Daniel Ferstay <dferstay@splunk.com>
  • Loading branch information
Daniel Ferstay committed Dec 27, 2021
1 parent d5d4903 commit 4056ea7
Showing 1 changed file with 14 additions and 20 deletions.
34 changes: 14 additions & 20 deletions pulsar/default_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package pulsar

import (
"math"
"math/rand"
"sync/atomic"
"time"
Expand All @@ -27,7 +26,7 @@ import (
type defaultRouter struct {
currentPartitionCursor uint32

lastChangeTimestamp int64
lastBatchTimestamp int64
msgCounter uint32
cumulativeBatchSize uint32
}
Expand All @@ -45,7 +44,7 @@ func NewDefaultRouter(
disableBatching bool) func(*ProducerMessage, uint32) int {
state := &defaultRouter{
currentPartitionCursor: rand.Uint32(),
lastChangeTimestamp: math.MinInt64,
lastBatchTimestamp: time.Now().UnixNano(),
}

readClockAfterNumMessages := uint32(maxBatchingMessages / 10)
Expand Down Expand Up @@ -80,32 +79,27 @@ func NewDefaultRouter(
// spread the data on different partitions but not necessarily in a specific sequence.
var now int64
size := uint32(len(message.Payload))
previousMessageCount := atomic.LoadUint32(&state.msgCounter)
previousBatchingMaxSize := atomic.LoadUint32(&state.cumulativeBatchSize)
previousLastChange := atomic.LoadInt64(&state.lastChangeTimestamp)
messageCount := atomic.AddUint32(&state.msgCounter, 1)
batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size)

messageCountReached := previousMessageCount >= uint32(maxBatchingMessages-1)
sizeReached := (size >= uint32(maxBatchingSize)-previousBatchingMaxSize)
messageCountReached := messageCount%uint32(maxBatchingMessages) == 0
sizeReached := (batchSize >= uint32(maxBatchingSize))
durationReached := false
if readClockAfterNumMessages == 0 || previousMessageCount%readClockAfterNumMessages == 0 {
if readClockAfterNumMessages == 0 || messageCount%readClockAfterNumMessages == 0 {
now = time.Now().UnixNano()
durationReached = now-previousLastChange >= maxBatchingDelay.Nanoseconds()
previousBatch := atomic.LoadInt64(&state.lastBatchTimestamp)
durationReached = now-previousBatch >= maxBatchingDelay.Nanoseconds()
}
if messageCountReached || sizeReached || durationReached {
atomic.AddUint32(&state.currentPartitionCursor, 1)
atomic.StoreUint32(&state.msgCounter, 0)
cursor := atomic.AddUint32(&state.currentPartitionCursor, 1)
atomic.StoreUint32(&state.cumulativeBatchSize, 0)
if now != 0 {
atomic.StoreInt64(&state.lastChangeTimestamp, now)
if now == 0 {
now = time.Now().UnixNano()
}
return int(state.currentPartitionCursor % numPartitions)
atomic.StoreInt64(&state.lastBatchTimestamp, now)
return int(cursor % numPartitions)
}

atomic.AddUint32(&state.msgCounter, 1)
atomic.AddUint32(&state.cumulativeBatchSize, size)
if now != 0 {
atomic.StoreInt64(&state.lastChangeTimestamp, now)
}
return int(state.currentPartitionCursor % numPartitions)
}
}

0 comments on commit 4056ea7

Please sign in to comment.