Skip to content

Commit

Permalink
vFile: handle swarm node promotion and demotion (vmware-archive#1868)
Browse files Browse the repository at this point in the history
When a node is promoted from worker to manager, the helper thread
will join ETCD cluster according to swarm information; On the other
hand, when the node is demoted from manager to worker, the helper
thread should stop the watcher, delete itself from ETCD member list,
and clean up the ETCD data directory.
  • Loading branch information
Miao Luo authored and shuklanirdesh82 committed Nov 1, 2017
1 parent 087b0d7 commit 4431771
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 34 deletions.
212 changes: 183 additions & 29 deletions client_plugin/drivers/vfile/kvstore/etcdops/etcdops.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
Expand All @@ -38,6 +39,7 @@ import (
etcdClusterToken: ID of the cluster to create/join
etcdListenURL: etcd listening interface
etcdScheme: Protocol used for communication
etcdDataDir: Data directory for ETCD
etcdClusterStateNew: Used to indicate the formation of a new
cluster
etcdClusterStateExisting: Used to indicate that this node is joining
Expand All @@ -58,12 +60,13 @@ const (
etcdClusterToken = "vfile-etcd-cluster"
etcdListenURL = "0.0.0.0"
etcdScheme = "http://"
etcdDataDir = "/etcd-data"
etcdClusterStateNew = "new"
etcdClusterStateExisting = "existing"
etcdRequestTimeout = 2 * time.Second
etcdUpdateTimeout = 10 * time.Second
checkSleepDuration = time.Second
gcTicker = 30 * time.Second
gcTicker = 15 * time.Second
etcdClientCreateError = "Failed to create etcd client"
swarmUnhealthyErrorMsg = "Swarm cluster maybe unhealthy"
etcdSingleRef = "1"
Expand All @@ -74,6 +77,13 @@ type EtcdKVS struct {
dockerOps *dockerops.DockerOps
nodeID string
nodeAddr string
// isManager indicates if current node is a manager or not
isManager bool
// etcdCMD records the cmd struct for ETCD process
// it's used for stopping ETCD process when node is demoted
etcdCMD *exec.Cmd
// watcher is used for killing the watch request when node is demoted
watcher *etcdClient.Client
}

// VFileVolConnectivityData - Contains metadata of vFile volumes
Expand Down Expand Up @@ -101,12 +111,15 @@ func NewKvStore(dockerOps *dockerops.DockerOps) *EtcdKVS {
dockerOps: dockerOps,
nodeID: nodeID,
nodeAddr: addr,
isManager: isManager,
}

if !isManager {
log.WithFields(
log.Fields{"nodeID": nodeID},
).Info("Swarm node role: worker. Return from NewKvStore ")
// start helper before return
go e.etcdHelper()
return e
}

Expand Down Expand Up @@ -135,28 +148,22 @@ func NewKvStore(dockerOps *dockerops.DockerOps) *EtcdKVS {
).Error("Failed to start ETCD Cluster ")
return nil
}
// start helper before return
go e.etcdHelper()
return e
}

// if manager, first find out who's leader, then proceed to join ETCD cluster
leaderAddr, err := dockerOps.GetSwarmLeader()
if err != nil {
log.WithFields(
log.Fields{
"nodeID": nodeID,
"error": err},
).Error("Failed to get swarm leader address ")
return nil
}

err = e.joinEtcdCluster(leaderAddr)
// if manager, join ETCD cluster
err = e.joinEtcdCluster()
if err != nil {
log.WithFields(log.Fields{
"nodeID": nodeID,
"error": err},
).Error("Failed to join ETCD Cluster")
return nil
}
// start helper before return
go e.etcdHelper()
return e
}

