From 4056ea7030e84d684c2bd9f948ed01a78535279e Mon Sep 17 00:00:00 2001 From: Daniel Ferstay Date: Sun, 26 Dec 2021 16:01:40 -0800 Subject: [PATCH] Remove races reading and updating the default router state Previously, we used atomic operations to read and update parts of the default router state. Unfortunately, the reads and updates could race under concurrent calls which leads to unnecessary clock reads and an associated slowdown in performance. Now, we use atomic addition to increment the message count and batch size. This removes the race condition by ensuring that each go-routine will have a unique messageCount, and hence only one will perform the clock read. Signed-off-by: Daniel Ferstay --- pulsar/default_router.go | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/pulsar/default_router.go b/pulsar/default_router.go index b5e24a6214..ea49ba33c2 100644 --- a/pulsar/default_router.go +++ b/pulsar/default_router.go @@ -18,7 +18,6 @@ package pulsar import ( - "math" "math/rand" "sync/atomic" "time" @@ -27,7 +26,7 @@ import ( type defaultRouter struct { currentPartitionCursor uint32 - lastChangeTimestamp int64 + lastBatchTimestamp int64 msgCounter uint32 cumulativeBatchSize uint32 } @@ -45,7 +44,7 @@ func NewDefaultRouter( disableBatching bool) func(*ProducerMessage, uint32) int { state := &defaultRouter{ currentPartitionCursor: rand.Uint32(), - lastChangeTimestamp: math.MinInt64, + lastBatchTimestamp: time.Now().UnixNano(), } readClockAfterNumMessages := uint32(maxBatchingMessages / 10) @@ -80,32 +79,27 @@ func NewDefaultRouter( // spread the data on different partitions but not necessarily in a specific sequence. var now int64 size := uint32(len(message.Payload)) - previousMessageCount := atomic.LoadUint32(&state.msgCounter) - previousBatchingMaxSize := atomic.LoadUint32(&state.cumulativeBatchSize) - previousLastChange := atomic.LoadInt64(&state.lastChangeTimestamp) + messageCount := atomic.AddUint32(&state.msgCounter, 1) + batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size) - messageCountReached := previousMessageCount >= uint32(maxBatchingMessages-1) - sizeReached := (size >= uint32(maxBatchingSize)-previousBatchingMaxSize) + messageCountReached := messageCount%uint32(maxBatchingMessages) == 0 + sizeReached := (batchSize >= uint32(maxBatchingSize)) durationReached := false - if readClockAfterNumMessages == 0 || previousMessageCount%readClockAfterNumMessages == 0 { + if readClockAfterNumMessages == 0 || messageCount%readClockAfterNumMessages == 0 { now = time.Now().UnixNano() - durationReached = now-previousLastChange >= maxBatchingDelay.Nanoseconds() + previousBatch := atomic.LoadInt64(&state.lastBatchTimestamp) + durationReached = now-previousBatch >= maxBatchingDelay.Nanoseconds() } if messageCountReached || sizeReached || durationReached { - atomic.AddUint32(&state.currentPartitionCursor, 1) - atomic.StoreUint32(&state.msgCounter, 0) + cursor := atomic.AddUint32(&state.currentPartitionCursor, 1) atomic.StoreUint32(&state.cumulativeBatchSize, 0) - if now != 0 { - atomic.StoreInt64(&state.lastChangeTimestamp, now) + if now == 0 { + now = time.Now().UnixNano() } - return int(state.currentPartitionCursor % numPartitions) + atomic.StoreInt64(&state.lastBatchTimestamp, now) + return int(cursor % numPartitions) } - atomic.AddUint32(&state.msgCounter, 1) - atomic.AddUint32(&state.cumulativeBatchSize, size) - if now != 0 { - atomic.StoreInt64(&state.lastChangeTimestamp, now) - } return int(state.currentPartitionCursor % numPartitions) } }