Skip to content

Commit

Permalink
Add fswatch library to watch and batch filesystem events, use in allo…
Browse files Browse the repository at this point in the history
…cator

This pull refactors the fsnotify code in allocator/main out to a
shared library, and in that shared library implements a batched
notification processor.

Closes #1816: This takes a slightly different approach than specified
in the issue, instead choosing to just delay processing until after a
batch processing period. I chose 1s - it's far longer than necessary,
but still much shorter than it takes for the secret changes to
propagate to the container anyways.

I considered the approach in #1816 of trying to parse the actual
events, but it's too fiddly to get exactly right: e.g. maybe you only
refresh on "write", but then "chmod" could make the file readable
whereas it wasn't before, "rename" could expose a file that wasn't
there before, etc.
  • Loading branch information
zmerlynn committed Nov 8, 2022
1 parent 0dbfa4e commit 437d4fe
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 59 deletions.
86 changes: 27 additions & 59 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"agones.dev/agones/pkg/client/informers/externalversions"
"agones.dev/agones/pkg/gameserverallocations"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/fswatch"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
gw_runtime "github.com/grpc-ecosystem/grpc-gateway/runtime"
Expand All @@ -49,7 +50,6 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"gopkg.in/fsnotify.v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -224,70 +224,38 @@ func main() {
h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime)

if !h.tlsDisabled {
watcherTLS, err := fsnotify.NewWatcher()
cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() {
tlsCert, err := readTLSCert()
if err != nil {
logger.WithError(err).Error("could not load TLS certs; keeping old one")
} else {
h.tlsMutex.Lock()
h.tlsCert = tlsCert
h.tlsMutex.Unlock()
}
logger.Info("TLS certs updated")
})
if err != nil {
logger.WithError(err).Fatal("could not create watcher for tls certs")
}
defer watcherTLS.Close() // nolint: errcheck
if err := watcherTLS.Add(tlsDir); err != nil {
logger.WithError(err).Fatalf("cannot watch folder %s for secret changes", tlsDir)
logger.WithError(err).Fatal("could not create watcher for TLS certs")
}

// Watching for the events in certificate directory for updating certificates, when there is a change
go func() {
for {
select {
// watch for events
case event := <-watcherTLS.Events:
tlsCert, err := readTLSCert()
if err != nil {
logger.WithError(err).Error("could not load TLS cert; keeping old one")
} else {
h.tlsMutex.Lock()
h.tlsCert = tlsCert
h.tlsMutex.Unlock()
}
logger.Infof("Tls directory change event %v", event)

// watch for errors
case err := <-watcherTLS.Errors:
logger.WithError(err).Error("error watching for TLS directory")
}
}
}()
defer cancelTLS()

if !h.mTLSDisabled {
// creates a new file watcher for client certificate folder
watcher, err := fsnotify.NewWatcher()
cancelCert, err := fswatch.Watch(logger, certDir, time.Second, func() {
h.certMutex.Lock()
caCertPool, err := getCACertPool(certDir)
if err != nil {
logger.WithError(err).Error("could not load CA certs; keeping old ones")
} else {
h.caCertPool = caCertPool
}
logger.Info("CA certs updated")
h.certMutex.Unlock()
})
if err != nil {
logger.WithError(err).Fatal("could not create watcher for client certs")
}
defer watcher.Close() // nolint: errcheck
if err := watcher.Add(certDir); err != nil {
logger.WithError(err).Fatalf("cannot watch folder %s for secret changes", certDir)
logger.WithError(err).Fatal("could not create watcher for CA certs")
}

go func() {
for {
select {
// watch for events
case event := <-watcher.Events:
h.certMutex.Lock()
caCertPool, err := getCACertPool(certDir)
if err != nil {
logger.WithError(err).Error("could not load CA certs; keeping old ones")
} else {
h.caCertPool = caCertPool
}
logger.Infof("Certificate directory change event %v", event)
h.certMutex.Unlock()

// watch for errors
case err := <-watcher.Errors:
logger.WithError(err).Error("error watching for certificate directory")
}
}
}()
defer cancelCert()
}
}

Expand Down
81 changes: 81 additions & 0 deletions pkg/util/fswatch/fswatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2022 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package fswatch provies Watch(), a utility function to watch a filesystem path.
package fswatch

import (
"time"

"github.com/sirupsen/logrus"
"gopkg.in/fsnotify.v1"
)

// Watch watched the filesystem path `path`. When anything changes, changes are
// batched for the period `batchFor`, then `processEvent` is called.
//
// Returns a cancel() function to terminate the watch.
func Watch(logger *logrus.Entry, path string, batchFor time.Duration, processEvent func()) (func(), error) {
logger = logger.WithField("path", path)
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
cancelChan := make(chan struct{})
cancel := func() {
close(cancelChan)
_ = watcher.Close()
}
if err := watcher.Add(path); err != nil {
cancel()
return nil, err
}

go batchWatch(batchFor, watcher.Events, watcher.Errors, cancelChan, processEvent, func(error) {
logger.WithError(err).Errorf("error watching path")
})
return cancel, nil
}

// batchWatch: watch for events; when an event occurs, keep draining events for duration `batchFor`, then call processEvent().
// Intended for batching of rapid-fire events where we want to process the batch once, like filesystem update notifications.
func batchWatch(batchFor time.Duration, events chan fsnotify.Event, errors chan error, cancelChan chan struct{}, processEvent func(), onError func(error)) {
// Pattern shamelessly stolen from https://blog.gopheracademy.com/advent-2013/day-24-channel-buffering-patterns/
timer := time.NewTimer(0)
var timerCh <-chan time.Time

for {
select {
// start a timer when an event occurs, otherwise ignore event
case <-events:
if timerCh == nil {
timer.Reset(batchFor)
timerCh = timer.C
}

// on timer, run the batch; nil channels are silently ignored
case <-timerCh:
processEvent()
timerCh = nil

// handle errors
case err := <-errors:
onError(err)

// on cancel, abort
case <-cancelChan:
return
}
}
}
66 changes: 66 additions & 0 deletions pkg/util/fswatch/fswatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2022 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fswatch

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"gopkg.in/fsnotify.v1"
)

func TestBatchWatch(t *testing.T) {
eventChan := make(chan fsnotify.Event)
errorChan := make(chan error)
cancelChan := make(chan struct{})
defer close(cancelChan)

eventOut := make(chan struct{}, 1) // only allow one event
errorCount := 0

go batchWatch(time.Second, eventChan, errorChan, cancelChan, func() {
select {
case eventOut <- struct{}{}:
// capacity
default:
assert.FailNow(t, "second event written - did not want")
}
}, func(error) {
errorCount++
})

drainEventAndErrors := func(wantErrors int) {
timeout := time.NewTimer(2 * time.Second)
select {
case <-eventOut:
case <-timeout.C:
assert.FailNow(t, "no event in 2s")
}
assert.Equal(t, wantErrors, errorCount)
}

for i := 0; i < 10; i++ {
eventChan <- fsnotify.Event{}
}
drainEventAndErrors(0)

for i := 0; i < 10; i++ {
errorChan <- errors.New("some error")
eventChan <- fsnotify.Event{}
}
drainEventAndErrors(10)
}

0 comments on commit 437d4fe

Please sign in to comment.