-
Notifications
You must be signed in to change notification settings - Fork 95
Shared plugin: add etcd watcher and event handler #1661
Shared plugin: add etcd watcher and event handler #1661
Conversation
a032e77
to
3a83454
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, and I do have several comments/questions.
@@ -134,7 +163,7 @@ func (d *VolumeDriver) initEtcd() error { | |||
"manager ID": nodeID}, | |||
).Info("Swarm node role: manager. Action: find leader ") | |||
|
|||
joinEtcdCluster(addr, n.ManagerStatus.Addr, nodeID) | |||
e.joinEtcdCluster(n.ManagerStatus.Addr) | |||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why here return error as "nil? I think e.startEtcdCluster returns with err, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this... I changed the logic of return last time and forgot to update return error here!
@@ -255,9 +292,11 @@ func joinEtcdCluster(nodeAddr string, leaderAddr string, nodeID string) error { | |||
"--initial-cluster-state", etcdClusterStateExisting, | |||
} | |||
|
|||
// start the etcd cluster routine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the comments is start the etcd cluster routine? I think it should be "join the etcd cluster"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Join etcd cluster is called by manager nodes who are not initializing the cluster. To join, they must start their own etcd, hence the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it really means "start" ... "routine". will update the comment to avoid confusion :)
// checkLocalEtcd function check if local ETCD endpoint is successfully started or not | ||
// if yes, start the watcher for volume global refcount | ||
func (e *etcdInfo) checkLocalEtcd() error { | ||
ticker := time.NewTicker(checkSleepDuration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, how do we decide the right value of "checkSleepDuration" and "requestTimeout"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These values are mainly set according to other projects. Will need to do more testing and adjust values to be more accurate.
} | ||
} | ||
|
||
func (e *etcdInfo) etcdWatcher() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments for this function.
} | ||
} | ||
|
||
func (e *etcdInfo) etcdEventHandler(ev *etcdClient.Event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments for this function.
ev.PrevKv != nil && | ||
string(ev.PrevKv.Value) == etcdNoRef { | ||
// watcher observes global refcount from 0 to 1 | ||
// transactional edit state first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the code to make sure the state edit is transactional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Inside function "CompareAndPut", you call the transaction interface. But you call function "CompareAndPut" 2 or 3 times here. How do we make sure it is transactional too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we want the different compare and put steps to be in the same transaction. State of the volume has to be updated first which will then act as a lock for all other operations. Miao can throw some light maybe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot have those CompareAndPut as a single transaction :)
As Shivanshu has mentioned above, please consider the CompareAndPut to change the state from Ready to Intermediate as "lock", and consider the CompareAndPut to change state from Intermediate to Mounted as "unlock". Only when the "lock" is successful, we go ahead to do corresponding operations (that is, startSMBServer), and "unlock" later.
return | ||
} | ||
|
||
if string(ev.Kv.Value) == etcdNoRef && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code snippet is very similar to the above code which handle "etcdSingleRef" to "etcdNonRef". Is it possible to create a common util function to reuse the code?
@@ -346,14 +520,14 @@ func (d *VolumeDriver) etcdList() ([]string, error) { | |||
} | |||
|
|||
// writeVolMetadata - Update or Create volume metadata in KV store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: "writeVolMetadata" -> "WriteVolMetadata" to be consistent with the function name.
@@ -379,13 +553,13 @@ func (d *VolumeDriver) writeVolMetadata(entries []kvPair) error { | |||
} | |||
|
|||
// readVolMetadata - Read volume metadata in KV store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same above.
@@ -145,7 +147,8 @@ func NewVolumeDriver(cfg config.Config, mountDir string) *VolumeDriver { | |||
d.dockerd = cli | |||
|
|||
// initialize built-in etcd cluster | |||
err = d.initEtcd() | |||
d.etcd.driver = &d |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we doing this?
Putting etcdInfo obj in VolumeDriver
And also putting VolumeDriver in etcdInfo obj?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, we do have access to d. in etcdops.go right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No really :)
For example, from etcdEventHandler
function (which is a method of etcdInfo
struct), we need to call startSMBServer
function (which is a method of VolumeDriver
struct). So from e *etcdInfo
we need to get the VolumeDriver associate with it, and the best way is to link back VolumeDriver inside etcdInfo.
if e.CompareAndPut(etcdPrefixState+volName, | ||
string(volStateIntermediate), | ||
string(volStateReady)) == false { | ||
// Failed to set state Intermediate->Mounted, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment fix: Intermediate -> ready. SMB server has stopped not started.
string(volStateError)) | ||
} | ||
} else { | ||
// failed to start SMB server, set to state Error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same. Failed to stop smb server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor comment fixes suggested.
Ques: Why are we putting etcdInfo obj in VolumeDriver and also VolumeDriver in etcdInfo when VolumeDriver obj "d" is accessible in etcdops?
miao
: Maybe I missed something, but how do you access the corresponding "d" without linking it inside etcdInfo? Remember that etcdEventHandler is a method of etcdInfo now.
There seems a possibility that all watchers can get error in setting state from XXX -> Intermediate.
No change needed in this PR but we will have to handle that in Mount() via timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Have a few comments related to coding style.
type kvStore interface { | ||
// Init - initialize the KV store cluster | ||
Init() error | ||
// WriteVolMetadata - Update or Create volume metadata in KV store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add an empty line in between the interfaces so that it looks prettier?
@@ -319,10 +493,10 @@ func addrToEtcdClient(addr string) (*etcdClient.Client, error) { | |||
} | |||
|
|||
// etcdList function lists all the volume names associated with this KV store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: update the comment
nodeAddr string | ||
client *etcdClient.Client | ||
} | ||
|
||
// initEtcd start or join ETCD cluster depending on the role of the node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: comments need to be updated
ListVolumeName() ([]string, error) | ||
} | ||
|
||
type etcdInfo struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a better name for this interface? The name "etcdInfo" sounds like a data object.
@@ -72,10 +77,32 @@ type kvPair struct { | |||
value string | |||
} | |||
|
|||
type kvStore interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intended that the interface is not exported, but the functions in it are exported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will move the interface code to a following refactor PR :)
log.Fields{"type": ev.Type}, | ||
).Infof("Watcher on global refcount returns event ") | ||
|
||
if ev.Type == etcdClient.EventTypePut { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we plan to add more handlers for other events? If we are just handling PUT event here, we can return fast to avoid too deep nested if conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we will need to check other events in the future...
Every docker swarm manager starts a watcher on all the global refcount keys in etcd. All the managers get events when any global refcount key is updated. Only one of the managers is able to get the right to adjust the state of volume and start/stop the file system server.
Address review comments. *Add missing error return after joinEtcdCluster *Add missing function comments *Address misleading comments *Make duplicated code block into nested function *Delay interface of KvStore change to a later refactor PR
3a83454
to
d1428f6
Compare
nodeID := info.Swarm.NodeID | ||
addr := info.Swarm.NodeAddr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is info.Swarm guaranteed to be there? SHould there be a check on info.Swarm being valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Swarm is a pre-request for this plugin...
But we need a check and error out here too. will update.
nodeAddr: addr, | ||
} | ||
|
||
// worker just returns | ||
if info.Swarm.ControlAvailable == false { | ||
log.WithFields( | ||
log.Fields{"nodeID": nodeID}, | ||
).Info("Swarm node role: worker. Action: return from InitEtcd ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change function name in error message. And error message should really say why its bailing out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching the function name. This one in fact is not a error message but a log info for debugging use.
// if leader, proceed to start ETCD cluster | ||
if node.ManagerStatus.Leader { | ||
log.WithFields( | ||
log.Fields{"nodeID": nodeID}, | ||
).Info("Swarm node role: leader, start etcd cluster") | ||
err = startEtcdCluster(addr, nodeID) | ||
err = e.startEtcdCluster() | ||
if err != nil { | ||
log.WithFields(log.Fields{"nodeID": nodeID, | ||
"error": err}).Error("Failed to start ETCD Cluster") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could make all error messages use the same name - ETCD or etcd, line 113 calls it "etcd".
} | ||
|
||
// if manager, first find out who's leader, then proceed to join ETCD cluster | ||
nodes, err := docker.NodeList(ctx, types.NodeListOptions{}) | ||
if err != nil { | ||
log.WithFields(log.Fields{"nodeID": nodeID, | ||
"error": err}).Error("Failed to get NodeList from swarm manager") | ||
return err | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to leave after starting the etcd cluster above? What happens if this code runs again and calls startEtcdCluster(). Will that error or return ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the NodeList be got first and then startEtcdCluster()? this is an error flow that can be handled first before creating the cluster? Start etcd cluster after line 149.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the first question:
It's OK because only swarm leader will call the startEtcdCluster, and non-leader managers won't call startEtcdCluster.
For the second question:
No. Because swarm leader doesn't need to call NodeList. The workflow is: if this node is a leader, startEtcdCluster and leave; if this node is a non-leader manager, do NodeList to find who's the leader, then joinEtcdCluster and leave.
joinEtcdCluster(addr, n.ManagerStatus.Addr, nodeID) | ||
return nil | ||
e.joinEtcdCluster(n.ManagerStatus.Addr) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is err set?
// set to state Error | ||
e.CompareAndPut(etcdPrefixState+volName, | ||
string(volStateIntermediate), | ||
string(volStateError)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this guaranteed to pass, logs should be included for these failures, because the volume may remain in the intermediate state (creating?) if the state change fails here (already error'ed at line 368). What does the user do in this case? Can the user delete the volume?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, users can still delete the volume with "--force" flag. And we already have the log message inside CompareAndPut function.
succeeded := e.CompareAndPut(etcdPrefixState+volName, | ||
string(fromState), string(volStateIntermediate)) | ||
if !succeeded { | ||
// this handler doesn't get the right to start server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment correct? The preceding line is to set state of a volume vs. starting a server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's set state and then start a server. And set the state is for starting the server :)
if err != nil { | ||
log.WithFields( | ||
log.Fields{"Key": key, | ||
"Error": err}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could log the new and old values
} | ||
|
||
func (e *etcdKVS) createEtcdClient() *etcdClient.Client { | ||
dclient := e.driver.dockerd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At line 92 we're calling this dockerd as "docker", could use a single variable name in different functions where e.driver.dockerd is used - dclient sounds correct.
return | ||
} | ||
|
||
func (e *etcdKVS) CompareAndPut(key string, oldVal string, newVal string) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving changes and @luomiao will handle any comments as appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Address more comments.
2939b9e
to
edf2a09
Compare
Every docker swarm manager starts a watcher on all the global
refcount keys in etcd. All the managers get events when any global
refcount key is updated. Only one of the managers is able to get
the right to adjust the state of volume and start/stop the file
system server.
This PR also changed some of the etcd functions into kvStore interface:
This PR needs to be rebased after PR #1631 is merged.
Basic e2e test passed on local testbed.