Skip to content

Commit

Permalink
perf(client): use transmitResultPool to reduce heap allocs
Browse files Browse the repository at this point in the history
Struct pkg/varlog.(transmitResult) is an element of transmitQueue. It is
allocated in heap memory to be pushed into transmitQueue. This PR introduces
transmitResultPool to reuse transmitResult as possible.
  • Loading branch information
ijsong committed Feb 26, 2024
1 parent e44af3d commit 1dd22e8
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
13 changes: 2 additions & 11 deletions pkg/varlog/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"math/rand"
"testing"

"github.com/kakao/varlog/internal/storagenode/client"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

func BenchmarkTransmitQueue(b *testing.B) {
Expand Down Expand Up @@ -42,15 +40,8 @@ func BenchmarkTransmitQueue(b *testing.B) {
for range b.N {
tq := tc.generate(size)
for range size {
tr := transmitResult{
result: client.SubscribeResult{
LogEntry: varlogpb.LogEntry{
LogEntryMeta: varlogpb.LogEntryMeta{
GLSN: types.GLSN(rand.Int31()),
},
},
},
}
tr := &transmitResult{}
tr.result.GLSN = types.GLSN(rand.Int31())
tq.Push(tr)
}
}
Expand Down
49 changes: 30 additions & 19 deletions pkg/varlog/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,31 @@ func (pq *PriorityQueue) Pop() interface{} {
return item
}

var transmitResultPool = sync.Pool{
New: func() any {
return &transmitResult{}
},

Check warning on line 124 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L122-L124

Added lines #L122 - L124 were not covered by tests
}

type transmitResult struct {
logStreamID types.LogStreamID
storageNodeID types.StorageNodeID
result client.SubscribeResult
}

func (t transmitResult) Priority() uint64 {
func newTransmitResult(snid types.StorageNodeID, lsid types.LogStreamID) *transmitResult {
t := transmitResultPool.Get().(*transmitResult)
t.storageNodeID = snid
t.logStreamID = lsid
return t

Check warning on line 137 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L133-L137

Added lines #L133 - L137 were not covered by tests
}

func (t *transmitResult) Release() {
*t = transmitResult{}
transmitResultPool.Put(t)

Check warning on line 142 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L140-L142

Added lines #L140 - L142 were not covered by tests
}

func (t *transmitResult) Priority() uint64 {

Check warning on line 145 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L145

Added line #L145 was not covered by tests
return uint64(t.result.GLSN)
}

Expand All @@ -133,37 +151,33 @@ type transmitQueue struct {
mu sync.Mutex
}

func (tq *transmitQueue) Push(r transmitResult) {
func (tq *transmitQueue) Push(r *transmitResult) {

Check warning on line 154 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L154

Added line #L154 was not covered by tests
tq.mu.Lock()
defer tq.mu.Unlock()

heap.Push(tq.pq, r)
}

func (tq *transmitQueue) Pop() (transmitResult, bool) {
func (tq *transmitQueue) Pop() (*transmitResult, bool) {

Check warning on line 161 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L161

Added line #L161 was not covered by tests
tq.mu.Lock()
defer tq.mu.Unlock()

if tq.pq.Len() == 0 {
return transmitResult{
result: client.InvalidSubscribeResult,
}, false
return nil, false

Check warning on line 166 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L166

Added line #L166 was not covered by tests
}

return heap.Pop(tq.pq).(transmitResult), true
return heap.Pop(tq.pq).(*transmitResult), true

Check warning on line 169 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L169

Added line #L169 was not covered by tests
}

func (tq *transmitQueue) Front() (transmitResult, bool) {
func (tq *transmitQueue) Front() (*transmitResult, bool) {

Check warning on line 172 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L172

Added line #L172 was not covered by tests
tq.mu.Lock()
defer tq.mu.Unlock()

if tq.pq.Len() == 0 {
return transmitResult{
result: client.InvalidSubscribeResult,
}, false
return nil, false

Check warning on line 177 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L177

Added line #L177 was not covered by tests
}

return (*tq.pq)[0].(transmitResult), true
return (*tq.pq)[0].(*transmitResult), true

Check warning on line 180 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L180

Added line #L180 was not covered by tests
}

type subscriber struct {
Expand Down Expand Up @@ -229,11 +243,7 @@ func (s *subscriber) subscribe(ctx context.Context) {
case <-ctx.Done():
return
case res, ok := <-s.resultC:
r := transmitResult{
storageNodeID: s.storageNodeID,
logStreamID: s.logStreamID,
}

r := newTransmitResult(s.storageNodeID, s.logStreamID)

Check warning on line 246 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L246

Added line #L246 was not covered by tests
if ok {
r.result = res
} else {
Expand Down Expand Up @@ -387,7 +397,7 @@ func (p *transmitter) handleTimeout(ctx context.Context) {
p.refreshSubscriber(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
}

func (p *transmitter) handleError(r transmitResult) error {
func (p *transmitter) handleError(r *transmitResult) error {

Check warning on line 400 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L400

Added line #L400 was not covered by tests
s, ok := p.subscribers[r.logStreamID]
if !ok {
return nil
Expand All @@ -400,7 +410,7 @@ func (p *transmitter) handleError(r transmitResult) error {
return r.result.Error
}

func (p *transmitter) handleResult(r transmitResult) error {
func (p *transmitter) handleResult(r *transmitResult) error {

Check warning on line 413 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L413

Added line #L413 was not covered by tests
var err error

/* NOTE Ignore transmitResult with GLSN less than p.wanted.
Expand Down Expand Up @@ -432,6 +442,7 @@ func (p *transmitter) transmitLoop(ctx context.Context) bool {
if res.result.GLSN <= p.wanted {
res, _ := p.transmitQ.Pop()
err := p.handleResult(res)
res.Release()

Check warning on line 445 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L445

Added line #L445 was not covered by tests
if p.wanted == p.end ||
errors.Is(err, verrors.ErrTrimmed) {
return false
Expand Down

0 comments on commit 1dd22e8

Please sign in to comment.