Skip to content

Commit

Permalink
perf(client): use transmitResultPool to reduce heap allocs
Browse files Browse the repository at this point in the history
Struct pkg/varlog.(transmitResult) is an element of transmitQueue. It is
allocated in heap memory to be pushed into transmitQueue. This PR introduces
transmitResultPool to reuse transmitResult as possible.
  • Loading branch information
ijsong committed Mar 5, 2024
1 parent 26ed1a6 commit aa40aec
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
13 changes: 2 additions & 11 deletions pkg/varlog/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"math/rand"
"testing"

"github.com/kakao/varlog/internal/storagenode/client"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

func BenchmarkTransmitQueue(b *testing.B) {
Expand Down Expand Up @@ -42,15 +40,8 @@ func BenchmarkTransmitQueue(b *testing.B) {
for range b.N {
tq := tc.generate(size)
for range size {
tr := transmitResult{
result: client.SubscribeResult{
LogEntry: varlogpb.LogEntry{
LogEntryMeta: varlogpb.LogEntryMeta{
GLSN: types.GLSN(rand.Int31()),
},
},
},
}
tr := &transmitResult{}
tr.result.GLSN = types.GLSN(rand.Int31())
tq.Push(tr)
}
}
Expand Down
49 changes: 30 additions & 19 deletions pkg/varlog/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,31 @@ func (pq *PriorityQueue) Pop() interface{} {
return item
}

var transmitResultPool = sync.Pool{
New: func() any {
return &transmitResult{}
},

Check warning on line 128 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L126-L128

Added lines #L126 - L128 were not covered by tests
}

type transmitResult struct {
logStreamID types.LogStreamID
storageNodeID types.StorageNodeID
result client.SubscribeResult
}

func (t transmitResult) Priority() uint64 {
func newTransmitResult(snid types.StorageNodeID, lsid types.LogStreamID) *transmitResult {
t := transmitResultPool.Get().(*transmitResult)
t.storageNodeID = snid
t.logStreamID = lsid
return t

Check warning on line 141 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L137-L141

Added lines #L137 - L141 were not covered by tests
}

func (t *transmitResult) Release() {
*t = transmitResult{}
transmitResultPool.Put(t)

Check warning on line 146 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L144-L146

Added lines #L144 - L146 were not covered by tests
}

func (t *transmitResult) Priority() uint64 {

Check warning on line 149 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L149

Added line #L149 was not covered by tests
return uint64(t.result.GLSN)
}

Expand All @@ -137,37 +155,33 @@ type transmitQueue struct {
mu sync.Mutex
}

func (tq *transmitQueue) Push(r transmitResult) {
func (tq *transmitQueue) Push(r *transmitResult) {

Check warning on line 158 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L158

Added line #L158 was not covered by tests
tq.mu.Lock()
defer tq.mu.Unlock()

heap.Push(tq.pq, r)
}

func (tq *transmitQueue) Pop() (transmitResult, bool) {
func (tq *transmitQueue) Pop() (*transmitResult, bool) {

Check warning on line 165 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L165

Added line #L165 was not covered by tests
tq.mu.Lock()
defer tq.mu.Unlock()

if tq.pq.Len() == 0 {
return transmitResult{
result: client.InvalidSubscribeResult,
}, false
return nil, false

Check warning on line 170 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L170

Added line #L170 was not covered by tests
}

return heap.Pop(tq.pq).(transmitResult), true
return heap.Pop(tq.pq).(*transmitResult), true

Check warning on line 173 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L173

Added line #L173 was not covered by tests
}

func (tq *transmitQueue) Front() (transmitResult, bool) {
func (tq *transmitQueue) Front() (*transmitResult, bool) {

Check warning on line 176 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L176

Added line #L176 was not covered by tests
tq.mu.Lock()
defer tq.mu.Unlock()

if tq.pq.Len() == 0 {
return transmitResult{
result: client.InvalidSubscribeResult,
}, false
return nil, false

Check warning on line 181 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L181

Added line #L181 was not covered by tests
}

return (*tq.pq)[0].(transmitResult), true
return (*tq.pq)[0].(*transmitResult), true

Check warning on line 184 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L184

Added line #L184 was not covered by tests
}

type subscriber struct {
Expand Down Expand Up @@ -233,11 +247,7 @@ func (s *subscriber) subscribe(ctx context.Context) {
case <-ctx.Done():
return
case res, ok := <-s.resultC:
r := transmitResult{
storageNodeID: s.storageNodeID,
logStreamID: s.logStreamID,
}

r := newTransmitResult(s.storageNodeID, s.logStreamID)

Check warning on line 250 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L250

Added line #L250 was not covered by tests
if ok {
r.result = res
} else {
Expand Down Expand Up @@ -391,7 +401,7 @@ func (p *transmitter) handleTimeout(ctx context.Context) {
p.refreshSubscriber(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
}

func (p *transmitter) handleError(r transmitResult) error {
func (p *transmitter) handleError(r *transmitResult) error {

Check warning on line 404 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L404

Added line #L404 was not covered by tests
s, ok := p.subscribers[r.logStreamID]
if !ok {
return nil
Expand All @@ -404,7 +414,7 @@ func (p *transmitter) handleError(r transmitResult) error {
return r.result.Error
}

func (p *transmitter) handleResult(r transmitResult) error {
func (p *transmitter) handleResult(r *transmitResult) error {

Check warning on line 417 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L417

Added line #L417 was not covered by tests
var err error

/* NOTE Ignore transmitResult with GLSN less than p.wanted.
Expand Down Expand Up @@ -436,6 +446,7 @@ func (p *transmitter) transmitLoop(ctx context.Context) bool {
if res.result.GLSN <= p.wanted {
res, _ := p.transmitQ.Pop()
err := p.handleResult(res)
res.Release()

Check warning on line 449 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L449

Added line #L449 was not covered by tests
if p.wanted == p.end ||
errors.Is(err, verrors.ErrTrimmed) {
return false
Expand Down

0 comments on commit aa40aec

Please sign in to comment.