Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: expose Leadership Transfer API to clients #8153

Merged
merged 10 commits into from
Jul 6, 2017
17 changes: 17 additions & 0 deletions Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ This is a generated documentation. Please read the proto files for more.
| Defragment | DefragmentRequest | DefragmentResponse | Defragment defragments a member's backend database to recover storage space. |
| Hash | HashRequest | HashResponse | Hash returns the hash of the local KV state for consistency checking purpose. This is designed for testing; do not use this in production when there are ongoing transactions. |
| Snapshot | SnapshotRequest | SnapshotResponse | Snapshot sends a snapshot of the entire backend from a member over a stream to a client. |
| MoveLeader | MoveLeaderRequest | MoveLeaderResponse | MoveLeader requests current leader node to transfer its leadership to transferee. |



Expand Down Expand Up @@ -608,6 +609,22 @@ Empty field.



##### message `MoveLeaderRequest` (etcdserver/etcdserverpb/rpc.proto)

| Field | Description | Type |
| ----- | ----------- | ---- |
| targetID | targetID is the node ID for the new leader. | uint64 |



##### message `MoveLeaderResponse` (etcdserver/etcdserverpb/rpc.proto)

| Field | Description | Type |
| ----- | ----------- | ---- |
| header | | ResponseHeader |



##### message `PutRequest` (etcdserver/etcdserverpb/rpc.proto)

| Field | Description | Type |
Expand Down
47 changes: 46 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,33 @@
}
}
},
"/v3alpha/maintenance/transfer-leadership": {
"post": {
"tags": [
"Maintenance"
],
"summary": "MoveLeader requests current leader node to transfer its leadership to transferee.",
"operationId": "MoveLeader",
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/etcdserverpbMoveLeaderRequest"
}
}
],
"responses": {
"200": {
"description": "(empty)",
"schema": {
"$ref": "#/definitions/etcdserverpbMoveLeaderResponse"
}
}
}
}
},
"/v3alpha/watch": {
"post": {
"tags": [
Expand Down Expand Up @@ -1803,6 +1830,24 @@
}
}
},
"etcdserverpbMoveLeaderRequest": {
"type": "object",
"properties": {
"targetID": {
"description": "targetID is the node ID for the new leader.",
"type": "string",
"format": "uint64"
}
}
},
"etcdserverpbMoveLeaderResponse": {
"type": "object",
"properties": {
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
}
}
},
"etcdserverpbPutRequest": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2177,7 +2222,7 @@
"format": "boolean"
},
"compact_revision": {
"description": "compact_revision is set to the minimum index if a watcher tries to watch\nat a compacted index.\n\nThis happens when creating a watcher at a compacted revision or the watcher cannot\ncatch up with the progress of the key-value store. \n\nThe client should treat the watcher as canceled and should not try to create any\nwatcher with the same start_revision again.",
"description": "compact_revision is set to the minimum index if a watcher tries to watch\nat a compacted index.\n\nThis happens when creating a watcher at a compacted revision or the watcher cannot\ncatch up with the progress of the key-value store.\n\nThe client should treat the watcher as canceled and should not try to create any\nwatcher with the same start_revision again.",
"type": "string",
"format": "int64"
},
Expand Down
53 changes: 53 additions & 0 deletions clientv3/integration/maintenance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"testing"

"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

func TestMaintenanceMoveLeader(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

oldLeadIdx := clus.WaitLeader(t)
targetIdx := (oldLeadIdx + 1) % 3
target := uint64(clus.Members[targetIdx].ID())

cli := clus.Client(targetIdx)
_, err := cli.MoveLeader(context.Background(), target)
if err != rpctypes.ErrNotLeader {
t.Fatalf("error expected %v, got %v", rpctypes.ErrNotLeader, err)
}

cli = clus.Client(oldLeadIdx)
_, err = cli.MoveLeader(context.Background(), target)
if err != nil {
t.Fatal(err)
}

leadIdx := clus.WaitLeader(t)
lead := uint64(clus.Members[leadIdx].ID())
if target != lead {
t.Fatalf("new leader expected %d, got %d", target, lead)
}
}
10 changes: 10 additions & 0 deletions clientv3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type (
AlarmResponse pb.AlarmResponse
AlarmMember pb.AlarmMember
StatusResponse pb.StatusResponse
MoveLeaderResponse pb.MoveLeaderResponse
)

type Maintenance interface {
Expand All @@ -51,6 +52,10 @@ type Maintenance interface {

// Snapshot provides a reader for a snapshot of a backend.
Snapshot(ctx context.Context) (io.ReadCloser, error)

// MoveLeader requests current leader to transfer its leadership to the transferee.
// Request must be made to the leader.
MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
}

type maintenance struct {
Expand Down Expand Up @@ -180,3 +185,8 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
}()
return pr, nil
}

