diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index 1f0b86a005b..51b512ef69c 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -26,8 +26,8 @@ import ( "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" - "go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/traffic" + "go.etcd.io/etcd/tests/v3/robustness/validate" ) func TestRobustness(t *testing.T) { @@ -65,10 +65,7 @@ func TestRobustness(t *testing.T) { name: traffic.Name + "ClusterOfSize3", failpoint: RandomFailpoint, traffic: traffic, - watch: watchConfig{ - expectUniqueRevision: traffic.Traffic.ExpectUniqueRevision(), - }, - cluster: *e2e.NewConfig(clusterOfSize3Options...), + cluster: *e2e.NewConfig(clusterOfSize3Options...), }) } scenarios = append(scenarios, testScenario{ @@ -160,7 +157,8 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0 validateGotAtLeastOneProgressNotify(t, r.clientReports, s.watch.requestProgress || watchProgressNotifyEnabled) - r.visualizeHistory = validateCorrectness(t, lg, s.watch, r.clientReports) + validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.Traffic.ExpectUniqueRevision()} + r.visualizeHistory = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.clientReports) panicked = false } @@ -213,9 +211,3 @@ func forcestopCluster(clus *e2e.EtcdProcessCluster) error { } return clus.ConcurrentStop() } - -func validateCorrectness(t *testing.T, lg *zap.Logger, cfg watchConfig, reports []traffic.ClientReport) (visualize func(basepath string)) { - validateWatchCorrectness(t, cfg, reports) - operations := operationsFromClientReports(reports) - return model.ValidateOperationHistoryAndReturnVisualize(t, lg, operations) -} diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index 4f5d52c9874..8436601212e 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -16,11 +16,9 @@ package model import ( "fmt" - "testing" "time" "github.com/anishathalye/porcupine" - "go.uber.org/zap" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" @@ -28,24 +26,6 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/identity" ) -// ValidateOperationHistoryAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private. -func ValidateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation) (visualize func(basepath string)) { - linearizable, info := porcupine.CheckOperationsVerbose(NonDeterministicModel, operations, 5*time.Minute) - if linearizable == porcupine.Illegal { - t.Error("Model is not linearizable") - } - if linearizable == porcupine.Unknown { - t.Error("Linearization timed out") - } - return func(path string) { - lg.Info("Saving visualization", zap.String("path", path)) - err := porcupine.VisualizePath(NonDeterministicModel, info, path) - if err != nil { - t.Errorf("Failed to visualize, err: %v", err) - } - } -} - // AppendableHistory allows to collect history of sequential operations. // // Ensures that operation history is compatible with porcupine library, by preventing concurrent requests sharing the diff --git a/tests/robustness/validate/operations.go b/tests/robustness/validate/operations.go new file mode 100644 index 00000000000..9189af12675 --- /dev/null +++ b/tests/robustness/validate/operations.go @@ -0,0 +1,42 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "testing" + "time" + + "github.com/anishathalye/porcupine" + "go.uber.org/zap" + + "go.etcd.io/etcd/tests/v3/robustness/model" +) + +func validateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation) (visualize func(basepath string)) { + linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, 5*time.Minute) + if linearizable == porcupine.Illegal { + t.Error("Model is not linearizable") + } + if linearizable == porcupine.Unknown { + t.Error("Linearization timed out") + } + return func(path string) { + lg.Info("Saving visualization", zap.String("path", path)) + err := porcupine.VisualizePath(model.NonDeterministicModel, info, path) + if err != nil { + t.Errorf("Failed to visualize, err: %v", err) + } + } +} diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go new file mode 100644 index 00000000000..13f8aaa4e0b --- /dev/null +++ b/tests/robustness/validate/patch_history.go @@ -0,0 +1,107 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "github.com/anishathalye/porcupine" + + "go.etcd.io/etcd/tests/v3/robustness/model" + "go.etcd.io/etcd/tests/v3/robustness/traffic" +) + +func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) []porcupine.Operation { + newOperations := make([]porcupine.Operation, 0, len(operations)) + lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents) + + for _, op := range operations { + request := op.Input.(model.EtcdRequest) + resp := op.Output.(model.EtcdNonDeterministicResponse) + if resp.Err == nil || op.Call > lastObservedOperation.Call || request.Type != model.Txn { + // Cannot patch those requests. + newOperations = append(newOperations, op) + continue + } + event := matchWatchEvent(request.Txn, watchEvents) + if event != nil { + // Set revision and time based on watchEvent. + op.Return = event.Time.Nanoseconds() + op.Output = model.EtcdNonDeterministicResponse{ + EtcdResponse: model.EtcdResponse{Revision: event.Revision}, + ResultUnknown: true, + } + newOperations = append(newOperations, op) + continue + } + if hasNonUniqueWriteOperation(request.Txn) && !hasUniqueWriteOperation(request.Txn) { + // Leave operation as it is as we cannot match non-unique operations to watch events. + newOperations = append(newOperations, op) + continue + } + // Remove non persisted operations + } + return newOperations +} + +func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) porcupine.Operation { + var maxCallTime int64 + var lastOperation porcupine.Operation + for _, op := range operations { + request := op.Input.(model.EtcdRequest) + if request.Type != model.Txn { + continue + } + event := matchWatchEvent(request.Txn, watchEvents) + if event != nil && op.Call > maxCallTime { + maxCallTime = op.Call + lastOperation = op + } + } + return lastOperation +} + +func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) *traffic.TimedWatchEvent { + for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) { + if etcdOp.Type == model.Put { + // Remove LeaseID which is not exposed in watch. + event, ok := watchEvents[model.EtcdOperation{ + Type: etcdOp.Type, + Key: etcdOp.Key, + Value: etcdOp.Value, + }] + if ok { + return &event + } + } + } + return nil +} + +func hasNonUniqueWriteOperation(request *model.TxnRequest) bool { + for _, etcdOp := range request.OperationsOnSuccess { + if etcdOp.Type == model.Put || etcdOp.Type == model.Delete { + return true + } + } + return false +} + +func hasUniqueWriteOperation(request *model.TxnRequest) bool { + for _, etcdOp := range request.OperationsOnSuccess { + if etcdOp.Type == model.Put { + return true + } + } + return false +} diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go new file mode 100644 index 00000000000..7096dcde2bd --- /dev/null +++ b/tests/robustness/validate/validate.go @@ -0,0 +1,58 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "testing" + + "github.com/anishathalye/porcupine" + "go.uber.org/zap" + + "go.etcd.io/etcd/tests/v3/robustness/model" + "go.etcd.io/etcd/tests/v3/robustness/traffic" +) + +// ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private. +func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) { + validateWatch(t, cfg, reports) + allOperations := operations(reports) + watchEvents := uniqueWatchEvents(reports) + newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents) + return validateOperationHistoryAndReturnVisualize(t, lg, newOperations) +} + +func operations(reports []traffic.ClientReport) []porcupine.Operation { + var ops []porcupine.Operation + for _, r := range reports { + ops = append(ops, r.OperationHistory.Operations()...) + } + return ops +} + +func uniqueWatchEvents(reports []traffic.ClientReport) map[model.EtcdOperation]traffic.TimedWatchEvent { + persisted := map[model.EtcdOperation]traffic.TimedWatchEvent{} + for _, r := range reports { + for _, resp := range r.Watch { + for _, event := range resp.Events { + persisted[event.Op] = traffic.TimedWatchEvent{Time: resp.Time, WatchEvent: event} + } + } + } + return persisted +} + +type Config struct { + ExpectRevisionUnique bool +} diff --git a/tests/robustness/validate/watch.go b/tests/robustness/validate/watch.go new file mode 100644 index 00000000000..7b3d98c2409 --- /dev/null +++ b/tests/robustness/validate/watch.go @@ -0,0 +1,160 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + + "go.etcd.io/etcd/tests/v3/robustness/traffic" +) + +func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) { + // Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis + for _, r := range reports { + validateOrdered(t, r) + validateUnique(t, cfg.ExpectRevisionUnique, r) + validateAtomic(t, r) + // TODO: Validate Resumable + validateBookmarkable(t, r) + } + validateEventsMatch(t, reports) + // Expects that longest history encompasses all events. + // TODO: Use combined events from all histories instead of the longest history. + // TODO: Validate that each watch report is reliable, not only the longest one. + validateReliable(t, longestEventHistory(reports)) +} + +func validateBookmarkable(t *testing.T, report traffic.ClientReport) { + var lastProgressNotifyRevision int64 = 0 + for _, resp := range report.Watch { + for _, event := range resp.Events { + if event.Revision <= lastProgressNotifyRevision { + t.Errorf("Broke watch guarantee: Bookmarkable - Progress notification events guarantee that all events up to a revision have been already delivered, eventRevision: %d, progressNotifyRevision: %d", event.Revision, lastProgressNotifyRevision) + } + } + if resp.IsProgressNotify { + lastProgressNotifyRevision = resp.Revision + } + } +} + +func validateOrdered(t *testing.T, report traffic.ClientReport) { + var lastEventRevision int64 = 1 + for _, resp := range report.Watch { + for _, event := range resp.Events { + if event.Revision < lastEventRevision { + t.Errorf("Broke watch guarantee: Ordered - events are ordered by revision; an event will never appear on a watch if it precedes an event in time that has already been posted, lastRevision: %d, currentRevision: %d, client: %d", lastEventRevision, event.Revision, report.ClientId) + } + lastEventRevision = event.Revision + } + } +} + +func validateUnique(t *testing.T, expectUniqueRevision bool, report traffic.ClientReport) { + uniqueOperations := map[interface{}]struct{}{} + + for _, resp := range report.Watch { + for _, event := range resp.Events { + var key interface{} + if expectUniqueRevision { + key = event.Revision + } else { + key = struct { + revision int64 + key string + }{event.Revision, event.Op.Key} + } + + if _, found := uniqueOperations[key]; found { + t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Op.Key, event.Revision, report.ClientId) + } + uniqueOperations[key] = struct{}{} + } + } +} + +func validateAtomic(t *testing.T, report traffic.ClientReport) { + var lastEventRevision int64 = 1 + for _, resp := range report.Watch { + if len(resp.Events) > 0 { + if resp.Events[0].Revision == lastEventRevision { + t.Errorf("Broke watch guarantee: Atomic - a list of events is guaranteed to encompass complete revisions; updates in the same revision over multiple keys will not be split over several lists of events, previousListEventRevision: %d, currentListEventRevision: %d, client: %d", lastEventRevision, resp.Events[0].Revision, report.ClientId) + } + lastEventRevision = resp.Events[len(resp.Events)-1].Revision + } + } +} + +func validateReliable(t *testing.T, events []traffic.TimedWatchEvent) { + var lastEventRevision int64 = 1 + for _, event := range events { + if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 { + t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, missing revisions from range: %d-%d", lastEventRevision, event.Revision) + } + lastEventRevision = event.Revision + } +} + +func toWatchEvents(responses []traffic.WatchResponse) (events []traffic.TimedWatchEvent) { + for _, resp := range responses { + for _, event := range resp.Events { + events = append(events, traffic.TimedWatchEvent{ + Time: resp.Time, + WatchEvent: event, + }) + } + } + return events +} + +func validateEventsMatch(t *testing.T, reports []traffic.ClientReport) { + type revisionKey struct { + revision int64 + key string + } + type eventClientId struct { + traffic.WatchEvent + ClientId int + } + revisionKeyToEvent := map[revisionKey]eventClientId{} + for _, r := range reports { + for _, resp := range r.Watch { + for _, event := range resp.Events { + rk := revisionKey{key: event.Op.Key, revision: event.Revision} + if prev, found := revisionKeyToEvent[rk]; found { + if prev.WatchEvent != event { + t.Errorf("Events between clients %d and %d don't match, key: %q, revision: %d, diff: %s", prev.ClientId, r.ClientId, rk.key, rk.revision, cmp.Diff(prev, event)) + } + } + revisionKeyToEvent[rk] = eventClientId{ClientId: r.ClientId, WatchEvent: event} + } + } + } +} + +func longestEventHistory(report []traffic.ClientReport) []traffic.TimedWatchEvent { + longestIndex := 0 + longestEventCount := 0 + for i, r := range report { + rEventCount := r.WatchEventCount() + if rEventCount > longestEventCount { + longestIndex = i + longestEventCount = rEventCount + } + } + return toWatchEvents(report[longestIndex].Watch) +} diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index e2e865e9e34..a08ee6a5940 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -20,12 +20,8 @@ import ( "testing" "time" - "github.com/anishathalye/porcupine" - "github.com/google/go-cmp/cmp" - "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" - "go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/traffic" ) @@ -64,8 +60,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd } type watchConfig struct { - requestProgress bool - expectUniqueRevision bool + requestProgress bool } // watchUntilRevision watches all changes until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. @@ -114,34 +109,6 @@ func watchUntilRevision(ctx context.Context, t *testing.T, c *traffic.RecordingC } } -func watchResponsesMaxRevision(responses []traffic.WatchResponse) int64 { - var maxRevision int64 - for _, response := range responses { - for _, event := range response.Events { - if event.Revision > maxRevision { - maxRevision = event.Revision - } - } - } - return maxRevision -} - -func validateWatchCorrectness(t *testing.T, cfg watchConfig, reports []traffic.ClientReport) { - // Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis - for _, r := range reports { - validateOrdered(t, r) - validateUnique(t, cfg.expectUniqueRevision, r) - validateAtomic(t, r) - // TODO: Validate Resumable - validateBookmarkable(t, r) - } - validateEventsMatch(t, reports) - // Expects that longest history encompasses all events. - // TODO: Use combined events from all histories instead of the longest history. - // TODO: Validate that each watch report is reliable, not only the longest one. - validateReliable(t, longestEventHistory(reports)) -} - func validateGotAtLeastOneProgressNotify(t *testing.T, reports []traffic.ClientReport, expectProgressNotify bool) { var gotProgressNotify = false for _, r := range reports { @@ -161,219 +128,3 @@ func validateGotAtLeastOneProgressNotify(t *testing.T, reports []traffic.ClientR t.Errorf("Progress notify does not match, expect: %v, got: %v", expectProgressNotify, gotProgressNotify) } } - -func validateBookmarkable(t *testing.T, report traffic.ClientReport) { - var lastProgressNotifyRevision int64 = 0 - for _, resp := range report.Watch { - for _, event := range resp.Events { - if event.Revision <= lastProgressNotifyRevision { - t.Errorf("Broke watch guarantee: Renewable - watch can renewed using revision in last progress notification; Progress notification guarantees that previous events have been already delivered, eventRevision: %d, progressNotifyRevision: %d", event.Revision, lastProgressNotifyRevision) - } - } - if resp.IsProgressNotify { - lastProgressNotifyRevision = resp.Revision - } - } -} - -func validateOrdered(t *testing.T, report traffic.ClientReport) { - var lastEventRevision int64 = 1 - for _, resp := range report.Watch { - for _, event := range resp.Events { - if event.Revision < lastEventRevision { - t.Errorf("Broke watch guarantee: Ordered - events are ordered by revision; an event will never appear on a watch if it precedes an event in time that has already been posted, lastRevision: %d, currentRevision: %d, client: %d", lastEventRevision, event.Revision, report.ClientId) - } - lastEventRevision = event.Revision - } - } -} - -func validateUnique(t *testing.T, expectUniqueRevision bool, report traffic.ClientReport) { - uniqueOperations := map[interface{}]struct{}{} - - for _, resp := range report.Watch { - for _, event := range resp.Events { - var key interface{} - if expectUniqueRevision { - key = event.Revision - } else { - key = struct { - revision int64 - key string - }{event.Revision, event.Op.Key} - } - - if _, found := uniqueOperations[key]; found { - t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Op.Key, event.Revision, report.ClientId) - } - uniqueOperations[key] = struct{}{} - } - } -} - -func validateAtomic(t *testing.T, report traffic.ClientReport) { - var lastEventRevision int64 = 1 - for _, resp := range report.Watch { - if len(resp.Events) > 0 { - if resp.Events[0].Revision == lastEventRevision { - t.Errorf("Broke watch guarantee: Atomic - a list of events is guaranteed to encompass complete revisions; updates in the same revision over multiple keys will not be split over several lists of events, previousListEventRevision: %d, currentListEventRevision: %d, client: %d", lastEventRevision, resp.Events[0].Revision, report.ClientId) - } - lastEventRevision = resp.Events[len(resp.Events)-1].Revision - } - } -} - -func validateReliable(t *testing.T, events []traffic.TimedWatchEvent) { - var lastEventRevision int64 = 1 - for _, event := range events { - if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 { - t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, missing revisions from range: %d-%d", lastEventRevision, event.Revision) - } - lastEventRevision = event.Revision - } -} - -func toWatchEvents(responses []traffic.WatchResponse) (events []traffic.TimedWatchEvent) { - for _, resp := range responses { - for _, event := range resp.Events { - events = append(events, traffic.TimedWatchEvent{ - Time: resp.Time, - WatchEvent: event, - }) - } - } - return events -} - -func operationsFromClientReports(reports []traffic.ClientReport) []porcupine.Operation { - operations := []porcupine.Operation{} - persisted := map[model.EtcdOperation]traffic.TimedWatchEvent{} - for _, r := range reports { - operations = append(operations, r.OperationHistory.Operations()...) - for _, resp := range r.Watch { - for _, event := range resp.Events { - persisted[event.Op] = traffic.TimedWatchEvent{Time: resp.Time, WatchEvent: event} - } - } - } - newOperations := make([]porcupine.Operation, 0, len(operations)) - lastObservedOperation := lastOperationObservedInWatch(operations, persisted) - - for _, op := range operations { - request := op.Input.(model.EtcdRequest) - resp := op.Output.(model.EtcdNonDeterministicResponse) - if resp.Err == nil || op.Call > lastObservedOperation.Call || request.Type != model.Txn { - // Cannot patch those requests. - newOperations = append(newOperations, op) - continue - } - event := matchWatchEvent(request.Txn, persisted) - if event != nil { - // Set revision and time based on watchEvent. - op.Return = event.Time.Nanoseconds() - op.Output = model.EtcdNonDeterministicResponse{ - EtcdResponse: model.EtcdResponse{Revision: event.Revision}, - ResultUnknown: true, - } - newOperations = append(newOperations, op) - continue - } - if hasNonUniqueWriteOperation(request.Txn) && !hasUniqueWriteOperation(request.Txn) { - // Leave operation as it is as we cannot match non-unique operations to watch events. - newOperations = append(newOperations, op) - continue - } - // Remove non persisted operations - } - return newOperations -} - -func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) porcupine.Operation { - var maxCallTime int64 - var lastOperation porcupine.Operation - for _, op := range operations { - request := op.Input.(model.EtcdRequest) - if request.Type != model.Txn { - continue - } - event := matchWatchEvent(request.Txn, watchEvents) - if event != nil && op.Call > maxCallTime { - maxCallTime = op.Call - lastOperation = op - } - } - return lastOperation -} - -func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) *traffic.TimedWatchEvent { - for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) { - if etcdOp.Type == model.Put { - // Remove LeaseID which is not exposed in watch. - event, ok := watchEvents[model.EtcdOperation{ - Type: etcdOp.Type, - Key: etcdOp.Key, - Value: etcdOp.Value, - }] - if ok { - return &event - } - } - } - return nil -} - -func hasNonUniqueWriteOperation(request *model.TxnRequest) bool { - for _, etcdOp := range request.OperationsOnSuccess { - if etcdOp.Type == model.Put || etcdOp.Type == model.Delete { - return true - } - } - return false -} - -func hasUniqueWriteOperation(request *model.TxnRequest) bool { - for _, etcdOp := range request.OperationsOnSuccess { - if etcdOp.Type == model.Put { - return true - } - } - return false -} - -func validateEventsMatch(t *testing.T, reports []traffic.ClientReport) { - type revisionKey struct { - revision int64 - key string - } - type eventClientId struct { - traffic.WatchEvent - ClientId int - } - revisionKeyToEvent := map[revisionKey]eventClientId{} - for _, r := range reports { - for _, resp := range r.Watch { - for _, event := range resp.Events { - rk := revisionKey{key: event.Op.Key, revision: event.Revision} - if prev, found := revisionKeyToEvent[rk]; found { - if prev.WatchEvent != event { - t.Errorf("Events between clients %d and %d don't match, key: %q, revision: %d, diff: %s", prev.ClientId, r.ClientId, rk.key, rk.revision, cmp.Diff(prev, event)) - } - } - revisionKeyToEvent[rk] = eventClientId{ClientId: r.ClientId, WatchEvent: event} - } - } - } -} - -func longestEventHistory(report []traffic.ClientReport) []traffic.TimedWatchEvent { - longestIndex := 0 - longestEventCount := 0 - for i, r := range report { - rEventCount := r.WatchEventCount() - if rEventCount > longestEventCount { - longestIndex = i - longestEventCount = rEventCount - } - } - return toWatchEvents(report[longestIndex].Watch) -}