This repository has been archived by the owner on Nov 2, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
nodes.go
271 lines (217 loc) · 6.29 KB
/
nodes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package main
import (
"context"
"encoding/json"
"log"
"sync"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
k8serrors "k8s.io/apimachinery/pkg/util/errors"
k8s "k8s.io/client-go/kubernetes"
)
// nodeManager manages nodes of the kubernets cluster
type nodeManager struct {
client k8s.Interface
}
type Node interface {
Cordon() error
Uncordon() error
IsCordoned() (bool, error)
Drain() error
}
type NodeManager interface {
Node(name string) Node
}
// NewNodeManager creates a new node manager that connects to the
// cluster using the given client
func NewNodeManager(client k8s.Interface) (NodeManager, error) {
return &nodeManager{
client: client,
}, nil
}
type nodePatchUnschedulable struct {
Spec struct {
Unschedulable bool `json:"unschedulable"`
} `json:"spec,omitempty"`
}
func newNodePatchUnschedulable(state bool) nodePatchUnschedulable {
var patch nodePatchUnschedulable
patch.Spec.Unschedulable = state
return patch
}
type node struct {
manager *nodeManager
name string
}
// Node returns the control interface for the given node
func (nm *nodeManager) Node(name string) Node {
return &node{
manager: nm,
name: name,
}
}
func (n *node) Cordon() error {
return n.manager.CordonNode(n.name)
}
func (n *node) Uncordon() error {
return n.manager.UncordonNode(n.name)
}
func (n *node) IsCordoned() (bool, error) {
return n.manager.IsNodeCordoned(n.name)
}
func (n *node) Drain() error {
return n.manager.DrainNode(n.name)
}
// PatchNodeUnschedulable patches the Spec.Unschedulable field of a node
func (nm *nodeManager) PatchNodeUnschedulable(name string, state bool) error {
bytes, err := json.Marshal(newNodePatchUnschedulable(state))
if err != nil {
return errors.Wrap(err, "failed to patch node")
}
_, err = nm.client.CoreV1().Nodes().Patch(name, k8stypes.StrategicMergePatchType, bytes)
if err != nil {
return errors.Wrap(err, "failed to patch node")
}
return nil
}
// GetNodePods returns a list of Pods running on the given node
func (nm *nodeManager) GetNodePods(name string) ([]v1.Pod, error) {
list, err := nm.client.CoreV1().Pods("").List(metav1.ListOptions{
FieldSelector: "spec.nodeName=" + name,
})
if err != nil {
return nil, errors.Wrap(err, "failed to get pod list")
}
return list.Items, nil
}
// UncordonNode marks the node as schedulable
func (nm *nodeManager) UncordonNode(name string) error {
return nm.PatchNodeUnschedulable(name, false)
}
// CordonNode marks the node as unschedulable
func (nm *nodeManager) CordonNode(name string) error {
return nm.PatchNodeUnschedulable(name, true)
}
func (nm *nodeManager) IsNodeCordoned(name string) (bool, error) {
node, err := nm.client.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
return false, errors.Wrap(err, "failed to get node")
}
return node.Spec.Unschedulable, nil
}
// DrainNode drains the given node by cordon it and
// evicting all pods.
func (nm *nodeManager) DrainNode(name string) error {
// Drain node does the following:
// CordonNode
// Evict all Pods on that node
if err := nm.CordonNode(name); err != nil {
return err
}
/*pods, err := nm.GetNodePods(name)
if err != nil {
return err
}
/*for _, pod := range pods {
if err := EvictPod(nm.client, pod.Namespace, pod.Name); err != nil {
return err
}
}*/
return nil
}
func patchNodeUnschedulable(client k8s.Interface, name string, state bool) error {
bytes, err := json.Marshal(newNodePatchUnschedulable(state))
if err != nil {
return errors.Wrap(err, "failed to patch node")
}
_, err = client.CoreV1().Nodes().Patch(name, k8stypes.StrategicMergePatchType, bytes)
if err != nil {
return errors.Wrap(err, "failed to patch node")
}
return nil
}
func cordonNode(client k8s.Interface, name string) error {
log.Printf("Cordon node %s", name)
return patchNodeUnschedulable(client, name, true)
}
func uncordonNode(client k8s.Interface, name string) error {
log.Printf("Uncordon node %s", name)
return patchNodeUnschedulable(client, name, false)
}
func getNodePods(client k8s.Interface, name string) ([]v1.Pod, error) {
list, err := client.CoreV1().Pods("").List(metav1.ListOptions{
FieldSelector: "spec.nodeName=" + name,
})
if err != nil {
return nil, errors.Wrap(err, "failed to get pod list")
}
return list.Items, nil
}
func drainNode(ctx context.Context, client k8s.Interface, name string, options *metav1.DeleteOptions) error {
if err := cordonNode(client, name); err != nil {
return errors.Wrap(err, "failed to drain node")
}
if err := runForEachPodOnNode(ctx, client, name, func(pod *v1.Pod) error {
return evictPod(ctx, client, pod.GetName(), pod.GetNamespace(), options)
}); err != nil {
return errors.Wrap(err, "failed to drain node")
}
return nil
}
func simulateCrashNode(ctx context.Context, client k8s.Interface, name string, options *metav1.DeleteOptions) error {
if err := cordonNode(client, name); err != nil {
return errors.Wrap(err, "failed to crash node")
}
if err := runForEachPodOnNode(ctx, client, name, func(pod *v1.Pod) error {
return deletePod(ctx, client, pod.GetNamespace(), pod.GetName(), options)
}); err != nil {
return errors.Wrap(err, "failed to crash node")
}
return nil
}
func runForEachPodOnNode(ctx context.Context, client k8s.Interface, name string, job func(*v1.Pod) error) error {
errorChannel := make(chan error)
defer close(errorChannel)
var waitGroup sync.WaitGroup
var errorList []error
var jobsDone int
pods, err := getNodePods(client, name)
if err != nil {
return errors.Wrap(err, "failed to run jobs")
}
for _, pod := range pods {
// Ignore daemonsets
controller := metav1.GetControllerOf(&pod)
if controller != nil && controller.Kind == "DaemonSet" {
continue
}
// Ignore mirror pods
if _, found := pod.ObjectMeta.Annotations[v1.MirrorPodAnnotationKey]; found {
continue
}
jobsDone++
waitGroup.Add(1)
go func(pod v1.Pod) {
defer waitGroup.Done()
errorChannel <- errors.Wrap(job(&pod), "failed to run job")
}(pod)
}
// Check for errors
for jobsDone > 0 {
select {
case err := <-errorChannel:
jobsDone--
if err != nil {
errorList = append(errorList, err)
}
}
}
// should return immediately
waitGroup.Wait()
if len(errorList) != 0 {
return k8serrors.NewAggregate(errorList)
}
return nil
}