Skip to content

Commit

Permalink
Update picker less frequently
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 9, 2024
1 parent 34da793 commit f0796a7
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 4 deletions.
13 changes: 9 additions & 4 deletions balancer/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type pickfirstBalancer struct {
subConns *resolver.AddressMap // scData for active subonns mapped by address.
addressList addressList
firstPass bool
numTF int
}

func (b *pickfirstBalancer) ResolverError(err error) {
Expand Down Expand Up @@ -529,11 +530,14 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
// We have finished the first pass, keep re-connecting failing subconns.
switch newState.ConnectivityState {
case connectivity.TransientFailure:
b.numTF++
sd.lastErr = newState.ConnectionError
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: newState.ConnectionError},
})
if b.numTF%b.subConns.Len() == 0 {
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: newState.ConnectionError},
})
}
// We don't need to request re-resolution since the subconn already does
// that before reporting TRANSIENT_FAILURE.
// TODO: #7534 - Move re-resolution requests from subconn into pick_first.
Expand All @@ -545,6 +549,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
// Only executed in the context of a serializer callback.
func (b *pickfirstBalancer) endFirstPass(lastErr error) {
b.firstPass = false
b.numTF = 0
b.state = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
Expand Down
74 changes: 74 additions & 0 deletions balancer/pickfirstleaf/pickfirstleaf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@
package pickfirstleaf

import (
"fmt"
"testing"
"time"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
)

const (
// Default timeout for tests in this package.
defaultTestTimeout = 10 * time.Second

Check failure on line 36 in balancer/pickfirstleaf/pickfirstleaf_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

const defaultTestTimeout is unused (U1000)
// Default short timeout, to be used when waiting for events which are not
// expected to happen.
defaultTestShortTimeout = 100 * time.Millisecond
)

type s struct {
grpctest.Tester
}
Expand Down Expand Up @@ -191,3 +204,64 @@ func (s) TestAddressList_SeekTo(t *testing.T) {
t.Errorf("addressList.increment() = %t, want %t", got, want)
}
}

// TestPickFirstLeaf_TFPickerUpdate sends TRANSIENT_FAILURE subconn state updates
// for each subconn managed by a pickfirst balancer. It verifies that the picker
// is updated with the expected frequency.
func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
cc := testutils.NewBalancerClientConn(t)
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
defer bal.Close()
bal.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}},
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
},
},
})

// PF should report TRANSIENT_FAILURE only once all the sunbconns have failed
// once.
tfErr := fmt.Errorf("test err: connection refused")
sc0 := <-cc.NewSubConnCh
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: tfErr})

<-time.After(defaultTestShortTimeout)
p := <-cc.NewPickerCh
_, err := p.Pick(balancer.PickInfo{})
if want, got := balancer.ErrNoSubConnAvailable, err; got != want {
t.Fatalf("picker.Pick() = %v, want %v", got, want)
}

sc1 := <-cc.NewSubConnCh
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: tfErr})

<-time.After(defaultTestShortTimeout)
p = <-cc.NewPickerCh
_, err = p.Pick(balancer.PickInfo{})
if want, got := tfErr, err; got != want {
t.Fatalf("picker.Pick() = %v, want %v", got, want)
}

// Subsequent TRANSIENT_FAILUREs should be reported only after seeing "# of subconns"
// TRANSIENT_FAILUREs.
newTfErr := fmt.Errorf("test err: unreachable")
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: newTfErr})
select {
case <-time.After(defaultTestShortTimeout):
case p = <-cc.NewPickerCh:
sc, err := p.Pick(balancer.PickInfo{})
t.Fatalf("unexpected picker update: %v, %v", sc, err)
}

sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: newTfErr})
<-time.After(defaultTestShortTimeout)
p = <-cc.NewPickerCh
_, err = p.Pick(balancer.PickInfo{})
if want, got := newTfErr, err; got != want {
t.Fatalf("picker.Pick() = %v, want %v", got, want)
}
}

0 comments on commit f0796a7

Please sign in to comment.