Skip to content

Commit

Permalink
Merge pull request #14657 from fuweid/test-fix-TestDowngradeUpgradeCl…
Browse files Browse the repository at this point in the history
…usterOf3

test: deflake TestDowngradeUpgradeClusterOf3 timeout
  • Loading branch information
serathius authored Nov 2, 2022
2 parents e25090f + 3ddcb3d commit 7ed4eda
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 29 deletions.
104 changes: 75 additions & 29 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package e2e

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -27,6 +29,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/testutils"
"go.uber.org/zap"
)

func TestDowngradeUpgradeClusterOf1(t *testing.T) {
Expand All @@ -43,17 +46,24 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
if !fileutil.Exist(lastReleaseBinary) {
t.Skipf("%q does not exist", lastReleaseBinary)
}

currentVersion := semver.New(version.Version)
lastVersion := semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1}
currentVersionStr := fmt.Sprintf("%d.%d", currentVersion.Major, currentVersion.Minor)
lastVersionStr := fmt.Sprintf("%d.%d", lastVersion.Major, lastVersion.Minor)

currentVersion.PreRelease = ""
currentVersionStr := currentVersion.String()
lastVersionStr := lastVersion.String()

e2e.BeforeTest(t)

t.Logf("Create cluster with version %s", currentVersionStr)
epc := newCluster(t, currentEtcdBinary, clusterSize)
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: currentVersionStr,
Server: version.Version,
Storage: currentVersionStr,
})
}
t.Logf("Cluster created")

Expand All @@ -62,36 +72,49 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {

t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: lastVersionStr,
Server: version.Version,
Storage: lastVersionStr,
})
e2e.AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
}
t.Log("Cluster is ready for downgrade")

t.Log("Cluster is ready for downgrade")
t.Logf("Starting downgrade process to %q", lastVersionStr)
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Downgrading member %d by running %s binary", i, lastReleaseBinary)
stopEtcd(t, epc.Procs[i])
startEtcd(t, epc.Procs[i], lastReleaseBinary)
}

t.Log("All members downgraded, validating downgrade")
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: lastVersionStr,
Server: lastVersionStr,
})
}
t.Log("Downgrade complete")

t.Log("Downgrade complete")
t.Logf("Starting upgrade process to %q", currentVersionStr)
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Upgrading member %d", i)
stopEtcd(t, epc.Procs[i])
startEtcd(t, epc.Procs[i], currentEtcdBinary)
if i+1 < len(epc.Procs) {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
}
// NOTE: The leader has monitor to the cluster version, which will
// update cluster version. We don't need to check the transient
// version just in case that it might be flaky.
}

t.Log("All members upgraded, validating upgrade")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: currentVersionStr,
Server: version.Version,
Storage: currentVersionStr,
})
}
t.Log("Upgrade complete")
}
Expand Down Expand Up @@ -140,30 +163,23 @@ func stopEtcd(t *testing.T, ep e2e.EtcdProcess) {
}

func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) {
// Two separate calls to expect as it doesn't support multiple matches on the same line
var err error
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
for {
if expect.Server != "" {
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server)
if err != nil {
time.Sleep(time.Second)
continue
}
result, err := getMemberVersionByCurl(cfg, member)
if err != nil {
cfg.Logger.Warn("failed to get member version and retrying", zap.Error(err))
time.Sleep(time.Second)
continue
}
if expect.Cluster != "" {
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster)
if err != nil {
time.Sleep(time.Second)
continue
}

if err := compareMemberVersion(expect, result); err != nil {
cfg.Logger.Warn("failed to validate and retrying", zap.Error(err))
time.Sleep(time.Second)
continue
}
break
}
})
if err != nil {
t.Fatal(err)
}
}

func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
Expand All @@ -190,3 +206,33 @@ func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
t.Fatal("Leader not found")
return nil
}

func compareMemberVersion(expect version.Versions, target version.Versions) error {
if expect.Server != "" && expect.Server != target.Server {
return fmt.Errorf("expect etcdserver version %v, but got %v", expect.Server, target.Server)
}

if expect.Cluster != "" && expect.Cluster != target.Cluster {
return fmt.Errorf("expect etcdcluster version %v, but got %v", expect.Cluster, target.Cluster)
}

if expect.Storage != "" && expect.Storage != target.Storage {
return fmt.Errorf("expect storage version %v, but got %v", expect.Storage, target.Storage)
}
return nil
}

func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) (version.Versions, error) {
args := e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"})
lines, err := e2e.RunUtilCompletion(args, nil)
if err != nil {
return version.Versions{}, err
}

data := strings.Join(lines, "\n")
result := version.Versions{}
if err := json.Unmarshal([]byte(data), &result); err != nil {
return version.Versions{}, fmt.Errorf("failed to unmarshal (%v): %w", data, err)
}
return result, nil
}
16 changes: 16 additions & 0 deletions tests/framework/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ func SpawnWithExpectLines(ctx context.Context, args []string, envVars map[string
return lines, perr
}

func RunUtilCompletion(args []string, envVars map[string]string) ([]string, error) {
proc, err := SpawnCmd(args, envVars)
if err != nil {
return nil, fmt.Errorf("failed to spawn command: %w", err)
}
defer proc.Stop()

perr := proc.Wait()
// make sure that all the outputs are received
proc.Close()
if perr != nil {
return nil, fmt.Errorf("unexpected error from command %v: %w", args, perr)
}
return proc.Lines(), nil
}

func RandomLeaseID() int64 {
return rand.New(rand.NewSource(time.Now().UnixNano())).Int63()
}
Expand Down

0 comments on commit 7ed4eda

Please sign in to comment.