diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index e63cc0ce98a7..c023528047b8 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -581,6 +581,8 @@ type Member struct { Closed bool GrpcServerRecorder *grpc_testing.GrpcRecorder + + LogObserver *testutils.LogObserver } func (m *Member) GRPCURL() string { return m.GrpcURL } @@ -730,7 +732,9 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { } m.V2Deprecation = config.V2_DEPR_DEFAULT m.GrpcServerRecorder = &grpc_testing.GrpcRecorder{} - m.Logger = memberLogger(t, mcfg.Name) + + m.Logger, m.LogObserver = memberLogger(t, mcfg.Name) + m.StrictReconfigCheck = !mcfg.DisableStrictReconfigCheck if err := m.listenGRPC(); err != nil { t.Fatalf("listenGRPC FAILED: %v", err) @@ -743,14 +747,23 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { return m } -func memberLogger(t testutil.TB, name string) *zap.Logger { +func memberLogger(t testutil.TB, name string) (*zap.Logger, *testutils.LogObserver) { level := zapcore.InfoLevel if os.Getenv("CLUSTER_DEBUG") != "" { level = zapcore.DebugLevel } - options := zaptest.WrapOptions(zap.Fields(zap.String("member", name))) - return zaptest.NewLogger(t, zaptest.Level(level), options).Named(name) + obCore, logOb := testutils.NewLogObserver(level) + + options := zaptest.WrapOptions( + zap.Fields(zap.String("member", name)), + + // copy logged entities to log observer + zap.WrapCore(func(oldCore zapcore.Core) zapcore.Core { + return zapcore.NewTee(oldCore, obCore) + }), + ) + return zaptest.NewLogger(t, zaptest.Level(level), options).Named(name), logOb } // listenGRPC starts a grpc server over a unix domain socket on the member @@ -934,7 +947,7 @@ func (m *Member) Clone(t testutil.TB) *Member { mm.ElectionTicks = m.ElectionTicks mm.PeerTLSInfo = m.PeerTLSInfo mm.ClientTLSInfo = m.ClientTLSInfo - mm.Logger = memberLogger(t, mm.Name+"c") + mm.Logger, mm.LogObserver = memberLogger(t, mm.Name+"c") return mm } diff --git a/tests/framework/testutils/log_observer.go b/tests/framework/testutils/log_observer.go new file mode 100644 index 000000000000..1fffadb636ae --- /dev/null +++ b/tests/framework/testutils/log_observer.go @@ -0,0 +1,101 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + zapobserver "go.uber.org/zap/zaptest/observer" +) + +type LogObserver struct { + ob *zapobserver.ObservedLogs + enc zapcore.Encoder + + mu sync.Mutex + // entries stores all the logged entries after syncLogs. + entries []zapobserver.LoggedEntry +} + +func NewLogObserver(level zapcore.LevelEnabler) (zapcore.Core, *LogObserver) { + // align with zaptest + enc := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) + + co, ob := zapobserver.New(level) + return co, &LogObserver{ + ob: ob, + enc: enc, + } +} + +// Expect returns the first N lines containing the given string. +func (logOb *LogObserver) Expect(ctx context.Context, s string, count int) ([]string, error) { + return logOb.ExpectFunc(ctx, func(log string) bool { return strings.Contains(log, s) }, count) +} + +// ExpectFunc returns the first N line satisfying the function f. +func (logOb *LogObserver) ExpectFunc(ctx context.Context, filter func(string) bool, count int) ([]string, error) { + i := 0 + res := make([]string, 0, count) + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + entries := logOb.syncLogs() + + // The order of entries won't be changed because of append-only. + // It's safe to skip scanned entries by reusing `i`. + for ; i < len(entries); i++ { + buf, err := logOb.enc.EncodeEntry(entries[i].Entry, entries[i].Context) + if err != nil { + return nil, fmt.Errorf("failed to encode entry: %v", err) + } + + logInStr := buf.String() + if filter(logInStr) { + res = append(res, logInStr) + } + + if len(res) >= count { + break + } + } + + if len(res) >= count { + return res, nil + } + time.Sleep(10 * time.Millisecond) + } +} + +// syncLogs is to take all the existing logged entries from zapobserver and +// truncate zapobserver's entries slice. +func (logOb *LogObserver) syncLogs() []zapobserver.LoggedEntry { + logOb.mu.Lock() + defer logOb.mu.Unlock() + + logOb.entries = append(logOb.entries, logOb.ob.TakeAll()...) + return logOb.entries +} diff --git a/tests/framework/testutils/log_observer_test.go b/tests/framework/testutils/log_observer_test.go new file mode 100644 index 000000000000..bddd407ebcd4 --- /dev/null +++ b/tests/framework/testutils/log_observer_test.go @@ -0,0 +1,83 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestLogObserver_Timeout(t *testing.T) { + logCore, logOb := NewLogObserver(zap.InfoLevel) + + logger := zap.New(logCore) + logger.Info(t.Name()) + + ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) + _, err := logOb.Expect(ctx, "unknown", 1) + cancel() + assert.Equal(t, true, errors.Is(err, context.DeadlineExceeded)) + + assert.Equal(t, 1, len(logOb.entries)) +} + +func TestLogObserver_Expect(t *testing.T) { + logCore, logOb := NewLogObserver(zap.InfoLevel) + + logger := zap.New(logCore) + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + resCh := make(chan []string, 1) + go func() { + defer close(resCh) + + res, err := logOb.Expect(ctx, t.Name(), 2) + require.NoError(t, err) + resCh <- res + }() + + msgs := []string{"Hello " + t.Name(), t.Name() + ", World"} + for _, msg := range msgs { + logger.Info(msg) + time.Sleep(40 * time.Millisecond) + } + + res := <-resCh + assert.Equal(t, 2, len(res)) + + // The logged message should be like + // + // 2023-04-16T11:46:19.367+0800 INFO Hello TestLogObserver_Expect + // 2023-04-16T11:46:19.408+0800 INFO TestLogObserver_Expect, World + // + // The prefix timestamp is unpredictable so we should assert the suffix + // only. + for idx := range msgs { + expected := fmt.Sprintf("\tINFO\t%s\n", msgs[idx]) + assert.Equal(t, true, strings.HasSuffix(res[idx], expected)) + } + + assert.Equal(t, 2, len(logOb.entries)) +} diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index 9178d95f64b8..40a42ecad1c1 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -17,14 +17,10 @@ package integration import ( "context" "fmt" - "strings" - "sync" - "sync/atomic" "testing" "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/integration" ) @@ -58,9 +54,7 @@ func MustFetchNotEmptyMetric(tb testing.TB, member *integration.Member, metric s func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { integration.BeforeTest(t) - logMonitor := newTestingLogfMonitor(t) - - clus := integration.NewCluster(logMonitor, &integration.ClusterConfig{ + clus := integration.NewCluster(t, &integration.ClusterConfig{ Size: 3, SnapshotCount: 10, SnapshotCatchUpEntries: 5, @@ -95,6 +89,14 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { kvc := integration.ToGRPC(clus.Client(1)).KV + // to trigger snapshot from the leader to the stopped follower + for i := 0; i < 15; i++ { + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + if err != nil { + t.Errorf("#%d: couldn't put key (%v)", i, err) + } + } + // NOTE: When starting a new cluster with 3 members, each member will // apply 3 ConfChange directly at the beginning before a leader is // elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet @@ -108,24 +110,7 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // // Since there is no way to confirm server has compacted the log, we // use log monitor to watch and expect "compacted Raft logs" content. - logSubID := "compacted" - logSub := newLineCountExpecter("compacted Raft logs", 4) // two members - logMonitor.addSubscriber(logSubID, logSub) - - // to trigger snapshot from the leader to the stopped follower - for i := 0; i < 15; i++ { - _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) - if err != nil { - t.Errorf("#%d: couldn't put key (%v)", i, err) - } - } - - // ensure two members has compacted the log twice. - if err := logSub.wait(5 * time.Second); err != nil { - t.Fatal("Failed to ensure that members compacted Raft log in 5 seconds") - } - logMonitor.delSubscriber(logSubID) - t.Logf("two members have compacted raft logs") + expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2) // After RecoverPartition, leader L will send snapshot to slow F_m0 // follower, because F_m0(index:8) is 'out of date' compared to @@ -154,6 +139,8 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { t.Fatalf("inflight snapshot receives expected 0 or 1, got %q", receives) } + expectMemberLog(t, clus.Members[0], 5*time.Second, "received and saved database snapshot", 1) + t.Logf("sleeping for 2 seconds") time.Sleep(2 * time.Second) t.Logf("sleeping for 2 seconds DONE") @@ -186,88 +173,15 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { } } -type lineCountExpecter struct { - doneOnce sync.Once - doneCh chan struct{} - - content string - count int64 - seen int64 -} - -func newLineCountExpecter(expectedContent string, expectedCount int64) *lineCountExpecter { - return &lineCountExpecter{ - doneCh: make(chan struct{}), - content: expectedContent, - count: expectedCount, - } -} - -func (le *lineCountExpecter) Notify(log string) { - if !strings.Contains(log, le.content) { - return - } - - if atomic.AddInt64(&le.seen, 1) >= le.count { - le.doneOnce.Do(func() { - close(le.doneCh) - }) - } -} - -func (le *lineCountExpecter) wait(timeout time.Duration) error { +func expectMemberLog(t *testing.T, m *integration.Member, timeout time.Duration, s string, count int) { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() - select { - case <-le.doneCh: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -type testingLogfSubscriber interface { - Notify(log string) -} - -// testingLogfMonitor is to monitor t.Logf output. -type testingLogfMonitor struct { - testutil.TB - - mu sync.RWMutex - subscribers map[string]testingLogfSubscriber -} - -func newTestingLogfMonitor(tb testutil.TB) *testingLogfMonitor { - return &testingLogfMonitor{ - TB: tb, - subscribers: make(map[string]testingLogfSubscriber), + lines, err := m.LogObserver.Expect(ctx, s, count) + if err != nil { + t.Fatalf("failed to expect (log:%s, count:%v): %v", s, count, err) } -} - -func (m *testingLogfMonitor) addSubscriber(id string, sub testingLogfSubscriber) { - m.mu.Lock() - defer m.mu.Unlock() - m.subscribers[id] = sub -} - -func (m *testingLogfMonitor) delSubscriber(id string) { - m.mu.Lock() - defer m.mu.Unlock() - delete(m.subscribers, id) -} - -func (m *testingLogfMonitor) Logf(format string, args ...interface{}) { - m.mu.RLock() - if len(m.subscribers) > 0 { - log := fmt.Sprintf(format, args...) - - for _, sub := range m.subscribers { - sub.Notify(log) - } + for _, line := range lines { + t.Logf("[expected line]: %v", line) } - m.mu.RUnlock() - - m.TB.Logf(format, args...) }