Skip to content

Commit

Permalink
perf(sn): reuse buffer for ReplicateRequest unmarshaling
Browse files Browse the repository at this point in the history
Improve unmarshaling performance by reusing buffers for ReplicateRequest in the
backup replica.

The protobuf message `github.com/kakao/varlog/proto/snpb.(ReplicateRequest)` has
two slice fields—LLSN (`[]uint64`) and Data (`[][]byte`). The backup replica
receives replicated log entries from the primary replica via the gRPC service
`github.com/kakao/varlog/proto/snpb.(ReplicatorServer).Replicate`, which sends
`ReplicateRequest` messages.

Upon receiving a `ReplicateRequest`, the backup replica unmarshals the message,
which involves growing slices for fields such as LLSN and Data. This growth
causes copy overhead whenever the slice capacities need to expand.

To address this, we introduce a new method, `ResetReuse`, for reusing slices
instead of resetting them completely. The `ResetReuse` method shrinks the slice
lengths while preserving their capacities, thus avoiding the overhead of
reallocating memory.

Example implementation:

```go
type Message struct {
    Buffer []byte
    // Other fields
}

func (m *Message) Reset() {
    *m = Message{}
}

func (m *Message) ResetReuse() {
    s := m.Buffer[:0]
    *m = Message{}
    m.Buffer = s
}
```

Risks:

This approach has potential downsides. Since the heap space consumed by the
slices is not reclaimed, the storage node's memory consumption may increase.
Currently, there is no mechanism to shrink the heap usage.

Additionally, this PR changes the generated code. The protobuf compiler can
revert it, which is contrary to our intention. To catch this mistake, this PR
includes a unit test (github.com/kakao/varlog/proto/snpb.TestReplicateRequest)
to verify that the buffer backing the slices is reused.

Resolves: #795

See also: #806
  • Loading branch information
ijsong committed Jun 14, 2024
1 parent 8077cef commit 6feab12
Show file tree
Hide file tree
Showing 7 changed files with 865 additions and 22 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ $(PROTO_PBS): $(PROTO_SRCS)
$(PROTOC) $(PROTO_INCS) \
--gogo_out=plugins=grpc,Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,paths=source_relative:. $$src ; \
done
$(MAKE) fmt
git apply -v proto/patches/*.patch

proto-check:
$(MAKE) proto
Expand Down
43 changes: 43 additions & 0 deletions docker-d2hub-push.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env bash

set -e -o pipefail

scriptdir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" > /dev/null && pwd )"

IMAGE_REGISTRY="idock.daumkakao.io"
IMAGE_NAMESPACE=varlog
DOCKER_CONTEXT="${scriptdir}"
DOCKERFILE="${scriptdir}/build/Dockerfile"
VERSION=$(git describe --tags --abbrev=0)-$(git rev-parse --short HEAD)


if [ "${IMAGE_REGISTRY}" = "" ]; then
echo "no image registry"
exit 1
fi

function build_push() {
local target=$1
local name=$2
local tag=$3

echo "build: target=${target} tag=${tag}"
docker build --target "${target}" --file "${DOCKERFILE}" --tag "${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/${name}:${tag}" "${DOCKER_CONTEXT}"
docker push "${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/${name}:${tag}"
}

function push() {
local target=$1
local name=$2
local tag=$3

echo "push: target=${target} tag=${tag}"
docker push "${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/${name}:${tag}"
}

for name in varlogctl varlogcli varlogmr varlogadm varlogsn; do
target="${name}"
tag="${VERSION}"
build_push ${target} ${name} ${tag}
build_push ${target} ${name} ${tag}
done
16 changes: 6 additions & 10 deletions internal/storagenode/replication_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,12 @@ type replicationServerTask struct {
err error
}

func newReplicationServerTask(req snpb.ReplicateRequest, err error) *replicationServerTask {
rst := replicationServerTaskPool.Get().(*replicationServerTask)
rst.req = req
rst.err = err
return rst
func newReplicationServerTask() *replicationServerTask {
return replicationServerTaskPool.Get().(*replicationServerTask)
}

func (rst *replicationServerTask) release() {
rst.req = snpb.ReplicateRequest{}
rst.req.ResetReuse()
rst.err = nil
replicationServerTaskPool.Put(rst)
}
Expand All @@ -113,11 +110,10 @@ func (rs *replicationServer) recv(ctx context.Context, stream snpb.Replicator_Re
go func() {
defer wg.Done()
defer close(c)
req := &snpb.ReplicateRequest{}
for {
req.Reset()
err := stream.RecvMsg(req)
rst := newReplicationServerTask(*req, err)
rst := newReplicationServerTask()
err := stream.RecvMsg(&rst.req)
rst.err = err
select {
case c <- rst:
if err != nil {
Expand Down
Loading

0 comments on commit 6feab12

Please sign in to comment.