Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: Introduce CommittedEntries pagination #9982

Merged
merged 2 commits into from
Aug 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,27 @@ type raftLog struct {
applied uint64

logger Logger

maxMsgSize uint64
}

// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the latest snapshot.
// newLog returns log using the given storage and default options. It
// recovers the log to the state that it just commits and applies the
// latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
return newLogWithSize(storage, logger, noLimit)
}

// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
storage: storage,
logger: logger,
maxMsgSize: maxMsgSize,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
Expand Down Expand Up @@ -139,7 +149,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
func (l *raftLog) nextEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex())
if l.committed+1 > off {
ents, err := l.slice(off, l.committed+1, noLimit)
ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand Down
9 changes: 9 additions & 0 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,15 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
// If we hit a size limit when loadaing CommittedEntries, clamp
// our HardState.Commit to what we're actually returning. This is
// also used as our cursor to resume for the next Ready batch.
if len(rd.CommittedEntries) > 0 {
lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
if rd.HardState.Commit > lastCommit.Index {
rd.HardState.Commit = lastCommit.Index
}
}
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
Expand Down
95 changes: 95 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ import (
"github.com/coreos/etcd/raft/raftpb"
)

// readyWithTimeout selects from n.Ready() with a 1-second timeout. It
// panics on timeout, which is better than the indefinite wait that
// would occur if this channel were read without being wrapped in a
// select.
func readyWithTimeout(n Node) Ready {
select {
case rd := <-n.Ready():
return rd
case <-time.After(time.Second):
panic("timed out waiting for ready")
}
}

// TestNodeStep ensures that node.Step sends msgProp to propc chan
// and other kinds of messages to recvc chan.
func TestNodeStep(t *testing.T) {
Expand Down Expand Up @@ -831,3 +844,85 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
close(stop)
<-done
}

func TestAppendPagination(t *testing.T) {
const maxSizePerMsg = 2048
n := newNetworkWithConfig(func(c *Config) {
c.MaxSizePerMsg = maxSizePerMsg
}, nil, nil, nil)

seenFullMessage := false
// Inspect all messages to see that we never exceed the limit, but
// we do see messages of larger than half the limit.
n.msgHook = func(m raftpb.Message) bool {
if m.Type == raftpb.MsgApp {
size := 0
for _, e := range m.Entries {
size += len(e.Data)
}
if size > maxSizePerMsg {
t.Errorf("sent MsgApp that is too large: %d bytes", size)
}
if size > maxSizePerMsg/2 {
seenFullMessage = true
}
}
return true
}

n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})

// Partition the network while we make our proposals. This forces
// the entries to be batched into larger messages.
n.isolate(1)
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 5; i++ {
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
}
n.recover()

// After the partition recovers, tick the clock to wake everything
// back up and send the messages.
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
if !seenFullMessage {
t.Error("didn't see any messages more than half the max size; something is wrong with this test")
}
}

func TestCommitPagination(t *testing.T) {
s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg.MaxSizePerMsg = 2048
r := newRaft(cfg)
n := newNode()
go n.run(r)
n.Campaign(context.TODO())

rd := readyWithTimeout(&n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()

blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 3; i++ {
if err := n.Propose(context.TODO(), blob); err != nil {
t.Fatal(err)
}
}

// The 3 proposals will commit in two batches.
rd = readyWithTimeout(&n)
if len(rd.CommittedEntries) != 2 {
t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()
rd = readyWithTimeout(&n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()
}
2 changes: 1 addition & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLog(c.Storage, c.Logger)
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err) // TODO(bdarnell)
Expand Down
9 changes: 9 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3986,6 +3986,10 @@ type network struct {
storage map[uint64]*MemoryStorage
dropm map[connem]float64
ignorem map[pb.MessageType]bool

// msgHook is called for each message sent. It may inspect the
// message and return true to send it or false to drop it.
msgHook func(pb.Message) bool
}

// newNetwork initializes a network from peers.
Expand Down Expand Up @@ -4104,6 +4108,11 @@ func (nw *network) filter(msgs []pb.Message) []pb.Message {
continue
}
}
if nw.msgHook != nil {
if !nw.msgHook(m) {
continue
}
}
mm = append(mm, m)
}
return mm
Expand Down