Skip to content

Commit

Permalink
Fix auto_removal feature so log lines are NOT dropped from old file (#…
Browse files Browse the repository at this point in the history
…452)

* Fix Tail.sendLine() to NOT drop log lines when the tailer is dying because of errStopAtEOF.
* Simplify TestLogsFileAutoRemoval().
  • Loading branch information
adam-mateen authored May 16, 2022
1 parent dd62a46 commit 8262fbe
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 117 deletions.
19 changes: 9 additions & 10 deletions cmd/config-translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package main

import (
"fmt"
"io/ioutil"
"regexp"
"testing"
Expand Down Expand Up @@ -34,13 +33,13 @@ func checkIfSchemaValidateAsExpected(t *testing.T, jsonInputPath string, shouldS
} else {
errorDetails := result.Errors()
for _, errorDetail := range errorDetails {
fmt.Printf("String: %v \n", errorDetail.String())
fmt.Printf("Context: %v \n", errorDetail.Context().String())
fmt.Printf("Description: %v \n", errorDetail.Description())
fmt.Printf("Details: %v \n", errorDetail.Details())
fmt.Printf("Field: %v \n", errorDetail.Field())
fmt.Printf("Type: %v \n", errorDetail.Type())
fmt.Printf("Value: %v \n", errorDetail.Value())
t.Logf("String: %v \n", errorDetail.String())
t.Logf("Context: %v \n", errorDetail.Context().String())
t.Logf("Description: %v \n", errorDetail.Description())
t.Logf("Details: %v \n", errorDetail.Details())
t.Logf("Field: %v \n", errorDetail.Field())
t.Logf("Type: %v \n", errorDetail.Type())
t.Logf("Value: %v \n", errorDetail.Value())
if _, ok := actualErrorMap[errorDetail.Type()]; ok {
actualErrorMap[errorDetail.Type()] += 1
} else {
Expand Down Expand Up @@ -185,9 +184,9 @@ func TestSampleConfigSchema(t *testing.T) {
re := regexp.MustCompile(".json")
for _, file := range files {
if re.MatchString(file.Name()) {
fmt.Printf("Validating ../../translator/totomlconfig/sampleConfig/%s\n", file.Name())
t.Logf("Validating ../../translator/totomlconfig/sampleConfig/%s\n", file.Name())
checkIfSchemaValidateAsExpected(t, "../../translator/totomlconfig/sampleConfig/"+file.Name(), true, map[string]int{})
fmt.Printf("Validated ../../translator/totomlconfig/sampleConfig/%s\n", file.Name())
t.Logf("Validated ../../translator/totomlconfig/sampleConfig/%s\n", file.Name())
}
}
} else {
Expand Down
3 changes: 1 addition & 2 deletions logger/lumberjack_logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package logger

import (
"fmt"
"io"
"io/ioutil"
"log"
Expand Down Expand Up @@ -141,7 +140,7 @@ func TestWriteToFileInRotation(t *testing.T) {

files, _ = ioutil.ReadDir(tempDir)
for _, file := range files {
fmt.Printf("%v/%v, size:%v\n", tempDir, file.Name(), file.Size())
t.Logf("%v/%v, size:%v\n", tempDir, file.Name(), file.Size())
}

assert.Equal(t, 4, len(files))
Expand Down
6 changes: 4 additions & 2 deletions plugins/inputs/logfile/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc {

if _, ok := dests[filename]; ok {
continue
} else if fileconfig.AutoRemoval { // This logic means auto_removal does not work with publish_multi_logs
} else if fileconfig.AutoRemoval {
// This logic means auto_removal does not work with publish_multi_logs
for _, dst := range dests {
dst.tailer.StopAtEOF() // Stop all other tailers in favor of the newly found file
// Stop all other tailers in favor of the newly found file
dst.tailer.StopAtEOF()
}
}

Expand Down
168 changes: 93 additions & 75 deletions plugins/inputs/logfile/logfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,100 +358,118 @@ func TestLogsFileRemove(t *testing.T) {
tt.Stop()
}

//When another file is created for the same file config and the file config has auto_removal as true, the old files will stop at EOF and removed afterwards
func TestLogsFileAutoRemoval(t *testing.T) {
var wg sync.WaitGroup

multilineWaitPeriod = 10 * time.Millisecond
logEntryString := "anything"
filePrefix := "file_auto_removal"
tmpfile1, err := createTempFile("", filePrefix)
require.NoError(t, err)

_, err = tmpfile1.WriteString(logEntryString + "\n")
require.NoError(t, err)
tmpfile1.Sync()
tmpfile1.Close()

tt := NewLogFile()
tt.Log = TestLogger{t}
tt.FileConfig = []FileConfig{{
FilePath: filepath.Join(filepath.Dir(tmpfile1.Name()), filePrefix+"*"),
func setupLogFileForTest(t *testing.T, file *os.File, prefix string) *LogFile {
logFile := NewLogFile()
logFile.Log = TestLogger{t}
logFile.FileConfig = []FileConfig{{
FilePath: filepath.Join(filepath.Dir(file.Name()), prefix+"*"),
FromBeginning: true,
AutoRemoval: true,
}}
tt.FileConfig[0].init()
tt.started = true
logFile.FileConfig[0].init()
logFile.started = true
return logFile
}

lsrcs := tt.FindLogSrc()
if len(lsrcs) != 1 {
t.Fatalf("%v log src was returned when 1 should be available", len(lsrcs))
}
func makeTempFile(t *testing.T, prefix string) *os.File {
file, err := createTempFile("", prefix)
t.Logf("Created temp file, %s\n", file.Name())
require.NoError(t, err)
return file
}

lsrc := lsrcs[0]
// getLogSrc returns a LogSrc from the given LogFile, and the channel for output.
// Verifies 1 and only 1 LogSrc is discovered.
func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent) {
logSources := logFile.FindLogSrc()
require.Equal(t, 1, len(logSources))
logSource := logSources[0]
evts := make(chan logs.LogEvent)
lsrc.SetOutput(func(e logs.LogEvent) {
logSource.SetOutput(func(e logs.LogEvent) {
if e != nil {
evts <- e
}
})
return &logSource, evts
}

var tmpfile2 *os.File
defer func() {
if tmpfile2 != nil {
os.Remove(tmpfile2.Name())
}
}()

wg.Add(1)

go func() {
defer wg.Done()

// create a new file matching configured pattern
tmpfile2, err = createTempFile("", filePrefix)
require.NoError(t, err)

_, err = tmpfile2.WriteString(logEntryString + "\n")
func writeLines(t *testing.T, file *os.File, numLines int, msg string) {
t.Log("Fill temp file with sufficient lines to be read.")
for i := 0; i < numLines; i++ {
_, err := file.WriteString(msg + "\n")
require.NoError(t, err)

}()

e := <-evts
if e.Message() != logEntryString {
t.Errorf("Wrong log found from first file: \n%v\nExpecting:\n%v\n", e.Message(), logEntryString)
}
defer lsrc.Stop()
}

for {
lsrcs = tt.FindLogSrc()
if len(lsrcs) > 0 {
break
// createWriteRead creates a temp file, writes to it, then verifies events
// are received. If isParent is true, then spawn a 2nd goroutine for createWriteRead.
// Close the given channel when complete to let caller know it was successful.
func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bool, isParent bool) {
// Let caller know when the goroutine is done.
defer close(done)
// done2 is only passed to child if this is the parent.
done2 := make(chan bool)
file := makeTempFile(t, prefix)
if isParent {
logFile = setupLogFileForTest(t, file, prefix)
defer logFile.Stop()
}
logSrc, evts := getLogSrc(t, logFile)
defer (*logSrc).Stop()
defer close(evts)
// Choose a large enough number of lines so that even high-spec hosts will not
// complete receiving logEvents before the 2nd createWriteRead() goroutine begins.
const numLines int = 100000
const msg string = "this is the best log line ever written to a file"
writeLines(t, file, numLines, msg)
file.Close()
if !isParent {
// Child creates 2nd temp file which is NOT auto removed.
defer os.Remove(file.Name())
}
t.Log("Verify every line written to the temp file is received.")
for i := 0; i < numLines; i++ {
logEvent := <- evts
require.Equal(t, msg, logEvent.Message())
if i != numLines / 2 {
continue
}
// Halfway through start another goroutine to create another temp file.
if isParent {
go createWriteRead(t, prefix, logFile, done2, false)
}
time.Sleep(1 * time.Second)
}

lsrc = lsrcs[0]
lsrc.SetOutput(func(e logs.LogEvent) {
if e != nil {
evts <- e
// Only wait for child if it was spawned
if isParent {
t.Log("Verify child completed.")
select {
case <-done2:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
require.Fail(t, "timeout waiting for child")
}
})

e = <-evts
if e.Message() != logEntryString {
t.Errorf("Wrong log found from 2nd file: \n% x\nExpecting:\n% x\n", e.Message(), logEntryString)
t.Log("Verify 1st temp file was auto deleted.")
_, err := os.Open(file.Name())
assert.True(t, os.IsNotExist(err))
}
}

//Use Wait Group to avoid race condition between opening tmpfile2 to delete tmpfile1 with auto_removal and opening tmpfile1
//to check it exist
wg.Wait()

_, err = os.Open(tmpfile1.Name())
assert.True(t, os.IsNotExist(err))

lsrc.Stop()
tt.Stop()
// TestLogsFileAutoRemoval verifies when a new file matching the configured
// FilePath is discovered, the old file will be automatically deleted after
// being read to the end-of-file.
func TestLogsFileAutoRemoval(t *testing.T) {
// Override global in tailersrc.go.
multilineWaitPeriod = 10 * time.Millisecond
prefix := "file_auto_removal"
done := make(chan bool)
createWriteRead(t, prefix, nil, done, true)
t.Log("Verify 1st tmp file created and discovered.")
select {
case <-done:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
require.Fail(t, "timeout waiting for 2nd temp file.")
}
}

func TestLogsTimestampAsMultilineStarter(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions plugins/inputs/logfile/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (tail *Tail) Stop() error {
}

// StopAtEOF stops tailing as soon as the end of the file is reached.
// Blocks until tailer is dead and returns reason for death.
func (tail *Tail) StopAtEOF() error {
tail.Kill(errStopAtEOF)
return tail.Wait()
Expand Down Expand Up @@ -394,7 +395,7 @@ func (tail *Tail) tailFileSync() {
if errReadLine == nil {
tail.sendLine(line, tail.curOffset)
} else {
break
return
}
}
} else if err != ErrStop {
Expand Down Expand Up @@ -469,7 +470,6 @@ func (tail *Tail) waitForChanges() error {
case <-tail.Dying():
return ErrStop
}

}

func (tail *Tail) openReader() {
Expand Down Expand Up @@ -514,8 +514,13 @@ func (tail *Tail) sendLine(line string, offset int64) bool {
select {
case tail.Lines <- &Line{line, now, nil, offset}:
case <-tail.Dying():
tail.dropCnt += len(lines) - i
return true
if tail.Err() == errStopAtEOF {
// Try sending, even if it blocks.
tail.Lines <- &Line{line, now, nil, offset}
} else {
tail.dropCnt += len(lines) - i
return true
}
}
}

Expand Down
46 changes: 37 additions & 9 deletions plugins/inputs/logfile/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const linesWrittenToFile int = 10

type testLogger struct {
debugs, infos, warns, errors []string
}
Expand Down Expand Up @@ -71,22 +75,46 @@ func TestNotTailedCompeletlyLogging(t *testing.T) {
verifyTailerExited(t, tail)
}

func TestDroppedLinesWhenStopAtEOFLogging(t *testing.T) {
tmpfile, tail, tlog := setup(t)
func TestStopAtEOF(t *testing.T) {
tmpfile, tail, _ := setup(t)
defer tearDown(tmpfile)

readThreelines(t, tail)

// Ask the tailer to StopAtEOF
tail.StopAtEOF()
// Since StopAtEOF() will block until the EOF is reached, run it in a goroutine.
done := make(chan bool)
go func() {
tail.StopAtEOF()
close(done)
}()

// Verify the goroutine is blocked indefinitely.
select {
case <-done:
t.Fatalf("StopAtEOF() completed unexpectedly")
case <-time.After(time.Second * 1):
t.Log("timeout waiting for StopAtEOF() (as expected)")
}

assert.Equal(t, errStopAtEOF, tail.Err())

// Read to EOF
for i := 0; i < linesWrittenToFile - 3; i++ {
<-tail.Lines
}

// Verify StopAtEOF() has completed.
select {
case <-done:
t.Log("StopAtEOF() completed (as expected)")
case <- time.After(time.Second * 1):
t.Fatalf("StopAtEOF() has not completed")
}

// Then remove the tmpfile
if err := os.Remove(tmpfile.Name()); err != nil {
t.Fatalf("failed to remove temporary log file %v: %v", tmpfile.Name(), err)
}
// Wait until the tailer should have been terminated
time.Sleep(exitOnDeletionWaitDuration + exitOnDeletionCheckDuration + 1*time.Second)

verifyTailerLogging(t, tlog, "Dropped 7 lines for stopped tail for file "+tmpfile.Name())
verifyTailerExited(t, tail)
}

Expand All @@ -97,7 +125,7 @@ func setup(t *testing.T) (*os.File, *Tail, *testLogger) {
}

// Write the file content
for i := 0; i < 10; i++ {
for i := 0; i < linesWrittenToFile; i++ {
if _, err := fmt.Fprintf(tmpfile, "%v some log line\n", time.Now()); err != nil {
log.Fatal(err)
}
Expand Down
Loading

0 comments on commit 8262fbe

Please sign in to comment.