From 0b198cf121e5b32d9fb44bc4978d5d5750d28152 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 14 Apr 2023 16:11:09 +0100 Subject: [PATCH 1/4] raft: fix out-of-order terms This commit adds a couple invariant checks which were silently ignored and resulted in incorrectly setup tests passing, and incorrect messages exchanged. Property 1: for a MsgApp message m, m.Entries[i].Term >= m.LogTerm. Or, more generally, m.Entries is a slice of contiguous entries with a non-decreasing Term, and m.Index+1 == m.Entries[0].Index && m.LogTerm <= m.Entries[0].Term. This property was broken in a few tests. The root cause was that leader appends out-of-order entries to its own log when it becomeLeader(). This was a result of incorrectly set up tests: they restored to a snapshot at (index=11,term=11), but became leader at an earlier term 1. Then it was sending the following, obviously incorrect, MsgApp: {Index=11,LogTerm=11,Entries={{Index=12,Term=1}}. The fix in tests is either going through a follower state at the correct term, by calling becomeFollower(term, ...), or initializing from a correct HardState in storage. Property 2: For a MsgSnap message m, m.Term >= m.Snapshot.Metadata.Term, because the leader doesn't know of any higher term than its own, and hence can't send a message with such an inversion. This was broken in TestRestoreFromSnapMsg, and is now fixed. Signed-off-by: Pavel Kalinnikov --- log.go | 10 ++++++-- raft.go | 15 ++++++++++- raft_snap_test.go | 22 ++++++++-------- raft_test.go | 65 +++++++++++++++++------------------------------ 4 files changed, 58 insertions(+), 54 deletions(-) diff --git a/log.go b/log.go index db22740b..5fd5e1bb 100644 --- a/log.go +++ b/log.go @@ -371,11 +371,17 @@ func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() } func (l *raftLog) lastTerm() uint64 { - t, err := l.term(l.lastIndex()) + _, term := l.tip() + return term +} + +func (l *raftLog) tip() (index, term uint64) { + index = l.lastIndex() + t, err := l.term(index) if err != nil { l.logger.Panicf("unexpected error when getting the last term (%v)", err) } - return t + return index, t } func (l *raftLog) term(i uint64) (uint64, error) { diff --git a/raft.go b/raft.go index d1048294..9ef30eb0 100644 --- a/raft.go +++ b/raft.go @@ -757,7 +757,10 @@ func (r *raft) reset(term uint64) { } func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { - li := r.raftLog.lastIndex() + li, lt := r.raftLog.tip() + if r.Term < lt { + r.logger.Panicf("%x appending out-of-order term: %d < %d", r.id, r.Term, lt) + } for i := range es { es[i].Term = r.Term es[i].Index = li + 1 + uint64(i) @@ -1727,6 +1730,16 @@ func (r *raft) restore(s pb.Snapshot) bool { return false } + // Another defense-in-depth: the follower is seeing a snapshot at a bigger + // term, but hasn't updated its own term. + if s.Metadata.Term > r.Term { + r.logger.Warningf("%x attempted to restore snapshot at term %d while being at earlier term %d; "+ + "should transition to follower at a larger term first", + r.id, s.Metadata.Term, r.Term) + r.becomeFollower(s.Metadata.Term, None) + return false + } + // More defense-in-depth: throw away snapshot if recipient is not in the // config. This shouldn't ever happen (at the time of writing) but lots of // code here and there assumes that r.id is in the progress tracker. diff --git a/raft_snap_test.go b/raft_snap_test.go index 6585bd24..77b1f96e 100644 --- a/raft_snap_test.go +++ b/raft_snap_test.go @@ -17,6 +17,8 @@ package raft import ( "testing" + "github.com/stretchr/testify/require" + pb "go.etcd.io/raft/v3/raftpb" ) @@ -33,8 +35,8 @@ var ( func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { storage := newTestMemoryStorage(withPeers(1)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) - + sm.becomeFollower(testingSnap.Metadata.Term, 0) + require.True(t, sm.restore(testingSnap)) sm.becomeCandidate() sm.becomeLeader() @@ -51,8 +53,8 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { func TestPendingSnapshotPauseReplication(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) - + sm.becomeFollower(testingSnap.Metadata.Term, 0) + require.True(t, sm.restore(testingSnap)) sm.becomeCandidate() sm.becomeLeader() @@ -68,8 +70,8 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { func TestSnapshotFailure(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) - + sm.becomeFollower(testingSnap.Metadata.Term, 0) + require.True(t, sm.restore(testingSnap)) sm.becomeCandidate() sm.becomeLeader() @@ -91,8 +93,8 @@ func TestSnapshotFailure(t *testing.T) { func TestSnapshotSucceed(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) - + sm.becomeFollower(testingSnap.Metadata.Term, 0) + require.True(t, sm.restore(testingSnap)) sm.becomeCandidate() sm.becomeLeader() @@ -114,8 +116,8 @@ func TestSnapshotSucceed(t *testing.T) { func TestSnapshotAbort(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) - + sm.becomeFollower(testingSnap.Metadata.Term, 0) + require.True(t, sm.restore(testingSnap)) sm.becomeCandidate() sm.becomeLeader() diff --git a/raft_test.go b/raft_test.go index 5637c4a4..a9ba7eb1 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1366,11 +1366,11 @@ func TestHandleHeartbeat(t *testing.T) { // TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response. func TestHandleHeartbeatResp(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) + storage.SetHardState(pb.HardState{Term: 3, Vote: 1, Commit: 3}) storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) sm := newTestRaft(1, 5, 1, storage) sm.becomeCandidate() sm.becomeLeader() - sm.raftLog.commitTo(sm.raftLog.lastIndex()) // A heartbeat response from a node that is behind; re-send MsgApp sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) @@ -2916,9 +2916,8 @@ func TestRestore(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - if ok := sm.restore(s); !ok { - t.Fatal("restore fail, want succeed") - } + sm.becomeFollower(s.Metadata.Term, 0) + require.True(t, sm.restore(s)) if sm.raftLog.lastIndex() != s.Metadata.Index { t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index) @@ -2955,9 +2954,8 @@ func TestRestoreWithLearner(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3)) sm := newTestLearnerRaft(3, 8, 2, storage) - if ok := sm.restore(s); !ok { - t.Error("restore fail, want succeed") - } + sm.becomeFollower(s.Metadata.Term, 0) + require.True(t, sm.restore(s)) if sm.raftLog.lastIndex() != s.Metadata.Index { t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index) @@ -3001,9 +2999,8 @@ func TestRestoreWithVotersOutgoing(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - if ok := sm.restore(s); !ok { - t.Fatal("restore fail, want succeed") - } + sm.becomeFollower(s.Metadata.Term, 0) + require.True(t, sm.restore(s)) if sm.raftLog.lastIndex() != s.Metadata.Index { t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index) @@ -3047,13 +3044,9 @@ func TestRestoreVoterToLearner(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2, 3)) sm := newTestRaft(3, 10, 1, storage) - - if sm.isLearner { - t.Errorf("%x is learner, want not", sm.id) - } - if ok := sm.restore(s); !ok { - t.Error("restore failed unexpectedly") - } + sm.becomeFollower(s.Metadata.Term, 0) + require.True(t, !sm.isLearner) + require.True(t, sm.restore(s)) } // TestRestoreLearnerPromotion checks that a learner can become to a follower after @@ -3069,18 +3062,10 @@ func TestRestoreLearnerPromotion(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3)) sm := newTestLearnerRaft(3, 10, 1, storage) - - if !sm.isLearner { - t.Errorf("%x is not learner, want yes", sm.id) - } - - if ok := sm.restore(s); !ok { - t.Error("restore fail, want succeed") - } - - if sm.isLearner { - t.Errorf("%x is learner, want not", sm.id) - } + require.True(t, sm.isLearner) + sm.becomeFollower(s.Metadata.Term, 0) + require.True(t, sm.restore(s)) + require.True(t, !sm.isLearner) } // TestLearnerReceiveSnapshot tests that a learner can receive a snpahost from leader @@ -3096,13 +3081,13 @@ func TestLearnerReceiveSnapshot(t *testing.T) { store := newTestMemoryStorage(withPeers(1), withLearners(2)) n1 := newTestLearnerRaft(1, 10, 1, store) - n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) - - n1.restore(s) + n1.becomeFollower(s.Metadata.Term, 0) + require.True(t, n1.restore(s)) snap := n1.raftLog.nextUnstableSnapshot() store.ApplySnapshot(*snap) n1.appliedSnap(snap) + n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) nt := newNetwork(n1, n2) setRandomizedElectionTimeout(n1, n1.electionTimeout) @@ -3132,6 +3117,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { ConfState: pb.ConfState{Voters: []uint64{1, 2}}, }, } + sm.becomeFollower(s.Metadata.Term, 0) // ignore snapshot if ok := sm.restore(s); ok { @@ -3162,8 +3148,8 @@ func TestProvideSnap(t *testing.T) { } storage := newTestMemoryStorage(withPeers(1)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(s) - + sm.becomeFollower(s.Metadata.Term, 0) + require.True(t, sm.restore(s)) sm.becomeCandidate() sm.becomeLeader() @@ -3192,8 +3178,8 @@ func TestIgnoreProvidingSnap(t *testing.T) { } storage := newTestMemoryStorage(withPeers(1)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(s) - + sm.becomeFollower(s.Metadata.Term, 0) + require.True(t, sm.restore(s)) sm.becomeCandidate() sm.becomeLeader() @@ -3218,14 +3204,11 @@ func TestRestoreFromSnapMsg(t *testing.T) { ConfState: pb.ConfState{Voters: []uint64{1, 2}}, }, } - m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s} + m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 11, Snapshot: s} sm := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2))) sm.Step(m) - - if sm.lead != uint64(1) { - t.Errorf("sm.lead = %d, want 1", sm.lead) - } + require.Equal(t, uint64(1), sm.lead) // TODO(bdarnell): what should this test? } From d5f73f3ecdaec8c93f8bbf32e02a808c33e879fc Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 13 Apr 2023 14:44:28 +0100 Subject: [PATCH 2/4] raft: use a type-safe slice of log entries This commit introduces LogRange - a type-safe slice of contiguous log entries. Signed-off-by: Pavel Kalinnikov --- bootstrap.go | 4 +- log.go | 30 ++++++++------- log_test.go | 48 ++++++++++++------------ log_unstable.go | 12 +++--- log_unstable_test.go | 4 +- raft.go | 21 ++++++++--- raft_paper_test.go | 12 +++--- raft_test.go | 8 ++-- rafttest/interaction_env.go | 2 +- range.go | 75 +++++++++++++++++++++++++++++++++++++ rawnode_test.go | 4 +- storage.go | 2 +- util.go | 4 +- util_test.go | 4 +- 14 files changed, 158 insertions(+), 72 deletions(-) create mode 100644 range.go diff --git a/bootstrap.go b/bootstrap.go index 2a61aa23..a5d40ffb 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -48,7 +48,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { // TODO(tbg): remove StartNode and give the application the right tools to // bootstrap the initial membership in a cleaner way. rn.raft.becomeFollower(1, None) - ents := make([]pb.Entry, len(peers)) + ents := make(LogRange, len(peers)) // the LogRange is valid by construction for i, peer := range peers { cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} data, err := cc.Marshal() @@ -58,7 +58,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} } - rn.raft.raftLog.append(ents...) + rn.raft.raftLog.append(ents) // Now apply them, mainly so that the application can call Campaign // immediately after StartNode in tests. Note that these nodes will diff --git a/log.go b/log.go index 5fd5e1bb..2e8a7e83 100644 --- a/log.go +++ b/log.go @@ -106,7 +106,7 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). -func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { +func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents LogRange) (lastnewi uint64, ok bool) { if !l.matchTerm(index, logTerm) { return 0, false } @@ -122,13 +122,13 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry if ci-offset > uint64(len(ents)) { l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) } - l.append(ents[ci-offset:]...) + l.append(ents[ci-offset:]) } l.commitTo(min(committed, lastnewi)) return lastnewi, true } -func (l *raftLog) append(ents ...pb.Entry) uint64 { +func (l *raftLog) append(ents LogRange) uint64 { if len(ents) == 0 { return l.lastIndex() } @@ -149,7 +149,7 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 { // An entry is considered to be conflicting if it has the same index but // a different term. // The index of the given entries MUST be continuously increasing. -func (l *raftLog) findConflict(ents []pb.Entry) uint64 { +func (l *raftLog) findConflict(ents LogRange) uint64 { for _, ne := range ents { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { @@ -191,7 +191,7 @@ func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) // nextUnstableEnts returns all entries that are available to be written to the // local stable log and are not already in-progress. -func (l *raftLog) nextUnstableEnts() []pb.Entry { +func (l *raftLog) nextUnstableEnts() LogRange { return l.unstable.nextEntries() } @@ -213,7 +213,7 @@ func (l *raftLog) hasNextOrInProgressUnstableEnts() bool { // appended them to the local raft log yet. If allowUnstable is true, committed // entries from the unstable log may be returned; otherwise, only entries known // to reside locally on stable storage will be returned. -func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) { +func (l *raftLog) nextCommittedEnts(allowUnstable bool) LogRange { if l.applyingEntsPaused { // Entry application outstanding size limit reached. return nil @@ -412,7 +412,7 @@ func (l *raftLog) term(i uint64) (uint64, error) { panic(err) // TODO(bdarnell) } -func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { +func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) (LogRange, error) { if i > l.lastIndex() { return nil, nil } @@ -420,7 +420,7 @@ func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, erro } // allEntries returns all entries in the log. -func (l *raftLog) allEntries() []pb.Entry { +func (l *raftLog) allEntries() LogRange { ents, err := l.entries(l.firstIndex(), noLimit) if err == nil { return ents @@ -468,7 +468,7 @@ func (l *raftLog) restore(s pb.Snapshot) { } // slice returns a slice of log entries from lo through hi-1, inclusive. -func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) { +func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) (LogRange, error) { err := l.mustCheckOutOfBounds(lo, hi) if err != nil { return nil, err @@ -476,7 +476,7 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e if lo == hi { return nil, nil } - var ents []pb.Entry + var ents LogRange if lo < l.unstable.offset { storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), uint64(maxSize)) if err == ErrCompacted { @@ -492,15 +492,17 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e return storedEnts, nil } + // TODO(pavelkalinnikov): Storage.Entries() returns []pb.Entry which is not + // verified. Verify it before converting to LogRange, or (better) require + // the API to return a verified range. ents = storedEnts } if hi > l.unstable.offset { unstable := l.unstable.slice(max(lo, l.unstable.offset), hi) if len(ents) > 0 { - combined := make([]pb.Entry, len(ents)+len(unstable)) - n := copy(combined, ents) - copy(combined[n:], unstable) - ents = combined + combined := make(LogRange, 0, len(ents)+len(unstable)) + combined = append(combined, ents...) + ents = combined.Append(unstable) } else { ents = unstable } diff --git a/log_test.go b/log_test.go index 14f4e9bb..69cf2dba 100644 --- a/log_test.go +++ b/log_test.go @@ -49,7 +49,7 @@ func TestFindConflict(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(previousEnts) require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents)) }) } @@ -101,7 +101,7 @@ func TestFindConflictByTerm(t *testing.T) { Term: tt.ents[0].Term, }}) l := newLog(st, raftLogger) - l.append(tt.ents[1:]...) + l.append(tt.ents[1:]) index, term := l.findConflictByTerm(tt.index, tt.term) require.Equal(t, tt.want, index) @@ -115,7 +115,7 @@ func TestFindConflictByTerm(t *testing.T) { func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(previousEnts) tests := []struct { lastIndex uint64 term uint64 @@ -183,10 +183,10 @@ func TestAppend(t *testing.T) { storage := NewMemoryStorage() storage.Append(previousEnts) raftLog := newLog(storage, raftLogger) - require.Equal(t, tt.windex, raftLog.append(tt.ents...)) + require.Equal(t, tt.windex, raftLog.append(tt.ents)) g, err := raftLog.entries(1, noLimit) require.NoError(t, err) - require.Equal(t, tt.wents, g) + require.Equal(t, mustLogRange(t, tt.wents), g) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) } @@ -287,7 +287,7 @@ func TestLogMaybeAppend(t *testing.T) { for i, tt := range tests { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(previousEnts) raftLog.committed = commit t.Run(fmt.Sprint(i), func(t *testing.T) { @@ -296,14 +296,14 @@ func TestLogMaybeAppend(t *testing.T) { require.True(t, tt.wpanic) } }() - glasti, gappend := raftLog.maybeAppend(tt.index, tt.logTerm, tt.committed, tt.ents...) + glasti, gappend := raftLog.maybeAppend(tt.index, tt.logTerm, tt.committed, tt.ents) require.Equal(t, tt.wlasti, glasti) require.Equal(t, tt.wappend, gappend) require.Equal(t, tt.wcommit, raftLog.committed) if gappend && len(tt.ents) != 0 { gents, err := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit) require.NoError(t, err) - require.Equal(t, tt.ents, gents) + require.Equal(t, mustLogRange(t, tt.ents), gents) } }) } @@ -323,7 +323,7 @@ func TestCompactionSideEffects(t *testing.T) { } raftLog := newLog(storage, raftLogger) for i = unstableIndex; i < lastIndex; i++ { - raftLog.append(pb.Entry{Term: i + 1, Index: i + 1}) + raftLog.append([]pb.Entry{{Term: i + 1, Index: i + 1}}) } require.True(t, raftLog.maybeCommit(lastIndex, lastTerm)) @@ -346,7 +346,7 @@ func TestCompactionSideEffects(t *testing.T) { require.Equal(t, uint64(751), unstableEnts[0].Index) prev := raftLog.lastIndex() - raftLog.append(pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1}) + raftLog.append([]pb.Entry{{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1}}) require.Equal(t, prev+1, raftLog.lastIndex()) ents, err := raftLog.entries(raftLog.lastIndex(), noLimit) @@ -396,7 +396,7 @@ func TestHasNextCommittedEnts(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLog(storage, raftLogger) - raftLog.append(ents...) + raftLog.append(ents) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied, 0 /* size */) @@ -454,7 +454,7 @@ func TestNextCommittedEnts(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLog(storage, raftLogger) - raftLog.append(ents...) + raftLog.append(ents) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied, 0 /* size */) @@ -465,7 +465,7 @@ func TestNextCommittedEnts(t *testing.T) { newSnap.Metadata.Index++ raftLog.restore(newSnap) } - require.Equal(t, tt.wents, raftLog.nextCommittedEnts(tt.allowUnstable)) + require.Equal(t, mustLogRange(t, tt.wents), raftLog.nextCommittedEnts(tt.allowUnstable)) }) } } @@ -513,7 +513,7 @@ func TestAcceptApplying(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLogWithSize(storage, raftLogger, maxSize) - raftLog.append(ents...) + raftLog.append(ents) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) raftLog.appliedTo(3, 0 /* size */) @@ -562,7 +562,7 @@ func TestAppliedTo(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLogWithSize(storage, raftLogger, maxSize) - raftLog.append(ents...) + raftLog.append(ents) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) raftLog.appliedTo(3, 0 /* size */) @@ -597,13 +597,13 @@ func TestNextUnstableEnts(t *testing.T) { // append unstable entries to raftlog raftLog := newLog(storage, raftLogger) - raftLog.append(previousEnts[tt.unstable-1:]...) + raftLog.append(previousEnts[tt.unstable-1:]) ents := raftLog.nextUnstableEnts() if l := len(ents); l > 0 { raftLog.stableTo(ents[l-1].Index, ents[l-1].Term) } - require.Equal(t, tt.wents, ents) + require.Equal(t, mustLogRange(t, tt.wents), ents) require.Equal(t, previousEnts[len(previousEnts)-1].Index+1, raftLog.unstable.offset) }) } @@ -629,7 +629,7 @@ func TestCommitTo(t *testing.T) { } }() raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(previousEnts) raftLog.committed = commit raftLog.commitTo(tt.commit) require.Equal(t, tt.wcommit, raftLog.committed) @@ -651,7 +651,7 @@ func TestStableTo(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) + raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}) raftLog.stableTo(tt.stablei, tt.stablet) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) @@ -688,7 +688,7 @@ func TestStableToWithSnap(t *testing.T) { s := NewMemoryStorage() require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})) raftLog := newLog(s, raftLogger) - raftLog.append(tt.newEnts...) + raftLog.append(tt.newEnts) raftLog.stableTo(tt.stablei, tt.stablet) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) @@ -761,7 +761,7 @@ func TestIsOutOfBounds(t *testing.T) { storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) l := newLog(storage, raftLogger) for i := uint64(1); i <= num; i++ { - l.append(pb.Entry{Index: i + offset}) + l.append([]pb.Entry{{Index: i + offset}}) } first := offset + 1 @@ -835,7 +835,7 @@ func TestTerm(t *testing.T) { storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) l := newLog(storage, raftLogger) for i := uint64(1); i < num; i++ { - l.append(pb.Entry{Index: offset + i, Term: i}) + l.append([]pb.Entry{{Index: offset + i, Term: i}}) } for i, tt := range []struct { @@ -904,7 +904,7 @@ func TestSlice(t *testing.T) { } l := newLog(storage, raftLogger) for i = num / 2; i < num; i++ { - l.append(pb.Entry{Index: offset + i, Term: offset + i}) + l.append(mustLogRange(t, []pb.Entry{{Index: offset + i, Term: offset + i}})) } tests := []struct { @@ -943,7 +943,7 @@ func TestSlice(t *testing.T) { g, err := l.slice(tt.from, tt.to, entryEncodingSize(tt.limit)) require.False(t, tt.from <= offset && err != ErrCompacted) require.False(t, tt.from > offset && err != nil) - require.Equal(t, tt.w, g) + require.Equal(t, mustLogRange(t, tt.w), g) }) } } diff --git a/log_unstable.go b/log_unstable.go index 16cbdeff..76bec2bd 100644 --- a/log_unstable.go +++ b/log_unstable.go @@ -34,7 +34,7 @@ type unstable struct { // the incoming unstable snapshot, if any. snapshot *pb.Snapshot // all entries that have not yet been written to storage. - entries []pb.Entry + entries LogRange // entries[i] has raft log position i+offset. offset uint64 @@ -93,7 +93,7 @@ func (u *unstable) maybeTerm(i uint64) (uint64, bool) { // nextEntries returns the unstable entries that are not already in the process // of being written to storage. -func (u *unstable) nextEntries() []pb.Entry { +func (u *unstable) nextEntries() LogRange { inProgress := int(u.offsetInProgress - u.offset) if len(u.entries) == inProgress { return nil @@ -172,7 +172,7 @@ func (u *unstable) shrinkEntriesArray() { if len(u.entries) == 0 { u.entries = nil } else if len(u.entries)*lenMultiple < cap(u.entries) { - newEntries := make([]pb.Entry, len(u.entries)) + newEntries := make(LogRange, len(u.entries)) copy(newEntries, u.entries) u.entries = newEntries } @@ -193,7 +193,7 @@ func (u *unstable) restore(s pb.Snapshot) { u.snapshotInProgress = false } -func (u *unstable) truncateAndAppend(ents []pb.Entry) { +func (u *unstable) truncateAndAppend(ents LogRange) { fromIndex := ents[0].Index switch { case fromIndex == u.offset+uint64(len(u.entries)): @@ -210,7 +210,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) { // Truncate to fromIndex (exclusive), and append the new entries. u.logger.Infof("truncate the unstable entries before index %d", fromIndex) keep := u.slice(u.offset, fromIndex) // NB: appending to this slice is safe, - u.entries = append(keep, ents...) // and will reallocate/copy it + u.entries = keep.Append(ents) // and will reallocate/copy it // Only in-progress entries before fromIndex are still considered to be // in-progress. u.offsetInProgress = min(u.offsetInProgress, fromIndex) @@ -225,7 +225,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) { // TODO(pavelkalinnikov): this, and similar []pb.Entry slices, may bubble up all // the way to the application code through Ready struct. Protect other slices // similarly, and document how the client can use them. -func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry { +func (u *unstable) slice(lo uint64, hi uint64) LogRange { u.mustCheckOutOfBounds(lo, hi) // NB: use the full slice expression to limit what the caller can do with the // returned slice. For example, an append will reallocate and copy this slice diff --git a/log_unstable_test.go b/log_unstable_test.go index a0b4f8e3..5a8444f6 100644 --- a/log_unstable_test.go +++ b/log_unstable_test.go @@ -244,7 +244,7 @@ func TestUnstableNextEntries(t *testing.T) { logger: raftLogger, } res := u.nextEntries() - require.Equal(t, tt.wentries, res) + require.Equal(t, mustLogRange(t, tt.wentries), res) }) } } @@ -511,7 +511,7 @@ func TestUnstableTruncateAndAppend(t *testing.T) { woffset uint64 woffsetInProgress uint64 - wentries []pb.Entry + wentries LogRange }{ // append to the end { diff --git a/raft.go b/raft.go index 9ef30eb0..1192ca0e 100644 --- a/raft.go +++ b/raft.go @@ -574,7 +574,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { lastIndex, nextIndex := pr.Next-1, pr.Next lastTerm, errt := r.raftLog.term(lastIndex) - var ents []pb.Entry + var ents LogRange var erre error // In a throttled StateReplicate only send empty MsgApp, to ensure progress. // Otherwise, if we had a full Inflights and all inflight messages were in @@ -622,6 +622,9 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { r.logger.Panicf("%x: %v", r.id, err) } // NB: pr has been updated, but we make sure to only use its old values below. + if _, err := VerifyLogRange(lastIndex, lastTerm, ents); err != nil { + r.logger.Panicf("VerifyLogRange: %v", err) + } r.send(pb.Message{ To: to, Type: pb.MsgApp, @@ -765,8 +768,10 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { es[i].Term = r.Term es[i].Index = li + 1 + uint64(i) } + ents := LogRange(es) // this is a valid log range by construction + // Track the size of this uncommitted proposal. - if !r.increaseUncommittedSize(es) { + if !r.increaseUncommittedSize(ents) { r.logger.Warningf( "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id, @@ -775,7 +780,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { return false } // use latest "last" index after truncate/append - li = r.raftLog.append(es...) + li = r.raftLog.append(ents) // The leader needs to self-ack the entries just appended once they have // been durably persisted (since it doesn't send an MsgApp to itself). This // response message will be added to msgsAfterAppend and delivered back to @@ -1647,11 +1652,15 @@ func stepFollower(r *raft, m pb.Message) error { } func (r *raft) handleAppendEntries(m pb.Message) { + entries, err := VerifyLogRange(m.Index, m.LogTerm, m.Entries) + if err != nil { + r.logger.Panicf("%x: invalid MsgApp [index %d, term %d]: %v", r.id, m.Index, m.LogTerm, err) + } if m.Index < r.raftLog.committed { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } - if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, entries); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1955,7 +1964,7 @@ func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Messa // // Empty payloads are never refused. This is used both for appending an empty // entry at a new leader's term, as well as leaving a joint configuration. -func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { +func (r *raft) increaseUncommittedSize(ents LogRange) bool { s := payloadsSize(ents) if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize { // If the uncommitted tail of the Raft log is empty, allow any size @@ -1984,7 +1993,7 @@ func (r *raft) reduceUncommittedSize(s entryPayloadSize) { } } -func numOfPendingConf(ents []pb.Entry) int { +func numOfPendingConf(ents LogRange) int { n := 0 for i := range ents { if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 { diff --git a/raft_paper_test.go b/raft_paper_test.go index d7e9949c..f5c18a95 100644 --- a/raft_paper_test.go +++ b/raft_paper_test.go @@ -415,7 +415,7 @@ func TestLeaderStartReplication(t *testing.T) { } msgs := r.readMessages() sort.Sort(messageSlice(msgs)) - wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}} + wents := mustLogRange(t, []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}) wmsgs := []pb.Message{ {From: 1, To: 2, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li}, {From: 1, To: 3, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li}, @@ -451,7 +451,7 @@ func TestLeaderCommitEntry(t *testing.T) { if g := r.raftLog.committed; g != li+1 { t.Errorf("committed = %d, want %d", g, li+1) } - wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}} + wents := mustLogRange(t, []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}) if g := r.raftLog.nextCommittedEnts(true); !reflect.DeepEqual(g, wents) { t.Errorf("nextCommittedEnts = %+v, want %+v", g, wents) } @@ -536,7 +536,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) { } li := uint64(len(tt)) - wents := append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")}) + wents := mustLogRange(t, append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")})) if g := r.raftLog.nextCommittedEnts(true); !reflect.DeepEqual(g, wents) { t.Errorf("#%d: ents = %+v, want %+v", i, g, wents) } @@ -588,7 +588,7 @@ func TestFollowerCommitEntry(t *testing.T) { if g := r.raftLog.committed; g != tt.commit { t.Errorf("#%d: committed = %d, want %d", i, g, tt.commit) } - wents := tt.ents[:int(tt.commit)] + wents := mustLogRange(t, tt.ents[:int(tt.commit)]) if g := r.raftLog.nextCommittedEnts(true); !reflect.DeepEqual(g, wents) { t.Errorf("#%d: nextCommittedEnts = %v, want %v", i, g, wents) } @@ -685,10 +685,10 @@ func TestFollowerAppendEntries(t *testing.T) { r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) - if g := r.raftLog.allEntries(); !reflect.DeepEqual(g, tt.wents) { + if g := r.raftLog.allEntries(); !reflect.DeepEqual(g, mustLogRange(t, tt.wents)) { t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents) } - if g := r.raftLog.nextUnstableEnts(); !reflect.DeepEqual(g, tt.wunstable) { + if g := r.raftLog.nextUnstableEnts(); !reflect.DeepEqual(g, mustLogRange(t, tt.wunstable)) { t.Errorf("#%d: unstableEnts = %+v, want %+v", i, g, tt.wunstable) } } diff --git a/raft_test.go b/raft_test.go index a9ba7eb1..8f046251 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2767,7 +2767,7 @@ func TestLeaderIncreaseNext(t *testing.T) { for i, tt := range tests { sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2))) - sm.raftLog.append(previousEnts...) + sm.raftLog.append(previousEnts) sm.becomeCandidate() sm.becomeLeader() sm.prs.Progress[2].State = tt.state @@ -3068,7 +3068,7 @@ func TestRestoreLearnerPromotion(t *testing.T) { require.True(t, !sm.isLearner) } -// TestLearnerReceiveSnapshot tests that a learner can receive a snpahost from leader +// TestLearnerReceiveSnapshot tests that a learner can receive a snapshot from leader. func TestLearnerReceiveSnapshot(t *testing.T) { // restore the state machine from a snapshot so it has a compacted log and a snapshot s := pb.Snapshot{ @@ -3107,7 +3107,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { commit := uint64(1) storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.raftLog.append(previousEnts...) + sm.raftLog.append(previousEnts) sm.raftLog.commitTo(commit) s := pb.Snapshot{ @@ -3277,7 +3277,7 @@ func TestStepIgnoreConfig(t *testing.T) { index := r.raftLog.lastIndex() pendingConfIndex := r.pendingConfIndex r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) - wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} + wents := mustLogRange(t, []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}) ents, err := r.raftLog.entries(index+1, noLimit) if err != nil { t.Fatalf("unexpected error %v", err) diff --git a/rafttest/interaction_env.go b/rafttest/interaction_env.go index a7dfc0cf..cf601476 100644 --- a/rafttest/interaction_env.go +++ b/rafttest/interaction_env.go @@ -83,7 +83,7 @@ type Storage interface { SetHardState(state pb.HardState) error ApplySnapshot(pb.Snapshot) error Compact(newFirstIndex uint64) error - Append([]pb.Entry) error + Append(raft.LogRange) error } // raftConfigStub sets up a raft.Config stub with reasonable testing defaults. diff --git a/range.go b/range.go new file mode 100644 index 00000000..d1821119 --- /dev/null +++ b/range.go @@ -0,0 +1,75 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "fmt" + "testing" + + pb "go.etcd.io/raft/v3/raftpb" +) + +// LogRange represents a range of contiguous log entries. This entries slice has +// been verified to have the following properties: +// - entries[i+1].Index == entries[i].Index + 1 +// - entries[i+1].Term >= entries[i].Term +// +// TODO(pavelkalinnikov): make it harder to convert from []pb.Entry to LogRange. +type LogRange []pb.Entry + +// VerifyLogRange checks that the given slice represents a valid range of +// contiguous log entries that can be appended after (index, term). +func VerifyLogRange(index, term uint64, ents []pb.Entry) (LogRange, error) { + for i := range ents { + index++ + if got := ents[i].Index; got != index { + return nil, fmt.Errorf("entry[%d].Index is %d, want %d", i, got, index) + } + curTerm := ents[i].Term + if curTerm < term { + return nil, fmt.Errorf("entry[%d].Term is %d, want at least %d", i, curTerm, term) + } + term = curTerm + } + return ents, nil +} + +// Append appends a valid log range to this one. It is like a regular slice +// append, but verifies that the resulting slice is a valid log range. +// +// TODO(pavelkalinnikov): consider returning error instead of panic. +func (r LogRange) Append(other LogRange) LogRange { + if lr := len(r); lr != 0 && len(other) != 0 { + if last, index := r[lr-1].Index, other[0].Index; index != last+1 { + panic(fmt.Sprintf("disjoint ranges: last index %d, next %d, want %d", last, index, last+1)) + } + if last, term := r[lr-1].Term, other[0].Term; term < last { + panic(fmt.Sprintf("appending non-monotonic term: last term %d, next %d", last, term)) + } + } + return append(r, other...) +} + +func mustLogRange(t *testing.T, ents []pb.Entry) LogRange { + t.Helper() + if len(ents) == 0 { + return ents + } + res, err := VerifyLogRange(ents[0].Index-1, ents[0].Term, ents) + if err != nil { + t.Fatalf("VerifyLogRange: %v", err) + } + return res +} diff --git a/rawnode_test.go b/rawnode_test.go index 553d74fd..577154ea 100644 --- a/rawnode_test.go +++ b/rawnode_test.go @@ -665,10 +665,10 @@ func TestRawNodeReadIndex(t *testing.T) { // requires the application to bootstrap the state, i.e. it does not accept peers // and will not create faux configuration change entries. func TestRawNodeStart(t *testing.T) { - entries := []pb.Entry{ + entries := mustLogRange(t, []pb.Entry{ {Term: 1, Index: 2, Data: nil}, // empty entry {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry - } + }) want := Ready{ SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1}, diff --git a/storage.go b/storage.go index b781dfa8..393538e5 100644 --- a/storage.go +++ b/storage.go @@ -250,7 +250,7 @@ func (ms *MemoryStorage) Compact(compactIndex uint64) error { // Append the new entries to storage. // TODO (xiangli): ensure the entries are continuous and // entries[0].Index > ms.entries[0].Index -func (ms *MemoryStorage) Append(entries []pb.Entry) error { +func (ms *MemoryStorage) Append(entries LogRange) error { if len(entries) == 0 { return nil } diff --git a/util.go b/util.go index 6b68fe2d..11be586a 100644 --- a/util.go +++ b/util.go @@ -273,7 +273,7 @@ func entsSize(ents []pb.Entry) entryEncodingSize { return size } -func limitSize(ents []pb.Entry, maxSize entryEncodingSize) []pb.Entry { +func limitSize(ents LogRange, maxSize entryEncodingSize) LogRange { if len(ents) == 0 { return ents } @@ -300,7 +300,7 @@ func payloadSize(e pb.Entry) entryPayloadSize { } // payloadsSize is the size of the payloads of the provided entries. -func payloadsSize(ents []pb.Entry) entryPayloadSize { +func payloadsSize(ents LogRange) entryPayloadSize { var s entryPayloadSize for _, e := range ents { s += payloadSize(e) diff --git a/util_test.go b/util_test.go index 545202ab..03af9c03 100644 --- a/util_test.go +++ b/util_test.go @@ -41,10 +41,10 @@ func TestDescribeEntry(t *testing.T) { } func TestLimitSize(t *testing.T) { - ents := []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}} + ents := mustLogRange(t, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}) tests := []struct { maxsize uint64 - wentries []pb.Entry + wentries LogRange }{ {math.MaxUint64, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 6}}}, // Even if maxsize is zero, the first entry should be returned. From 51bade301cbc99342ecc061aac6f180268b2cd6b Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 13 Apr 2023 15:36:41 +0100 Subject: [PATCH 3/4] raft: make entry slices in Ready to be LogRange Signed-off-by: Pavel Kalinnikov --- node.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node.go b/node.go index 9c53aed2..b1ecd02f 100644 --- a/node.go +++ b/node.go @@ -77,7 +77,7 @@ type Ready struct { // If async storage writes are enabled, this field does not need to be acted // on immediately. It will be reflected in a MsgStorageAppend message in the // Messages slice. - Entries []pb.Entry + Entries LogRange // Snapshot specifies the snapshot to be saved to stable storage. // @@ -93,7 +93,7 @@ type Ready struct { // If async storage writes are enabled, this field does not need to be acted // on immediately. It will be reflected in a MsgStorageApply message in the // Messages slice. - CommittedEntries []pb.Entry + CommittedEntries LogRange // Messages specifies outbound messages. // From f245683197b723370f37f5d10d0e0c7663dc0c03 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 14 Apr 2023 00:49:24 +0100 Subject: [PATCH 4/4] raft: add type-safe log append struct All appends are conditional puts, with condition being that the preceding entry is (index, term). The entries slice + this (index, term) can be used by all the log append stack starting from handling the MsgApp message, all the way down to storage. Signed-off-by: Pavel Kalinnikov --- bootstrap.go | 8 ++- log.go | 85 ++++++++++++----------- log_test.go | 177 ++++++++++++++++++++++++++++-------------------- log_unstable.go | 2 +- raft.go | 18 ++--- raft_test.go | 4 +- range.go | 47 ++++++++++--- 7 files changed, 202 insertions(+), 139 deletions(-) diff --git a/bootstrap.go b/bootstrap.go index a5d40ffb..9c0f6ed5 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -48,7 +48,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { // TODO(tbg): remove StartNode and give the application the right tools to // bootstrap the initial membership in a cleaner way. rn.raft.becomeFollower(1, None) - ents := make(LogRange, len(peers)) // the LogRange is valid by construction + ents := make([]pb.Entry, len(peers)) for i, peer := range peers { cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} data, err := cc.Marshal() @@ -58,7 +58,11 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} } - rn.raft.raftLog.append(ents) + rn.raft.raftLog.append(logAppend{ + index: 0, + term: 0, + entries: ents, + }) // Now apply them, mainly so that the application can call Campaign // immediately after StartNode in tests. Note that these nodes will diff --git a/log.go b/log.go index 2e8a7e83..6b7d6bbd 100644 --- a/log.go +++ b/log.go @@ -104,62 +104,61 @@ func (l *raftLog) String() string { l.committed, l.applied, l.applying, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries)) } -// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, -// it returns (last index of new entries, true). -func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents LogRange) (lastnewi uint64, ok bool) { - if !l.matchTerm(index, logTerm) { - return 0, false - } - - lastnewi = index + uint64(len(ents)) - ci := l.findConflict(ents) - switch { - case ci == 0: - case ci <= l.committed: - l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) - default: - offset := index + 1 - if ci-offset > uint64(len(ents)) { - l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) - } - l.append(ents[ci-offset:]) +// maybeAppend appends the given entries to the log, advances the committed +// index, and returns true. If the requested operation conflicts with the log, +// does nothing and returns false. +func (l *raftLog) maybeAppend(la logAppend, committed uint64) bool { + la, ok := l.findConflict(la) + if !ok { + return false + } + if len(la.entries) != 0 { // there is at least one entry to append + l.append(la) } - l.commitTo(min(committed, lastnewi)) - return lastnewi, true + l.commitTo(min(committed, la.lastIndex())) + return true } -func (l *raftLog) append(ents LogRange) uint64 { - if len(ents) == 0 { - return l.lastIndex() +// append conditionally appends the given range of entries to the log. It does +// so only if term(la.index) == la.term. All entries after la.index, if any, are +// erased and replaced with la.entries, unless len(la.entries) == 0. +// +// TODO(pavelkalinnikov): rename to truncateAndAppend, to match unstable. +func (l *raftLog) append(la logAppend) bool { + if !l.matchTerm(la.index, la.term) { + return false } - if after := ents[0].Index - 1; after < l.committed { + if after := la.index; after < l.committed { // don't erase committed entries l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } - l.unstable.truncateAndAppend(ents) - return l.lastIndex() -} - -// findConflict finds the index of the conflict. -// It returns the first pair of conflicting entries between the existing -// entries and the given entries, if there are any. -// If there is no conflicting entries, and the existing entries contains -// all the given entries, zero will be returned. -// If there is no conflicting entries, but the given entries contains new -// entries, the index of the first new entry will be returned. -// An entry is considered to be conflicting if it has the same index but -// a different term. -// The index of the given entries MUST be continuously increasing. -func (l *raftLog) findConflict(ents LogRange) uint64 { - for _, ne := range ents { + if len(la.entries) > 0 { // TODO(pavelkalinnikov): allow len == 0 + l.unstable.truncateAndAppend(la.entries) + } + return true +} + +// findConflict verifies that the given logAppend request can be performed on +// the log, and finds the first entry in it that is not yet present in the log. +// Returns a modified request that only contains missing entries. +// +// Returns false if the request can not proceed because it conflicts with the +// log, i.e. if term(la.index) != la.term or la.index is missing. +func (l *raftLog) findConflict(la logAppend) (logAppend, bool) { + if !l.matchTerm(la.index, la.term) { + return logAppend{}, false + } + mismatch := len(la.entries) + for i, ne := range la.entries { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term) } - return ne.Index + mismatch = i + break } } - return 0 + return la.skip(mismatch), true } // findConflictByTerm returns a best guess on where this log ends matching diff --git a/log_test.go b/log_test.go index 69cf2dba..6470c143 100644 --- a/log_test.go +++ b/log_test.go @@ -18,39 +18,60 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" pb "go.etcd.io/raft/v3/raftpb" ) +func newLogAppend(index, term uint64, terms ...uint64) logAppend { + entries := make([]pb.Entry, len(terms)) + for i, t := range terms { + entries[i].Index, entries[i].Term = index+1+uint64(i), t + } + return logAppend{index: index, term: term, entries: entries} +} + func TestFindConflict(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} - tests := []struct { - ents []pb.Entry - wconflict uint64 + for _, tt := range []struct { + req logAppend + wantOk bool + want logAppend }{ - // no conflict, empty ent - {[]pb.Entry{}, 0}, - // no conflict - {[]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0}, - {[]pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0}, - {[]pb.Entry{{Index: 3, Term: 3}}, 0}, - // no conflict, but has new entries - {[]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, - {[]pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, - {[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, - {[]pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, - // conflicts with existing entries - {[]pb.Entry{{Index: 1, Term: 4}, {Index: 2, Term: 4}}, 1}, - {[]pb.Entry{{Index: 2, Term: 1}, {Index: 3, Term: 4}, {Index: 4, Term: 4}}, 2}, - {[]pb.Entry{{Index: 3, Term: 1}, {Index: 4, Term: 2}, {Index: 5, Term: 4}, {Index: 6, Term: 4}}, 3}, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { + // Empty appends with no conflict. + {req: newLogAppend(0, 0), wantOk: true, want: newLogAppend(0, 0)}, + {req: newLogAppend(1, 1), wantOk: true, want: newLogAppend(1, 1)}, + {req: newLogAppend(3, 3), wantOk: true, want: newLogAppend(3, 3)}, + // Empty appends with conflict. + {req: newLogAppend(4, 4), wantOk: false}, // past the log size + {req: newLogAppend(0, 10), wantOk: false}, // term mismatch + {req: newLogAppend(1, 10), wantOk: false}, // term mismatch + // No conflict, no new entries. + {req: newLogAppend(0, 0, 1, 2, 3), wantOk: true, want: newLogAppend(3, 3)}, + {req: newLogAppend(0, 0, 1, 2), wantOk: true, want: newLogAppend(2, 2)}, + {req: newLogAppend(0, 0, 1), wantOk: true, want: newLogAppend(1, 1)}, + {req: newLogAppend(1, 1, 2, 3), wantOk: true, want: newLogAppend(3, 3)}, + {req: newLogAppend(1, 1, 2), wantOk: true, want: newLogAppend(2, 2)}, + {req: newLogAppend(2, 2, 3), wantOk: true, want: newLogAppend(3, 3)}, + // No conflict, but has new entries. + {req: newLogAppend(0, 0, 1, 2, 3, 4, 4), wantOk: true, want: newLogAppend(3, 3, 4, 4)}, + {req: newLogAppend(1, 1, 2, 3, 4, 4), wantOk: true, want: newLogAppend(3, 3, 4, 4)}, + {req: newLogAppend(2, 2, 3, 4, 4), wantOk: true, want: newLogAppend(3, 3, 4, 4)}, + {req: newLogAppend(3, 3, 4, 4), wantOk: true, want: newLogAppend(3, 3, 4, 4)}, + // Conflicts with existing entries. + {req: newLogAppend(0, 0, 4, 4), wantOk: true, want: newLogAppend(0, 0, 4, 4)}, + {req: newLogAppend(0, 0, 1, 2, 4, 8), wantOk: true, want: newLogAppend(2, 2, 4, 8)}, + {req: newLogAppend(1, 1, 1, 4, 4), wantOk: true, want: newLogAppend(1, 1, 1, 4, 4)}, + {req: newLogAppend(1, 1, 2, 8, 8), wantOk: true, want: newLogAppend(2, 2, 8, 8)}, + {req: newLogAppend(2, 2, 2, 2, 4), wantOk: true, want: newLogAppend(2, 2, 2, 2, 4)}, + } { + t.Run("", func(t *testing.T) { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts) - require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents)) + require.True(t, raftLog.append(logAppend{entries: previousEnts})) + req, ok := raftLog.findConflict(tt.req) + assert.Equal(t, tt.want, req) + assert.Equal(t, tt.wantOk, ok) }) } } @@ -101,7 +122,11 @@ func TestFindConflictByTerm(t *testing.T) { Term: tt.ents[0].Term, }}) l := newLog(st, raftLogger) - l.append(tt.ents[1:]) + require.True(t, l.append(logAppend{ + index: tt.ents[0].Index, + term: tt.ents[0].Term, + entries: tt.ents[1:], + })) index, term := l.findConflictByTerm(tt.index, tt.term) require.Equal(t, tt.want, index) @@ -115,7 +140,7 @@ func TestFindConflictByTerm(t *testing.T) { func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts) + raftLog.append(logAppend{entries: previousEnts}) tests := []struct { lastIndex uint64 term uint64 @@ -145,33 +170,33 @@ func TestIsUpToDate(t *testing.T) { func TestAppend(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}} tests := []struct { - ents []pb.Entry + req logAppend windex uint64 wents []pb.Entry wunstable uint64 }{ { - []pb.Entry{}, + newLogAppend(0, 0), 2, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 3, }, { - []pb.Entry{{Index: 3, Term: 2}}, + newLogAppend(2, 2, 2), 3, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 2}}, 3, }, // conflicts with index 1 { - []pb.Entry{{Index: 1, Term: 2}}, + newLogAppend(0, 0, 2), 1, []pb.Entry{{Index: 1, Term: 2}}, 1, }, // conflicts with index 2 { - []pb.Entry{{Index: 2, Term: 3}, {Index: 3, Term: 3}}, + newLogAppend(1, 1, 3, 3), 3, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 3}, {Index: 3, Term: 3}}, 2, @@ -183,7 +208,8 @@ func TestAppend(t *testing.T) { storage := NewMemoryStorage() storage.Append(previousEnts) raftLog := newLog(storage, raftLogger) - require.Equal(t, tt.windex, raftLog.append(tt.ents)) + require.True(t, raftLog.append(tt.req)) + require.Equal(t, tt.windex, raftLog.lastIndex()) g, err := raftLog.entries(1, noLimit) require.NoError(t, err) require.Equal(t, mustLogRange(t, tt.wents), g) @@ -209,10 +235,8 @@ func TestLogMaybeAppend(t *testing.T) { commit := uint64(1) tests := []struct { - logTerm uint64 - index uint64 + req logAppend committed uint64 - ents []pb.Entry wlasti uint64 wappend bool @@ -221,73 +245,73 @@ func TestLogMaybeAppend(t *testing.T) { }{ // not match: term is different { - lastterm - 1, lastindex, lastindex, []pb.Entry{{Index: lastindex + 1, Term: 4}}, + newLogAppend(lastindex, lastterm-1, 4), lastindex, 0, false, commit, false, }, // not match: index out of bound { - lastterm, lastindex + 1, lastindex, []pb.Entry{{Index: lastindex + 2, Term: 4}}, + newLogAppend(lastindex+1, lastterm, 4), lastindex, 0, false, commit, false, }, // match with the last existing entry { - lastterm, lastindex, lastindex, nil, + newLogAppend(lastindex, lastterm), lastindex, lastindex, true, lastindex, false, }, { - lastterm, lastindex, lastindex + 1, nil, + newLogAppend(lastindex, lastterm), lastindex + 1, lastindex, true, lastindex, false, // do not increase commit higher than lastnewi }, { - lastterm, lastindex, lastindex - 1, nil, + newLogAppend(lastindex, lastterm), lastindex - 1, lastindex, true, lastindex - 1, false, // commit up to the commit in the message }, { - lastterm, lastindex, 0, nil, + newLogAppend(lastindex, lastterm), 0, lastindex, true, commit, false, // commit do not decrease }, { - 0, 0, lastindex, nil, + newLogAppend(0, 0), lastindex, 0, true, commit, false, // commit do not decrease }, { - lastterm, lastindex, lastindex, []pb.Entry{{Index: lastindex + 1, Term: 4}}, + newLogAppend(lastindex, lastterm, 4), lastindex, lastindex + 1, true, lastindex, false, }, { - lastterm, lastindex, lastindex + 1, []pb.Entry{{Index: lastindex + 1, Term: 4}}, + newLogAppend(lastindex, lastterm, 4), lastindex + 1, lastindex + 1, true, lastindex + 1, false, }, { - lastterm, lastindex, lastindex + 2, []pb.Entry{{Index: lastindex + 1, Term: 4}}, + newLogAppend(lastindex, lastterm, 4), lastindex + 2, lastindex + 1, true, lastindex + 1, false, // do not increase commit higher than lastnewi }, { - lastterm, lastindex, lastindex + 2, []pb.Entry{{Index: lastindex + 1, Term: 4}, {Index: lastindex + 2, Term: 4}}, + newLogAppend(lastindex, lastterm, 4, 4), lastindex + 2, lastindex + 2, true, lastindex + 2, false, }, // match with the entry in the middle { - lastterm - 1, lastindex - 1, lastindex, []pb.Entry{{Index: lastindex, Term: 4}}, + newLogAppend(lastindex-1, lastterm-1, 4), lastindex, lastindex, true, lastindex, false, }, { - lastterm - 2, lastindex - 2, lastindex, []pb.Entry{{Index: lastindex - 1, Term: 4}}, + newLogAppend(lastindex-2, lastterm-2, 4), lastindex, lastindex - 1, true, lastindex - 1, false, }, { - lastterm - 3, lastindex - 3, lastindex, []pb.Entry{{Index: lastindex - 2, Term: 4}}, + newLogAppend(lastindex-3, lastterm-3, 4), lastindex, lastindex - 2, true, lastindex - 2, true, // conflict with existing committed entry }, { - lastterm - 2, lastindex - 2, lastindex, []pb.Entry{{Index: lastindex - 1, Term: 4}, {Index: lastindex, Term: 4}}, + newLogAppend(lastindex-2, lastterm-2, 4, 4), lastindex, lastindex, true, lastindex, false, }, } for i, tt := range tests { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts) + require.True(t, raftLog.append(logAppend{entries: previousEnts})) raftLog.committed = commit t.Run(fmt.Sprint(i), func(t *testing.T) { @@ -296,14 +320,16 @@ func TestLogMaybeAppend(t *testing.T) { require.True(t, tt.wpanic) } }() - glasti, gappend := raftLog.maybeAppend(tt.index, tt.logTerm, tt.committed, tt.ents) - require.Equal(t, tt.wlasti, glasti) + gappend := raftLog.maybeAppend(tt.req, tt.committed) require.Equal(t, tt.wappend, gappend) + if gappend { + require.Equal(t, tt.wlasti, tt.req.lastIndex()) + } require.Equal(t, tt.wcommit, raftLog.committed) - if gappend && len(tt.ents) != 0 { - gents, err := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit) + if gappend && len(tt.req.entries) != 0 { + gents, err := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.req.entries))+1, raftLog.lastIndex()+1, noLimit) require.NoError(t, err) - require.Equal(t, mustLogRange(t, tt.ents), gents) + require.Equal(t, tt.req.entries, gents) } }) } @@ -323,7 +349,7 @@ func TestCompactionSideEffects(t *testing.T) { } raftLog := newLog(storage, raftLogger) for i = unstableIndex; i < lastIndex; i++ { - raftLog.append([]pb.Entry{{Term: i + 1, Index: i + 1}}) + require.True(t, raftLog.append(newLogAppend(i, i, i+1))) } require.True(t, raftLog.maybeCommit(lastIndex, lastTerm)) @@ -346,7 +372,7 @@ func TestCompactionSideEffects(t *testing.T) { require.Equal(t, uint64(751), unstableEnts[0].Index) prev := raftLog.lastIndex() - raftLog.append([]pb.Entry{{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1}}) + require.True(t, raftLog.append(newLogAppend(raftLog.lastIndex(), raftLog.lastTerm(), raftLog.lastTerm()+1))) require.Equal(t, prev+1, raftLog.lastIndex()) ents, err := raftLog.entries(raftLog.lastIndex(), noLimit) @@ -396,7 +422,8 @@ func TestHasNextCommittedEnts(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLog(storage, raftLogger) - raftLog.append(ents) + require.True(t, raftLog.append( + logAppend{index: snap.Metadata.Index, term: snap.Metadata.Term, entries: ents})) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied, 0 /* size */) @@ -454,7 +481,8 @@ func TestNextCommittedEnts(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLog(storage, raftLogger) - raftLog.append(ents) + require.True(t, raftLog.append( + logAppend{index: snap.Metadata.Index, term: snap.Metadata.Term, entries: ents})) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied, 0 /* size */) @@ -513,7 +541,8 @@ func TestAcceptApplying(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLogWithSize(storage, raftLogger, maxSize) - raftLog.append(ents) + require.True(t, raftLog.append( + logAppend{index: snap.Metadata.Index, term: snap.Metadata.Term, entries: ents})) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) raftLog.appliedTo(3, 0 /* size */) @@ -562,7 +591,8 @@ func TestAppliedTo(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLogWithSize(storage, raftLogger, maxSize) - raftLog.append(ents) + require.True(t, raftLog.append( + logAppend{index: snap.Metadata.Index, term: snap.Metadata.Term, entries: ents})) raftLog.stableTo(4, 1) raftLog.maybeCommit(5, 1) raftLog.appliedTo(3, 0 /* size */) @@ -582,7 +612,7 @@ func TestAppliedTo(t *testing.T) { func TestNextUnstableEnts(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} tests := []struct { - unstable uint64 + unstable int wents []pb.Entry }{ {3, nil}, @@ -597,7 +627,7 @@ func TestNextUnstableEnts(t *testing.T) { // append unstable entries to raftlog raftLog := newLog(storage, raftLogger) - raftLog.append(previousEnts[tt.unstable-1:]) + require.True(t, raftLog.append(logAppend{entries: previousEnts}.skip(tt.unstable-1))) ents := raftLog.nextUnstableEnts() if l := len(ents); l > 0 { @@ -629,7 +659,7 @@ func TestCommitTo(t *testing.T) { } }() raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts) + require.True(t, raftLog.append(logAppend{entries: previousEnts})) raftLog.committed = commit raftLog.commitTo(tt.commit) require.Equal(t, tt.wcommit, raftLog.committed) @@ -651,7 +681,7 @@ func TestStableTo(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}) + require.True(t, raftLog.append(newLogAppend(0, 0, 1, 2))) raftLog.stableTo(tt.stablei, tt.stablet) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) @@ -688,7 +718,7 @@ func TestStableToWithSnap(t *testing.T) { s := NewMemoryStorage() require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})) raftLog := newLog(s, raftLogger) - raftLog.append(tt.newEnts) + require.True(t, raftLog.append(logAppend{index: snapi, term: snapt, entries: tt.newEnts})) raftLog.stableTo(tt.stablei, tt.stablet) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) @@ -758,10 +788,10 @@ func TestIsOutOfBounds(t *testing.T) { offset := uint64(100) num := uint64(100) storage := NewMemoryStorage() - storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) + storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) l := newLog(storage, raftLogger) for i := uint64(1); i <= num; i++ { - l.append([]pb.Entry{{Index: i + offset}}) + require.True(t, l.append(newLogAppend(i+offset-1, 1, 1))) } first := offset + 1 @@ -835,7 +865,7 @@ func TestTerm(t *testing.T) { storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) l := newLog(storage, raftLogger) for i := uint64(1); i < num; i++ { - l.append([]pb.Entry{{Index: offset + i, Term: i}}) + require.True(t, l.append(newLogAppend(offset+i-1, i, i+1))) } for i, tt := range []struct { @@ -845,8 +875,8 @@ func TestTerm(t *testing.T) { }{ {idx: offset - 1, err: ErrCompacted}, {idx: offset, term: 1}, - {idx: offset + num/2, term: num / 2}, - {idx: offset + num - 1, term: num - 1}, + {idx: offset + num/2, term: num/2 + 1}, // NICE: was a bug + {idx: offset + num - 1, term: num}, {idx: offset + num, err: ErrUnavailable}, } { t.Run(fmt.Sprint(i), func(t *testing.T) { @@ -904,7 +934,7 @@ func TestSlice(t *testing.T) { } l := newLog(storage, raftLogger) for i = num / 2; i < num; i++ { - l.append(mustLogRange(t, []pb.Entry{{Index: offset + i, Term: offset + i}})) + require.True(t, l.append(newLogAppend(offset+i-1, offset+i-1, offset+i))) } tests := []struct { @@ -947,7 +977,6 @@ func TestSlice(t *testing.T) { }) } } - func mustTerm(term uint64, err error) uint64 { if err != nil { panic(err) diff --git a/log_unstable.go b/log_unstable.go index 76bec2bd..7875cd09 100644 --- a/log_unstable.go +++ b/log_unstable.go @@ -198,7 +198,7 @@ func (u *unstable) truncateAndAppend(ents LogRange) { switch { case fromIndex == u.offset+uint64(len(u.entries)): // fromIndex is the next index in the u.entries, so append directly. - u.entries = append(u.entries, ents...) + u.entries = u.entries.Append(ents) case fromIndex <= u.offset: u.logger.Infof("replace the unstable entries from index %d", fromIndex) // The log is being truncated to before our current offset diff --git a/raft.go b/raft.go index 1192ca0e..aae6dc5b 100644 --- a/raft.go +++ b/raft.go @@ -768,10 +768,13 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { es[i].Term = r.Term es[i].Index = li + 1 + uint64(i) } - ents := LogRange(es) // this is a valid log range by construction + la, err := verifyLogAppend(li, lt, es) + if err != nil { + r.logger.Panicf("verifyLogAppend: %v", err) + } // Track the size of this uncommitted proposal. - if !r.increaseUncommittedSize(ents) { + if !r.increaseUncommittedSize(la.entries) { r.logger.Warningf( "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id, @@ -779,8 +782,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { // Drop the proposal. return false } - // use latest "last" index after truncate/append - li = r.raftLog.append(ents) + r.raftLog.append(la) // The leader needs to self-ack the entries just appended once they have // been durably persisted (since it doesn't send an MsgApp to itself). This // response message will be added to msgsAfterAppend and delivered back to @@ -791,7 +793,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { // if r.maybeCommit() { // r.bcastAppend() // } - r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: li}) + r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) return true } @@ -1652,7 +1654,7 @@ func stepFollower(r *raft, m pb.Message) error { } func (r *raft) handleAppendEntries(m pb.Message) { - entries, err := VerifyLogRange(m.Index, m.LogTerm, m.Entries) + app, err := verifyLogAppend(m.Index, m.LogTerm, m.Entries) if err != nil { r.logger.Panicf("%x: invalid MsgApp [index %d, term %d]: %v", r.id, m.Index, m.LogTerm, err) } @@ -1660,8 +1662,8 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } - if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, entries); ok { - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) + if r.raftLog.maybeAppend(app, m.Commit) { + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: app.lastIndex()}) return } r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", diff --git a/raft_test.go b/raft_test.go index 8f046251..4939afc5 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2767,7 +2767,7 @@ func TestLeaderIncreaseNext(t *testing.T) { for i, tt := range tests { sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2))) - sm.raftLog.append(previousEnts) + sm.raftLog.append(logAppend{entries: previousEnts}) sm.becomeCandidate() sm.becomeLeader() sm.prs.Progress[2].State = tt.state @@ -3107,7 +3107,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { commit := uint64(1) storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.raftLog.append(previousEnts) + sm.raftLog.append(logAppend{entries: previousEnts}) sm.raftLog.commitTo(commit) s := pb.Snapshot{ diff --git a/range.go b/range.go index d1821119..4ad3c013 100644 --- a/range.go +++ b/range.go @@ -31,19 +31,19 @@ type LogRange []pb.Entry // VerifyLogRange checks that the given slice represents a valid range of // contiguous log entries that can be appended after (index, term). -func VerifyLogRange(index, term uint64, ents []pb.Entry) (LogRange, error) { - for i := range ents { +func VerifyLogRange(index, term uint64, entries []pb.Entry) (LogRange, error) { + for i := range entries { index++ - if got := ents[i].Index; got != index { + if got := entries[i].Index; got != index { return nil, fmt.Errorf("entry[%d].Index is %d, want %d", i, got, index) } - curTerm := ents[i].Term + curTerm := entries[i].Term if curTerm < term { return nil, fmt.Errorf("entry[%d].Term is %d, want at least %d", i, curTerm, term) } term = curTerm } - return ents, nil + return entries, nil } // Append appends a valid log range to this one. It is like a regular slice @@ -62,12 +62,41 @@ func (r LogRange) Append(other LogRange) LogRange { return append(r, other...) } -func mustLogRange(t *testing.T, ents []pb.Entry) LogRange { +type logAppend struct { + index uint64 // log index after which entries are appended + term uint64 // term of the entry preceding entries + entries LogRange // entries start from index + 1 +} + +func (la logAppend) skip(count int) logAppend { + if count == 0 { + return la + } + return logAppend{ + index: la.index + uint64(count), + term: la.entries[count-1].Term, + entries: la.entries[count:], + } +} + +func (la logAppend) lastIndex() uint64 { + return la.index + uint64(len(la.entries)) +} + +func verifyLogAppend(index, term uint64, entries []pb.Entry) (logAppend, error) { + rng, err := VerifyLogRange(index, term, entries) + if err != nil { + return logAppend{}, err + } + return logAppend{index: index, term: term, entries: rng}, nil +} + +func mustLogRange(t *testing.T, entries []pb.Entry) LogRange { t.Helper() - if len(ents) == 0 { - return ents + if len(entries) == 0 { + return entries } - res, err := VerifyLogRange(ents[0].Index-1, ents[0].Term, ents) + res, err := VerifyLogRange(entries[0].Index-1, entries[0].Term, entries) if err != nil { t.Fatalf("VerifyLogRange: %v", err) }