-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
controller.go
500 lines (444 loc) · 15.8 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
package deploy
import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/k3s-io/k3s/pkg/agent/util"
apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1"
controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
pkgutil "github.com/k3s-io/k3s/pkg/util"
errors2 "github.com/pkg/errors"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/kv"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
)
const (
ControllerName = "deploy"
GVKAnnotation = "addon.k3s.cattle.io/gvks"
startKey = "_start_"
gvkSep = ";"
)
// WatchFiles sets up an OnChange callback to start a periodic goroutine to watch files for changes once the controller has started up.
func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Apply, addons controllersv1.AddonController, disables map[string]bool, bases ...string) error {
w := &watcher{
apply: apply,
addonCache: addons.Cache(),
addons: addons,
bases: bases,
disables: disables,
modTime: map[string]time.Time{},
gvkCache: map[schema.GroupVersionKind]bool{},
discovery: client.Discovery(),
}
addons.Enqueue(metav1.NamespaceNone, startKey)
addons.OnChange(ctx, "addon-start", func(key string, _ *apisv1.Addon) (*apisv1.Addon, error) {
if key == startKey {
go w.start(ctx, client)
}
return nil, nil
})
return nil
}
type watcher struct {
sync.Mutex
apply apply.Apply
addonCache controllersv1.AddonCache
addons controllersv1.AddonClient
bases []string
disables map[string]bool
modTime map[string]time.Time
gvkCache map[schema.GroupVersionKind]bool
recorder record.EventRecorder
discovery discovery.DiscoveryInterface
}
// start calls listFiles at regular intervals to trigger application of manifests that have changed on disk.
func (w *watcher) start(ctx context.Context, client kubernetes.Interface) {
w.recorder = pkgutil.BuildControllerEventRecorder(client, ControllerName, metav1.NamespaceSystem)
force := true
for {
if err := w.listFiles(force); err == nil {
force = false
} else {
logrus.Errorf("Failed to process config: %v", err)
}
select {
case <-ctx.Done():
return
case <-time.After(15 * time.Second):
}
}
}
// listFiles calls listFilesIn on a list of paths.
func (w *watcher) listFiles(force bool) error {
var errs []error
for _, base := range w.bases {
if err := w.listFilesIn(base, force); err != nil {
errs = append(errs, err)
}
}
return merr.NewErrors(errs...)
}
// listFilesIn recursively processes all files within a path, and checks them against the disable and skip lists. Files found that
// are not on either list are loaded as Addons and applied to the cluster.
func (w *watcher) listFilesIn(base string, force bool) error {
files := map[string]os.FileInfo{}
if err := filepath.Walk(base, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
files[path] = info
return nil
}); err != nil {
return err
}
// Make a map of .skip files - these are used later to indicate that a given file should be ignored
// For example, 'addon.yaml.skip' will cause 'addon.yaml' to be ignored completely - unless it is also
// disabled, since disable processing happens first.
skips := map[string]bool{}
keys := make([]string, len(files))
keyIndex := 0
for path, file := range files {
if strings.HasSuffix(file.Name(), ".skip") {
skips[strings.TrimSuffix(file.Name(), ".skip")] = true
}
keys[keyIndex] = path
keyIndex++
}
sort.Strings(keys)
var errs []error
for _, path := range keys {
// Disabled files are not just skipped, but actively deleted from the filesystem
if shouldDisableFile(base, path, w.disables) {
if err := w.delete(path); err != nil {
errs = append(errs, errors2.Wrapf(err, "failed to delete %s", path))
}
continue
}
// Skipped files are just ignored
if shouldSkipFile(files[path].Name(), skips) {
continue
}
modTime := files[path].ModTime()
if !force && modTime.Equal(w.modTime[path]) {
continue
}
if err := w.deploy(path, !force); err != nil {
errs = append(errs, errors2.Wrapf(err, "failed to process %s", path))
} else {
w.modTime[path] = modTime
}
}
return merr.NewErrors(errs...)
}
// deploy loads yaml from a manifest on disk, creates an AddOn resource to track its application, and then applies
// all resources contained within to the cluster.
func (w *watcher) deploy(path string, compareChecksum bool) error {
name := basename(path)
addon, err := w.getOrCreateAddon(name)
if err != nil {
return err
}
addon.Spec.Source = path
// Create the new Addon now so that we can use it to report Events when parsing/applying the manifest
// Events need the UID and ObjectRevision set to function properly
if addon.UID == "" {
newAddon, err := w.addons.Create(&addon)
if err != nil {
return err
}
addon = *newAddon
}
content, err := os.ReadFile(path)
if err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ReadManifestFailed", "Read manifest at %q failed: %v", path, err)
return err
}
checksum := checksum(content)
if compareChecksum && checksum == addon.Spec.Checksum {
logrus.Debugf("Skipping existing deployment of %s, check=%v, checksum %s=%s", path, compareChecksum, checksum, addon.Spec.Checksum)
return nil
}
// Attempt to parse the YAML/JSON into objects. Failure at this point would be due to bad file content - not YAML/JSON,
// YAML/JSON that can't be converted to Kubernetes objects, etc.
objects, err := objectSet(content)
if err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ParseManifestFailed", "Parse manifest at %q failed: %v", path, err)
return err
}
// Merge GVK list early for validation
addonGVKs := objects.GVKs()
for _, gvkString := range strings.Split(addon.Annotations[GVKAnnotation], gvkSep) {
if gvk, err := getGVK(gvkString); err == nil {
addonGVKs = append(addonGVKs, *gvk)
}
}
// Ensure that we don't try to prune using GVKs that the server doesn't have.
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
addonGVKs, err = w.validateGVKs(addonGVKs)
if err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ValidateManifestFailed", "Validate GVKs for manifest at %q failed: %v", path, err)
return err
}
// Attempt to apply the changes. Failure at this point would be due to more complicated issues - invalid changes to
// existing objects, rejected by validating webhooks, etc.
// WithGVK searches for objects using both GVKs currently listed in the manifest, as well as GVKs previously
// applied. This ensures that objects don't get orphaned when they are removed from the file - if the apply
// doesn't know to search that GVK for owner references, it won't find and delete them.
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "ApplyingManifest", "Applying manifest at %q", path)
if err := w.apply.WithOwner(&addon).WithGVK(addonGVKs...).Apply(objects); err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ApplyManifestFailed", "Applying manifest at %q failed: %v", path, err)
return err
}
// Emit event, Update Addon checksum and GVKs only if apply was successful
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "AppliedManifest", "Applied manifest at %q", path)
if addon.Annotations == nil {
addon.Annotations = map[string]string{}
}
addon.Spec.Checksum = checksum
addon.Annotations[GVKAnnotation] = getGVKString(objects.GVKs())
_, err = w.addons.Update(&addon)
return err
}
// delete completely removes both a manifest, and any resources that it did or would have created. The manifest is
// parsed, and any resources it specified are deleted. Finally, the file itself is removed from disk.
func (w *watcher) delete(path string) error {
name := basename(path)
addon, err := w.getOrCreateAddon(name)
if err != nil {
return err
}
addonGVKs := []schema.GroupVersionKind{}
for _, gvkString := range strings.Split(addon.Annotations[GVKAnnotation], gvkSep) {
if gvk, err := getGVK(gvkString); err == nil {
addonGVKs = append(addonGVKs, *gvk)
}
}
content, err := os.ReadFile(path)
if err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ReadManifestFailed", "Read manifest at %q failed: %v", path, err)
} else {
if o, err := objectSet(content); err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ParseManifestFailed", "Parse manifest at %q failed: %v", path, err)
} else {
// Search for objects using both GVKs currently listed in the file, as well as GVKs previously applied.
// This ensures that any conflicts between competing deploy controllers are handled properly.
addonGVKs = append(addonGVKs, o.GVKs()...)
}
}
// Ensure that we don't try to delete using GVKs that the server doesn't have.
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
addonGVKs, err = w.validateGVKs(addonGVKs)
if err != nil {
return err
}
// ensure that the addon is completely removed before deleting the objectSet,
// so return when err == nil, otherwise pods may get stuck terminating
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "DeletingManifest", "Deleting manifest at %q", path)
if err := w.addons.Delete(addon.Namespace, addon.Name, &metav1.DeleteOptions{}); err == nil || !apierrors.IsNotFound(err) {
return err
}
// apply an empty set with owner & gvk data to delete
if err := w.apply.WithOwner(&addon).WithGVK(addonGVKs...).ApplyObjects(); err != nil {
return err
}
return os.Remove(path)
}
// getOrCreateAddon attempts to get an Addon by name from the addon namespace, and creates a new one
// if it cannot be found.
func (w *watcher) getOrCreateAddon(name string) (apisv1.Addon, error) {
addon, err := w.addonCache.Get(metav1.NamespaceSystem, name)
if apierrors.IsNotFound(err) {
addon = apisv1.NewAddon(metav1.NamespaceSystem, name, apisv1.Addon{})
} else if err != nil {
return apisv1.Addon{}, err
}
return *addon, nil
}
// validateGVKs removes from the list any GVKs that the server does not support
func (w *watcher) validateGVKs(addonGVKs []schema.GroupVersionKind) ([]schema.GroupVersionKind, error) {
gvks := []schema.GroupVersionKind{}
for _, gvk := range addonGVKs {
found, err := w.serverHasGVK(gvk)
if err != nil {
return gvks, err
}
if found {
gvks = append(gvks, gvk)
}
}
return gvks, nil
}
// serverHasGVK uses a positive cache of GVKs that the cluster is known to have supported at some
// point in time. Note this may fail to filter out GVKs that are removed from the cluster after
// startup (for example, if CRDs are deleted) - but the Wrangler DesiredSet cache has the same issue,
// so it should be fine.
func (w *watcher) serverHasGVK(gvk schema.GroupVersionKind) (bool, error) {
w.Lock()
defer w.Unlock()
if found, ok := w.gvkCache[gvk]; ok {
return found, nil
}
resources, err := w.discovery.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
// Cache all Kinds for this GroupVersion to save on future lookups
for _, resource := range resources.APIResources {
// Resources in the requested GV are returned with empty GroupVersion.
// Subresources with different GV may also be returned, but we aren't interested in those.
if resource.Group == "" && resource.Version == "" {
w.gvkCache[gvk.GroupVersion().WithKind(resource.Kind)] = true
}
}
return w.gvkCache[gvk], nil
}
// objectSet returns a new ObjectSet containing all resources from a given yaml chunk
func objectSet(content []byte) (*objectset.ObjectSet, error) {
objs, err := yamlToObjects(bytes.NewBuffer(content))
if err != nil {
return nil, err
}
return objectset.NewObjectSet(objs...), nil
}
// basename returns a file's basename by returning everything before the first period
func basename(path string) string {
name := filepath.Base(path)
return strings.SplitN(name, ".", 2)[0]
}
// checksum returns the hex-encoded SHA256 sum of a byte slice
func checksum(bytes []byte) string {
d := sha256.Sum256(bytes)
return hex.EncodeToString(d[:])
}
// isEmptyYaml returns true if a chunk of YAML contains nothing but whitespace, comments, or document separators
func isEmptyYaml(yaml []byte) bool {
isEmpty := true
lines := bytes.Split(yaml, []byte("\n"))
for _, l := range lines {
s := bytes.TrimSpace(l)
if string(s) != "---" && !bytes.HasPrefix(s, []byte("#")) && string(s) != "" {
isEmpty = false
}
}
return isEmpty
}
// yamlToObjects returns an object slice yielded from documents in a chunk of YAML
func yamlToObjects(in io.Reader) ([]runtime.Object, error) {
var result []runtime.Object
reader := yamlDecoder.NewYAMLReader(bufio.NewReaderSize(in, 4096))
for {
raw, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if !isEmptyYaml(raw) {
obj, err := toObjects(raw)
if err != nil {
return nil, err
}
result = append(result, obj...)
}
}
return result, nil
}
// Returns one or more objects from a single YAML document
func toObjects(bytes []byte) ([]runtime.Object, error) {
bytes, err := yamlDecoder.ToJSON(bytes)
if err != nil {
return nil, err
}
obj, _, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil)
if err != nil {
return nil, err
}
if l, ok := obj.(*unstructured.UnstructuredList); ok {
var result []runtime.Object
for _, obj := range l.Items {
copy := obj
result = append(result, ©)
}
return result, nil
}
return []runtime.Object{obj}, nil
}
// Returns true if a file should be skipped. Skips anything from the provided skip map,
// anything that is a dotfile, and anything that does not have a json/yaml/yml extension.
func shouldSkipFile(fileName string, skips map[string]bool) bool {
switch {
case strings.HasPrefix(fileName, "."):
return true
case skips[fileName]:
return true
case util.HasSuffixI(fileName, ".yaml", ".yml", ".json"):
return false
default:
return true
}
}
// Returns true if a file should be disabled, by checking the file basename against a disables map.
// only json/yaml files are checked.
func shouldDisableFile(base, fileName string, disables map[string]bool) bool {
// Check to see if the file is in a subdirectory that is in the disables map.
// If a file is nested several levels deep, checks 'parent1', 'parent1/parent2', 'parent1/parent2/parent3', etc.
relFile := strings.TrimPrefix(fileName, base)
namePath := strings.Split(relFile, string(os.PathSeparator))
for i := 1; i < len(namePath); i++ {
subPath := filepath.Join(namePath[0:i]...)
if disables[subPath] {
return true
}
}
if !util.HasSuffixI(fileName, ".yaml", ".yml", ".json") {
return false
}
// Check the basename against the disables map
baseFile := filepath.Base(fileName)
suffix := filepath.Ext(baseFile)
baseName := strings.TrimSuffix(baseFile, suffix)
return disables[baseName]
}
func getGVK(s string) (*schema.GroupVersionKind, error) {
parts := strings.Split(s, ", Kind=")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid GVK format: %s", s)
}
gvk := &schema.GroupVersionKind{}
gvk.Group, gvk.Version = kv.Split(parts[0], "/")
gvk.Kind = parts[1]
return gvk, nil
}
func getGVKString(gvks []schema.GroupVersionKind) string {
strs := make([]string, len(gvks))
for i, gvk := range gvks {
strs[i] = gvk.String()
}
return strings.Join(strs, gvkSep)
}