Expand All @@ -167,6 +174,7 @@ func (e *EtcdKVS) startEtcdCluster() error {
log.Infof("startEtcdCluster on node with nodeID %s and nodeAddr %s", nodeID, nodeAddr)
lines := []string{
"--name", nodeID,
"--data-dir", etcdDataDir,
"--advertise-client-urls", etcdScheme + nodeAddr + etcdClientPort,
"--initial-advertise-peer-urls", etcdScheme + nodeAddr + etcdPeerPort,
"--listen-client-urls", etcdScheme + etcdListenURL + etcdClientPort,
Expand All @@ -177,17 +185,27 @@ func (e *EtcdKVS) startEtcdCluster() error {
}

// start the routine to create an etcd cluster
go etcdService(lines)
e.etcdStartService(lines)

// check if etcd cluster is successfully started, then start the watcher
return e.checkLocalEtcd()
}

// joinEtcdCluster function is called by a non-leader swarm manager to join a ETCD cluster
func (e *EtcdKVS) joinEtcdCluster(leaderAddr string) error {
func (e *EtcdKVS) joinEtcdCluster() error {
nodeAddr := e.nodeAddr
nodeID := e.nodeID
log.Infof("joinEtcdCluster on node with nodeID %s and nodeAddr %s leaderAddr %s", nodeID, nodeAddr, leaderAddr)
log.Infof("joinEtcdCluster on node with nodeID %s and nodeAddr %s", nodeID, nodeAddr)

leaderAddr, err := e.dockerOps.GetSwarmLeader()
if err != nil {
log.WithFields(
log.Fields{
"nodeID": nodeID,
"error": err},
).Error("Failed to get swarm leader address ")
return err
}

etcd, err := addrToEtcdClient(leaderAddr)
if err != nil {
Expand All @@ -196,6 +214,7 @@ func (e *EtcdKVS) joinEtcdCluster(leaderAddr string) error {
"leaderAddr": leaderAddr,
"nodeID": nodeID},
).Error("Failed to join ETCD cluster on manager ")
return err
}
defer etcd.Close()

Expand Down Expand Up @@ -280,6 +299,7 @@ func (e *EtcdKVS) joinEtcdCluster(leaderAddr string) error {

lines := []string{
"--name", nodeID,
"--data-dir", etcdDataDir,
"--advertise-client-urls", etcdScheme + nodeAddr + etcdClientPort,
"--initial-advertise-peer-urls", etcdScheme + nodeAddr + etcdPeerPort,
"--listen-client-urls", etcdScheme + etcdListenURL + etcdClientPort,
Expand All @@ -290,20 +310,104 @@ func (e *EtcdKVS) joinEtcdCluster(leaderAddr string) error {
}

// start the routine for joining an etcd cluster
go etcdService(lines)
e.etcdStartService(lines)

// check if successfully joined the etcd cluster, then start the watcher
return e.checkLocalEtcd()
}

// etcdService function starts a routine of etcd
func etcdService(cmd []string) {
_, err := exec.Command("/bin/etcd", cmd...).Output()
// leaveEtcdCluster function is called when a manager is demoted
func (e *EtcdKVS) leaveEtcdCluster() error {
etcd, err := addrToEtcdClient(e.nodeAddr)
if err != nil {
log.WithFields(
log.Fields{"nodeAddr": e.nodeAddr,
"nodeID": e.nodeID},
).Error("Failed to create ETCD client from own address")
return err
}
defer etcd.Close()

// list all current ETCD members
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
lresp, err := etcd.MemberList(ctx)
cancel()
if err != nil {
log.WithFields(
log.Fields{"nodeAddr": e.nodeAddr,
"error": err},
).Error("Failed to list member for ETCD")
return err
}

// create the peer URL for filtering ETCD member information
// each ETCD member has a unique peer URL
peerAddr := etcdScheme + e.nodeAddr + etcdPeerPort
for _, member := range lresp.Members {
// loop all current etcd members to find if there is already a member with the same peerAddr
if member.PeerURLs[0] == peerAddr {
log.WithFields(
log.Fields{"nodeID": e.nodeID,
"peerAddr": peerAddr},
).Info("Remove self from ETCD member due to demotion. ")

ctx, cancel = context.WithTimeout(context.Background(), etcdRequestTimeout)
_, err = etcd.MemberRemove(ctx, member.ID)
cancel()
if err != nil {
log.WithFields(
log.Fields{"peerAddr": peerAddr,
"member.ID": member.ID},
).Error("Failed to remove this node from ETCD ")
return err
}

// the same peerAddr can only join at once. no need to continue.
log.WithFields(
log.Fields{"peerAddr": peerAddr,
"member.ID": member.ID},
).Info("Successfully removed self from ETCD ")
break
}
}

e.etcdStopService()
return nil
}

// etcdStartService function starts an ETCD process
func (e *EtcdKVS) etcdStartService(lines []string) {
cmd := exec.Command("/bin/etcd", lines...)
err := cmd.Start()
if err != nil {
log.WithFields(
log.Fields{"error": err, "cmd": cmd},
).Error("Failed to start ETCD command ")
return
}

e.etcdCMD = cmd
}

// etcdStopService function stops the ETCD process
func (e *EtcdKVS) etcdStopService() {
// stop watcher
e.watcher.Close()

// stop ETCD process
if err := e.etcdCMD.Process.Kill(); err != nil {
log.Errorf("Failed to stop ETCD process. Error: %v", err)
return
}

// clean up ETCD data
if err := os.RemoveAll(etcdDataDir); err != nil {
log.Errorf("Failed to remove ETCD data directory. Error: %v", err)
return
}

log.Infof("Stopped ETCD service due to demotion")
e.etcdCMD = nil
}

// checkLocalEtcd function check if local ETCD endpoint is successfully started or not
Expand All @@ -325,8 +429,9 @@ func (e *EtcdKVS) checkLocalEtcd() error {
"error": err},
).Warningf("Failed to get ETCD client, retry before timeout ")
} else {
log.Infof("Local ETCD client is up successfully, start watcher")
e.watcher = cli
go e.etcdWatcher(cli)
go e.serviceAndVolumeGC(cli)
return nil
}
case <-timer.C:
Expand All @@ -337,7 +442,6 @@ func (e *EtcdKVS) checkLocalEtcd() error {

// etcdWatcher function sets up a watcher to monitor all the changes to global refcounts in the KV store
func (e *EtcdKVS) etcdWatcher(cli *etcdClient.Client) {
// TODO: when the manager is demoted to worker, the watcher should be cancelled
watchCh := cli.Watch(context.Background(), kvstore.VolPrefixGRef,
etcdClient.WithPrefix(), etcdClient.WithPrevKV())
for wresp := range watchCh {
Expand All @@ -347,20 +451,30 @@ func (e *EtcdKVS) etcdWatcher(cli *etcdClient.Client) {
}
}

// serviceAndVolumeGC: garbage collector for orphan services or volumes
func (e *EtcdKVS) serviceAndVolumeGC(cli *etcdClient.Client) {
// etcdHelper: a helper thread which does the following tasks with time interval
// 1. clean up orphan services or orphan internal volumes
// 2. monitor the role of the node, start/shutdown etcd service accordingly
func (e *EtcdKVS) etcdHelper() {
ticker := time.NewTicker(gcTicker)
quit := make(chan struct{})

for {
select {
case <-ticker.C:
// find all the vFile volume services
volumesToVerify, err := e.dockerOps.ListVolumesFromServices()
// check the role of this node
err := e.etcdRoleCheck()
if err != nil {
log.Warningf("Failed to get vFile volumes according to docker services")
} else {
e.cleanOrphanService(volumesToVerify)
log.Warningf("Failed to do role check")
}

if e.isManager {
// find all the vFile volume services
volumesToVerify, err := e.dockerOps.ListVolumesFromServices()
if err != nil {
log.Warningf("Failed to get vFile volumes according to docker services")
} else {
e.cleanOrphanService(volumesToVerify)
}
}
case <-quit:
ticker.Stop()
Expand All @@ -369,6 +483,46 @@ func (e *EtcdKVS) serviceAndVolumeGC(cli *etcdClient.Client) {
}
}

func (e *EtcdKVS) etcdRoleCheck() error {
nodeID, _, isManager, err := e.dockerOps.GetSwarmInfo()
if err != nil {
log.WithFields(
log.Fields{"error": err},
).Error("Failed to get swarm Info from docker client ")
return err
}

if isManager {
if !e.isManager {
log.Infof("Node is promoted to manager, prepare to join ETCD cluster")
err = e.joinEtcdCluster()
if err != nil {
log.WithFields(log.Fields{
"nodeID": nodeID,
"error": err},
).Error("Failed to join ETCD Cluster in etcdRoleCheck")
return err
}
e.isManager = true
}
} else {
if e.isManager {
log.Infof("Node is demoted from manager to worker, prepare to leave ETCD cluster")
err = e.leaveEtcdCluster()
if err != nil {
log.WithFields(log.Fields{
"nodeID": nodeID,
"error": err},
).Error("Failed to leave ETCD Cluster in etcdRoleCheck")
return err
}
e.isManager = false
}
}

return nil
}

// cleanOrphanService: stop orphan services
func (e *EtcdKVS) cleanOrphanService(volumesToVerify []string) {
volStates, err := e.KvMapFromPrefix(string(kvstore.VolPrefixState))
Expand Down
2 changes: 1 addition & 1 deletion misc/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ then
$DOCKER run --rm -v $PWD/..:$dir -w $dir $pylint_container $MAKE_ESX pylint
else
docker_socket=/var/run/docker.sock
if [ -z $SSH_KEY_OPT ]
if [ -z '$SSH_KEY_OPT' ]
then
SSH_KEY_OPT="-i /root/.ssh/id_rsa"
fi
Expand Down
9 changes: 9 additions & 0 deletions tests/constants/dockercli/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ const (
// ListNodes list all docker swarm nodes
ListNodes = dockerNode + "ls "

// InspectNode inspects a swarm node
InspectNode = dockerNode + "inspect "

// PromoteNode promotes a swarm worker to manager
PromoteNode = dockerNode + "promote "

// DemoteNode demotes a swarm manager to worker
DemoteNode = dockerNode + "demote "

// CreateService create a docker service
CreateService = dockerService + "create "

Expand Down
Loading

0 comments on commit 4431771

Please sign in to comment.