Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fragment Watch Reponse Messages #8371

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions clientv3/integration/fragment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2017 The etcd Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be part of clientv3/integration/watch_test.go

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"fmt"
"strconv"
"testing"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/proxy/grpcproxy"
"github.com/coreos/etcd/proxy/grpcproxy/adapter"
)

func TestResponseWithoutFragmenting(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestWatchFragmentCombine?

It's not clear to me what these two tests are trying to do. It seems like both should have a big delete; one test checks it combines correctly and another checks it returns fragments if requested.

defer testutil.AfterTest(t)
// MaxResponseBytes will overflow to 1000 once the grpcOverheadBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be done in a way that doesn't rely on overflow?

Copy link
Contributor Author

@mangoslicer mangoslicer Sep 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another way would be to turn the MaxResponseBytes to a int instead of a uint, so that I can set its value to negative. I'm not a fan of this method, but that's the only way I can think of because adding the grpcOverheadBytes doesn't seem to be straightforward to toggle.

// which have a value of 512 * 1024, are added to MaxResponseBytes.
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, MaxResponseBytes: ^uint(0) - (512*1024 - 1 - 1000)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Size: 1 since this isn't testing any clustering features

defer clus.Terminate(t)

cfg := clientv3.Config{
Endpoints: []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
},
}
cli, err := clientv3.New(cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't make a special client if it's not needed; just use clus.Client(0)


cli.SetEndpoints(clus.Members[0].GRPCAddr())
firstLease, err := clus.Client(0).Grant(context.Background(), 10000)
if err != nil {
t.Error(err)
}

// Create and register watch proxy.
wp, _ := grpcproxy.NewWatchProxy(clus.Client(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to create a proxy here; the tag cluster_proxy will automatically have the integration package put a proxy between the server and the client

wc := adapter.WatchServerToWatchClient(wp)
w := clientv3.NewWatchFromWatchClient(wc)

kv := clus.Client(0)
for i := 0; i < 10; i++ {
_, err = kv.Put(context.TODO(), fmt.Sprintf("foo%d", i), "bar", clientv3.WithLease(firstLease.ID))
if err != nil {
t.Error(err)
}
}

// Does not include the clientv3.WithFragmentedResponse option.
wChannel := w.Watch(context.TODO(), "foo", clientv3.WithRange("z"))
_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using a lease to bulk delete, `Delete(context.TODO(), "foo", clientv3.WithPrefix())`` would be clearer / less code

if err != nil {
t.Error(err)
}

r, ok := <-wChannel
if !ok {
t.Error()
}
keyDigitSum := 0
responseSum := 0
if len(r.Events) != 10 {
t.Errorf("Expected 10 events, got %d\n", len(r.Events))
}
for i := 0; i < 10; i++ {
if r.Events[i].Type != mvccpb.DELETE {
t.Errorf("Expected DELETE event, got %d", r.Events[i].Type)
}
keyDigitSum += i
digit, err := strconv.Atoi((string(r.Events[i].Kv.Key)[3:]))
if err != nil {
t.Error("Failed to convert %s to int", (string(r.Events[i].Kv.Key)[3:]))
}
responseSum += digit
}
if keyDigitSum != responseSum {
t.Errorf("Expected digits of keys received in the response to sum to %d, but got %d\n", keyDigitSum, responseSum)
}
}

func TestFragmenting(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestWatchFragments

defer testutil.AfterTest(t)
// MaxResponseBytes will overflow to 1000 once the grpcOverheadBytes,
// which have a value of 512 * 1024, are added to MaxResponseBytes.
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, MaxResponseBytes: ^uint(0) - (512*1024 - 1 - 1000)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Size: 1

defer clus.Terminate(t)

cfg := clientv3.Config{
Endpoints: []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
},
}
cli, err := clientv3.New(cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use clus.Client(0)?


cli.SetEndpoints(clus.Members[0].GRPCAddr())
firstLease, err := clus.Client(0).Grant(context.Background(), 10000)
if err != nil {
t.Error(err)
}

// Create and register watch proxy
wp, _ := grpcproxy.NewWatchProxy(clus.Client(0))
wc := adapter.WatchServerToWatchClient(wp)
w := clientv3.NewWatchFromWatchClient(wc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove proxy stuff since picked up by cluster_proxy


kv := clus.Client(0)
for i := 0; i < 100; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably do this concurrently; 100 puts is a little pricey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the flag to concurrently put?

_, err = kv.Put(context.TODO(), fmt.Sprintf("foo%d", i), "bar", clientv3.WithLease(firstLease.ID))
if err != nil {
t.Error(err)
}
}
wChannel := w.Watch(context.TODO(), "foo", clientv3.WithRange("z"), clientv3.WithFragmentedResponse())
_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete(context.TODO(), "foo", clientv3.WithPrefix())

if err != nil {
t.Error(err)
}

r, ok := <-wChannel
if !ok {
t.Error()
}
keyDigitSum := 0
responseSum := 0
if len(r.Events) != 100 {
t.Errorf("Expected 100 events, got %d\n", len(r.Events))
}
for i := 0; i < 100; i++ {
if r.Events[i].Type != mvccpb.DELETE {
t.Errorf("Expected DELETE event, got %d", r.Events[i].Type)
}
keyDigitSum += i
digit, err := strconv.Atoi((string(r.Events[i].Kv.Key)[3:]))
if err != nil {
t.Error("Failed to convert %s to int", (string(r.Events[i].Kv.Key)[3:]))
}
responseSum += digit
}
if keyDigitSum != responseSum {
t.Errorf("Expected digits of keys received in the response to sum to %d, but got %d\n", keyDigitSum, responseSum)
}
}
25 changes: 25 additions & 0 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ type Op struct {
cmps []Cmp
thenOps []Op
elseOps []Op

// fragmentResponse allows watch clients to toggle whether to send
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put these below filterDelete with the other watch flags

// watch responses that are too large to send over a rpc stream in fragments.
// Sending in fragments is an opt-in feature in order to preserve compatibility
// with older clients that can not handle responses being split into fragments.
fragmentResponse bool
// combineFragments indicates whether watch clients should combine
// fragments or relay the watch response in fragmented form.
combineFragments bool
}

// accesors / mutators
Expand Down Expand Up @@ -415,6 +424,22 @@ func WithCreatedNotify() OpOption {
}
}

// WithFragmentedResponse makes the watch server send watch responses
// that are too large to send over the rpc stream in fragments.
// The fragmenting feature is an opt-in feature in order to maintain
// compatibility with older versions of clients.
func WithFragmentedResponse() OpOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this since the client here can default to fragmenting. If it's an old server, it will ignore the flag and default to non-fragmenting. For backwards compatibility, it only needs to be optional on the server side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused, what's the point of the fragment flag if I remove the code to toggle the flag? Won't it always be true?

return func(op *Op) { op.fragmentResponse = true }
}

// WithFragments allows watch clients to passively relay their receieved
// fragmented watch responses.
// This option is handy for the watch client's belonging to watch proxies,
// because these watch clients can be set to passively send along fragments
// rather than reassembling and then fragmenting them such that they can be
// sent to the user's watch client.
func WithFragments() OpOption { return func(op *Op) { op.combineFragments = true } }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithFragments should disable combining fragments and return the raw watch responses

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the use case be for having the client return raw watch responses to the user be? I was under the impression that WithFragments sort of an internal option to prevent the proxy's client form combining and then refragmenting before sending off.


// WithFilterPut discards PUT events from the watcher.
func WithFilterPut() OpOption {
return func(op *Op) { op.filterPut = true }
Expand Down
68 changes: 49 additions & 19 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type WatchResponse struct {

// cancelReason is a reason of canceling watch
cancelReason string

// MoreFragments indicates that more fragments composing one large
// watch fragment are expected.
MoreFragments bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just Fragment?

}

// IsCreate returns true if the event tells that the key is newly created.
Expand Down Expand Up @@ -145,6 +149,10 @@ type watchGrpcStream struct {
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error

// combineFragments indicates whether watch clients should combine
// fragments or relay the watch response in fragmented form.
combineFragments bool
}

// watchRequest is issued by the subscriber to start a new watcher
Expand All @@ -157,6 +165,9 @@ type watchRequest struct {
createdNotify bool
// progressNotify is for progress updates
progressNotify bool
// fragmentResponse allows watch clients to toggle whether to send
// watch responses that are too large to send over a rpc stream in fragments.
fragmentResponse bool
// filters is the list of events to filter out
filters []pb.WatchCreateRequest_FilterType
// get the previous key-value pair before the event happens
Expand Down Expand Up @@ -241,15 +252,16 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
}

wr := &watchRequest{
ctx: ctx,
createdNotify: ow.createdNotify,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
ctx: ctx,
createdNotify: ow.createdNotify,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
fragmentResponse: ow.fragmentResponse,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}

ok := false
Expand All @@ -267,6 +279,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
wgs.combineFragments = ow.combineFragments
w.streams[ctxKey] = wgs
}
donec := wgs.donec
Expand Down Expand Up @@ -424,7 +437,7 @@ func (w *watchGrpcStream) run() {
}

cancelSet := make(map[int64]struct{})

var fragmentsToSend *pb.WatchResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fragmentBuffer?

for {
select {
// Watch() requested
Expand All @@ -450,12 +463,26 @@ func (w *watchGrpcStream) run() {
}
// New events from the watch client
case pbresp := <-w.respc:
if w.combineFragments {
fragmentsToSend = pbresp
} else {
if fragmentsToSend == nil || fragmentsToSend.WatchId != pbresp.WatchId {
fragmentsToSend = pbresp
} else {
fragmentsToSend.Events = append(fragmentsToSend.Events, pbresp.Events...)
fragmentsToSend.MoreFragments = pbresp.MoreFragments
}
if fragmentsToSend.MoreFragments {
break
}
}
switch {
case pbresp.Created:
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.dispatchEvent(pbresp)
w.addSubstream(fragmentsToSend, ws)
w.dispatchEvent(fragmentsToSend)
fragmentsToSend = nil
w.resuming[0] = nil
}
if ws := w.nextResume(); ws != nil {
Expand All @@ -470,7 +497,8 @@ func (w *watchGrpcStream) run() {
}
default:
// dispatch to appropriate watch stream
if ok := w.dispatchEvent(pbresp); ok {
if ok := w.dispatchEvent(fragmentsToSend); ok {
fragmentsToSend = nil
break
}
// watch response on unexpected watch id; cancel id
Expand Down Expand Up @@ -537,6 +565,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
Created: pbresp.Created,
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
MoreFragments: pbresp.MoreFragments,
}
ws, ok := w.substreams[pbresp.WatchId]
if !ok {
Expand Down Expand Up @@ -784,12 +813,13 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
func (wr *watchRequest) toPB() *pb.WatchRequest {
req := &pb.WatchCreateRequest{
StartRevision: wr.rev,
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify,
Filters: wr.filters,
PrevKv: wr.prevKV,
StartRevision: wr.rev,
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify,
FragmentResponse: wr.fragmentResponse,
Filters: wr.filters,
PrevKv: wr.prevKV,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
Expand Down
11 changes: 6 additions & 5 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ const (
ClusterStateFlagNew = "new"
ClusterStateFlagExisting = "existing"

DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultMaxResponseBytes = 1.5 * 1024 * 1024

DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
Expand Down
3 changes: 2 additions & 1 deletion etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
}
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
opts = append(opts, grpc.MaxMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(int(s.Cfg.MaxResponseBytes+grpcOverheadBytes)))
grpcServer := grpc.NewServer(opts...)

pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
Expand Down
Loading