Skip to content

Commit

Permalink
addrConn: resetTransport cleanup (#2673)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl authored Mar 14, 2019
1 parent ff28255 commit 3958fc8
Showing 1 changed file with 22 additions and 60 deletions.
82 changes: 22 additions & 60 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,8 +921,6 @@ type addrConn struct {
// Use updateConnectivityState for updating addrConn's connectivity state.
state connectivity.State

tearDownErr error // The reason this addrConn is torn down.

backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}

Expand Down Expand Up @@ -963,28 +961,30 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {

func (ac *addrConn) resetTransport() {
for i := 0; ; i++ {
tryNextAddrFromStart := grpcsync.NewEvent()

ac.mu.Lock()
if i > 0 {
ac.cc.resolveNow(resolver.ResolveNowOption{})
}

ac.mu.Lock()
addrs := ac.addrs
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)

// This will be the duration that dial gets to finish.
dialDuration := getMinConnectTimeout()
if dialDuration < backoffFor {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
// We can potentially spend all the time trying the first address, and
// if the server accepts the connection and then hangs, the following
// addresses will never be tried.
//
// The spec doesn't mention what should be done for multiple addresses.
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
connectDeadline := time.Now().Add(dialDuration)
ac.mu.Unlock()

addrLoop:
for _, addr := range addrs {
ac.mu.Lock()

if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
Expand All @@ -996,17 +996,10 @@ func (ac *addrConn) resetTransport() {
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()

if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}

copts := ac.dopts.copts
if ac.scopts.CredsBundle != nil {
copts.CredsBundle = ac.scopts.CredsBundle
}
hctx, hcancel := context.WithCancel(ac.ctx)
defer hcancel()
ac.mu.Unlock()

if channelz.IsOn() {
Expand All @@ -1017,25 +1010,22 @@ func (ac *addrConn) resetTransport() {
}

reconnect := grpcsync.NewEvent()
prefaceReceived := make(chan struct{})
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect)
if err != nil {
ac.cc.blockingpicker.updateConnectionError(err)
hcancel()
if err == errConnClosing {
return
}

if tryNextAddrFromStart.HasFired() {
break addrLoop
}
continue
}

backoffFor = 0
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
newTr.Close()
ac.mu.Unlock()
return
}
ac.curAddr = addr
ac.transport = newTr
ac.backoffIdx = 0
ac.mu.Unlock()

healthCheckConfig := ac.cc.healthCheckConfig()
Expand All @@ -1044,6 +1034,7 @@ func (ac *addrConn) resetTransport() {
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
hctx, hcancel := context.WithCancel(ac.ctx)
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
Expand All @@ -1061,6 +1052,8 @@ func (ac *addrConn) resetTransport() {
ac.mu.Unlock()
}

// Block until the created transport is down. And when this happens,
// we restart from the top of the addr list.
<-reconnect.Done()
hcancel()

Expand All @@ -1070,10 +1063,7 @@ func (ac *addrConn) resetTransport() {
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
break
}

// After exhausting all addresses, or after need to reconnect after a
Expand All @@ -1088,7 +1078,6 @@ func (ac *addrConn) resetTransport() {
// Backoff.
b := ac.resetBackoff
timer := time.NewTimer(backoffFor)
acctx := ac.ctx
ac.mu.Unlock()

select {
Expand All @@ -1098,7 +1087,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Unlock()
case <-b:
timer.Stop()
case <-acctx.Done():
case <-ac.ctx.Done():
timer.Stop()
return
}
Expand All @@ -1110,7 +1099,8 @@ func (ac *addrConn) resetTransport() {
// unable to successfully create a transport.
//
// If waitForHandshake is enabled, it blocks until server preface arrives.
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event) (transport.ClientTransport, error) {
prefaceReceived := make(chan struct{})
onCloseCalled := make(chan struct{})

target := transport.TargetInfo{
Expand Down Expand Up @@ -1144,14 +1134,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
if err != nil {
// newTr is either nil, or closed.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()

return nil, errConnClosing
}
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
return nil, err
}
Expand All @@ -1171,25 +1153,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
}

// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
newTr.Close()
return nil, errConnClosing
}
ac.mu.Unlock()

// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
newTr.Close()
return nil, errConnClosing
}
ac.mu.Unlock()

return newTr, nil
}

Expand Down Expand Up @@ -1278,7 +1241,6 @@ func (ac *addrConn) tearDown(err error) {
// between setting the state and logic that waits on context cancelation / etc.
ac.updateConnectivityState(connectivity.Shutdown)
ac.cancel()
ac.tearDownErr = err
ac.curAddr = resolver.Address{}
if err == errConnDrain && curTr != nil {
// GracefulClose(...) may be executed multiple times when
Expand Down

0 comments on commit 3958fc8

Please sign in to comment.