Skip to content

Commit

Permalink
Cancel watch if cluster not healthy before or after injecting failpoi…
Browse files Browse the repository at this point in the history
…nts.

Signed-off-by: James Blair <mail@jamesblair.net>
  • Loading branch information
jmhbnz committed Apr 1, 2023
1 parent 6a995d2 commit 3f249c9
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
44 changes: 42 additions & 2 deletions tests/robustness/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.uber.org/zap"

clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -85,30 +87,68 @@ var (
}}
)

func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
func verifyClusterHealth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, failpointName string) error {
for i := 0; i < len(clus.Procs); i++ {
clusterClient, err := clientv3.New(clientv3.Config{
Endpoints: clus.Procs[i].EndpointsGRPC(),
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
defer clusterClient.Close()

cli := healthpb.NewHealthClient(clusterClient.ActiveConnection())
resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
if err != nil {
return fmt.Errorf("Error checking member health: %v", err)
}
if resp.Status != healthpb.HealthCheckResponse_SERVING {
return fmt.Errorf("Member health status expected %s, got %s", healthpb.HealthCheckResponse_SERVING, resp.Status)
}
}
return nil
}

func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) error {
var err error
successes := 0
failures := 0
for _, proc := range clus.Procs {
if !config.failpoint.Available(proc) {
t.Errorf("Failpoint %q not available on %s", config.failpoint.Name(), proc.Config().Name)
return
return nil
}
}
for successes < config.count && failures < config.retries {
time.Sleep(config.waitBetweenTriggers)

lg.Info("Verifying cluster health before failpoint\n", zap.String("failpoint", config.failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus, config.failpoint.Name()); err != nil {
t.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
return err
}

lg.Info("Triggering failpoint\n", zap.String("failpoint", config.failpoint.Name()))
err = config.failpoint.Trigger(ctx, t, lg, clus)
if err != nil {
lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err))
failures++
continue
}

lg.Info("Verifying cluster health after failpoint\n", zap.String("failpoint", config.failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus, config.failpoint.Name()); err != nil {
t.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
return err
}

successes++
}
if successes < config.count || failures >= config.retries {
t.Errorf("failed to trigger failpoints enough times, err: %v", err)
}

return nil
}

type FailpointConfig struct {
Expand Down
8 changes: 6 additions & 2 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,12 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
g := errgroup.Group{}
trafficCtx, trafficCancel := context.WithCancel(ctx)
watchCtx, watchCancel := context.WithCancel(ctx)
g.Go(func() error {
triggerFailpoints(ctx, t, lg, clus, failpoint)
err := triggerFailpoints(ctx, t, lg, clus, failpoint)
if err != nil {
watchCancel()
}
time.Sleep(time.Second)
trafficCancel()
return nil
Expand All @@ -211,7 +215,7 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
return nil
})
g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan)
responses = collectClusterWatchEvents(watchCtx, t, clus, maxRevisionChan)
return nil
})
g.Wait()
Expand Down

0 comments on commit 3f249c9

Please sign in to comment.