From 47cff4dfe5d10c3ae39bc591f772a5948827cbe8 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 17 Aug 2018 16:17:17 -0700 Subject: [PATCH 1/4] etcdserver/api/rafthttp: rename to "pipelineProber" Preliminary work to add prober to "streamRt" Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/transport.go | 14 +++++----- etcdserver/api/rafthttp/transport_test.go | 34 +++++++++++------------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/etcdserver/api/rafthttp/transport.go b/etcdserver/api/rafthttp/transport.go index e52acfceb93..78b15956520 100644 --- a/etcdserver/api/rafthttp/transport.go +++ b/etcdserver/api/rafthttp/transport.go @@ -130,7 +130,7 @@ type Transport struct { remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map - prober probing.Prober + pipelineProber probing.Prober } func (t *Transport) Start() error { @@ -145,7 +145,7 @@ func (t *Transport) Start() error { } t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) - t.prober = probing.NewProber(t.pipelineRt) + t.pipelineProber = probing.NewProber(t.pipelineRt) // If client didn't provide dial retry frequency, use the default // (100ms backoff between attempts to create a new stream), @@ -221,7 +221,7 @@ func (t *Transport) Stop() { for _, p := range t.peers { p.stop() } - t.prober.RemoveAll() + t.pipelineProber.RemoveAll() if tr, ok := t.streamRt.(*http.Transport); ok { tr.CloseIdleConnections() } @@ -317,7 +317,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { } fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) - addPeerToProber(t.Logger, t.prober, id.String(), us) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us) if t.Logger != nil { t.Logger.Info( @@ -358,7 +358,7 @@ func (t *Transport) removePeer(id types.ID) { } delete(t.peers, id) delete(t.LeaderStats.Followers, id.String()) - t.prober.Remove(id.String()) + t.pipelineProber.Remove(id.String()) if t.Logger != nil { t.Logger.Info( @@ -388,8 +388,8 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { } t.peers[id].update(urls) - t.prober.Remove(id.String()) - addPeerToProber(t.Logger, t.prober, id.String(), us) + t.pipelineProber.Remove(id.String()) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us) if t.Logger != nil { t.Logger.Info( diff --git a/etcdserver/api/rafthttp/transport_test.go b/etcdserver/api/rafthttp/transport_test.go index 9a58ce74fee..ef208ef8ed9 100644 --- a/etcdserver/api/rafthttp/transport_test.go +++ b/etcdserver/api/rafthttp/transport_test.go @@ -97,10 +97,10 @@ func TestTransportCutMend(t *testing.T) { func TestTransportAdd(t *testing.T) { ls := stats.NewLeaderStats("") tr := &Transport{ - LeaderStats: ls, - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: ls, + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) @@ -125,10 +125,10 @@ func TestTransportAdd(t *testing.T) { func TestTransportRemove(t *testing.T) { tr := &Transport{ - LeaderStats: stats.NewLeaderStats(""), - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: stats.NewLeaderStats(""), + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) tr.RemovePeer(types.ID(1)) @@ -142,8 +142,8 @@ func TestTransportRemove(t *testing.T) { func TestTransportUpdate(t *testing.T) { peer := newFakePeer() tr := &Transport{ - peers: map[types.ID]Peer{types.ID(1): peer}, - prober: probing.NewProber(nil), + peers: map[types.ID]Peer{types.ID(1): peer}, + pipelineProber: probing.NewProber(nil), } u := "http://localhost:2380" tr.UpdatePeer(types.ID(1), []string{u}) @@ -156,13 +156,13 @@ func TestTransportUpdate(t *testing.T) { func TestTransportErrorc(t *testing.T) { errorc := make(chan error, 1) tr := &Transport{ - Raft: &fakeRaft{}, - LeaderStats: stats.NewLeaderStats(""), - ErrorC: errorc, - streamRt: newRespRoundTripper(http.StatusForbidden, nil), - pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + Raft: &fakeRaft{}, + LeaderStats: stats.NewLeaderStats(""), + ErrorC: errorc, + streamRt: newRespRoundTripper(http.StatusForbidden, nil), + pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) defer tr.Stop() From 4a239070c8beaa0abcc86f7f301e9dd849f080b7 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 17 Aug 2018 16:22:00 -0700 Subject: [PATCH 2/4] etcdserver/api/rafthttp: display roundtripper name in warnings Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/probing_status.go | 16 ++++++++++++---- etcdserver/api/rafthttp/transport.go | 4 ++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/etcdserver/api/rafthttp/probing_status.go b/etcdserver/api/rafthttp/probing_status.go index a8199dfdfad..d97590730c2 100644 --- a/etcdserver/api/rafthttp/probing_status.go +++ b/etcdserver/api/rafthttp/probing_status.go @@ -17,10 +17,16 @@ package rafthttp import ( "time" + "github.com/prometheus/client_golang/prometheus" "github.com/xiang90/probing" "go.uber.org/zap" ) +const ( + // RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message. + RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT" +) + var ( // proberInterval must be shorter than read timeout. // Or the connection will time-out. @@ -29,7 +35,7 @@ var ( statusErrorInterval = 5 * time.Second ) -func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) { +func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { hus := make([]string, len(us)) for i := range us { hus[i] = us[i] + ProbingPrefix @@ -47,10 +53,10 @@ func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) { return } - go monitorProbingStatus(lg, s, id) + go monitorProbingStatus(lg, s, id, roundTripperName, rttSecProm) } -func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { +func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { // set the first interval short to log error early. interval := statusErrorInterval for { @@ -60,6 +66,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { if lg != nil { lg.Warn( "prober detected unhealthy status", + zap.String("round-tripper-name", roundTripperName), zap.String("remote-peer-id", id), zap.Duration("rtt", s.SRTT()), zap.Error(s.Err()), @@ -75,6 +82,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { if lg != nil { lg.Warn( "prober found high clock drift", + zap.String("round-tripper-name", roundTripperName), zap.String("remote-peer-id", id), zap.Duration("clock-drift", s.SRTT()), zap.Duration("rtt", s.ClockDiff()), @@ -84,7 +92,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) } } - rttSec.WithLabelValues(id).Observe(s.SRTT().Seconds()) + rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds()) case <-s.StopNotify(): return diff --git a/etcdserver/api/rafthttp/transport.go b/etcdserver/api/rafthttp/transport.go index 78b15956520..30642ced648 100644 --- a/etcdserver/api/rafthttp/transport.go +++ b/etcdserver/api/rafthttp/transport.go @@ -317,7 +317,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { } fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) - addPeerToProber(t.Logger, t.pipelineProber, id.String(), us) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) if t.Logger != nil { t.Logger.Info( @@ -389,7 +389,7 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { t.peers[id].update(urls) t.pipelineProber.Remove(id.String()) - addPeerToProber(t.Logger, t.pipelineProber, id.String(), us) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) if t.Logger != nil { t.Logger.Info( From 7b1ef370544f13a9e374c58cf91e0f5483ffb171 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Sun, 7 Oct 2018 03:28:54 -0700 Subject: [PATCH 3/4] etcdserver/api/rafthttp: probe all Raft messages' RTT This PR adds another probing routine to monitor the connection for Raft message transports. Previously, we only monitored snapshot transports. In our production cluster, we found one TCP connection had >8-sec latencies to a remote peer, but "etcd_network_peer_round_trip_time_seconds" metrics shows <1-sec latency distribution, which means etcd server was not sampling enough while such latency spikes happen outside of snapshot pipeline connection. Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/metrics.go | 2 +- etcdserver/api/rafthttp/probing_status.go | 3 +++ etcdserver/api/rafthttp/transport.go | 7 +++++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/etcdserver/api/rafthttp/metrics.go b/etcdserver/api/rafthttp/metrics.go index 5f862e9decc..ce51248d887 100644 --- a/etcdserver/api/rafthttp/metrics.go +++ b/etcdserver/api/rafthttp/metrics.go @@ -137,7 +137,7 @@ var ( Namespace: "etcd", Subsystem: "network", Name: "peer_round_trip_time_seconds", - Help: "Round-Trip-Time histogram between peers.", + Help: "Round-Trip-Time histogram between peers", // lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2 // highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec diff --git a/etcdserver/api/rafthttp/probing_status.go b/etcdserver/api/rafthttp/probing_status.go index d97590730c2..4d10ec87d06 100644 --- a/etcdserver/api/rafthttp/probing_status.go +++ b/etcdserver/api/rafthttp/probing_status.go @@ -23,6 +23,9 @@ import ( ) const ( + // RoundTripperNameRaftMessage is the name of round-tripper that sends + // all other Raft messages, other than "snap.Message". + RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE" // RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message. RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT" ) diff --git a/etcdserver/api/rafthttp/transport.go b/etcdserver/api/rafthttp/transport.go index 30642ced648..d6e55091c33 100644 --- a/etcdserver/api/rafthttp/transport.go +++ b/etcdserver/api/rafthttp/transport.go @@ -131,6 +131,7 @@ type Transport struct { peers map[types.ID]Peer // peers map pipelineProber probing.Prober + streamProber probing.Prober } func (t *Transport) Start() error { @@ -146,6 +147,7 @@ func (t *Transport) Start() error { t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) t.pipelineProber = probing.NewProber(t.pipelineRt) + t.streamProber = probing.NewProber(t.streamRt) // If client didn't provide dial retry frequency, use the default // (100ms backoff between attempts to create a new stream), @@ -222,6 +224,7 @@ func (t *Transport) Stop() { p.stop() } t.pipelineProber.RemoveAll() + t.streamProber.RemoveAll() if tr, ok := t.streamRt.(*http.Transport); ok { tr.CloseIdleConnections() } @@ -318,6 +321,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) + addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) if t.Logger != nil { t.Logger.Info( @@ -359,6 +363,7 @@ func (t *Transport) removePeer(id types.ID) { delete(t.peers, id) delete(t.LeaderStats.Followers, id.String()) t.pipelineProber.Remove(id.String()) + t.streamProber.Remove(id.String()) if t.Logger != nil { t.Logger.Info( @@ -390,6 +395,8 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { t.pipelineProber.Remove(id.String()) addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) + t.streamProber.Remove(id.String()) + addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) if t.Logger != nil { t.Logger.Info( From 884a8bd36b47238649c46fc835fcc2c421b71988 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Sun, 7 Oct 2018 03:32:05 -0700 Subject: [PATCH 4/4] etcdserver/api/rafthttp: configure "streamProber" in tests Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/transport_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/etcdserver/api/rafthttp/transport_test.go b/etcdserver/api/rafthttp/transport_test.go index ef208ef8ed9..7c8641fec4f 100644 --- a/etcdserver/api/rafthttp/transport_test.go +++ b/etcdserver/api/rafthttp/transport_test.go @@ -101,6 +101,7 @@ func TestTransportAdd(t *testing.T) { streamRt: &roundTripperRecorder{}, peers: make(map[types.ID]Peer), pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) @@ -129,6 +130,7 @@ func TestTransportRemove(t *testing.T) { streamRt: &roundTripperRecorder{}, peers: make(map[types.ID]Peer), pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) tr.RemovePeer(types.ID(1)) @@ -144,6 +146,7 @@ func TestTransportUpdate(t *testing.T) { tr := &Transport{ peers: map[types.ID]Peer{types.ID(1): peer}, pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } u := "http://localhost:2380" tr.UpdatePeer(types.ID(1), []string{u}) @@ -163,6 +166,7 @@ func TestTransportErrorc(t *testing.T) { pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), peers: make(map[types.ID]Peer), pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) defer tr.Stop()