Skip to content

Commit

Permalink
fix(promtail): Fix bug with Promtail config reloading getting stuck i…
Browse files Browse the repository at this point in the history
…ndefinitely (#12795)

Signed-off-by: Paulin Todev <paulin.todev@gmail.com>
  • Loading branch information
ptodev committed May 9, 2024
1 parent d16a3bf commit 4d761ac
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 6 deletions.
35 changes: 29 additions & 6 deletions clients/pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
FilenameLabel = "filename"
)

var errFileTargetStopped = errors.New("File target is stopped")

// Config describes behavior for Target
type Config struct {
SyncPeriod time.Duration `mapstructure:"sync_period" yaml:"sync_period"`
Expand Down Expand Up @@ -223,6 +225,11 @@ func (t *FileTarget) run() {
}
case <-ticker.C:
err := t.sync()
if errors.Is(err, errFileTargetStopped) {
// This file target has been stopped.
// This is normal and there is no need to log an error.
return
}
if err != nil {
level.Error(t.logger).Log("msg", "error running sync function", "error", err)
}
Expand Down Expand Up @@ -291,14 +298,20 @@ func (t *FileTarget) sync() error {
t.watchesMutex.Lock()
toStartWatching := missing(t.watches, dirs)
t.watchesMutex.Unlock()
t.startWatching(toStartWatching)
err := t.startWatching(toStartWatching)
if errors.Is(err, errFileTargetStopped) {
return err
}

// Remove any directories which no longer need watching.
t.watchesMutex.Lock()
toStopWatching := missing(dirs, t.watches)
t.watchesMutex.Unlock()

t.stopWatching(toStopWatching)
err = t.stopWatching(toStopWatching)
if errors.Is(err, errFileTargetStopped) {
return err
}

// fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves.
t.watchesMutex.Lock()
Expand All @@ -321,32 +334,42 @@ func (t *FileTarget) sync() error {
return nil
}

func (t *FileTarget) startWatching(dirs map[string]struct{}) {
func (t *FileTarget) startWatching(dirs map[string]struct{}) error {
for dir := range dirs {
if _, ok := t.getWatch(dir); ok {
continue
}

level.Info(t.logger).Log("msg", "watching new directory", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
select {
case <-t.quit:
return errFileTargetStopped
case t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStart,
}:
}
}
return nil
}

func (t *FileTarget) stopWatching(dirs map[string]struct{}) {
func (t *FileTarget) stopWatching(dirs map[string]struct{}) error {
for dir := range dirs {
if _, ok := t.getWatch(dir); !ok {
continue
}

level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
select {
case <-t.quit:
return errFileTargetStopped
case t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStop,
}:
}
}
return nil
}

func (t *FileTarget) startTailing(ps []string) {
Expand Down
87 changes: 87 additions & 0 deletions clients/pkg/promtail/targets/file/filetarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,93 @@ func TestFileTarget_StopsTailersCleanly_Parallel(t *testing.T) {
ps.Stop()
}

// Make sure that Stop() doesn't hang if FileTarget is waiting on a channel send.
func TestFileTarget_StopAbruptly(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

dirName := newTestLogDirectories(t)
positionsFileName := filepath.Join(dirName, "positions.yml")
logDir1 := filepath.Join(dirName, "log1")
logDir2 := filepath.Join(dirName, "log2")
logDir3 := filepath.Join(dirName, "log3")

logfile1 := filepath.Join(logDir1, "test1.log")
logfile2 := filepath.Join(logDir2, "test1.log")
logfile3 := filepath.Join(logDir3, "test1.log")

ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Millisecond,
PositionsFile: positionsFileName,
})
require.NoError(t, err)

client := fake.New(func() {})
defer client.Stop()

// fakeHandler has to be a buffered channel so that we can call the len() function on it.
// We need to call len() to check if the channel is full.
fakeHandler := make(chan fileTargetEvent, 1)
pathToWatch := filepath.Join(dirName, "**", "*.log")
registry := prometheus.NewRegistry()
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

// Create a directory, still nothing is watched.
err = os.MkdirAll(logDir1, 0750)
assert.NoError(t, err)
_, err = os.Create(logfile1)
assert.NoError(t, err)

// There should be only one WatchStart event in the channel so far.
ftEvent := <-fakeHandler
require.Equal(t, fileTargetEventWatchStart, ftEvent.eventType)

requireEventually(t, func() bool {
return target.getReadersLen() == 1
}, "expected 1 tailer to be created")

require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP promtail_files_active_total Number of active files.
# TYPE promtail_files_active_total gauge
promtail_files_active_total 1
`), "promtail_files_active_total"))

// Create two directories - one more than the buffer of fakeHandler,
// so that the file target hands until we call Stop().
err = os.MkdirAll(logDir2, 0750)
assert.NoError(t, err)
_, err = os.Create(logfile2)
assert.NoError(t, err)

err = os.MkdirAll(logDir3, 0750)
assert.NoError(t, err)
_, err = os.Create(logfile3)
assert.NoError(t, err)

// Wait until the file target is waiting on a channel send due to a full channel buffer.
requireEventually(t, func() bool {
return len(fakeHandler) == 1
}, "expected an event in the fakeHandler channel")

// If FileHandler works well, then it will stop waiting for
// the blocked fakeHandler and stop cleanly.
// This is why this time we don't drain fakeHandler.
requireEventually(t, func() bool {
target.Stop()
ps.Stop()
return true
}, "expected FileTarget not to hang")

require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP promtail_files_active_total Number of active files.
# TYPE promtail_files_active_total gauge
promtail_files_active_total 0
`), "promtail_files_active_total"))
}

func TestFileTargetPathExclusion(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
Expand Down

0 comments on commit 4d761ac

Please sign in to comment.