From 5abceb4bbc26a6bfe46f62f6ff23131cf548f5a7 Mon Sep 17 00:00:00 2001 From: Miao Luo Date: Wed, 2 Aug 2017 16:23:24 -0700 Subject: [PATCH] Shared plugin: refactor kvstore and docker ops * Create a new kvStore interface, and have etcd as one of the implementations of the kvStore interface. * Move docker host related operations to dockerops package --- client_plugin/Makefile | 4 +- .../drivers/shared/dockerops/dockerops.go | 154 +++++++++++ .../shared/{ => kvstore/etcdops}/etcdops.go | 250 ++++++++---------- .../drivers/shared/kvstore/kvstore.go | 89 +++++++ client_plugin/drivers/shared/shared_driver.go | 173 +++++------- esx_service/utils/vsan_info_test.py | 2 +- 6 files changed, 415 insertions(+), 257 deletions(-) create mode 100644 client_plugin/drivers/shared/dockerops/dockerops.go rename client_plugin/drivers/shared/{ => kvstore/etcdops}/etcdops.go (72%) create mode 100644 client_plugin/drivers/shared/kvstore/kvstore.go diff --git a/client_plugin/Makefile b/client_plugin/Makefile index 7ec49c8a5..8fcefc442 100644 --- a/client_plugin/Makefile +++ b/client_plugin/Makefile @@ -106,7 +106,9 @@ COMMON_SRC = utils/log_formatter/log_formatter.go \ VMDK_PLUGIN_SRC = vmdk_plugin/main.go vmdk_plugin/main_linux.go \ drivers/photon/photon_driver.go drivers/vmdk/vmdk_driver.go -SHARED_PLUGIN_SRC = shared_plugin/main.go drivers/shared/shared_driver.go drivers/shared/etcdops.go +SHARED_PLUGIN_SRC = shared_plugin/main.go drivers/shared/shared_driver.go \ + drivers/shared/kvstore/kvstore.go drivers/shared/kvstore/etcdops/etcdops.go \ + drivers/shared/dockerops/dockerops.go TEST_SRC = ../tests/utils/inputparams/testparams.go diff --git a/client_plugin/drivers/shared/dockerops/dockerops.go b/client_plugin/drivers/shared/dockerops/dockerops.go new file mode 100644 index 000000000..7a410743a --- /dev/null +++ b/client_plugin/drivers/shared/dockerops/dockerops.go @@ -0,0 +1,154 @@ +// Copyright 2017 VMware, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Docker host related operations +// +// DockerOps struct holds the docker client based on a certain API version +// and docker socket. All the operations which require a docker client should +// be executed through this structure, including docker volume create/remove, +// docker service start/stop, and docker information retrieve. + +package dockerops + +import ( + "context" + "errors" + "fmt" + log "github.com/Sirupsen/logrus" + dockerClient "github.com/docker/engine-api/client" + dockerTypes "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/swarm" +) + +const ( + // dockerAPIVersion: docker engine 1.24 and above support this api version + dockerAPIVersion = "v1.24" + // dockerUSocket: Unix socket on which Docker engine is listening + dockerUSocket = "unix:///var/run/docker.sock" +) + +// DockerOps is the interface for docker host related operations +type DockerOps struct { + Dockerd *dockerClient.Client +} + +func NewDockerOps() *DockerOps { + var d *DockerOps + + client, err := dockerClient.NewClient(dockerUSocket, dockerAPIVersion, nil, nil) + if err != nil { + log.WithFields( + log.Fields{"error": err}, + ).Error("Failed to create client for Docker ") + return nil + } + + d = &DockerOps{ + Dockerd: client, + } + + return d +} + +// GetSwarmInfo - returns the node ID and node IP address in swarm cluster +// also returns if this node is a manager or not +func (d *DockerOps) GetSwarmInfo() (nodeID string, addr string, isManager bool, err error) { + info, err := d.Dockerd.Info(context.Background()) + if err != nil { + return + } + + // if node is not in active swarm mode, return error + if info.Swarm.LocalNodeState != swarm.LocalNodeStateActive { + err = fmt.Errorf("Swarm node state is not active, local node state: %s", + string(info.Swarm.LocalNodeState)) + return + } + + // get the swarmID and IP address of current node + nodeID = info.Swarm.NodeID + addr = info.Swarm.NodeAddr + isManager = info.Swarm.ControlAvailable + + return +} + +// GetSwarmManagers - return all the managers according to local docker info +func (d *DockerOps) GetSwarmManagers() ([]swarm.Peer, error) { + info, err := d.Dockerd.Info(context.Background()) + if err != nil { + return nil, err + } + + return info.Swarm.RemoteManagers, nil +} + +// IsSwarmLeader - check if nodeID is a swarm leader or not +// this function can only be executed successfully on a swarm manager node +func (d *DockerOps) IsSwarmLeader(nodeID string) (bool, error) { + node, _, err := d.Dockerd.NodeInspectWithRaw(context.Background(), nodeID) + if err != nil { + return false, err + } + + return node.ManagerStatus.Leader, nil +} + +// GetSwarmLeader - return the IP address of the swarm leader +// this function can only be executed successfully on a swarm manager node +func (d *DockerOps) GetSwarmLeader() (string, error) { + nodes, err := d.Dockerd.NodeList(context.Background(), dockerTypes.NodeListOptions{}) + if err != nil { + return "", err + } + + for _, n := range nodes { + if n.ManagerStatus != nil && n.ManagerStatus.Leader == true { + return n.ManagerStatus.Addr, nil + } + } + + msg := fmt.Sprintf("Failed to get leader for swarm manager") + return "", errors.New(msg) +} + +// VolumeCreate - create volume from docker host with specific volume driver +func (d *DockerOps) VolumeCreate(volumeDriver string, volName string, options map[string]string) error { + dockerVolOptions := dockerTypes.VolumeCreateRequest{ + Driver: volumeDriver, + Name: volName, + DriverOpts: options, + } + + _, err := d.Dockerd.VolumeCreate(context.Background(), dockerVolOptions) + + return err +} + +// VolumeCreate - remove volume from docker host with specific volume driver +func (d *DockerOps) VolumeRemove(volName string) error { + return d.Dockerd.VolumeRemove(context.Background(), volName) +} + +// StartSMBServer - Start SMB server +func (d *DockerOps) StartSMBServer(volName string) bool { + log.Errorf("startSMBServer to be implemented") + return true +} + +// StopSMBServer - Stop SMB server +func (d *DockerOps) StopSMBServer(volName string) bool { + log.Errorf("stopSMBServer to be implemented") + return true +} diff --git a/client_plugin/drivers/shared/etcdops.go b/client_plugin/drivers/shared/kvstore/etcdops/etcdops.go similarity index 72% rename from client_plugin/drivers/shared/etcdops.go rename to client_plugin/drivers/shared/kvstore/etcdops/etcdops.go index 0da6fd5c9..840d817a0 100644 --- a/client_plugin/drivers/shared/etcdops.go +++ b/client_plugin/drivers/shared/kvstore/etcdops/etcdops.go @@ -12,43 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. -package shared +// ETCD implementation for KV Store interface + +package etcdops import ( "context" "errors" "fmt" - log "github.com/Sirupsen/logrus" - etcdClient "github.com/coreos/etcd/clientv3" - "github.com/docker/engine-api/types" - "github.com/docker/engine-api/types/swarm" "os/exec" "strings" "time" + + log "github.com/Sirupsen/logrus" + etcdClient "github.com/coreos/etcd/clientv3" + "github.com/vmware/docker-volume-vsphere/client_plugin/drivers/shared/dockerops" + "github.com/vmware/docker-volume-vsphere/client_plugin/drivers/shared/kvstore" ) /* - etcdClientPort: On which port do etcd clients talk to - the peers? - etcdPeerPort: On which port do etcd peers talk to - each other? + etcdClientPort: port for etcd clients to talk to the peers + etcdPeerPort: port for etcd peers talk to each other etcdClusterToken: ID of the cluster to create/join - etcdListenURL: On which interface is etcd listening? + etcdListenURL: etcd listening interface etcdScheme: Protocol used for communication etcdClusterStateNew: Used to indicate the formation of a new cluster etcdClusterStateExisting: Used to indicate that this node is joining an existing etcd cluster - etcdPrefixState: Each volume has three metadata keys. Each such - key terminates in the name of the volume, but - is preceded by a prefix. This is the prefix - for "State" key - etcdPrefixGRef: The prefix for GRef key (Global refcount) - etcdPrefixInfo: The prefix for info key. This key holds all - other metadata fields squashed into one requestTimeout: After how long should an etcd request timeout etcdClientCreateError: Error indicating failure to create etcd client - VolumeDoesNotExistError: Error indicating that there is no such volume etcdSingleRef: if global refcount 0 -> 1, start SMB server etcdNoRef: if global refcount 1 -> 0, shut down SMB server */ @@ -60,123 +53,98 @@ const ( etcdScheme = "http://" etcdClusterStateNew = "new" etcdClusterStateExisting = "existing" - etcdPrefixState = "SVOLS_stat_" - etcdPrefixGRef = "SVOLS_gref_" - etcdPrefixInfo = "SVOLS_info_" requestTimeout = 5 * time.Second checkSleepDuration = time.Second etcdClientCreateError = "Failed to create etcd client" - VolumeDoesNotExistError = "No such volume" etcdSingleRef = "1" etcdNoRef = "0" ) -// kvPair : Key Value pair holder -type kvPair struct { - key string - value string -} - -type etcdKVS struct { - driver *VolumeDriver - nodeID string - nodeAddr string - client *etcdClient.Client +type EtcdKVS struct { + dockerOps *dockerops.DockerOps + nodeID string + nodeAddr string + client *etcdClient.Client } // NewKvStore function: start or join ETCD cluster depending on the role of the node -func NewKvStore(driver *VolumeDriver) *etcdKVS { - var e *etcdKVS - - ctx := context.Background() - dclient := driver.dockerd +func NewKvStore(dockerOps *dockerops.DockerOps) *EtcdKVS { + var e *EtcdKVS - // get NodeID from docker client - info, err := dclient.Info(ctx) + // get swarm info from docker client + nodeID, addr, isManager, err := dockerOps.GetSwarmInfo() if err != nil { log.WithFields( log.Fields{"error": err}, - ).Error("Failed to get Info from docker client ") + ).Error("Failed to get swarm Info from docker client ") return nil } - // get the swarmID and IP address of current node - if info.Swarm.LocalNodeState != swarm.LocalNodeStateActive { - log.WithFields( - log.Fields{"LocalNodeState": string(info.Swarm.LocalNodeState)}, - ).Errorf("Swarm node state is not active ") - return nil - } - - nodeID := info.Swarm.NodeID - addr := info.Swarm.NodeAddr - - e = &etcdKVS{ - driver: driver, - nodeID: nodeID, - nodeAddr: addr, + e = &EtcdKVS{ + dockerOps: dockerOps, + nodeID: nodeID, + nodeAddr: addr, } - // worker just returns - if info.Swarm.ControlAvailable == false { + if !isManager { log.WithFields( log.Fields{"nodeID": nodeID}, - ).Info("Swarm node role: worker. No further action needed, return from NewKvStore ") + ).Info("Swarm node role: worker. Return from NewKvStore ") return e } // check my local role - node, _, err := dclient.NodeInspectWithRaw(ctx, nodeID) + isLeader, err := dockerOps.IsSwarmLeader(nodeID) if err != nil { - log.WithFields(log.Fields{"nodeID": nodeID, - "error": err}).Error("Failed to inspect node ") + log.WithFields( + log.Fields{ + "nodeID": nodeID, + "error": err}, + ).Error("Failed to check swarm leader status from docker client ") return nil } // if leader, proceed to start ETCD cluster - if node.ManagerStatus.Leader { + if isLeader { log.WithFields( log.Fields{"nodeID": nodeID}, - ).Info("Swarm node role: leader, start ETCD cluster") + ).Info("Swarm node role: leader, start ETCD cluster ") err = e.startEtcdCluster() if err != nil { - log.WithFields(log.Fields{"nodeID": nodeID, - "error": err}).Error("Failed to start ETCD Cluster") + log.WithFields( + log.Fields{ + "nodeID": nodeID, + "error": err}, + ).Error("Failed to start ETCD Cluster ") return nil } return e } // if manager, first find out who's leader, then proceed to join ETCD cluster - nodes, err := dclient.NodeList(ctx, types.NodeListOptions{}) + leaderAddr, err := dockerOps.GetSwarmLeader() if err != nil { - log.WithFields(log.Fields{"nodeID": nodeID, - "error": err}).Error("Failed to get NodeList from swarm manager") + log.WithFields( + log.Fields{ + "nodeID": nodeID, + "error": err}, + ).Error("Failed to get swarm leader address ") return nil } - for _, n := range nodes { - if n.ManagerStatus != nil && n.ManagerStatus.Leader == true { - log.WithFields( - log.Fields{"leader ID": n.ID, - "manager ID": nodeID}, - ).Info("Swarm node role: manager. Action: find leader ") - err = e.joinEtcdCluster(n.ManagerStatus.Addr) - if err != nil { - log.WithFields(log.Fields{"nodeID": nodeID, - "error": err}).Error("Failed to join ETCD Cluster") - return nil - } - return e - } + err = e.joinEtcdCluster(leaderAddr) + if err != nil { + log.WithFields(log.Fields{ + "nodeID": nodeID, + "error": err}, + ).Error("Failed to join ETCD Cluster") + return nil } - - log.Errorf("Failed to get leader for swarm manager %s", nodeID) - return nil + return e } // startEtcdCluster function is called by swarm leader to start a ETCD cluster -func (e *etcdKVS) startEtcdCluster() error { +func (e *EtcdKVS) startEtcdCluster() error { nodeID := e.nodeID nodeAddr := e.nodeAddr lines := []string{ @@ -198,7 +166,7 @@ func (e *etcdKVS) startEtcdCluster() error { } // joinEtcdCluster function is called by a non-leader swarm manager to join a ETCD cluster -func (e *etcdKVS) joinEtcdCluster(leaderAddr string) error { +func (e *EtcdKVS) joinEtcdCluster(leaderAddr string) error { nodeAddr := e.nodeAddr nodeID := e.nodeID @@ -314,7 +282,7 @@ func etcdService(cmd []string) { // checkLocalEtcd function check if local ETCD endpoint is successfully started or not // if yes, start the watcher for volume global refcount -func (e *etcdKVS) checkLocalEtcd() error { +func (e *EtcdKVS) checkLocalEtcd() error { ticker := time.NewTicker(checkSleepDuration) defer ticker.Stop() timer := time.NewTimer(requestTimeout) @@ -342,9 +310,9 @@ func (e *etcdKVS) checkLocalEtcd() error { } // etcdWatcher function sets up a watcher to monitor all the changes to global refcounts in the KV store -func (e *etcdKVS) etcdWatcher() { +func (e *EtcdKVS) etcdWatcher() { // TODO: when the manager is demoted to worker, the watcher should be cancelled - watchCh := e.client.Watch(context.Background(), etcdPrefixGRef, + watchCh := e.client.Watch(context.Background(), kvstore.VolPrefixGRef, etcdClient.WithPrefix(), etcdClient.WithPrevKV()) for wresp := range watchCh { for _, ev := range wresp.Events { @@ -354,18 +322,18 @@ func (e *etcdKVS) etcdWatcher() { } // etcdEventHandler function handles the returned event from etcd watcher of global refcount changes -func (e *etcdKVS) etcdEventHandler(ev *etcdClient.Event) { +func (e *EtcdKVS) etcdEventHandler(ev *etcdClient.Event) { log.WithFields( log.Fields{"type": ev.Type}, ).Infof("Watcher on global refcount returns event ") - nested := func(key string, fromState volStatus, - toState volStatus, interimState volStatus, + nested := func(key string, fromState kvstore.VolStatus, + toState kvstore.VolStatus, interimState kvstore.VolStatus, fn func(string) bool) { // watcher observes global refcount critical change // transactional edit state first - volName := strings.TrimPrefix(key, etcdPrefixGRef) - succeeded := e.CompareAndPutStateOrBusywait(etcdPrefixState+volName, + volName := strings.TrimPrefix(key, kvstore.VolPrefixGRef) + succeeded := e.CompareAndPutStateOrBusywait(kvstore.VolPrefixState+volName, string(fromState), string(interimState)) if !succeeded { // this handler doesn't get the right to start server @@ -374,20 +342,20 @@ func (e *etcdKVS) etcdEventHandler(ev *etcdClient.Event) { if fn(volName) { // server start/stop succeed, set desired state - if e.CompareAndPut(etcdPrefixState+volName, + if e.CompareAndPut(kvstore.VolPrefixState+volName, string(interimState), string(toState)) == false { // Failed to set state to desired state // set to state Error - e.CompareAndPut(etcdPrefixState+volName, + e.CompareAndPut(kvstore.VolPrefixState+volName, string(interimState), - string(volStateError)) + string(kvstore.VolStateError)) } } else { // failed to start/stop server, set to state Error - e.CompareAndPut(etcdPrefixState+volName, + e.CompareAndPut(kvstore.VolPrefixState+volName, string(interimState), - string(volStateError)) + string(kvstore.VolStateError)) } return } @@ -399,16 +367,16 @@ func (e *etcdKVS) etcdEventHandler(ev *etcdClient.Event) { ev.PrevKv != nil && string(ev.PrevKv.Value) == etcdNoRef { // Refcount went 0 -> 1 - nested(string(ev.Kv.Key), volStateReady, - volStateMounted, volStateMounting, - e.driver.startSMBServer) + nested(string(ev.Kv.Key), kvstore.VolStateReady, + kvstore.VolStateMounted, kvstore.VolStateMounting, + e.dockerOps.StartSMBServer) } else if string(ev.Kv.Value) == etcdNoRef && ev.PrevKv != nil && string(ev.PrevKv.Value) == etcdSingleRef { // Refcount went 1 -> 0 - nested(string(ev.Kv.Key), volStateMounted, - volStateReady, volStateUnmounting, - e.driver.stopSMBServer) + nested(string(ev.Kv.Key), kvstore.VolStateMounted, + kvstore.VolStateReady, kvstore.VolStateUnmounting, + e.dockerOps.StopSMBServer) } } return @@ -416,7 +384,7 @@ func (e *etcdKVS) etcdEventHandler(ev *etcdClient.Event) { // CompareAndPut function: compare the value of the kay with oldVal // if equal, replace with newVal and return true; or else, return false. -func (e *etcdKVS) CompareAndPut(key string, oldVal string, newVal string) bool { +func (e *EtcdKVS) CompareAndPut(key string, oldVal string, newVal string) bool { txresp, err := e.client.Txn(context.TODO()).If( etcdClient.Compare(etcdClient.Value(key), "=", oldVal), ).Then( @@ -437,7 +405,7 @@ func (e *etcdKVS) CompareAndPut(key string, oldVal string, newVal string) bool { } //CompareAndPutOrFetch - Compare and put of get the current value of the key -func (e *etcdKVS) CompareAndPutOrFetch(key string, +func (e *EtcdKVS) CompareAndPutOrFetch(key string, oldVal string, newVal string) (*etcdClient.TxnResponse, error) { txresp, err := e.client.Txn(context.TODO()).If( @@ -463,7 +431,7 @@ func (e *etcdKVS) CompareAndPutOrFetch(key string, // CompareAndPutStateOrBusywait function: compare the volume state with oldVal // if equal, replace with newVal and return true; or else, return false; // waits if volume is in a state from where it can reach the ready state -func (e *etcdKVS) CompareAndPutStateOrBusywait(key string, oldVal string, newVal string) bool { +func (e *EtcdKVS) CompareAndPutStateOrBusywait(key string, oldVal string, newVal string) bool { var txresp *etcdClient.TxnResponse var err error @@ -483,8 +451,8 @@ func (e *etcdKVS) CompareAndPutStateOrBusywait(key string, oldVal string, newVal if txresp.Succeeded == false { resp := txresp.Responses[0].GetResponseRange() // Did we encounter states other than Unmounting or Creating? - if (string(resp.Kvs[0].Value) != string(volStateUnmounting)) && - (string(resp.Kvs[0].Value) != string(volStateCreating)) { + if (string(resp.Kvs[0].Value) != string(kvstore.VolStateUnmounting)) && + (string(resp.Kvs[0].Value) != string(kvstore.VolStateCreating)) { log.Infof("Volume not in proper state for the operation: %s", string(resp.Kvs[0].Value)) return false @@ -501,18 +469,17 @@ func (e *etcdKVS) CompareAndPutStateOrBusywait(key string, oldVal string, newVal } } -func (e *etcdKVS) createEtcdClient() *etcdClient.Client { - dclient := e.driver.dockerd - - info, err := dclient.Info(context.Background()) +// createEtcdClient function creates an ETCD client according to swarm manager info +func (e *EtcdKVS) createEtcdClient() *etcdClient.Client { + managers, err := e.dockerOps.GetSwarmManagers() if err != nil { log.WithFields( log.Fields{"error": err}, - ).Error("Failed to get Info from docker client ") + ).Error("Failed to get swarm managers ") return nil } - for _, manager := range info.Swarm.RemoteManagers { + for _, manager := range managers { etcd, err := addrToEtcdClient(manager.Addr) if err == nil { return etcd @@ -520,8 +487,8 @@ func (e *etcdKVS) createEtcdClient() *etcdClient.Client { } log.WithFields( - log.Fields{"Swarm ID": info.Swarm.NodeID, - "IP Addr": info.Swarm.NodeAddr}, + log.Fields{"Swarm ID": e.nodeID, + "IP Addr": e.nodeAddr}, ).Error("Failed to create ETCD client according to manager info ") return nil } @@ -550,9 +517,9 @@ func addrToEtcdClient(addr string) (*etcdClient.Client, error) { return etcd, nil } -// ListVolumeName function lists all the volume names associated with this KV store -func (e *etcdKVS) ListVolumeName() ([]string, error) { - var volumes []string +// List function lists all the different portion of keys with the given prefix +func (e *EtcdKVS) List(prefix string) ([]string, error) { + var keys []string etcd := e.createEtcdClient() if etcd == nil { @@ -560,25 +527,26 @@ func (e *etcdKVS) ListVolumeName() ([]string, error) { } ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - resp, err := etcd.Get(ctx, etcdPrefixState, etcdClient.WithPrefix(), + resp, err := etcd.Get(ctx, prefix, etcdClient.WithPrefix(), etcdClient.WithSort(etcdClient.SortByKey, etcdClient.SortDescend)) cancel() if err != nil { log.WithFields( - log.Fields{"error": err}, - ).Error("Failed to call ETCD Get for listing all volumes ") + log.Fields{"error": err, + "prefix": prefix}, + ).Error("Failed to call ETCD Get for listing all keys with prefix ") return nil, err } for _, ev := range resp.Kvs { - volumes = append(volumes, strings.TrimPrefix(string(ev.Key), etcdPrefixState)) + keys = append(keys, strings.TrimPrefix(string(ev.Key), prefix)) } - return volumes, nil + return keys, nil } -// WriteVolMetadata - Update or Create volume metadata in KV store -func (e *etcdKVS) WriteVolMetadata(entries []kvPair) error { +// WriteMetaData - Update or Create metadata in KV store +func (e *EtcdKVS) WriteMetaData(entries []kvstore.KvPair) error { var ops []etcdClient.Op var msg string @@ -591,7 +559,7 @@ func (e *etcdKVS) WriteVolMetadata(entries []kvPair) error { // ops contain multiple operations that will be done to etcd // in a single revision for _, elem := range entries { - ops = append(ops, etcdClient.OpPut(elem.key, elem.value)) + ops = append(ops, etcdClient.OpPut(elem.Key, elem.Value)) } // Lets write the metadata in a single transaction @@ -610,9 +578,9 @@ func (e *etcdKVS) WriteVolMetadata(entries []kvPair) error { return nil } -// ReadVolMetadata - Read volume metadata in KV store -func (e *etcdKVS) ReadVolMetadata(keys []string) ([]kvPair, error) { - var entries []kvPair +// ReadMetaData - Read metadata in KV store +func (e *EtcdKVS) ReadMetaData(keys []string) ([]kvstore.KvPair, error) { + var entries []kvstore.KvPair var ops []etcdClient.Op var missedCount int @@ -645,13 +613,13 @@ func (e *etcdKVS) ReadVolMetadata(keys []string) ([]kvPair, error) { missedCount++ continue } - entry := kvPair{key: elem, value: string(resp.Kvs[0].Value)} + entry := kvstore.KvPair{Key: elem, Value: string(resp.Kvs[0].Value)} entries = append(entries, entry) } if missedCount == len(keys) { // Volume does not exist - return nil, errors.New(VolumeDoesNotExistError) + return nil, errors.New(kvstore.VolumeDoesNotExistError) } else if missedCount > 0 { // This should not happen // There is a volume but we couldn't read all its keys @@ -662,8 +630,8 @@ func (e *etcdKVS) ReadVolMetadata(keys []string) ([]kvPair, error) { return entries, nil } -// DeleteVolMetadata - Delete volume metadata in KV store -func (e *etcdKVS) DeleteVolMetadata(name string) error { +// DeleteMetaData - Delete volume metadata in KV store +func (e *EtcdKVS) DeleteMetaData(name string) error { var msg string var err error @@ -675,9 +643,9 @@ func (e *etcdKVS) DeleteVolMetadata(name string) error { // ops hold multiple operations that will be done to etcd // in a single revision. Add all keys for this volname. ops := []etcdClient.Op{ - etcdClient.OpDelete(etcdPrefixState + name), - etcdClient.OpDelete(etcdPrefixGRef + name), - etcdClient.OpDelete(etcdPrefixInfo + name), + etcdClient.OpDelete(kvstore.VolPrefixState + name), + etcdClient.OpDelete(kvstore.VolPrefixGRef + name), + etcdClient.OpDelete(kvstore.VolPrefixInfo + name), } // Delete the metadata in a single transaction diff --git a/client_plugin/drivers/shared/kvstore/kvstore.go b/client_plugin/drivers/shared/kvstore/kvstore.go new file mode 100644 index 000000000..97d738d91 --- /dev/null +++ b/client_plugin/drivers/shared/kvstore/kvstore.go @@ -0,0 +1,89 @@ +// Copyright 2017 VMware, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// KV Store interface +// +// Provide functionalities of Key-Value store for volume driver usage. + +package kvstore + +// VolStatus: Datatype for keeping status of a shared volume +type VolStatus string + +/* + Constants: + VolStateCreating: Shared volume is being created. Not ready to be mounted. + VolStateReady: Shared volume is ready to be mounted but + no Samba service running right now. + VolStateMounted: Samba service already running. Volume mounted + on at least one host VM. + VolStateMounting: Volume is being mounted, File sharing service is being + started for this volume. + VolStateUnmounting: Volume is being unmounted, File sharing service is being + stopped for this volume. + VolStateDeleting: Shared volume is being deleted. Not ready to be mounted. + VolStateError: Shared volume in error state. + + VolPrefixState: Each volume has three metadata keys. Each such + key terminates in the name of the volume, but + is preceded by a prefix. This is the prefix + for "State" key + VolPrefixGRef: The prefix for GRef key (Global refcount) + VolPrefixInfo: The prefix for info key. This key holds all + other metadata fields squashed into one + + VolumeDoesNotExistError: Error indicating that there is no such volume +*/ +const ( + VolStateCreating VolStatus = "Creating" + VolStateReady VolStatus = "Ready" + VolStateMounted VolStatus = "Mounted" + VolStateMounting VolStatus = "Mounting" + VolStateUnmounting VolStatus = "Unmounting" + VolStateDeleting VolStatus = "Deleting" + VolStateError VolStatus = "Error" + VolPrefixState = "SVOLS_stat_" + VolPrefixGRef = "SVOLS_gref_" + VolPrefixInfo = "SVOLS_info_" + VolumeDoesNotExistError = "No such volume" +) + +// KvPair : Key Value pair holder +type KvPair struct { + Key string + Value string +} + +// KvStore is the interface for VolumeDriver to access a plugin-level KV store +type KvStore interface { + // WriteMetaData - Update or Create volume metadata in KV store + WriteMetaData(entries []KvPair) error + + // ReadMetaData - Read volume metadata in KV store + ReadMetaData(keys []string) ([]KvPair, error) + + // DeleteMetaData - Delete volume metadata in KV store + DeleteMetaData(name string) error + + // CompareAndPut - Compare the value of key with oldVal, if equal, replace with newVal + CompareAndPut(key string, oldVal string, newVal string) bool + + // CompareAndPutStateOrBusywait - Compare the volume state with oldVal + // if equal, replace with newVal and return true; or else, return false; + // waits if volume is in a state from where it can reach the ready state + CompareAndPutStateOrBusywait(key string, oldVal string, newVal string) bool + + // List - List all the different portion of keys with a given prefix + List(prefix string) ([]string, error) +} diff --git a/client_plugin/drivers/shared/shared_driver.go b/client_plugin/drivers/shared/shared_driver.go index aa5dc55a9..e27b288fd 100644 --- a/client_plugin/drivers/shared/shared_driver.go +++ b/client_plugin/drivers/shared/shared_driver.go @@ -23,15 +23,15 @@ package shared /// import ( - "context" "encoding/json" "errors" "flag" "fmt" log "github.com/Sirupsen/logrus" - dockerClient "github.com/docker/engine-api/client" - dockerTypes "github.com/docker/engine-api/types" "github.com/docker/go-plugins-helpers/volume" + "github.com/vmware/docker-volume-vsphere/client_plugin/drivers/shared/dockerops" + "github.com/vmware/docker-volume-vsphere/client_plugin/drivers/shared/kvstore" + "github.com/vmware/docker-volume-vsphere/client_plugin/drivers/shared/kvstore/etcdops" "github.com/vmware/docker-volume-vsphere/client_plugin/drivers/utils" "github.com/vmware/docker-volume-vsphere/client_plugin/utils/config" "github.com/vmware/docker-volume-vsphere/client_plugin/utils/plugin_utils" @@ -40,49 +40,28 @@ import ( "strings" ) -// volStatus: Datatype for keeping status of a shared volume -type volStatus string - /* Constants version: Version of the shared plugin driver - dockerAPIVersion: docker engine 1.24 and above support this api version - dockerUSocket: Unix socket on which Docker engine is listening internalVolumePrefix: Prefix for the names of internal volumes. These volumes are the actual vmdk backing the shared volumes. sambaImageName: Name of the docker hub image we pull as samba server sambaUsername: Default username for all accessing Samba servers sambaPassword: Default password for accessing all Samba servers - volStateCreating: Shared volume is being created. Not ready to be mounted. - volStateReady: Shared volume is ready to be mounted but - no Samba service running right now. - volStateMounted: Samba service already running. Volume mounted - on at least one host VM. - volStateMounting: Volume is being mounted, File sharing service is being - started for this volume. - volStateUnmounting: Volume is being unmounted, File sharing service is being - stopped for this volume. - volStateDeleting: Shared volume is being deleted. Not ready to be mounted. + stateIdx: Index for volume state in key-value pair array + gRefIdx: Index for global refcount in key-value pair array + infoIdx: Index for metdata information in key-value pair array */ const ( version = "vSphere Shared Volume Driver v0.2" - dockerAPIVersion = "v1.24" - dockerUSocket = "unix:///var/run/docker.sock" internalVolumePrefix = "InternalVol" // TODO Replace with our own samba later - sambaImageName = "dperson/samba" - sambaUsername = "root" - sambaPassword = "badpass" - volStateCreating volStatus = "Creating" - volStateReady volStatus = "Ready" - volStateMounting volStatus = "Mounting" - volStateMounted volStatus = "Mounted" - volStateError volStatus = "Error" - volStateUnmounting volStatus = "Unmounting" - volStateDeleting volStatus = "Deleting" - stateIdx = 0 - GRefIdx = 1 - InfoIdx = 2 + sambaImageName = "dperson/samba" + sambaUsername = "root" + sambaPassword = "badpass" + stateIdx = 0 + gRefIdx = 1 + infoIdx = 2 ) /* VolumeMetadata structure contains all the @@ -101,28 +80,28 @@ const ( // VolumeMetadata - Contains metadata of shared volumes type VolumeMetadata struct { - Status volStatus `json:"-"` // Field won't be marshalled - GlobalRefcount int `json:"-"` // Field won't be marshalled - Port int `json:"port,omitempty"` - ServiceName string `json:"serviceName,omitempty"` - Username string `json:"username,omitempty"` - Password string `json:"password,omitempty"` - ClientList []string `json:"clientList,omitempty"` + Status kvstore.VolStatus `json:"-"` // Field won't be marshalled + GlobalRefcount int `json:"-"` // Field won't be marshalled + Port int `json:"port,omitempty"` + ServiceName string `json:"serviceName,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + ClientList []string `json:"clientList,omitempty"` } /* VolumeDriver - vsphere shared plugin volume driver struct - dockerd: Client used for talking to Docker + dockerOps: Docker related methods and information internalVolumeDriver: Name of the plugin used by shared volume plugin to create internal volumes - etcd: Object of the KV store interface + kvStore: Key-value store related methods and information */ // VolumeDriver - Contains vars specific to this driver type VolumeDriver struct { utils.PluginDriver - dockerd *dockerClient.Client + dockerOps *dockerops.DockerOps internalVolumeDriver string - etcd *etcdKVS + kvStore kvstore.KvStore } // NewVolumeDriver creates driver instance @@ -144,20 +123,17 @@ func NewVolumeDriver(cfg config.Config, mountDir string) *VolumeDriver { d.internalVolumeDriver = *internalVolumeParam } - // create new docker client - cli, err := dockerClient.NewClient(dockerUSocket, dockerAPIVersion, nil, nil) - if err != nil { - log.WithFields( - log.Fields{"error": err}, - ).Error("Failed to create client for Docker ") + // create new docker operation client + d.dockerOps = dockerops.NewDockerOps() + if d.dockerOps == nil { + log.Errorf("Failed to create new DockerOps") return nil } - d.dockerd = cli // initialize built-in etcd cluster - d.etcd = NewKvStore(&d) - if d.etcd == nil { - log.Errorf("Failed to create new etcd KV store") + d.kvStore = etcdops.NewKvStore(d.dockerOps) + if d.kvStore == nil { + log.Errorf("Failed to create new KV store") return nil } @@ -183,7 +159,7 @@ func (d *VolumeDriver) Get(r volume.Request) volume.Response { // List volumes known to the driver func (d *VolumeDriver) List(r volume.Request) volume.Response { - volumes, err := d.etcd.ListVolumeName() + volumes, err := d.kvStore.List(kvstore.VolPrefixState) if err != nil { return volume.Response{Err: err.Error()} } @@ -207,15 +183,15 @@ func (d *VolumeDriver) GetVolume(name string) (map[string]interface{}, error) { // The kv pairs we want from the KV store keys := []string{ - etcdPrefixState + name, - etcdPrefixGRef + name, - etcdPrefixInfo + name, + kvstore.VolPrefixState + name, + kvstore.VolPrefixGRef + name, + kvstore.VolPrefixInfo + name, } // KV pairs will be returned in same order in which they were requested - entries, err := d.etcd.ReadVolMetadata(keys) + entries, err := d.kvStore.ReadMetaData(keys) if err != nil { - if err.Error() == VolumeDoesNotExistError { + if err.Error() == kvstore.VolumeDoesNotExistError { log.Infof("Volume not found: %s", name) return statusMap, err } @@ -225,10 +201,10 @@ func (d *VolumeDriver) GetVolume(name string) (map[string]interface{}, error) { return statusMap, errors.New(msg) } - statusMap["Volume Status"] = entries[stateIdx].value - statusMap["Global Refcount"], _ = strconv.Atoi(entries[GRefIdx].value) + statusMap["Volume Status"] = entries[stateIdx].Value + statusMap["Global Refcount"], _ = strconv.Atoi(entries[gRefIdx].Value) // Unmarshal Info key - err = json.Unmarshal([]byte(entries[InfoIdx].value), &volRecord) + err = json.Unmarshal([]byte(entries[infoIdx].Value), &volRecord) if err != nil { msg := fmt.Sprintf("Failed to unmarshal data. %v", err) log.Warningf(msg) @@ -258,33 +234,19 @@ func (d *VolumeDriver) UnmountVolume(name string) error { func (d *VolumeDriver) Create(r volume.Request) volume.Response { log.Infof("VolumeDriver Create: %s", r.Name) var msg string - var entries []kvPair - - // Ensure that the node running this command is a manager - info, err := d.dockerd.Info(context.Background()) - if err != nil { - msg = fmt.Sprintf("Failed to get Info from docker client. Reason: %v", err) - log.Warningf(msg) - return volume.Response{Err: msg} - } - if info.Swarm.ControlAvailable == false { - msg = fmt.Sprintf("This node is not a swarm manager.") - msg += fmt.Sprintf(" Shared Volume creation is only possible from swarm manager nodes.") - log.Warningf(msg) - return volume.Response{Err: msg} - } + var entries []kvstore.KvPair // Initialize volume metadata in KV store volRecord := VolumeMetadata{ - Status: volStateCreating, + Status: kvstore.VolStateCreating, GlobalRefcount: 0, Username: sambaUsername, Password: sambaPassword, } // Append global refcount and status to kv pairs that will be written - entries = append(entries, kvPair{key: etcdPrefixGRef + r.Name, value: strconv.Itoa(volRecord.GlobalRefcount)}) - entries = append(entries, kvPair{key: etcdPrefixState + r.Name, value: string(volRecord.Status)}) + entries = append(entries, kvstore.KvPair{Key: kvstore.VolPrefixGRef + r.Name, Value: strconv.Itoa(volRecord.GlobalRefcount)}) + entries = append(entries, kvstore.KvPair{Key: kvstore.VolPrefixState + r.Name, Value: string(volRecord.Status)}) // Append the rest of the metadata as one KV pair where the data is jsonified byteRecord, err := json.Marshal(volRecord) if err != nil { @@ -292,10 +254,10 @@ func (d *VolumeDriver) Create(r volume.Request) volume.Response { log.Warningf(msg) return volume.Response{Err: msg} } - entries = append(entries, kvPair{key: etcdPrefixInfo + r.Name, value: string(byteRecord)}) + entries = append(entries, kvstore.KvPair{Key: kvstore.VolPrefixInfo + r.Name, Value: string(byteRecord)}) log.Infof("Attempting to write initial metadata entry for %s", r.Name) - err = d.etcd.WriteVolMetadata(entries) + err = d.kvStore.WriteMetaData(entries) if err != nil { msg = fmt.Sprintf("Failed to create volume %s. Reason: %v", r.Name, err) @@ -306,19 +268,14 @@ func (d *VolumeDriver) Create(r volume.Request) volume.Response { // Create traditional volume as backend to shared volume log.Infof("Attempting to create internal volume for %s", r.Name) internalVolname := internalVolumePrefix + r.Name - dockerVolOptions := dockerTypes.VolumeCreateRequest{ - Driver: d.internalVolumeDriver, - Name: internalVolname, - DriverOpts: r.Options, - } - _, err = d.dockerd.VolumeCreate(context.Background(), dockerVolOptions) + err = d.dockerOps.VolumeCreate(d.internalVolumeDriver, internalVolname, r.Options) if err != nil { msg = fmt.Sprintf("Failed to create internal volume %s. Reason: %v", r.Name, err) msg += fmt.Sprintf(" Check the status of the volumes belonging to driver %s", d.internalVolumeDriver) log.Warningf(msg) // If failed, attempt to delete the metadata for this volume - err = d.etcd.DeleteVolMetadata(r.Name) + err = d.kvStore.DeleteMetaData(r.Name) if err != nil { log.Warningf("Failed to remove metadata entry for volume: %s. Reason: %v", r.Name, err) } @@ -328,15 +285,15 @@ func (d *VolumeDriver) Create(r volume.Request) volume.Response { // Update metadata to indicate successful volume creation log.Infof("Attempting to update volume state to ready for volume: %s", r.Name) entries = nil - entries = append(entries, kvPair{key: etcdPrefixState + r.Name, value: string(volStateReady)}) - err = d.etcd.WriteVolMetadata(entries) + entries = append(entries, kvstore.KvPair{Key: kvstore.VolPrefixState + r.Name, Value: string(kvstore.VolStateReady)}) + err = d.kvStore.WriteMetaData(entries) if err != nil { outerMessage := fmt.Sprintf("Failed to set status of volume %s to ready. Reason: %v", r.Name, err) log.Warningf(outerMessage) // If failed, attempt to remove the backing trad volume log.Infof("Attempting to delete internal volume") - err = d.dockerd.VolumeRemove(context.Background(), internalVolname) + err = d.dockerOps.VolumeRemove(internalVolname) if err != nil { msg = fmt.Sprintf(" Failed to remove internal volume. Reason %v.", err) msg += fmt.Sprintf(" Please remove the volume manually. Volume: %s", internalVolname) @@ -345,7 +302,7 @@ func (d *VolumeDriver) Create(r volume.Request) volume.Response { } // Attempt to delete the metadata for this volume - err = d.etcd.DeleteVolMetadata(r.Name) + err = d.kvStore.DeleteMetaData(r.Name) if err != nil { log.Warningf("Failed to remove metadata entry for volume: %s. Reason: %v", r.Name, err) } @@ -382,21 +339,21 @@ func (d *VolumeDriver) Remove(r volume.Request) volume.Response { } // Test and set status to Deleting - if !d.etcd.CompareAndPutStateOrBusywait(etcdPrefixState+r.Name, - string(volStateReady), - string(volStateDeleting)) { + if !d.kvStore.CompareAndPutStateOrBusywait(kvstore.VolPrefixState+r.Name, + string(kvstore.VolStateReady), + string(kvstore.VolStateDeleting)) { clientFetchSucceeded := true // Get a list of host VMs using this volume, if any keys := []string{ - etcdPrefixInfo + r.Name, + kvstore.VolPrefixInfo + r.Name, } - entries, err := d.etcd.ReadVolMetadata(keys) + entries, err := d.kvStore.ReadMetaData(keys) if err != nil { clientFetchSucceeded = false log.Warningf("Failed to check which host VMs are using volume %s", r.Name) } // Unmarshal Info key - err = json.Unmarshal([]byte(entries[0].value), &volRecord) + err = json.Unmarshal([]byte(entries[0].Value), &volRecord) if err != nil { clientFetchSucceeded = false log.Warningf("Failed to unmarshal data. %v", err) @@ -416,7 +373,7 @@ func (d *VolumeDriver) Remove(r volume.Request) volume.Response { // Delete internal volume log.Infof("Attempting to delete internal volume for %s", r.Name) internalVolname := internalVolumePrefix + r.Name - err := d.dockerd.VolumeRemove(context.Background(), internalVolname) + err := d.dockerOps.VolumeRemove(internalVolname) if err != nil { msg = fmt.Sprintf("Failed to remove internal volume %s. Reason: %v", r.Name, err) msg += fmt.Sprintf(" Check the status of the volumes belonging to driver %s", d.internalVolumeDriver) @@ -426,7 +383,7 @@ func (d *VolumeDriver) Remove(r volume.Request) volume.Response { // Delete metadata associated with this volume log.Infof("Attempting to delete volume metadata for %s", r.Name) - err = d.etcd.DeleteVolMetadata(r.Name) + err = d.kvStore.DeleteMetaData(r.Name) if err != nil { msg = fmt.Sprintf("Failed to delete volume metadata for %s. Reason: %v", r.Name, err) return volume.Response{Err: msg} @@ -467,15 +424,3 @@ func (d *VolumeDriver) Unmount(r volume.UnmountRequest) volume.Response { func (d *VolumeDriver) Capabilities(r volume.Request) volume.Response { return volume.Response{Capabilities: volume.Capability{Scope: "global"}} } - -// startSMBServer - Start SMB server -func (d *VolumeDriver) startSMBServer(volName string) bool { - log.Errorf("startSMBServer to be implemented") - return true -} - -// stopSMBServer - Stop SMB server -func (d *VolumeDriver) stopSMBServer(volName string) bool { - log.Errorf("stopSMBServer to be implemented") - return true -} diff --git a/esx_service/utils/vsan_info_test.py b/esx_service/utils/vsan_info_test.py index dd563bbe8..96ea04751 100644 --- a/esx_service/utils/vsan_info_test.py +++ b/esx_service/utils/vsan_info_test.py @@ -32,7 +32,7 @@ class TestVsanInfo(unittest.TestCase): """ Test VSAN Info API """ VM_NAME = "test-vm" - VSAN_DS = "/vmfs/volumes/vsanDatastore" + VSAN_DS = "/vmfs/volumes/vsandatastore" TEST_DIR = os.path.join(VSAN_DS, "vsan_info_test") TEST_VOL = "test_policy_vol" VMDK_PATH = os.path.join(TEST_DIR, TEST_VOL + ".vmdk")