Skip to content

Commit

Permalink
tests/robustness: Expect revions to be unique for Kubernetes Traffic
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed May 23, 2023
1 parent f3c9db9 commit 40146fa
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 22 deletions.
11 changes: 7 additions & 4 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func TestRobustness(t *testing.T) {
name: "ClusterOfSize3/" + traffic.Name,
failpoint: RandomFailpoint,
traffic: traffic,
cluster: *e2e.NewConfig(clusterOfSize3Options...),
watch: watchConfig{
expectUniqueRevision: traffic.Traffic.ExpectUniqueRevision(),
},
cluster: *e2e.NewConfig(clusterOfSize3Options...),
})
}
scenarios = append(scenarios, testScenario{
Expand Down Expand Up @@ -157,7 +160,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce

watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.clientReports, s.watch.requestProgress || watchProgressNotifyEnabled)
r.visualizeHistory = validateCorrectness(t, lg, r.clientReports)
r.visualizeHistory = validateCorrectness(t, lg, s.watch, r.clientReports)

panicked = false
}
Expand Down Expand Up @@ -211,8 +214,8 @@ func forcestopCluster(clus *e2e.EtcdProcessCluster) error {
return clus.ConcurrentStop()
}

func validateCorrectness(t *testing.T, lg *zap.Logger, reports []traffic.ClientReport) (visualize func(basepath string)) {
validateWatchCorrectness(t, reports)
func validateCorrectness(t *testing.T, lg *zap.Logger, cfg watchConfig, reports []traffic.ClientReport) (visualize func(basepath string)) {
validateWatchCorrectness(t, cfg, reports)
operations := operationsFromClientReports(reports)
return model.ValidateOperationHistoryAndReturnVisualize(t, lg, operations)
}
8 changes: 6 additions & 2 deletions tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
traffic: etcdTraffic{
Traffic: etcdTraffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
Expand All @@ -53,7 +53,7 @@ var (
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: etcdTraffic{
Traffic: etcdTraffic{
keyCount: 10,
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
Expand All @@ -73,6 +73,10 @@ type etcdTraffic struct {
largePutSize int
}

func (t etcdTraffic) ExpectUniqueRevision() bool {
return false
}

type etcdRequestType string

const (
Expand Down
6 changes: 5 additions & 1 deletion tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: kubernetesTraffic{
Traffic: kubernetesTraffic{
averageKeyCount: 5,
resource: "pods",
namespace: "default",
Expand All @@ -56,6 +56,10 @@ type kubernetesTraffic struct {
writeChoices []choiceWeight[KubernetesRequestType]
}

func (t kubernetesTraffic) ExpectUniqueRevision() bool {
return true
}

func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
kc := &kubernetesClient{client: c}
s := newStorage()
Expand Down
5 changes: 3 additions & 2 deletions tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
defer wg.Done()
defer c.Close()

config.traffic.Run(ctx, c, limiter, ids, lm, finish)
config.Traffic.Run(ctx, c, limiter, ids, lm, finish)
mux.Lock()
reports = append(reports, c.Report())
mux.Unlock()
Expand Down Expand Up @@ -95,9 +95,10 @@ type Config struct {
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
Traffic Traffic
}

type Traffic interface {
Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{})
ExpectUniqueRevision() bool
}
33 changes: 20 additions & 13 deletions tests/robustness/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
}

type watchConfig struct {
requestProgress bool
requestProgress bool
expectUniqueRevision bool
}

// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
Expand Down Expand Up @@ -140,11 +141,11 @@ func watchResponsesMaxRevision(responses []traffic.WatchResponse) int64 {
return maxRevision
}

func validateWatchCorrectness(t *testing.T, reports []traffic.ClientReport) {
func validateWatchCorrectness(t *testing.T, cfg watchConfig, reports []traffic.ClientReport) {
// Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis
for _, r := range reports {
validateOrdered(t, r)
validateUnique(t, r)
validateUnique(t, cfg.expectUniqueRevision, r)
validateAtomic(t, r)
validateBookmarkable(t, r)
// TODO: Validate presumable
Expand Down Expand Up @@ -202,19 +203,25 @@ func validateOrdered(t *testing.T, report traffic.ClientReport) {
}
}

func validateUnique(t *testing.T, report traffic.ClientReport) {
type revisionKey struct {
revision int64
key string
}
uniqueOperations := map[revisionKey]struct{}{}
func validateUnique(t *testing.T, expectUniqueRevision bool, report traffic.ClientReport) {
uniqueOperations := map[interface{}]struct{}{}

for _, resp := range report.Watch {
for _, event := range resp.Events {
rk := revisionKey{key: event.Op.Key, revision: event.Revision}
if _, found := uniqueOperations[rk]; found {
t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", rk.key, rk.revision, report.ClientId)
var key interface{}
if expectUniqueRevision {
key = event.Revision
} else {
key = struct {
revision int64
key string
}{event.Revision, event.Op.Key}
}

if _, found := uniqueOperations[key]; found {
t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Op.Key, event.Revision, report.ClientId)
}
uniqueOperations[rk] = struct{}{}
uniqueOperations[key] = struct{}{}
}
}
}
Expand Down

0 comments on commit 40146fa

Please sign in to comment.