diff --git a/raft/node.go b/raft/node.go index 7c5f329e45f..b4e2e535545 100644 --- a/raft/node.go +++ b/raft/node.go @@ -109,6 +109,19 @@ func (rd Ready) containsUpdates() bool { len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 } +// appliedCursor extracts from the Ready the highest index the client has +// applied (once the Ready is confirmed via Advance). If no information is +// contained in the Ready, returns zero. +func (rd Ready) appliedCursor() uint64 { + if n := len(rd.CommittedEntries); n > 0 { + return rd.CommittedEntries[n-1].Index + } + if index := rd.Snapshot.Metadata.Index; index > 0 { + return index + } + return 0 +} + // Node represents a node in a raft cluster. type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election @@ -282,6 +295,7 @@ func (n *node) run(r *raft) { var prevLastUnstablei, prevLastUnstablet uint64 var havePrevLastUnstablei bool var prevSnapi uint64 + var applyingToI uint64 var rd Ready lead := None @@ -381,13 +395,17 @@ func (n *node) run(r *raft) { if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Metadata.Index } + if index := rd.appliedCursor(); index != 0 { + applyingToI = index + } r.msgs = nil r.readStates = nil advancec = n.advancec case <-advancec: - if prevHardSt.Commit != 0 { - r.raftLog.appliedTo(prevHardSt.Commit) + if applyingToI != 0 { + r.raftLog.appliedTo(applyingToI) + applyingToI = 0 } if havePrevLastUnstablei { r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) @@ -559,15 +577,6 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { } if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { rd.HardState = hardSt - // If we hit a size limit when loadaing CommittedEntries, clamp - // our HardState.Commit to what we're actually returning. This is - // also used as our cursor to resume for the next Ready batch. - if len(rd.CommittedEntries) > 0 { - lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1] - if rd.HardState.Commit > lastCommit.Index { - rd.HardState.Commit = lastCommit.Index - } - } } if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot diff --git a/raft/node_test.go b/raft/node_test.go index 1a6501cb8eb..b067aaad4d1 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -17,6 +17,8 @@ package raft import ( "bytes" "context" + "fmt" + "math" "reflect" "strings" "testing" @@ -926,3 +928,72 @@ func TestCommitPagination(t *testing.T) { s.Append(rd.Entries) n.Advance() } + +type ignoreSizeHintMemStorage struct { + *MemoryStorage +} + +func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) { + return s.MemoryStorage.Entries(lo, hi, math.MaxUint64) +} + +// TestNodeCommitPaginationAfterRestart regression tests a scenario in which the +// Storage's Entries size limitation is slightly more permissive than Raft's +// internal one. The original bug was the following: +// +// - node learns that index 11 (or 100, doesn't matter) is committed +// - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However, +// index 10 already exceeds maxBytes, due to a user-provided impl of Entries. +// - Commit index gets bumped to 10 +// - the node persists the HardState, but crashes before applying the entries +// - upon restart, the storage returns the same entries, but `slice` takes a different code path +// (since it is now called with an upper bound of 10) and removes the last entry. +// - Raft emits a HardState with a regressing commit index. +// +// A simpler version of this test would have the storage return a lot less entries than dictated +// by maxSize (for example, exactly one entry) after the restart, resulting in a larger regression. +// This wouldn't need to exploit anything about Raft-internal code paths to fail. +func TestNodeCommitPaginationAfterRestart(t *testing.T) { + s := &ignoreSizeHintMemStorage{ + MemoryStorage: NewMemoryStorage(), + } + persistedHardState := raftpb.HardState{ + Term: 1, + Vote: 1, + Commit: 10, + } + + s.hardState = persistedHardState + s.ents = make([]raftpb.Entry, 10) + var size uint64 + for i := range s.ents { + ent := raftpb.Entry{ + Term: 1, + Index: uint64(i + 1), + Type: raftpb.EntryNormal, + Data: []byte("a"), + } + + s.ents[i] = ent + size += uint64(ent.Size()) + } + + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + // Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should + // not be included in the initial rd.CommittedEntries. However, our storage will ignore + // this and *will* return it (which is how the Commit index ended up being 10 initially). + cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 + + r := newRaft(cfg) + n := newNode() + go n.run(r) + defer n.Stop() + + rd := readyWithTimeout(&n) + if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit { + t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n%+v", + persistedHardState.Commit, rd.HardState.Commit, + DescribeEntries(rd.CommittedEntries, func(data []byte) string { return fmt.Sprintf("%q", data) }), + ) + } +} diff --git a/raft/rawnode.go b/raft/rawnode.go index a4cecfc8f7f..5f8a116dd63 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -47,18 +47,15 @@ func (rn *RawNode) commitReady(rd Ready) { if !IsEmptyHardState(rd.HardState) { rn.prevHardSt = rd.HardState } - if rn.prevHardSt.Commit != 0 { - // In most cases, prevHardSt and rd.HardState will be the same - // because when there are new entries to apply we just sent a - // HardState with an updated Commit value. However, on initial - // startup the two are different because we don't send a HardState - // until something changes, but we do send any un-applied but - // committed entries (and previously-committed entries may be - // incorporated into the snapshot, even if rd.CommittedEntries is - // empty). Therefore we mark all committed entries as applied - // whether they were included in rd.HardState or not. - rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit) + + // If entries were applied (or a snapshot), update our cursor for + // the next Ready. Note that if the current HardState contains a + // new Commit index, this does not mean that we're also applying + // all of the new entries due to commit pagination by size. + if index := rd.appliedCursor(); index > 0 { + rn.raft.raftLog.appliedTo(index) } + if len(rd.Entries) > 0 { e := rd.Entries[len(rd.Entries)-1] rn.raft.raftLog.stableTo(e.Index, e.Term) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 8b8ccf5d3cb..7e3841695cc 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -401,3 +401,82 @@ func TestRawNodeStatus(t *testing.T) { t.Errorf("expected status struct, got nil") } } + +// TestRawNodeCommitPaginationAfterRestart is the RawNode version of +// TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the +// Raft group would forget to apply entries: +// +// - node learns that index 11 is committed +// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already +// exceeds maxBytes), which isn't noticed internally by Raft +// - Commit index gets bumped to 10 +// - the node persists the HardState, but crashes before applying the entries +// - upon restart, the storage returns the same entries, but `slice` takes a +// different code path and removes the last entry. +// - Raft does not emit a HardState, but when the app calls Advance(), it bumps +// its internal applied index cursor to 10 (when it should be 9) +// - the next Ready asks the app to apply index 11 (omitting index 10), losing a +// write. +func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { + s := &ignoreSizeHintMemStorage{ + MemoryStorage: NewMemoryStorage(), + } + persistedHardState := raftpb.HardState{ + Term: 1, + Vote: 1, + Commit: 10, + } + + s.hardState = persistedHardState + s.ents = make([]raftpb.Entry, 10) + var size uint64 + for i := range s.ents { + ent := raftpb.Entry{ + Term: 1, + Index: uint64(i + 1), + Type: raftpb.EntryNormal, + Data: []byte("a"), + } + + s.ents[i] = ent + size += uint64(ent.Size()) + } + + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + // Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should + // not be included in the initial rd.CommittedEntries. However, our storage will ignore + // this and *will* return it (which is how the Commit index ended up being 10 initially). + cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 + + s.ents = append(s.ents, raftpb.Entry{ + Term: 1, + Index: uint64(11), + Type: raftpb.EntryNormal, + Data: []byte("boom"), + }) + + rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}}) + if err != nil { + t.Fatal(err) + } + + for highestApplied := uint64(0); highestApplied != 11; { + rd := rawNode.Ready() + n := len(rd.CommittedEntries) + if n == 0 { + t.Fatalf("stopped applying entries at index %d", highestApplied) + } + if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next { + t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied) + } + highestApplied = rd.CommittedEntries[n-1].Index + rawNode.Advance(rd) + rawNode.Step(raftpb.Message{ + Type: raftpb.MsgHeartbeat, + To: 1, + From: 1, // illegal, but we get away with it + Term: 1, + Commit: 11, + }) + } +} diff --git a/raft/util.go b/raft/util.go index d744927c4a7..1a7a1e9ac3a 100644 --- a/raft/util.go +++ b/raft/util.go @@ -113,6 +113,16 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string { return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted) } +// DescribeEntries calls DescribeEntry for each Entry, adding a newline to +// each. +func DescribeEntries(ents []pb.Entry, f EntryFormatter) string { + var buf bytes.Buffer + for _, e := range ents { + _, _ = buf.WriteString(DescribeEntry(e, f) + "\n") + } + return buf.String() +} + func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry { if len(ents) == 0 { return ents