Skip to content

Commit

Permalink
Merge pull request #10155 from gyuho/metrics-messages
Browse files Browse the repository at this point in the history
rafthttp: probe all raft transports
  • Loading branch information
gyuho authored Oct 9, 2018
2 parents ba606bf + 884a8bd commit d2a0f17
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 29 deletions.
2 changes: 1 addition & 1 deletion etcdserver/api/rafthttp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ var (
Namespace: "etcd",
Subsystem: "network",
Name: "peer_round_trip_time_seconds",
Help: "Round-Trip-Time histogram between peers.",
Help: "Round-Trip-Time histogram between peers",

// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
// highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec
Expand Down
19 changes: 15 additions & 4 deletions etcdserver/api/rafthttp/probing_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@ package rafthttp
import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/xiang90/probing"
"go.uber.org/zap"
)

const (
// RoundTripperNameRaftMessage is the name of round-tripper that sends
// all other Raft messages, other than "snap.Message".
RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
)

var (
// proberInterval must be shorter than read timeout.
// Or the connection will time-out.
Expand All @@ -29,7 +38,7 @@ var (
statusErrorInterval = 5 * time.Second
)

func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) {
func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
hus := make([]string, len(us))
for i := range us {
hus[i] = us[i] + ProbingPrefix
Expand All @@ -47,10 +56,10 @@ func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) {
return
}

go monitorProbingStatus(lg, s, id)
go monitorProbingStatus(lg, s, id, roundTripperName, rttSecProm)
}

func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) {
func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
// set the first interval short to log error early.
interval := statusErrorInterval
for {
Expand All @@ -60,6 +69,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) {
if lg != nil {
lg.Warn(
"prober detected unhealthy status",
zap.String("round-tripper-name", roundTripperName),
zap.String("remote-peer-id", id),
zap.Duration("rtt", s.SRTT()),
zap.Error(s.Err()),
Expand All @@ -75,6 +85,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) {
if lg != nil {
lg.Warn(
"prober found high clock drift",
zap.String("round-tripper-name", roundTripperName),
zap.String("remote-peer-id", id),
zap.Duration("clock-drift", s.SRTT()),
zap.Duration("rtt", s.ClockDiff()),
Expand All @@ -84,7 +95,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
}
}
rttSec.WithLabelValues(id).Observe(s.SRTT().Seconds())
rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())

case <-s.StopNotify():
return
Expand Down
21 changes: 14 additions & 7 deletions etcdserver/api/rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ type Transport struct {
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map

prober probing.Prober
pipelineProber probing.Prober
streamProber probing.Prober
}

func (t *Transport) Start() error {
Expand All @@ -145,7 +146,8 @@ func (t *Transport) Start() error {
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)
t.pipelineProber = probing.NewProber(t.pipelineRt)
t.streamProber = probing.NewProber(t.streamRt)

// If client didn't provide dial retry frequency, use the default
// (100ms backoff between attempts to create a new stream),
Expand Down Expand Up @@ -221,7 +223,8 @@ func (t *Transport) Stop() {
for _, p := range t.peers {
p.stop()
}
t.prober.RemoveAll()
t.pipelineProber.RemoveAll()
t.streamProber.RemoveAll()
if tr, ok := t.streamRt.(*http.Transport); ok {
tr.CloseIdleConnections()
}
Expand Down Expand Up @@ -317,7 +320,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, id, fs)
addPeerToProber(t.Logger, t.prober, id.String(), us)
addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)

if t.Logger != nil {
t.Logger.Info(
Expand Down Expand Up @@ -358,7 +362,8 @@ func (t *Transport) removePeer(id types.ID) {
}
delete(t.peers, id)
delete(t.LeaderStats.Followers, id.String())
t.prober.Remove(id.String())
t.pipelineProber.Remove(id.String())
t.streamProber.Remove(id.String())

if t.Logger != nil {
t.Logger.Info(
Expand Down Expand Up @@ -388,8 +393,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
}
t.peers[id].update(urls)

t.prober.Remove(id.String())
addPeerToProber(t.Logger, t.prober, id.String(), us)
t.pipelineProber.Remove(id.String())
addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
t.streamProber.Remove(id.String())
addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)

if t.Logger != nil {
t.Logger.Info(
Expand Down
38 changes: 21 additions & 17 deletions etcdserver/api/rafthttp/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,11 @@ func TestTransportCutMend(t *testing.T) {
func TestTransportAdd(t *testing.T) {
ls := stats.NewLeaderStats("")
tr := &Transport{
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})

Expand All @@ -125,10 +126,11 @@ func TestTransportAdd(t *testing.T) {

func TestTransportRemove(t *testing.T) {
tr := &Transport{
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
tr.RemovePeer(types.ID(1))
Expand All @@ -142,8 +144,9 @@ func TestTransportRemove(t *testing.T) {
func TestTransportUpdate(t *testing.T) {
peer := newFakePeer()
tr := &Transport{
peers: map[types.ID]Peer{types.ID(1): peer},
prober: probing.NewProber(nil),
peers: map[types.ID]Peer{types.ID(1): peer},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
u := "http://localhost:2380"
tr.UpdatePeer(types.ID(1), []string{u})
Expand All @@ -156,13 +159,14 @@ func TestTransportUpdate(t *testing.T) {
func TestTransportErrorc(t *testing.T) {
errorc := make(chan error, 1)
tr := &Transport{
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
defer tr.Stop()
Expand Down

0 comments on commit d2a0f17

Please sign in to comment.