Skip to content

Commit

Permalink
tests/robustness: Validate all etcd watches opened to etcd
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed May 15, 2023
1 parent 4675e5c commit 2ef3492
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 187 deletions.
32 changes: 16 additions & 16 deletions tests/robustness/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# etcd Robustness Testing

Purpose of etcd robustness tests is to validate that etcd upholds
[API guarantees] and [watch guarantees] under any condition or failure.
[KV API guarantees] and [watch API guarantees] under any condition or failure.

Robustness tests achieve that comparing etcd cluster behavior against a simplified model.
Multiple test encompass different etcd cluster setups, client traffic types and failures experienced by cluster.
During a single test we create a cluster and inject failures while sending and recording client traffic.
Correctness is validated by running collected history of client operations against the etcd model and a set of validators.
Upon failure tests generate a report that can be used to attribute whether failure was caused by bug in etcd or test framework.

[API guarantees]: https://etcd.io/docs/latest/learning/api_guarantees/
[watch guarantees]: https://etcd.io/docs/latest/learning/api/#watch-streams
[KV API guarantees]: https://etcd.io/docs/v3.6/learning/api_guarantees/#kv-apis
[watch API guarantees]: https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis

## Running locally

Expand Down Expand Up @@ -41,15 +41,15 @@ is included in test logs. One of log lines should look like:
logger.go:130: 2023-03-18T12:18:03.244+0100 INFO Saving member data dir {"member": "TestRobustnessIssue14370-test-0", "path": "/tmp/TestRobustness_Issue14370/TestRobustnessIssue14370-test-0"}
logger.go:130: 2023-03-18T12:18:03.244+0100 INFO Saving watch responses {"path": "/tmp/TestRobustness_Issue14370/TestRobustnessIssue14370-test-0/responses.json"}
logger.go:130: 2023-03-18T12:18:03.247+0100 INFO Saving watch events {"path": "/tmp/TestRobustness_Issue14370/TestRobustnessIssue14370-test-0/events.json"}
logger.go:130: 2023-03-18T12:18:03.248+0100 INFO Saving operation history {"path": "/tmp/TestRobustness_Issue14370/full-history.json"}
logger.go:130: 2023-03-18T12:18:03.252+0100 INFO Saving operation history {"path": "/tmp/TestRobustness_Issue14370/patched-history.json"}
logger.go:130: 2023-05-15T17:42:37.792+0200 INFO Saving operation history {"path": "/tmp/TestRobustness_ClusterOfSize3_Kubernetes/client-1/operations.json"}
logger.go:130: 2023-05-15T17:42:37.793+0200 INFO Saving watch responses {"path": "/tmp/TestRobustness_ClusterOfSize3_Kubernetes/client-2/watch.json"}
logger.go:130: 2023-03-18T12:18:03.256+0100 INFO Saving visualization {"path": "/tmp/TestRobustness_Issue14370/history.html"}
```

Report includes multiple types of files:
* Member db files, can be used to verify disk/memory corruption.
* Watch responses saved as json, can be used to validate [watch guarantees].
* Operation history saved as both html visualization and a json, can be used to validate [API guarantees].
* Watch responses saved as json, can be used to validate [watch API guarantees].
* Operation history saved as both html visualization and a json, can be used to validate [KV API guarantees].

### Example analysis of linearization issue

Expand Down Expand Up @@ -78,22 +78,22 @@ To reproduce the issue by yourself run `make test-robustness-issue15271`.
After a couple of tries robustness tests should fail with a logs `Broke watch guarantee` and save report locally.

Watch issues are easiest to analyse by reading the recorded watch history.
Watch history is recorded for each member separated in different subdirectory under `/tmp/TestRobustness_Issue15271/`
Open `responses.json` for member mentioned in log `Broke watch guarantee`.
For example for member `TestRobustnessIssue15271-test-1` open `/tmp/TestRobustness_Issue15271/TestRobustnessIssue15271-test-1/responses.json`.
Watch history is recorded for each client separated in different subdirectory under `/tmp/TestRobustness_Issue15271/`
Open `watch.json` for client mentioned in log `Broke watch guarantee`.
For example for client `14` open `/tmp/TestRobustness_Issue15271/client-14/watch.json`.

Each line consists of json blob corresponding to single watch response observed by client.
Look for lines with `mod_revision` equal to revision mentioned in the first log with `Broke watch guarantee`
You should see two lines where the `mod_revision` decreases like ones below:
```
{"Header":{"cluster_id":12951239930360520062,"member_id":16914881897345358027,"revision":2574,"raft_term":2},"Events":[{"kv":{"key":"Ng==","create_revision":2303,"mod_revision":2574,"version":46,"value":"Mjg5OA=="}}],"CompactRevision":0,"Canceled":false,"Created":false}
{"Header":{"cluster_id":12951239930360520062,"member_id":16914881897345358027,"revision":7708,"raft_term":2},"Events":[{"kv":{"key":"NQ==","create_revision":5,"mod_revision":91,"version":10,"value":"MTAy"}}, ... }
{"Events":[{"Op":{"Type":"put","Key":"5","WithPrefix":false,"Limit":0,"Value":{"Value":"357","Hash":0},"LeaseID":0},"Revision":335}],"IsProgressNotify":false,"Revision":335,"Time":1050415777}
{"Events":[{"Op":{"Type":"put","Key":"1","WithPrefix":false,"Limit":0,"Value":{"Value":"24","Hash":0},"LeaseID":0},"Revision":24}, ...
```
Up to the first line the `mod_revision` of events within responses only increased up to a value of `2574`.
However, the following line includes an event with `mod_revision` equal `91`.
If you follow the `mod_revision` throughout the file you should notice that watch replayed revisions second time.
This is incorrect and breaks `Ordered` and `Unique` [watch guarantees].
Up to the first line the `revision` of events within responses only increased up to a value of `335`.
However, the following line includes an event with `revision` equal `24`.
If you follow the `revision` throughout the file you should notice that watch replayed revisions second time.
This is incorrect and breaks `Ordered` and `Unique` [watch API guarantees].
This is consistent with the root cause of [#14370] where member reconnecting to cluster will incorrectly resend revisions.
[#15271]: https://github.com/etcd-io/etcd/issues/15271
7 changes: 7 additions & 0 deletions tests/robustness/identity/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Provider interface {
NewStreamId() int
// NewRequestId returns unique identification used to make write requests unique.
NewRequestId() int
// NewClientId returns unique identification for client and their reports.
NewClientId() int
}

func NewIdProvider() Provider {
Expand All @@ -30,6 +32,7 @@ func NewIdProvider() Provider {
type atomicProvider struct {
streamId atomic.Int64
requestId atomic.Int64
clientId atomic.Int64
}

func (id *atomicProvider) NewStreamId() int {
Expand All @@ -39,3 +42,7 @@ func (id *atomicProvider) NewStreamId() int {
func (id *atomicProvider) NewRequestId() int {
return int(id.requestId.Add(1))
}

func (id *atomicProvider) NewClientId() int {
return int(id.clientId.Add(1))
}
37 changes: 20 additions & 17 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"testing"
"time"

"github.com/anishathalye/porcupine"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
Expand Down Expand Up @@ -152,27 +152,24 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
defer func() {
r.Report(t, panicked)
}()
r.operations, r.responses = s.run(ctx, t, lg, r.clus)
r.clientReports = s.run(ctx, t, lg, r.clus)
forcestopCluster(r.clus)

watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, r.clus, r.responses, s.watch.requestProgress || watchProgressNotifyEnabled)

r.events = watchEvents(r.responses)
validateEventsMatch(t, r.events)

r.patchedOperations = patchOperationBasedOnWatchEvents(r.operations, longestHistory(r.events))
r.visualizeHistory = model.ValidateOperationHistoryAndReturnVisualize(t, lg, r.patchedOperations)
validateGotAtLeastOneProgressNotify(t, r.clientReports, s.watch.requestProgress || watchProgressNotifyEnabled)
r.visualizeHistory = validateCorrectness(t, lg, r.clientReports)

panicked = false
}

func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]traffic.WatchResponse) {
func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (reports []traffic.ClientReport) {
g := errgroup.Group{}
var operationReport, watchReport []traffic.ClientReport
finishTraffic := make(chan struct{})

// baseTime is used to get monotonic clock reading when recording operations/watch events
baseTime := time.Now()
ids := identity.NewIdProvider()
g.Go(func() error {
defer close(finishTraffic)
injectFailpoints(ctx, t, lg, clus, s.failpoint)
Expand All @@ -182,22 +179,22 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
defer close(maxRevisionChan)
operations = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime)
maxRevisionChan <- operationsMaxRevision(operations)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime, ids)
maxRevisionChan <- operationsMaxRevision(operationReport)
return nil
})
g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime)
watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
return nil
})
g.Wait()
return operations, responses
return append(operationReport, watchReport...)
}

func operationsMaxRevision(operations []porcupine.Operation) int64 {
func operationsMaxRevision(reports []traffic.ClientReport) int64 {
var maxRevision int64
for _, op := range operations {
revision := op.Output.(model.EtcdNonDeterministicResponse).Revision
for _, r := range reports {
revision := r.OperationHistory.MaxRevision()
if revision > maxRevision {
maxRevision = revision
}
Expand All @@ -212,3 +209,9 @@ func forcestopCluster(clus *e2e.EtcdProcessCluster) error {
}
return clus.ConcurrentStop()
}

func validateCorrectness(t *testing.T, lg *zap.Logger, reports []traffic.ClientReport) (visualize func(basepath string)) {
validateWatchCorrectness(t, reports)
operations := operationsFromClientReports(reports)
return model.ValidateOperationHistoryAndReturnVisualize(t, lg, operations)
}
15 changes: 15 additions & 0 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ func (h History) Merge(h2 History) History {
return result
}

func (h History) Len() int {
return len(h.successful) + len(h.failed)
}

func (h History) Operations() []porcupine.Operation {
operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed))
var maxTime int64
Expand All @@ -530,3 +534,14 @@ func (h History) Operations() []porcupine.Operation {
}
return operations
}

func (h History) MaxRevision() int64 {
var maxRevision int64
for _, op := range h.successful {
revision := op.Output.(EtcdNonDeterministicResponse).Revision
if revision > maxRevision {
maxRevision = revision
}
}
return maxRevision
}
48 changes: 27 additions & 21 deletions tests/robustness/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package robustness

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"testing"

Expand All @@ -29,13 +31,10 @@ import (
)

type report struct {
lg *zap.Logger
clus *e2e.EtcdProcessCluster
responses [][]traffic.WatchResponse
events [][]watchEvent
operations []porcupine.Operation
patchedOperations []porcupine.Operation
visualizeHistory func(path string)
lg *zap.Logger
clus *e2e.EtcdProcessCluster
clientReports []traffic.ClientReport
visualizeHistory func(path string)
}

func testResultsDirectory(t *testing.T) string {
Expand Down Expand Up @@ -65,21 +64,28 @@ func testResultsDirectory(t *testing.T) string {
func (r *report) Report(t *testing.T, force bool) {
path := testResultsDirectory(t)
if t.Failed() || force {
for i, member := range r.clus.Procs {
memberDataDir := filepath.Join(path, member.Config().Name)
for _, member := range r.clus.Procs {
memberDataDir := filepath.Join(path, fmt.Sprintf("server-%s", member.Config().Name))
persistMemberDataDir(t, r.lg, member, memberDataDir)
if r.responses != nil {
persistWatchResponses(t, r.lg, filepath.Join(memberDataDir, "responses.json"), r.responses[i])
}
if r.events != nil {
persistWatchEvents(t, r.lg, filepath.Join(memberDataDir, "events.json"), r.events[i])
}
}
if r.operations != nil {
persistOperationHistory(t, r.lg, filepath.Join(path, "full-history.json"), r.operations)
}
if r.patchedOperations != nil {
persistOperationHistory(t, r.lg, filepath.Join(path, "patched-history.json"), r.patchedOperations)
if r.clientReports != nil {
sort.Slice(r.clientReports, func(i, j int) bool {
return r.clientReports[i].ClientId < r.clientReports[j].ClientId
})
for _, report := range r.clientReports {
clientDir := filepath.Join(path, fmt.Sprintf("client-%d", report.ClientId))
err := os.MkdirAll(clientDir, 0700)
if err != nil {
t.Fatal(err)
}
if len(report.Watch) != 0 {
persistWatchResponses(t, r.lg, filepath.Join(clientDir, "watch.json"), report.Watch)
}
operations := report.OperationHistory.Operations()
if len(operations) != 0 {
persistOperationHistory(t, r.lg, filepath.Join(clientDir, "operations.json"), operations)
}
}
}
}
if r.visualizeHistory != nil {
Expand Down Expand Up @@ -112,7 +118,7 @@ func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses
}
}

func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events []watchEvent) {
func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events []traffic.TimedWatchEvent) {
lg.Info("Saving watch events", zap.String("path", path))
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
Expand Down
Loading

0 comments on commit 2ef3492

Please sign in to comment.