diff --git a/pulsar/default_router.go b/pulsar/default_router.go index b5e24a621..0b0d02013 100644 --- a/pulsar/default_router.go +++ b/pulsar/default_router.go @@ -20,6 +20,7 @@ package pulsar import ( "math" "math/rand" + "sync" "sync/atomic" "time" ) @@ -30,6 +31,7 @@ type defaultRouter struct { lastChangeTimestamp int64 msgCounter uint32 cumulativeBatchSize uint32 + sync.RWMutex } // NewDefaultRouter set the message routing mode for the partitioned producer. @@ -75,36 +77,37 @@ func NewDefaultRouter( // If there's no key, we do round-robin across partition, sticking with a given // partition for a certain amount of messages or volume buffered or the max delay to batch is reached so that // we ensure having a decent amount of batching of the messages. - // Note that it is possible that we skip more than one partition if multiple goroutines increment - // currentPartitionCursor at the same time. If that happens it shouldn't be a problem because we only want to - // spread the data on different partitions but not necessarily in a specific sequence. + // Use critical section to protect a group of counters and increment the currentPartitionCursor var now int64 + batchReached := false size := uint32(len(message.Payload)) - previousMessageCount := atomic.LoadUint32(&state.msgCounter) - previousBatchingMaxSize := atomic.LoadUint32(&state.cumulativeBatchSize) - previousLastChange := atomic.LoadInt64(&state.lastChangeTimestamp) + state.Lock() + defer state.Unlock() - messageCountReached := previousMessageCount >= uint32(maxBatchingMessages-1) - sizeReached := (size >= uint32(maxBatchingSize)-previousBatchingMaxSize) - durationReached := false - if readClockAfterNumMessages == 0 || previousMessageCount%readClockAfterNumMessages == 0 { + if state.msgCounter >= uint32(maxBatchingMessages-1) { + batchReached = true + } else if size >= (uint32(maxBatchingSize) - state.cumulativeBatchSize) { + batchReached = true + } else if readClockAfterNumMessages == 0 || state.msgCounter%readClockAfterNumMessages == 0 { now = time.Now().UnixNano() - durationReached = now-previousLastChange >= maxBatchingDelay.Nanoseconds() + batchReached = now-state.lastChangeTimestamp >= maxBatchingDelay.Nanoseconds() } - if messageCountReached || sizeReached || durationReached { - atomic.AddUint32(&state.currentPartitionCursor, 1) - atomic.StoreUint32(&state.msgCounter, 0) - atomic.StoreUint32(&state.cumulativeBatchSize, 0) + if batchReached { + // only the current partition cursor when the current batch is ready to flush + // so that we move to another partition for the next batch + state.currentPartitionCursor++ + state.msgCounter = 0 + state.cumulativeBatchSize = 0 if now != 0 { - atomic.StoreInt64(&state.lastChangeTimestamp, now) + state.lastChangeTimestamp = now } return int(state.currentPartitionCursor % numPartitions) } - atomic.AddUint32(&state.msgCounter, 1) - atomic.AddUint32(&state.cumulativeBatchSize, size) + state.msgCounter++ + state.cumulativeBatchSize += size if now != 0 { - atomic.StoreInt64(&state.lastChangeTimestamp, now) + state.lastChangeTimestamp = now } return int(state.currentPartitionCursor % numPartitions) }