Skip to content

Commit

Permalink
test: deflake TestDowngradeUpgradeClusterOf3 timeout
Browse files Browse the repository at this point in the history
In the TestDowngradeUpgradeCluster case, the brand-new cluster is using
simple-config-changer, which means that entries has been committed
before leader election and these entries will be applied when etcdserver
starts to receive apply-requests. The simple-config-changer will mark
the `confState` dirty and the storage backend precommit hook will update
the `confState`.

For the new cluster, the storage version is nil at the beginning. And
it will be v3.5 if the `confState` record has been committed. And it
will be >v3.5 if the `storageVersion` record has been committed.

When the new cluster is ready, the leader will set init cluster version
with v3.6.x. And then it will trigger the `monitorStorageVersion` to
update the `storageVersion` to v3.6.x. If the `confState` record has
been updated before cluster version update, we will get storageVersion
record.

If the storage backend doesn't commit in time, the
`monitorStorageVersion` won't update the version because of `cannot
detect storage schema version: missing confstate information`.

And then we file the downgrade request before next round of
`monitorStorageVersion`(per 4 second), the cluster version will be
v3.5.0 which is equal to the `UnsafeDetectSchemaVersion`'s result.
And we won't see that `The server is ready to downgrade`.

It is easy to reproduce the issue if you use cpuset or taskset to limit
in two cpus.

So, we should wait for the new cluster's storage ready before downgrade
request.

Fixes: etcd-io#14540

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Nov 2, 2022
1 parent 2e790d2 commit fc26bc3
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 27 deletions.
95 changes: 68 additions & 27 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package e2e

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -27,6 +29,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/testutils"
"go.uber.org/zap"
)

func TestDowngradeUpgradeClusterOf1(t *testing.T) {
Expand All @@ -45,15 +48,19 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
}
currentVersion := semver.New(version.Version)
lastVersion := semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1}
currentVersionStr := fmt.Sprintf("%d.%d", currentVersion.Major, currentVersion.Minor)
lastVersionStr := fmt.Sprintf("%d.%d", lastVersion.Major, lastVersion.Minor)
currentVersionStr := fmt.Sprintf("%d.%d.%d", currentVersion.Major, currentVersion.Minor, currentVersion.Patch)
lastVersionStr := fmt.Sprintf("%d.%d.0", lastVersion.Major, lastVersion.Minor)

e2e.BeforeTest(t)

t.Logf("Create cluster with version %s", currentVersionStr)
epc := newCluster(t, currentEtcdBinary, clusterSize)
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: currentVersionStr,
Server: version.Version,
Storage: currentVersionStr,
})
}
t.Logf("Cluster created")

Expand All @@ -62,36 +69,49 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {

t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: lastVersionStr,
Server: version.Version,
Storage: lastVersionStr,
})
e2e.AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
}
t.Log("Cluster is ready for downgrade")

t.Log("Cluster is ready for downgrade")
t.Logf("Starting downgrade process to %q", lastVersionStr)
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Downgrading member %d by running %s binary", i, lastReleaseBinary)
stopEtcd(t, epc.Procs[i])
startEtcd(t, epc.Procs[i], lastReleaseBinary)
}

t.Log("All members downgraded, validating downgrade")
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: lastVersionStr,
Server: lastVersionStr,
})
}
t.Log("Downgrade complete")

t.Log("Downgrade complete")
t.Logf("Starting upgrade process to %q", currentVersionStr)
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Upgrading member %d", i)
stopEtcd(t, epc.Procs[i])
startEtcd(t, epc.Procs[i], currentEtcdBinary)
if i+1 < len(epc.Procs) {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
}
// NOTE: The leader has monitor to the cluster version, which will
// update cluster version. We don't need to check the transient
// version just in case that it might be flaky.
}

t.Log("All members upgraded, validating upgrade")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: currentVersionStr,
Server: version.Version,
Storage: currentVersionStr,
})
}
t.Log("Upgrade complete")
}
Expand Down Expand Up @@ -140,30 +160,36 @@ func stopEtcd(t *testing.T, ep e2e.EtcdProcess) {
}

func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) {
// Two separate calls to expect as it doesn't support multiple matches on the same line
var err error
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
for {
if expect.Server != "" {
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server)
err := func() error {
result, err := getMemberVersionByCurl(cfg, member)
if err != nil {
time.Sleep(time.Second)
continue
return err
}
}
if expect.Cluster != "" {
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster)
if err != nil {
time.Sleep(time.Second)
continue

if expect.Server != "" && expect.Server != result.Server {
return fmt.Errorf("expect etcdserver version %v, but got %v", expect.Server, result.Server)
}

if expect.Cluster != "" && expect.Cluster != result.Cluster {
return fmt.Errorf("expect etcdcluster version %v, but got %v", expect.Cluster, result.Cluster)
}

if expect.Storage != "" && expect.Storage != result.Storage {
return fmt.Errorf("expect storage version %v, but got %v", expect.Storage, result.Storage)
}
return nil
}()
if err != nil {
cfg.Logger.Warn("failed to verify version and retrying", zap.Error(err))
time.Sleep(time.Second)
continue
}

break
}
})
if err != nil {
t.Fatal(err)
}
}

func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
Expand All @@ -190,3 +216,18 @@ func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
t.Fatal("Leader not found")
return nil
}

func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) (*version.Versions, error) {
args := e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"})
lines, err := e2e.RunUtilCompletion(args, nil)
if err != nil {
return nil, err
}

data := strings.Join(lines, "\n")
result := &version.Versions{}
if err := json.Unmarshal([]byte(data), result); err != nil {
return nil, fmt.Errorf("failed to unmarshal (%v): %w", data, err)
}
return result, nil
}
16 changes: 16 additions & 0 deletions tests/framework/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ func SpawnWithExpectLines(ctx context.Context, args []string, envVars map[string
return lines, perr
}

func RunUtilCompletion(args []string, envVars map[string]string) ([]string, error) {
proc, err := SpawnCmd(args, envVars)
if err != nil {
return nil, fmt.Errorf("failed to spawn command: %w", err)
}
defer proc.Stop()

perr := proc.Wait()
// make sure that all the outputs are received
proc.Close()
if perr != nil {
return nil, fmt.Errorf("unexpected error from command %v: %w", args, perr)
}
return proc.Lines(), nil
}

func RandomLeaseID() int64 {
return rand.New(rand.NewSource(time.Now().UnixNano())).Int63()
}
Expand Down

0 comments on commit fc26bc3

Please sign in to comment.