diff --git a/internal/envconfig/envconfig.go b/internal/envconfig/envconfig.go index 5ba9d94d49c2..80fd5c7d2a4f 100644 --- a/internal/envconfig/envconfig.go +++ b/internal/envconfig/envconfig.go @@ -36,6 +36,10 @@ var ( // "GRPC_RING_HASH_CAP". This does not override the default bounds // checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M). RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024) + // PickFirstLBConfig is set if we should support configuration of the + // pick_first LB policy, which can be enabled by setting the environment + // variable "GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG" to "true". + PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", false) ) func boolFromEnv(envVar string, def bool) bool { diff --git a/pickfirst.go b/pickfirst.go index 611bef7995cd..abe266b021d2 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/serviceconfig" ) @@ -112,7 +113,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState b.cfg = cfg } - if b.cfg != nil && b.cfg.ShuffleAddressList { + if envconfig.PickFirstLBConfig && b.cfg != nil && b.cfg.ShuffleAddressList { grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] }) } if b.subConn != nil { diff --git a/test/pickfirst_test.go b/test/pickfirst_test.go index 62310d4d330e..7f36ed400c8b 100644 --- a/test/pickfirst_test.go +++ b/test/pickfirst_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" @@ -382,6 +383,8 @@ func (s) TestPickFirst_StickyTransientFailure(t *testing.T) { } func (s) TestPickFirst_ShuffleAddressList(t *testing.T) { + defer func(old bool) { envconfig.PickFirstLBConfig = old }(envconfig.PickFirstLBConfig) + envconfig.PickFirstLBConfig = true const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}` // Install a shuffler that always reverses two entries. @@ -431,3 +434,58 @@ func (s) TestPickFirst_ShuffleAddressList(t *testing.T) { t.Fatal(err) } } + +func (s) TestPickFirst_ShuffleAddressListDisabled(t *testing.T) { + defer func(old bool) { envconfig.PickFirstLBConfig = old }(envconfig.PickFirstLBConfig) + envconfig.PickFirstLBConfig = false + const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}` + + // Install a shuffler that always reverses two entries. + origShuf := grpcrand.Shuffle + defer func() { grpcrand.Shuffle = origShuf }() + grpcrand.Shuffle = func(n int, f func(int, int)) { + if n != 2 { + t.Errorf("Shuffle called with n=%v; want 2", n) + } + f(0, 1) // reverse the two addresses + } + + // Set up our backends. + cc, r, backends := setupPickFirst(t, 2) + addrs := stubBackendsToResolverAddrs(backends) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Push an update with both addresses and shuffling disabled. We should + // connect to backend 0. + r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}}) + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + // Send a config with shuffling enabled. This will reverse the addresses, + // but the channel should still be connected to backend 0. + shufState := resolver.State{ + ServiceConfig: parseServiceConfig(t, r, serviceConfig), + Addresses: []resolver.Address{addrs[0], addrs[1]}, + } + r.UpdateState(shufState) + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + // Send a resolver update with no addresses. This should push the channel + // into TransientFailure. + r.UpdateState(resolver.State{}) + awaitState(ctx, t, cc, connectivity.TransientFailure) + + // Send the same config as last time with shuffling enabled. Since we are + // not connected to backend 0, we should connect to backend 1 if shuffling + // is supported. However with it disabled at the start of the test, we + // will connect to backend 0 instead. + r.UpdateState(shufState) + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } +} diff --git a/xds/internal/xdsclient/xdslbregistry/converter/converter.go b/xds/internal/xdsclient/xdslbregistry/converter/converter.go index 63323f5c2a04..6bafa647794b 100644 --- a/xds/internal/xdsclient/xdslbregistry/converter/converter.go +++ b/xds/internal/xdsclient/xdslbregistry/converter/converter.go @@ -97,6 +97,9 @@ type pfConfig struct { } func convertPickFirstProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) { + if !envconfig.PickFirstLBConfig { + return nil, nil + } pfProto := &v3pickfirstpb.PickFirst{} if err := proto.Unmarshal(rawProto, pfProto); err != nil { return nil, fmt.Errorf("failed to unmarshal resource: %v", err) diff --git a/xds/internal/xdsclient/xdslbregistry/xdslbregistry_test.go b/xds/internal/xdsclient/xdslbregistry/xdslbregistry_test.go index c14e2fcb715d..9a7f5be53108 100644 --- a/xds/internal/xdsclient/xdslbregistry/xdslbregistry_test.go +++ b/xds/internal/xdsclient/xdslbregistry/xdslbregistry_test.go @@ -86,6 +86,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) { policy *v3clusterpb.LoadBalancingPolicy wantConfig string // JSON config rhDisabled bool + pfDisabled bool }{ { name: "ring_hash", @@ -177,6 +178,27 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) { wantConfig: `[{"round_robin": {}}]`, rhDisabled: true, }, + { + name: "pick_first_disabled_pf_rr_use_first_supported", + policy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: testutils.MarshalAny(&v3pickfirstpb.PickFirst{ + ShuffleAddressList: true, + }), + }, + }, + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: testutils.MarshalAny(&v3roundrobinpb.RoundRobin{}), + }, + }, + }, + }, + wantConfig: `[{"round_robin": {}}]`, + pfDisabled: true, + }, { name: "custom_lb_type_v3_struct", policy: &v3clusterpb.LoadBalancingPolicy{ @@ -268,11 +290,12 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { if test.rhDisabled { - oldRingHashSupport := envconfig.XDSRingHash + defer func(old bool) { envconfig.XDSRingHash = old }(envconfig.XDSRingHash) envconfig.XDSRingHash = false - defer func() { - envconfig.XDSRingHash = oldRingHashSupport - }() + } + if !test.pfDisabled { + defer func(old bool) { envconfig.PickFirstLBConfig = old }(envconfig.PickFirstLBConfig) + envconfig.PickFirstLBConfig = true } rawJSON, err := xdslbregistry.ConvertToServiceConfig(test.policy, 0) if err != nil {