From de780f2dbdb9573f5fe1159d80ad15a56022d1a6 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 18 Mar 2020 16:12:22 -0700 Subject: [PATCH] clientv3: embed api version in metadata ref. https://github.com/etcd-io/etcd/pull/11687 Signed-off-by: Gyuho Lee --- clientv3/client.go | 8 ----- clientv3/ctx.go | 62 ++++++++++++++++++++++++++++++++ clientv3/ctx_test.go | 67 +++++++++++++++++++++++++++++++++++ clientv3/retry_interceptor.go | 2 ++ 4 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 clientv3/ctx.go create mode 100644 clientv3/ctx_test.go diff --git a/clientv3/client.go b/clientv3/client.go index 215e05479809..a35ec679a029 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -37,7 +37,6 @@ import ( "google.golang.org/grpc/codes" grpccredentials "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -397,13 +396,6 @@ func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCrede return creds } -// WithRequireLeader requires client requests to only succeed -// when the cluster has a leader. -func WithRequireLeader(ctx context.Context) context.Context { - md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - return metadata.NewOutgoingContext(ctx, md) -} - func newClient(cfg *Config) (*Client, error) { if cfg == nil { cfg = &Config{} diff --git a/clientv3/ctx.go b/clientv3/ctx.go new file mode 100644 index 000000000000..9f90c8361384 --- /dev/null +++ b/clientv3/ctx.go @@ -0,0 +1,62 @@ +// Copyright 2020 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "context" + "strings" + + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/version" + "google.golang.org/grpc/metadata" +) + +// WithRequireLeader requires client requests to only succeed +// when the cluster has a leader. +func WithRequireLeader(ctx context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { // no outgoing metadata ctx key, create one + md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + return metadata.NewOutgoingContext(ctx, md) + } + // overwrite/add 'hasleader' key/value + metadataSet(md, rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + return metadata.NewOutgoingContext(ctx, md) +} + +// embeds client version +func withVersion(ctx context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { // no outgoing metadata ctx key, create one + md = metadata.Pairs(rpctypes.MetadataClientAPIVersionKey, version.APIVersion) + return metadata.NewOutgoingContext(ctx, md) + } + // overwrite/add version key/value + metadataSet(md, rpctypes.MetadataClientAPIVersionKey, version.APIVersion) + return metadata.NewOutgoingContext(ctx, md) +} + +func metadataGet(md metadata.MD, k string) []string { + k = strings.ToLower(k) + return md[k] +} + +func metadataSet(md metadata.MD, k string, vals ...string) { + if len(vals) == 0 { + return + } + k = strings.ToLower(k) + md[k] = vals +} diff --git a/clientv3/ctx_test.go b/clientv3/ctx_test.go new file mode 100644 index 000000000000..2bf8d3f4a304 --- /dev/null +++ b/clientv3/ctx_test.go @@ -0,0 +1,67 @@ +// Copyright 2020 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "context" + "reflect" + "testing" + + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/version" + "google.golang.org/grpc/metadata" +) + +func TestMetadataWithRequireLeader(t *testing.T) { + ctx := context.TODO() + md, ok := metadata.FromOutgoingContext(ctx) + if ok { + t.Fatal("expected no outgoing metadata ctx key") + } + + // add a conflicting key with some other value + md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, "invalid") + // add a key, and expect not be overwritten + metadataSet(md, "hello", "1", "2") + ctx = metadata.NewOutgoingContext(ctx, md) + + // expect overwrites but still keep other keys + ctx = WithRequireLeader(ctx) + md, ok = metadata.FromOutgoingContext(ctx) + if !ok { + t.Fatal("expected outgoing metadata ctx key") + } + if ss := metadataGet(md, rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) { + t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss) + } + if ss := metadataGet(md, "hello"); !reflect.DeepEqual(ss, []string{"1", "2"}) { + t.Fatalf("unexpected metadata for 'hello' %v", ss) + } +} + +func TestMetadataWithClientAPIVersion(t *testing.T) { + ctx := withVersion(WithRequireLeader(context.TODO())) + + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + t.Fatal("expected outgoing metadata ctx key") + } + if ss := metadataGet(md, rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) { + t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss) + } + if ss := metadataGet(md, rpctypes.MetadataClientAPIVersionKey); !reflect.DeepEqual(ss, []string{version.APIVersion}) { + t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataClientAPIVersionKey, ss) + } +} diff --git a/clientv3/retry_interceptor.go b/clientv3/retry_interceptor.go index aac679ecccb7..2c266e55bec0 100644 --- a/clientv3/retry_interceptor.go +++ b/clientv3/retry_interceptor.go @@ -38,6 +38,7 @@ import ( func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor { intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ctx = withVersion(ctx) grpcOpts, retryOpts := filterCallOptions(opts) callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) // short circuit for simplicity, and avoiding allocations. @@ -103,6 +104,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor { intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + ctx = withVersion(ctx) grpcOpts, retryOpts := filterCallOptions(opts) callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) // short circuit for simplicity, and avoiding allocations.