func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, grpc.FailFast(false))
return (*MoveLeaderResponse)(resp), toErr(ctx, err)
}
92 changes: 92 additions & 0 deletions e2e/ctl_v3_move_leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
)

func TestCtlV3MoveLeader(t *testing.T) {
defer testutil.AfterTest(t)

epc := setupEtcdctlTest(t, &configNoTLS, true)
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}()

var leadIdx int
var leaderID uint64
var transferee uint64
for i, ep := range epc.grpcEndpoints() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: 3 * time.Second,
})
if err != nil {
t.Fatal(err)
}
resp, err := cli.Status(context.Background(), ep)
if err != nil {
t.Fatal(err)
}
cli.Close()

if resp.Header.GetMemberId() == resp.Leader {
leadIdx = i
leaderID = resp.Leader
} else {
transferee = resp.Header.GetMemberId()
}
}

os.Setenv("ETCDCTL_API", "3")
defer os.Unsetenv("ETCDCTL_API")
cx := ctlCtx{
t: t,
cfg: configNoTLS,
dialTimeout: 7 * time.Second,
epc: epc,
}

tests := []struct {
prefixes []string
expect string
}{
{ // request to non-leader
cx.prefixArgs([]string{cx.epc.grpcEndpoints()[(leadIdx+1)%3]}),
"no leader endpoint given at ",
},
{ // request to leader
cx.prefixArgs([]string{cx.epc.grpcEndpoints()[leadIdx]}),
fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)),
},
}
for i, tc := range tests {
cmdArgs := append(tc.prefixes, "move-leader", types.ID(transferee).String())
if err := spawnWithExpect(cmdArgs, tc.expect); err != nil {
t.Fatalf("#%d: %v", i, err)
}
}
}
23 changes: 23 additions & 0 deletions etcdctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,29 @@ Prints a line of JSON encoding the database hash, revision, total keys, and size
+----------+----------+------------+------------+
```

### MOVE-LEADER \<hexadecimal-transferee-id\>

MOVE-LEADER transfers leadership from the leader to another member in the cluster.

#### Example

```bash
# to choose transferee
transferee_id=$(./etcdctl \
--endpoints localhost:12379,localhost:22379,localhost:32379 \
endpoint status | grep -m 1 "false" | awk -F', ' '{print $2}')
echo ${transferee_id}
# c89feb932daef420

# endpoints should include leader node
./etcdctl --endpoints ${transferee_ep} move-leader ${transferee_id}
# Error: no leader endpoint given at [localhost:22379 localhost:32379]

# request to leader with target node ID
./etcdctl --endpoints ${leader_ep} move-leader ${transferee_id}
# Leadership transferred from 45ddc0e800e20b93 to c89feb932daef420
```

## Concurrency commands

### LOCK \<lockname\> [command arg1 arg2 ...]
Expand Down
87 changes: 87 additions & 0 deletions etcdctl/ctlv3/command/move_leader_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
"fmt"
"strconv"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/spf13/cobra"
)

// NewMoveLeaderCommand returns the cobra command for "move-leader".
func NewMoveLeaderCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "move-leader <transferee-member-id>",
Short: "Transfers leadership to another etcd cluster member.",
Run: transferLeadershipCommandFunc,
}
return cmd
}

// transferLeadershipCommandFunc executes the "compaction" command.
func transferLeadershipCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
ExitWithError(ExitBadArgs, fmt.Errorf("move-leader command needs 1 argument"))
}
target, err := strconv.ParseUint(args[0], 16, 64)
if err != nil {
ExitWithError(ExitBadArgs, err)
}

c := mustClientFromCmd(cmd)
eps := c.Endpoints()
c.Close()

ctx, cancel := commandCtx(cmd)

// find current leader
var leaderCli *clientv3.Client
var leaderID uint64
for _, ep := range eps {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: 3 * time.Second,
})
if err != nil {
ExitWithError(ExitError, err)
}
resp, err := cli.Status(ctx, ep)
if err != nil {
ExitWithError(ExitError, err)
}

if resp.Header.GetMemberId() == resp.Leader {
leaderCli = cli
leaderID = resp.Leader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break?

break
}
cli.Close()
}
if leaderCli == nil {
ExitWithError(ExitBadArgs, fmt.Errorf("no leader endpoint given at %v", eps))
}

var resp *clientv3.MoveLeaderResponse
resp, err = leaderCli.MoveLeader(ctx, target)
cancel()
if err != nil {
ExitWithError(ExitError, err)
}

display.MoveLeader(leaderID, target, *resp)
}
Loading