Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: deflake TestV3WatchRestoreSnapshotUnsync #15667

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 121 additions & 4 deletions tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ package integration
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"
)
Expand Down Expand Up @@ -54,7 +58,9 @@ func MustFetchNotEmptyMetric(tb testing.TB, member *integration.Member, metric s
func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{
logMonitor := newTestingLogfMonitor(t)

clus := integration.NewCluster(logMonitor, &integration.ClusterConfig{
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
Expand All @@ -81,14 +87,31 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}

clus.Members[0].InjectPartition(t, clus.Members[1:]...)
initialLead := clus.WaitMembersForLeader(t, clus.Members[1:])
initialLead := clus.WaitMembersForLeader(t, clus.Members[1:]) + 1
fuweid marked this conversation as resolved.
Show resolved Hide resolved
t.Logf("elected lead: %v", clus.Members[initialLead].Server.MemberId())
t.Logf("sleeping for 2 seconds")
time.Sleep(2 * time.Second)
t.Logf("sleeping for 2 seconds DONE")

kvc := integration.ToGRPC(clus.Client(1)).KV

// NOTE: When starting a new cluster with 3 members, each member will
// apply 3 ConfChange directly at the beginning before a leader is
// elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet
// changes. So member 0 has index 8 in raft log before network
// partition. We need to trigger EtcdServer.snapshot() at least twice.
//
// SnapshotCount: 10, SnapshotCatchUpEntries: 5
//
// T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date)
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
logSubID := "compacted"
logSub := newLineCountExpecter("compacted Raft logs", 4) // two members
logMonitor.addSubscriber(logSubID, logSub)

// to trigger snapshot from the leader to the stopped follower
for i := 0; i < 15; i++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
Expand All @@ -97,8 +120,16 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
}

// trigger snapshot send from leader to this slow follower
// which then calls watchable store Restore
// ensure two members has compacted the log twice.
if err := logSub.wait(5 * time.Second); err != nil {
t.Fatal("Failed to ensure that members compacted Raft log in 5 seconds")
}
logMonitor.delSubscriber(logSubID)
t.Logf("two members have compacted raft logs")

// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
// L(compacted-index:17).
clus.Members[0].RecoverPartition(t, clus.Members[1:]...)
// We don't expect leadership change here, just recompute the leader'Server index
// within clus.Members list.
Expand Down Expand Up @@ -154,3 +185,89 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
}
}

type lineCountExpecter struct {
doneOnce sync.Once
doneCh chan struct{}

content string
count int64
seen int64
}

func newLineCountExpecter(expectedContent string, expectedCount int64) *lineCountExpecter {
return &lineCountExpecter{
doneCh: make(chan struct{}),
content: expectedContent,
count: expectedCount,
}
}

func (le *lineCountExpecter) Notify(log string) {
if !strings.Contains(log, le.content) {
return
}

if atomic.AddInt64(&le.seen, 1) >= le.count {
le.doneOnce.Do(func() {
close(le.doneCh)
})
}
}

func (le *lineCountExpecter) wait(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()

select {
case <-le.doneCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

type testingLogfSubscriber interface {
Notify(log string)
}

// testingLogfMonitor is to monitor t.Logf output.
type testingLogfMonitor struct {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice implementation, didn't know it's possible to get to all logs in integration (there is AssertProcessLogs for e2e).

It seams like this is implemented to be reused in other tests. Should this be moved to util_test.go? cc @serathius

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func memberLogger(t testutil.TB, name string) *zap.Logger {

I think we can extend the zaptest.Core with tee function. It can be aligned with AssertProcessLogs for process expecter~

testutil.TB

mu sync.RWMutex
subscribers map[string]testingLogfSubscriber
}

func newTestingLogfMonitor(tb testutil.TB) *testingLogfMonitor {
return &testingLogfMonitor{
TB: tb,
subscribers: make(map[string]testingLogfSubscriber),
}
}

func (m *testingLogfMonitor) addSubscriber(id string, sub testingLogfSubscriber) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comment: change addSubscriber to register, and delSubscriber to deregister

m.mu.Lock()
defer m.mu.Unlock()
m.subscribers[id] = sub
}

func (m *testingLogfMonitor) delSubscriber(id string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.subscribers, id)
}

func (m *testingLogfMonitor) Logf(format string, args ...interface{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a generic solution, probably we should implementa Log as well.

m.mu.RLock()
if len(m.subscribers) > 0 {
log := fmt.Sprintf(format, args...)

for _, sub := range m.subscribers {
sub.Notify(log)
}
}
m.mu.RUnlock()

m.TB.Logf(format, args...)
}