Skip to content

Commit

Permalink
tests: deflake TestV3WatchRestoreSnapshotUnsync
Browse files Browse the repository at this point in the history
The TestV3WatchRestoreSnapshotUnsync setups three members' cluster.
Before serving any update requests from client, after leader elected,
each member will have index 8 log: 3 x ConfChange +
3 x ClusterMemberAttrSet + 1 x ClusterVersionSet.

Based on the config (SnapshotCount: 10, CatchUpCount: 5), we need to
file update requests to trigger snapshot at least twice.

T1: L(snapshot-index: 11, compacted-index:  6) F_m0(index: 8)
T2: L(snapshot-index: 22, compacted-index: 17) F_m0(index: 8, out of date)

After member0 recovers from network partition, it will reject leader's
request and return hint (index:8, term:x). If it happens after
second snapshot, leader will find out the index:8 is out of date and
force to transfer snapshot.

However, the client only files 15 update requests and leader doesn't
finish the process of snapshot in time. Since the last of
compacted-index is 6, leader can still replicate index:9 to member0
instead of snapshot.

```bash
cd tests/integration
CLUSTER_DEBUG=true go test -v -count=1 -run TestV3WatchRestoreSnapshotUnsync ./
...

INFO    m2.raft 3da8ba707f1a21a4 became leader at term 2        {"member": "m2"}
...
INFO    m2      triggering snapshot     {"member": "m2", "local-member-id": "3da8ba707f1a21a4", "local-member-applied-index": 22, "local-member-snapshot-index": 11, "local-member-snapshot-count": 10, "snapshot-forced": false}
...

cluster.go:1359: network partition between: 99626fe5001fde8b <-> 1c964119da6db036
cluster.go:1359: network partition between: 99626fe5001fde8b <-> 3da8ba707f1a21a4
cluster.go:416: WaitMembersForLeader

INFO    m0.raft 99626fe5001fde8b became follower at term 2      {"member": "m0"}
INFO    m0.raft raft.node: 99626fe5001fde8b elected leader 3da8ba707f1a21a4 at term 2   {"member": "m0"}
DEBUG   m2.raft 3da8ba707f1a21a4 received MsgAppResp(rejected, hint: (index 8, term 2)) from 99626fe5001fde8b for index 23      {"member": "m2"}
DEBUG   m2.raft 3da8ba707f1a21a4 decreased progress of 99626fe5001fde8b to [StateReplicate match=8 next=9 inflight=15]  {"member": "m2"}

DEBUG   m0      Applying entries        {"member": "m0", "num-entries": 15}
DEBUG   m0      Applying entry  {"member": "m0", "index": 9, "term": 2, "type": "EntryNormal"}

....

INFO    m2      saved snapshot  {"member": "m2", "snapshot-index": 22}
INFO    m2      compacted Raft logs     {"member": "m2", "compact-index": 17}
```

To fix this issue, the patch uses log monitor to watch "compacted Raft
log" and expect that two members should compact log twice.

Fixes: #15545

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Apr 8, 2023
1 parent caa775d commit 4632d29
Showing 1 changed file with 122 additions and 4 deletions.
126 changes: 122 additions & 4 deletions tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ package integration
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"
)
Expand Down Expand Up @@ -54,7 +58,9 @@ func MustFetchNotEmptyMetric(tb testing.TB, member *integration.Member, metric s
func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{
logMonitor := newTestingLogfMonitor(t)

clus := integration.NewCluster(logMonitor, &integration.ClusterConfig{
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
Expand All @@ -81,14 +87,30 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}

clus.Members[0].InjectPartition(t, clus.Members[1:]...)
initialLead := clus.WaitMembersForLeader(t, clus.Members[1:])
initialLead := clus.WaitMembersForLeader(t, clus.Members[1:]) + 1
t.Logf("elected lead: %v", clus.Members[initialLead].Server.MemberId())
t.Logf("sleeping for 2 seconds")
time.Sleep(2 * time.Second)
t.Logf("sleeping for 2 seconds DONE")

kvc := integration.ToGRPC(clus.Client(1)).KV

// NOTE: In 3 members cluster, after initial lead has been elected,
// there are 3xConfChange + 3xMemberAttrSet + 1xClusterVersionSet logs.
// So member 0 has index 8 in raft log before network partition. We
// need to trigger snapshot at least twice.
//
// SnapshotCount: 10, SnapshotCatchUpEntries: 5
//
// T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date)
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
logTopic := "compacted"
logSub := newlineCountExpecter("compacted Raft logs", 4) // two members
logMonitor.addSubscriber(logTopic, logSub)

// to trigger snapshot from the leader to the stopped follower
for i := 0; i < 15; i++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
Expand All @@ -97,12 +119,22 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
}

// ensure two members has compacted the log twice.
if err := logSub.wait(5 * time.Second); err != nil {
t.Fatal("Failed to ensure that members compacted Raft log in 5 seconds")
}
logMonitor.delSubscriber(logTopic)
t.Logf("two members have compacted raft logs")

// trigger snapshot send from leader to this slow follower
// which then calls watchable store Restore
clus.Members[0].RecoverPartition(t, clus.Members[1:]...)
// We don't expect leadership change here, just recompute the leader'Server index
// within clus.Members list.
// We don't expect leadership change here, recompute the leader'Server
// index within clus.Members list.
lead := clus.WaitLeader(t)
if lead != initialLead {
t.Fatalf("expected leader index (%v), but got (%v)", initialLead, lead)
}

// Sending is scheduled on fifo 'sched' within EtcdServer::run,
// so it can start delayed after recovery.
Expand Down Expand Up @@ -154,3 +186,89 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
}
}

type lineCountExpecter struct {
doneOnce sync.Once
doneCh chan struct{}

content string
count int64
seen int64
}

func newlineCountExpecter(interestingContent string, expectedCount int64) *lineCountExpecter {
return &lineCountExpecter{
doneCh: make(chan struct{}),
content: interestingContent,
count: expectedCount,
}
}

func (le *lineCountExpecter) Notify(log string) {
if !strings.Contains(log, le.content) {
return
}

if atomic.AddInt64(&le.seen, 1) >= le.count {
le.doneOnce.Do(func() {
close(le.doneCh)
})
}
}

func (le *lineCountExpecter) wait(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()

select {
case <-le.doneCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

type testingLogfSubscriber interface {
Notify(log string)
}

// testingLogfMonitor is to monitor t.Logf output.
type testingLogfMonitor struct {
testutil.TB

mu sync.RWMutex
subscribers map[string]testingLogfSubscriber
}

func newTestingLogfMonitor(tb testutil.TB) *testingLogfMonitor {
return &testingLogfMonitor{
TB: tb,
subscribers: make(map[string]testingLogfSubscriber),
}
}

func (m *testingLogfMonitor) addSubscriber(topic string, sub testingLogfSubscriber) {
m.mu.Lock()
defer m.mu.Unlock()
m.subscribers[topic] = sub
}

func (m *testingLogfMonitor) delSubscriber(topic string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.subscribers, topic)
}

func (m *testingLogfMonitor) Logf(format string, args ...interface{}) {
m.mu.RLock()
if len(m.subscribers) > 0 {
log := fmt.Sprintf(format, args...)

for _, sub := range m.subscribers {
sub.Notify(log)
}
}
m.mu.RUnlock()

m.TB.Logf(format, args...)
}

0 comments on commit 4632d29

Please sign in to comment.