Skip to content

Commit

Permalink
tests: make log monitor as common helper
Browse files Browse the repository at this point in the history
It's followup of etcd-io#15667.

This patch is to use zaptest/observer as base to provide a similar
function to pkg/expect.Expect.

The test env

```bash
11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
mkdir /sys/fs/cgroup/etcd-followup-15667
echo 0-2 | tee /sys/fs/cgroup/etcd-followup-15667/cpuset.cpus # three cores
```

Before change:

* memory.peak: ~ 681 MiB
* Elapsed (wall clock) time (h:mm:ss or m:ss): 6:14.04

After change:

* memory.peak: ~ 671 MiB
* Elapsed (wall clock) time (h:mm:ss or m:ss): 6:13.07

Based on the test result, I think it's safe to be enabled by default.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Apr 17, 2023
1 parent 22f3e50 commit e902546
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 109 deletions.
23 changes: 18 additions & 5 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ type Member struct {
Closed bool

GrpcServerRecorder *grpc_testing.GrpcRecorder

LogObserver *testutils.LogObserver
}

func (m *Member) GRPCURL() string { return m.GrpcURL }
Expand Down Expand Up @@ -730,7 +732,9 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
}
m.V2Deprecation = config.V2_DEPR_DEFAULT
m.GrpcServerRecorder = &grpc_testing.GrpcRecorder{}
m.Logger = memberLogger(t, mcfg.Name)

m.Logger, m.LogObserver = memberLogger(t, mcfg.Name)

m.StrictReconfigCheck = !mcfg.DisableStrictReconfigCheck
if err := m.listenGRPC(); err != nil {
t.Fatalf("listenGRPC FAILED: %v", err)
Expand All @@ -743,14 +747,23 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
return m
}

func memberLogger(t testutil.TB, name string) *zap.Logger {
func memberLogger(t testutil.TB, name string) (*zap.Logger, *testutils.LogObserver) {
level := zapcore.InfoLevel
if os.Getenv("CLUSTER_DEBUG") != "" {
level = zapcore.DebugLevel
}

options := zaptest.WrapOptions(zap.Fields(zap.String("member", name)))
return zaptest.NewLogger(t, zaptest.Level(level), options).Named(name)
obCore, logOb := testutils.NewLogObserver(level)

options := zaptest.WrapOptions(
zap.Fields(zap.String("member", name)),

// copy logged entities to log observer
zap.WrapCore(func(oldCore zapcore.Core) zapcore.Core {
return zapcore.NewTee(oldCore, obCore)
}),
)
return zaptest.NewLogger(t, zaptest.Level(level), options).Named(name), logOb
}

// listenGRPC starts a grpc server over a unix domain socket on the member
Expand Down Expand Up @@ -934,7 +947,7 @@ func (m *Member) Clone(t testutil.TB) *Member {
mm.ElectionTicks = m.ElectionTicks
mm.PeerTLSInfo = m.PeerTLSInfo
mm.ClientTLSInfo = m.ClientTLSInfo
mm.Logger = memberLogger(t, mm.Name+"c")
mm.Logger, mm.LogObserver = memberLogger(t, mm.Name+"c")
return mm
}

Expand Down
101 changes: 101 additions & 0 deletions tests/framework/testutils/log_observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package testutils

import (
"context"
"fmt"
"strings"
"sync"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
zapobserver "go.uber.org/zap/zaptest/observer"
)

type LogObserver struct {
ob *zapobserver.ObservedLogs
enc zapcore.Encoder

mu sync.Mutex
// entries stores all the logged entries after syncLogs.
entries []zapobserver.LoggedEntry
}

func NewLogObserver(level zapcore.LevelEnabler) (zapcore.Core, *LogObserver) {
// align with zaptest
enc := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig())

co, ob := zapobserver.New(level)
return co, &LogObserver{
ob: ob,
enc: enc,
}
}

// Expect returns the first N lines containing the given string.
func (logOb *LogObserver) Expect(ctx context.Context, s string, count int) ([]string, error) {
return logOb.ExpectFunc(ctx, func(log string) bool { return strings.Contains(log, s) }, count)
}

// ExpectFunc returns the first N line satisfying the function f.
func (logOb *LogObserver) ExpectFunc(ctx context.Context, filter func(string) bool, count int) ([]string, error) {
i := 0
res := make([]string, 0, count)

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

entries := logOb.syncLogs()

// The order of entries won't be changed because of append-only.
// It's safe to skip scanned entries by reusing `i`.
for ; i < len(entries); i++ {
buf, err := logOb.enc.EncodeEntry(entries[i].Entry, entries[i].Context)
if err != nil {
return nil, fmt.Errorf("failed to encode entry: %v", err)
}

logInStr := buf.String()
if filter(logInStr) {
res = append(res, logInStr)
}

if len(res) >= count {
break
}
}

if len(res) >= count {
return res, nil
}
time.Sleep(10 * time.Millisecond)
}
}

// syncLogs is to take all the existing logged entries from zapobserver and
// truncate zapobserver's entries slice.
func (logOb *LogObserver) syncLogs() []zapobserver.LoggedEntry {
logOb.mu.Lock()
defer logOb.mu.Unlock()

logOb.entries = append(logOb.entries, logOb.ob.TakeAll()...)
return logOb.entries
}
83 changes: 83 additions & 0 deletions tests/framework/testutils/log_observer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package testutils

import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestLogObserver_Timeout(t *testing.T) {
logCore, logOb := NewLogObserver(zap.InfoLevel)

logger := zap.New(logCore)
logger.Info(t.Name())

ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
_, err := logOb.Expect(ctx, "unknown", 1)
cancel()
assert.Equal(t, true, errors.Is(err, context.DeadlineExceeded))

assert.Equal(t, 1, len(logOb.entries))
}

func TestLogObserver_Expect(t *testing.T) {
logCore, logOb := NewLogObserver(zap.InfoLevel)

logger := zap.New(logCore)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

resCh := make(chan []string, 1)
go func() {
defer close(resCh)

res, err := logOb.Expect(ctx, t.Name(), 2)
require.NoError(t, err)
resCh <- res
}()

msgs := []string{"Hello " + t.Name(), t.Name() + ", World"}
for _, msg := range msgs {
logger.Info(msg)
time.Sleep(40 * time.Millisecond)
}

res := <-resCh
assert.Equal(t, 2, len(res))

// The logged message should be like
//
// 2023-04-16T11:46:19.367+0800 INFO Hello TestLogObserver_Expect
// 2023-04-16T11:46:19.408+0800 INFO TestLogObserver_Expect, World
//
// The prefix timestamp is unpredictable so we should assert the suffix
// only.
for idx := range msgs {
expected := fmt.Sprintf("\tINFO\t%s\n", msgs[idx])
assert.Equal(t, true, strings.HasSuffix(res[idx], expected))
}

assert.Equal(t, 2, len(logOb.entries))
}
Loading

0 comments on commit e902546

Please sign in to comment.