Skip to content

Commit

Permalink
Cancel scenario if cluster not healthy when injecting failpoints.
Browse files Browse the repository at this point in the history
Signed-off-by: James Blair <mail@jamesblair.net>
  • Loading branch information
jmhbnz committed Apr 1, 2023
1 parent 6a995d2 commit e4286cf
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
63 changes: 61 additions & 2 deletions tests/robustness/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package robustness

import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"net/http"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -85,30 +88,86 @@ var (
}}
)

func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
func verifyClusterHealth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, failpointName string) error {
for i := 0; i < len(clus.Procs); i++ {
cc, err := clientv3.New(clientv3.Config{
Endpoints: clus.Procs[i].EndpointsGRPC(),
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})

// Attempt to check status for the member
statusResp, err := cc.Status(ctx, cc.Endpoints()[0])
if err != nil {
return fmt.Errorf("failed to get cluster member status, err: %v", err)
}
for _, statusError := range statusResp.Errors {
return fmt.Errorf("cluster member status contains error, err: %v", statusError)
}

// Also check member /health endpoint
httpClient := &http.Client{}
healthResp, err := httpClient.Get(fmt.Sprintf("%s/health", cc.Endpoints()[0]))
if err != nil {
return fmt.Errorf("failed to get cluster member /health endpoint, err: %v", err)
}
defer healthResp.Body.Close()
if healthResp.StatusCode != http.StatusOK {
return fmt.Errorf("cluster member %d health check failed with status code: %d", i, healthResp.StatusCode)
}
var healthRespBodyBuf bytes.Buffer
if _, err := io.Copy(&healthRespBodyBuf, healthResp.Body); err != nil {
return fmt.Errorf("failed to read response body of cluster member /health endpoint, err: %v", err)
}

if !strings.Contains(healthRespBodyBuf.String(), "true") {
return fmt.Errorf("cluster member is not healthy: %v", healthRespBodyBuf.String())
}
}
return nil
}

func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) error {
var err error
successes := 0
failures := 0
for _, proc := range clus.Procs {
if !config.failpoint.Available(proc) {
t.Errorf("Failpoint %q not available on %s", config.failpoint.Name(), proc.Config().Name)
return
return nil
}
}
for successes < config.count && failures < config.retries {
time.Sleep(config.waitBetweenTriggers)

lg.Info("Verifying cluster health before failpoint\n", zap.String("failpoint", config.failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus, config.failpoint.Name()); err != nil {
t.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
return err
}

lg.Info("Triggering failpoint\n", zap.String("failpoint", config.failpoint.Name()))
err = config.failpoint.Trigger(ctx, t, lg, clus)
if err != nil {
lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err))
failures++
continue
}

lg.Info("Verifying cluster health after failpoint\n", zap.String("failpoint", config.failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus, config.failpoint.Name()); err != nil {
t.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
return err
}

successes++
}
if successes < config.count || failures >= config.retries {
t.Errorf("failed to trigger failpoints enough times, err: %v", err)
}

return nil
}

type FailpointConfig struct {
Expand Down
8 changes: 6 additions & 2 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,12 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
g := errgroup.Group{}
trafficCtx, trafficCancel := context.WithCancel(ctx)
watchCtx, watchCancel := context.WithCancel(ctx)
g.Go(func() error {
triggerFailpoints(ctx, t, lg, clus, failpoint)
err := triggerFailpoints(ctx, t, lg, clus, failpoint)
if err != nil {
watchCancel()
}
time.Sleep(time.Second)
trafficCancel()
return nil
Expand All @@ -211,7 +215,7 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
return nil
})
g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan)
responses = collectClusterWatchEvents(watchCtx, t, clus, maxRevisionChan)
return nil
})
g.Wait()
Expand Down

0 comments on commit e4286cf

Please sign in to comment.