Skip to content

Commit

Permalink
refinement to avoid skipping partitions if multiple goroutines increm…
Browse files Browse the repository at this point in the history
…ent the cursor
  • Loading branch information
Daniel Ferstay committed Jan 2, 2022
1 parent 02463d6 commit a659bfb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
33 changes: 19 additions & 14 deletions pulsar/default_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,38 @@ func NewDefaultRouter(
// If there's no key, we do round-robin across partition, sticking with a given
// partition for a certain amount of messages or volume buffered or the max delay to batch is reached so that
// we ensure having a decent amount of batching of the messages.
// Note that it is possible that we skip more than one partition if multiple goroutines increment
// currentPartitionCursor at the same time. If that happens it shouldn't be a problem because we only want to
// spread the data on different partitions but not necessarily in a specific sequence.
var now int64
size := uint32(len(message.Payload))
partitionCursor := atomic.LoadUint32(&state.currentPartitionCursor)
messageCount := atomic.AddUint32(&state.msgCounter, 1)
batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size)

messageCountReached := messageCount >= uint32(maxBatchingMessages)
sizeReached := (batchSize >= uint32(maxBatchingSize))
// Note: use greater-than for the threshold check so that we don't route this message to a new partition
// before a batch is complete.
messageCountReached := messageCount > uint32(maxBatchingMessages)
sizeReached := batchSize > uint32(maxBatchingSize)
durationReached := false
if readClockAfterNumMessages == 0 || messageCount%readClockAfterNumMessages == 0 {
now = time.Now().UnixNano()
lastBatchTime := atomic.LoadInt64(&state.lastBatchTimestamp)
durationReached = now-lastBatchTime >= maxBatchingDelay.Nanoseconds()
durationReached = now-lastBatchTime > maxBatchingDelay.Nanoseconds()
}
if messageCountReached || sizeReached || durationReached {
cursor := atomic.AddUint32(&state.currentPartitionCursor, 1)
atomic.StoreUint32(&state.cumulativeBatchSize, 0)
atomic.StoreUint32(&state.msgCounter, 0)
if now == 0 {
now = time.Now().UnixNano()
// Note: CAS to ensure that concurrent go-routines can only move the cursor forward by one so that
// partitions are not skipped.
newCursor := partitionCursor + 1
if atomic.CompareAndSwapUint32(&state.currentPartitionCursor, partitionCursor, newCursor) {
atomic.StoreUint32(&state.msgCounter, 0)
atomic.StoreUint32(&state.cumulativeBatchSize, 0)
if now == 0 {
now = time.Now().UnixNano()
}
atomic.StoreInt64(&state.lastBatchTimestamp, now)
}
atomic.StoreInt64(&state.lastBatchTimestamp, now)
return int(cursor % numPartitions)

return int(newCursor % numPartitions)
}

return int(state.currentPartitionCursor % numPartitions)
return int(partitionCursor % numPartitions)
}
}
13 changes: 9 additions & 4 deletions pulsar/default_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,21 @@ func TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) {
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
}, 3)
}, numPartitions)
assert.LessOrEqual(t, p1, int(numPartitions))

p2 := router(&ProducerMessage{
Payload: []byte("message 2"),
}, numPartitions)
if p1 == int(numPartitions-1) {
assert.Equal(t, 0, p2)
assert.Equal(t, p1, p2)

p3 := router(&ProducerMessage{
Payload: []byte("message 3"),
}, numPartitions)
if p2 == int(numPartitions-1) {
assert.Equal(t, 0, p3)
} else {
assert.Equal(t, p1+1, p2)
assert.Equal(t, p2+1, p3)
}
}

Expand Down

0 comments on commit a659bfb

Please sign in to comment.