Skip to content

Commit

Permalink
perf(client): use transmitResultPool to reduce heap allocs
Browse files Browse the repository at this point in the history
  • Loading branch information
ijsong committed Feb 25, 2024
1 parent d406846 commit b2ae65b
Showing 1 changed file with 30 additions and 19 deletions.
49 changes: 30 additions & 19 deletions pkg/varlog/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,31 @@ func (pq *PriorityQueue) Pop() interface{} {
return item
}

var transmitResultPool = sync.Pool{
New: func() any {
return &transmitResult{}
},
}

type transmitResult struct {
logStreamID types.LogStreamID
storageNodeID types.StorageNodeID
result client.SubscribeResult
}

func (t transmitResult) Priority() uint64 {
func newTransmitResult(snid types.StorageNodeID, lsid types.LogStreamID) *transmitResult {
t := transmitResultPool.Get().(*transmitResult)
t.storageNodeID = snid
t.logStreamID = lsid
return t
}

func (t *transmitResult) Release() {
*t = transmitResult{}
transmitResultPool.Put(t)
}

func (t *transmitResult) Priority() uint64 {
return uint64(t.result.GLSN)
}

Expand All @@ -133,37 +151,33 @@ type transmitQueue struct {
mu sync.Mutex
}

func (tq *transmitQueue) Push(r transmitResult) {
func (tq *transmitQueue) Push(r *transmitResult) {
tq.mu.Lock()
defer tq.mu.Unlock()

heap.Push(tq.pq, r)
}

func (tq *transmitQueue) Pop() (transmitResult, bool) {
func (tq *transmitQueue) Pop() (*transmitResult, bool) {
tq.mu.Lock()
defer tq.mu.Unlock()

if tq.pq.Len() == 0 {
return transmitResult{
result: client.InvalidSubscribeResult,
}, false
return nil, false
}

return heap.Pop(tq.pq).(transmitResult), true
return heap.Pop(tq.pq).(*transmitResult), true
}

func (tq *transmitQueue) Front() (transmitResult, bool) {
func (tq *transmitQueue) Front() (*transmitResult, bool) {
tq.mu.Lock()
defer tq.mu.Unlock()

if tq.pq.Len() == 0 {
return transmitResult{
result: client.InvalidSubscribeResult,
}, false
return nil, false
}

return (*tq.pq)[0].(transmitResult), true
return (*tq.pq)[0].(*transmitResult), true
}

type subscriber struct {
Expand Down Expand Up @@ -229,11 +243,7 @@ func (s *subscriber) subscribe(ctx context.Context) {
case <-ctx.Done():
return
case res, ok := <-s.resultC:
r := transmitResult{
storageNodeID: s.storageNodeID,
logStreamID: s.logStreamID,
}

r := newTransmitResult(s.storageNodeID, s.logStreamID)
if ok {
r.result = res
} else {
Expand Down Expand Up @@ -387,7 +397,7 @@ func (p *transmitter) handleTimeout(ctx context.Context) {
p.refreshSubscriber(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
}

func (p *transmitter) handleError(r transmitResult) error {
func (p *transmitter) handleError(r *transmitResult) error {
s, ok := p.subscribers[r.logStreamID]
if !ok {
return nil
Expand All @@ -400,7 +410,7 @@ func (p *transmitter) handleError(r transmitResult) error {
return r.result.Error
}

func (p *transmitter) handleResult(r transmitResult) error {
func (p *transmitter) handleResult(r *transmitResult) error {
var err error

/* NOTE Ignore transmitResult with GLSN less than p.wanted.
Expand Down Expand Up @@ -432,6 +442,7 @@ func (p *transmitter) transmitLoop(ctx context.Context) bool {
if res.result.GLSN <= p.wanted {
res, _ := p.transmitQ.Pop()
err := p.handleResult(res)
res.Release()
if p.wanted == p.end ||
errors.Is(err, verrors.ErrTrimmed) {
return false
Expand Down

0 comments on commit b2ae65b

Please sign in to comment.