From fd0898a6d6c236a1b8f0c2d7ca5b900319c6273d Mon Sep 17 00:00:00 2001 From: Injun Song Date: Sat, 24 Feb 2024 01:54:47 +0900 Subject: [PATCH] perf(client): use transmitResultPool to reduce heap allocs Struct pkg/varlog.(transmitResult) is an element of transmitQueue. It is allocated in heap memory to be pushed into transmitQueue. This PR introduces transmitResultPool to reuse transmitResult as possible. --- pkg/varlog/benchmark_test.go | 13 ++-------- pkg/varlog/subscribe.go | 49 ++++++++++++++++++++++-------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/pkg/varlog/benchmark_test.go b/pkg/varlog/benchmark_test.go index 2e3cd6745..4a9bfceef 100644 --- a/pkg/varlog/benchmark_test.go +++ b/pkg/varlog/benchmark_test.go @@ -5,9 +5,7 @@ import ( "math/rand" "testing" - "github.com/kakao/varlog/internal/storagenode/client" "github.com/kakao/varlog/pkg/types" - "github.com/kakao/varlog/proto/varlogpb" ) func BenchmarkTransmitQueue(b *testing.B) { @@ -42,15 +40,8 @@ func BenchmarkTransmitQueue(b *testing.B) { for range b.N { tq := tc.generate(size) for range size { - tr := transmitResult{ - result: client.SubscribeResult{ - LogEntry: varlogpb.LogEntry{ - LogEntryMeta: varlogpb.LogEntryMeta{ - GLSN: types.GLSN(rand.Int31()), - }, - }, - }, - } + tr := &transmitResult{} + tr.result.GLSN = types.GLSN(rand.Int31()) tq.Push(tr) } } diff --git a/pkg/varlog/subscribe.go b/pkg/varlog/subscribe.go index 5e4576932..4884dc50a 100644 --- a/pkg/varlog/subscribe.go +++ b/pkg/varlog/subscribe.go @@ -121,13 +121,31 @@ func (pq *PriorityQueue) Pop() interface{} { return item } +var transmitResultPool = sync.Pool{ + New: func() any { + return &transmitResult{} + }, +} + type transmitResult struct { logStreamID types.LogStreamID storageNodeID types.StorageNodeID result client.SubscribeResult } -func (t transmitResult) Priority() uint64 { +func newTransmitResult(snid types.StorageNodeID, lsid types.LogStreamID) *transmitResult { + t := transmitResultPool.Get().(*transmitResult) + t.storageNodeID = snid + t.logStreamID = lsid + return t +} + +func (t *transmitResult) Release() { + *t = transmitResult{} + transmitResultPool.Put(t) +} + +func (t *transmitResult) Priority() uint64 { return uint64(t.result.GLSN) } @@ -136,37 +154,33 @@ type transmitQueue struct { mu sync.Mutex } -func (tq *transmitQueue) Push(r transmitResult) { +func (tq *transmitQueue) Push(r *transmitResult) { tq.mu.Lock() defer tq.mu.Unlock() heap.Push(tq.pq, r) } -func (tq *transmitQueue) Pop() (transmitResult, bool) { +func (tq *transmitQueue) Pop() (*transmitResult, bool) { tq.mu.Lock() defer tq.mu.Unlock() if tq.pq.Len() == 0 { - return transmitResult{ - result: client.InvalidSubscribeResult, - }, false + return nil, false } - return heap.Pop(tq.pq).(transmitResult), true + return heap.Pop(tq.pq).(*transmitResult), true } -func (tq *transmitQueue) Front() (transmitResult, bool) { +func (tq *transmitQueue) Front() (*transmitResult, bool) { tq.mu.Lock() defer tq.mu.Unlock() if tq.pq.Len() == 0 { - return transmitResult{ - result: client.InvalidSubscribeResult, - }, false + return nil, false } - return (*tq.pq)[0].(transmitResult), true + return (*tq.pq)[0].(*transmitResult), true } type subscriber struct { @@ -232,11 +246,7 @@ func (s *subscriber) subscribe(ctx context.Context) { case <-ctx.Done(): return case res, ok := <-s.resultC: - r := transmitResult{ - storageNodeID: s.storageNodeID, - logStreamID: s.logStreamID, - } - + r := newTransmitResult(s.storageNodeID, s.logStreamID) if ok { r.result = res } else { @@ -390,7 +400,7 @@ func (p *transmitter) handleTimeout(ctx context.Context) { p.refreshSubscriber(ctx) //nolint:errcheck,revive // TODO: Handle an error returned. } -func (p *transmitter) handleError(r transmitResult) error { +func (p *transmitter) handleError(r *transmitResult) error { s, ok := p.subscribers[r.logStreamID] if !ok { return nil @@ -403,7 +413,7 @@ func (p *transmitter) handleError(r transmitResult) error { return r.result.Error } -func (p *transmitter) handleResult(r transmitResult) error { +func (p *transmitter) handleResult(r *transmitResult) error { var err error /* NOTE Ignore transmitResult with GLSN less than p.wanted. @@ -435,6 +445,7 @@ func (p *transmitter) transmitLoop(ctx context.Context) bool { if res.result.GLSN <= p.wanted { res, _ := p.transmitQ.Pop() err := p.handleResult(res) + res.Release() if p.wanted == p.end || errors.Is(err, verrors.ErrTrimmed) { return false