Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: use a type-safe slice of log entries #46

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {

ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
}
rn.raft.raftLog.append(ents...)
rn.raft.raftLog.append(logAppend{
index: 0,
term: 0,
entries: ents,
})

// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
Expand Down
117 changes: 62 additions & 55 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,62 +104,61 @@ func (l *raftLog) String() string {
l.committed, l.applied, l.applying, l.unstable.offset, l.unstable.offsetInProgress, len(l.unstable.entries))
}

// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if !l.matchTerm(index, logTerm) {
return 0, false
}

lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
}
l.append(ents[ci-offset:]...)
// maybeAppend appends the given entries to the log, advances the committed
// index, and returns true. If the requested operation conflicts with the log,
// does nothing and returns false.
func (l *raftLog) maybeAppend(la logAppend, committed uint64) bool {
la, ok := l.findConflict(la)
if !ok {
return false
}
if len(la.entries) != 0 { // there is at least one entry to append
l.append(la)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
l.commitTo(min(committed, la.lastIndex()))
return true
}

func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
// append conditionally appends the given range of entries to the log. It does
// so only if term(la.index) == la.term. All entries after la.index, if any, are
// erased and replaced with la.entries, unless len(la.entries) == 0.
//
// TODO(pavelkalinnikov): rename to truncateAndAppend, to match unstable.
func (l *raftLog) append(la logAppend) bool {
if !l.matchTerm(la.index, la.term) {
return false
}
if after := ents[0].Index - 1; after < l.committed {
if after := la.index; after < l.committed { // don't erase committed entries
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}

// findConflict finds the index of the conflict.
// It returns the first pair of conflicting entries between the existing
// entries and the given entries, if there are any.
// If there is no conflicting entries, and the existing entries contains
// all the given entries, zero will be returned.
// If there is no conflicting entries, but the given entries contains new
// entries, the index of the first new entry will be returned.
// An entry is considered to be conflicting if it has the same index but
// a different term.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for _, ne := range ents {
if len(la.entries) > 0 { // TODO(pavelkalinnikov): allow len == 0
l.unstable.truncateAndAppend(la.entries)
}
return true
}

// findConflict verifies that the given logAppend request can be performed on
// the log, and finds the first entry in it that is not yet present in the log.
// Returns a modified request that only contains missing entries.
//
// Returns false if the request can not proceed because it conflicts with the
// log, i.e. if term(la.index) != la.term or la.index is missing.
func (l *raftLog) findConflict(la logAppend) (logAppend, bool) {
if !l.matchTerm(la.index, la.term) {
return logAppend{}, false
}
mismatch := len(la.entries)
for i, ne := range la.entries {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term)
}
return ne.Index
mismatch = i
break
}
}
return 0
return la.skip(mismatch), true
}

// findConflictByTerm returns a best guess on where this log ends matching
Expand Down Expand Up @@ -191,7 +190,7 @@ func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64)

// nextUnstableEnts returns all entries that are available to be written to the
// local stable log and are not already in-progress.
func (l *raftLog) nextUnstableEnts() []pb.Entry {
func (l *raftLog) nextUnstableEnts() LogRange {
return l.unstable.nextEntries()
}

Expand All @@ -213,7 +212,7 @@ func (l *raftLog) hasNextOrInProgressUnstableEnts() bool {
// appended them to the local raft log yet. If allowUnstable is true, committed
// entries from the unstable log may be returned; otherwise, only entries known
// to reside locally on stable storage will be returned.
func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
func (l *raftLog) nextCommittedEnts(allowUnstable bool) LogRange {
if l.applyingEntsPaused {
// Entry application outstanding size limit reached.
return nil
Expand Down Expand Up @@ -371,11 +370,17 @@ func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() }

func (l *raftLog) lastTerm() uint64 {
t, err := l.term(l.lastIndex())
_, term := l.tip()
return term
}

func (l *raftLog) tip() (index, term uint64) {
index = l.lastIndex()
t, err := l.term(index)
if err != nil {
l.logger.Panicf("unexpected error when getting the last term (%v)", err)
}
return t
return index, t
}

func (l *raftLog) term(i uint64) (uint64, error) {
Expand Down Expand Up @@ -406,15 +411,15 @@ func (l *raftLog) term(i uint64) (uint64, error) {
panic(err) // TODO(bdarnell)
}

func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) (LogRange, error) {
if i > l.lastIndex() {
return nil, nil
}
return l.slice(i, l.lastIndex()+1, maxSize)
}

// allEntries returns all entries in the log.
func (l *raftLog) allEntries() []pb.Entry {
func (l *raftLog) allEntries() LogRange {
ents, err := l.entries(l.firstIndex(), noLimit)
if err == nil {
return ents
Expand Down Expand Up @@ -462,15 +467,15 @@ func (l *raftLog) restore(s pb.Snapshot) {
}

// slice returns a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) (LogRange, error) {
err := l.mustCheckOutOfBounds(lo, hi)
if err != nil {
return nil, err
}
if lo == hi {
return nil, nil
}
var ents []pb.Entry
var ents LogRange
if lo < l.unstable.offset {
storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), uint64(maxSize))
if err == ErrCompacted {
Expand All @@ -486,15 +491,17 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e
return storedEnts, nil
}

// TODO(pavelkalinnikov): Storage.Entries() returns []pb.Entry which is not
// verified. Verify it before converting to LogRange, or (better) require
// the API to return a verified range.
ents = storedEnts
}
if hi > l.unstable.offset {
unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
if len(ents) > 0 {
combined := make([]pb.Entry, len(ents)+len(unstable))
n := copy(combined, ents)
copy(combined[n:], unstable)
ents = combined
combined := make(LogRange, 0, len(ents)+len(unstable))
combined = append(combined, ents...)
ents = combined.Append(unstable)
} else {
ents = unstable
}
Expand Down
Loading