diff --git a/pkg/varlog/benchmark_test.go b/pkg/varlog/benchmark_test.go index 6a63d97ef..e63744891 100644 --- a/pkg/varlog/benchmark_test.go +++ b/pkg/varlog/benchmark_test.go @@ -5,9 +5,7 @@ import ( "math/rand" "testing" - "github.com/kakao/varlog/internal/storagenode/client" "github.com/kakao/varlog/pkg/types" - "github.com/kakao/varlog/proto/varlogpb" ) func BenchmarkTransmitQueue(b *testing.B) { @@ -42,15 +40,8 @@ func BenchmarkTransmitQueue(b *testing.B) { for range b.N { tq := tc.generate(size) for range size { - tr := transmitResult{ - result: client.SubscribeResult{ - LogEntry: varlogpb.LogEntry{ - LogEntryMeta: varlogpb.LogEntryMeta{ - GLSN: types.GLSN(rand.Int31()), - }, - }, - }, - } + tr := &transmitResult{} + tr.result.GLSN = types.GLSN(rand.Int31()) tq.Push(tr) } } diff --git a/pkg/varlog/subscribe.go b/pkg/varlog/subscribe.go index 533981018..fa2e4096d 100644 --- a/pkg/varlog/subscribe.go +++ b/pkg/varlog/subscribe.go @@ -122,13 +122,31 @@ func (pq *PriorityQueue) Pop() interface{} { return item } +var transmitResultPool = sync.Pool{ + New: func() any { + return &transmitResult{} + }, +} + type transmitResult struct { logStreamID types.LogStreamID storageNodeID types.StorageNodeID result client.SubscribeResult } -func (t transmitResult) Priority() uint64 { +func newTransmitResult(snid types.StorageNodeID, lsid types.LogStreamID) *transmitResult { + t := transmitResultPool.Get().(*transmitResult) + t.storageNodeID = snid + t.logStreamID = lsid + return t +} + +func (t *transmitResult) Release() { + *t = transmitResult{} + transmitResultPool.Put(t) +} + +func (t *transmitResult) Priority() uint64 { return uint64(t.result.GLSN) } @@ -137,37 +155,33 @@ type transmitQueue struct { mu sync.Mutex } -func (tq *transmitQueue) Push(r transmitResult) { +func (tq *transmitQueue) Push(r *transmitResult) { tq.mu.Lock() defer tq.mu.Unlock() heap.Push(tq.pq, r) } -func (tq *transmitQueue) Pop() (transmitResult, bool) { +func (tq *transmitQueue) Pop() (*transmitResult, bool) { tq.mu.Lock() defer tq.mu.Unlock() if tq.pq.Len() == 0 { - return transmitResult{ - result: client.InvalidSubscribeResult, - }, false + return nil, false } - return heap.Pop(tq.pq).(transmitResult), true + return heap.Pop(tq.pq).(*transmitResult), true } -func (tq *transmitQueue) Front() (transmitResult, bool) { +func (tq *transmitQueue) Front() (*transmitResult, bool) { tq.mu.Lock() defer tq.mu.Unlock() if tq.pq.Len() == 0 { - return transmitResult{ - result: client.InvalidSubscribeResult, - }, false + return nil, false } - return (*tq.pq)[0].(transmitResult), true + return (*tq.pq)[0].(*transmitResult), true } type subscriber struct { @@ -233,11 +247,7 @@ func (s *subscriber) subscribe(ctx context.Context) { case <-ctx.Done(): return case res, ok := <-s.resultC: - r := transmitResult{ - storageNodeID: s.storageNodeID, - logStreamID: s.logStreamID, - } - + r := newTransmitResult(s.storageNodeID, s.logStreamID) if ok { r.result = res } else { @@ -391,7 +401,7 @@ func (p *transmitter) handleTimeout(ctx context.Context) { p.refreshSubscriber(ctx) //nolint:errcheck,revive // TODO: Handle an error returned. } -func (p *transmitter) handleError(r transmitResult) error { +func (p *transmitter) handleError(r *transmitResult) error { s, ok := p.subscribers[r.logStreamID] if !ok { return nil @@ -404,7 +414,7 @@ func (p *transmitter) handleError(r transmitResult) error { return r.result.Error } -func (p *transmitter) handleResult(r transmitResult) error { +func (p *transmitter) handleResult(r *transmitResult) error { var err error /* NOTE Ignore transmitResult with GLSN less than p.wanted. @@ -436,6 +446,7 @@ func (p *transmitter) transmitLoop(ctx context.Context) bool { if res.result.GLSN <= p.wanted { res, _ := p.transmitQ.Pop() err := p.handleResult(res) + res.Release() if p.wanted == p.end || errors.Is(err, verrors.ErrTrimmed) { return false