diff --git a/pulsar/default_router.go b/pulsar/default_router.go index 82be98207..0e1a35482 100644 --- a/pulsar/default_router.go +++ b/pulsar/default_router.go @@ -48,6 +48,7 @@ func NewDefaultRouter( lastChangeTimestamp: math.MinInt64, } + readClockAfterNumMessages := uint32(maxBatchingMessages / 10) return func(message *ProducerMessage, numPartitions uint32) int { if numPartitions == 1 { // When there are no partitions, don't even bother @@ -72,23 +73,34 @@ func NewDefaultRouter( // Note that it is possible that we skip more than one partition if multiple goroutines increment // currentPartitionCursor at the same time. If that happens it shouldn't be a problem because we only want to // spread the data on different partitions but not necessarily in a specific sequence. + var now int64 size := uint32(len(message.Payload)) previousMessageCount := atomic.LoadUint32(&state.msgCounter) previousBatchingMaxSize := atomic.LoadUint32(&state.cumulativeBatchSize) previousLastChange := atomic.LoadInt64(&state.lastChangeTimestamp) - if (previousMessageCount >= uint32(maxBatchingMessages-1)) || - (size >= uint32(maxBatchingSize)-previousBatchingMaxSize) || - (time.Now().UnixNano()-previousLastChange >= maxBatchingDelay.Nanoseconds()) { + + messageCountReached := previousMessageCount >= uint32(maxBatchingMessages-1) + sizeReached := (size >= uint32(maxBatchingSize)-previousBatchingMaxSize) + durationReached := false + if readClockAfterNumMessages == 0 || previousMessageCount%readClockAfterNumMessages == 0 { + now = time.Now().UnixNano() + durationReached = now-previousLastChange >= maxBatchingDelay.Nanoseconds() + } + if messageCountReached || sizeReached || durationReached { atomic.AddUint32(&state.currentPartitionCursor, 1) - atomic.StoreInt64(&state.lastChangeTimestamp, time.Now().UnixNano()) - atomic.StoreUint32(&state.cumulativeBatchSize, 0) atomic.StoreUint32(&state.msgCounter, 0) + atomic.StoreUint32(&state.cumulativeBatchSize, 0) + if now != 0 { + atomic.StoreInt64(&state.lastChangeTimestamp, now) + } return int(state.currentPartitionCursor % numPartitions) } - atomic.StoreInt64(&state.lastChangeTimestamp, time.Now().UnixNano()) atomic.AddUint32(&state.msgCounter, 1) atomic.AddUint32(&state.cumulativeBatchSize, size) + if now != 0 { + atomic.StoreInt64(&state.lastChangeTimestamp, now) + } return int(state.currentPartitionCursor % numPartitions) } } diff --git a/pulsar/default_router_bench_test.go b/pulsar/default_router_bench_test.go new file mode 100644 index 000000000..d7ec17562 --- /dev/null +++ b/pulsar/default_router_bench_test.go @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar/internal" +) + +var ( + targetPartition int +) + +func BenchmarkDefaultRouter(b *testing.B) { + const ( + numPartitions = uint32(200) + maxBatchingMessages = 2000 + maxBatchingSize = 524288 + maxBatchingDelay = 100 * time.Millisecond + ) + msg := &ProducerMessage{ + Payload: []byte("message 1"), + } + router := NewDefaultRouter(internal.JavaStringHash, maxBatchingMessages, maxBatchingSize, maxBatchingDelay, false) + for i := 0; i < b.N; i++ { + targetPartition = router(msg, numPartitions) + } +} diff --git a/pulsar/default_router_test.go b/pulsar/default_router_test.go index 60e10c706..31b27aff9 100644 --- a/pulsar/default_router_test.go +++ b/pulsar/default_router_test.go @@ -47,7 +47,7 @@ func TestDefaultRouterRoutingBecauseBatchingDisabled(t *testing.T) { func TestDefaultRouterRoutingBecauseMaxPublishDelayReached(t *testing.T) { maxPublishDelay := time.Nanosecond * 10 - router := NewDefaultRouter(internal.JavaStringHash, 20, 100, maxPublishDelay, false) + router := NewDefaultRouter(internal.JavaStringHash, 10, 100, maxPublishDelay, false) const numPartitions = uint32(3) p1 := router(&ProducerMessage{ Payload: []byte("message 1"),