Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix auto_removal feature so log lines are NOT dropped from old file #452

Merged
merged 8 commits into from
May 16, 2022
19 changes: 9 additions & 10 deletions cmd/config-translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package main

import (
"fmt"
"io/ioutil"
"regexp"
"testing"
Expand Down Expand Up @@ -34,13 +33,13 @@ func checkIfSchemaValidateAsExpected(t *testing.T, jsonInputPath string, shouldS
} else {
errorDetails := result.Errors()
for _, errorDetail := range errorDetails {
fmt.Printf("String: %v \n", errorDetail.String())
fmt.Printf("Context: %v \n", errorDetail.Context().String())
fmt.Printf("Description: %v \n", errorDetail.Description())
fmt.Printf("Details: %v \n", errorDetail.Details())
fmt.Printf("Field: %v \n", errorDetail.Field())
fmt.Printf("Type: %v \n", errorDetail.Type())
fmt.Printf("Value: %v \n", errorDetail.Value())
t.Logf("String: %v \n", errorDetail.String())
t.Logf("Context: %v \n", errorDetail.Context().String())
t.Logf("Description: %v \n", errorDetail.Description())
t.Logf("Details: %v \n", errorDetail.Details())
t.Logf("Field: %v \n", errorDetail.Field())
t.Logf("Type: %v \n", errorDetail.Type())
t.Logf("Value: %v \n", errorDetail.Value())
if _, ok := actualErrorMap[errorDetail.Type()]; ok {
actualErrorMap[errorDetail.Type()] += 1
} else {
Expand Down Expand Up @@ -185,9 +184,9 @@ func TestSampleConfigSchema(t *testing.T) {
re := regexp.MustCompile(".json")
for _, file := range files {
if re.MatchString(file.Name()) {
fmt.Printf("Validating ../../translator/totomlconfig/sampleConfig/%s\n", file.Name())
t.Logf("Validating ../../translator/totomlconfig/sampleConfig/%s\n", file.Name())
checkIfSchemaValidateAsExpected(t, "../../translator/totomlconfig/sampleConfig/"+file.Name(), true, map[string]int{})
fmt.Printf("Validated ../../translator/totomlconfig/sampleConfig/%s\n", file.Name())
t.Logf("Validated ../../translator/totomlconfig/sampleConfig/%s\n", file.Name())
}
}
} else {
Expand Down
3 changes: 1 addition & 2 deletions logger/lumberjack_logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package logger

import (
"fmt"
"io"
"io/ioutil"
"log"
Expand Down Expand Up @@ -141,7 +140,7 @@ func TestWriteToFileInRotation(t *testing.T) {

files, _ = ioutil.ReadDir(tempDir)
for _, file := range files {
fmt.Printf("%v/%v, size:%v\n", tempDir, file.Name(), file.Size())
t.Logf("%v/%v, size:%v\n", tempDir, file.Name(), file.Size())
}

assert.Equal(t, 4, len(files))
Expand Down
6 changes: 4 additions & 2 deletions plugins/inputs/logfile/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc {

if _, ok := dests[filename]; ok {
continue
} else if fileconfig.AutoRemoval { // This logic means auto_removal does not work with publish_multi_logs
} else if fileconfig.AutoRemoval {
// This logic means auto_removal does not work with publish_multi_logs
for _, dst := range dests {
dst.tailer.StopAtEOF() // Stop all other tailers in favor of the newly found file
// Stop all other tailers in favor of the newly found file
dst.tailer.StopAtEOF()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this no longer blocks!

}
}

Expand Down
168 changes: 93 additions & 75 deletions plugins/inputs/logfile/logfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,100 +358,118 @@ func TestLogsFileRemove(t *testing.T) {
tt.Stop()
}

//When another file is created for the same file config and the file config has auto_removal as true, the old files will stop at EOF and removed afterwards
func TestLogsFileAutoRemoval(t *testing.T) {
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
var wg sync.WaitGroup

multilineWaitPeriod = 10 * time.Millisecond
logEntryString := "anything"
filePrefix := "file_auto_removal"
tmpfile1, err := createTempFile("", filePrefix)
require.NoError(t, err)

_, err = tmpfile1.WriteString(logEntryString + "\n")
require.NoError(t, err)
tmpfile1.Sync()
tmpfile1.Close()

tt := NewLogFile()
tt.Log = TestLogger{t}
tt.FileConfig = []FileConfig{{
FilePath: filepath.Join(filepath.Dir(tmpfile1.Name()), filePrefix+"*"),
func setupLogFileForTest(t *testing.T, file *os.File, prefix string) *LogFile {
logFile := NewLogFile()
logFile.Log = TestLogger{t}
logFile.FileConfig = []FileConfig{{
FilePath: filepath.Join(filepath.Dir(file.Name()), prefix+"*"),
FromBeginning: true,
AutoRemoval: true,
}}
tt.FileConfig[0].init()
tt.started = true
logFile.FileConfig[0].init()
logFile.started = true
return logFile
}

lsrcs := tt.FindLogSrc()
if len(lsrcs) != 1 {
t.Fatalf("%v log src was returned when 1 should be available", len(lsrcs))
}
func makeTempFile(t *testing.T, prefix string) *os.File {
file, err := createTempFile("", prefix)
t.Logf("Created temp file, %s\n", file.Name())
require.NoError(t, err)
return file
}

lsrc := lsrcs[0]
// getLogSrc returns a LogSrc from the given LogFile, and the channel for output.
// Verifies 1 and only 1 LogSrc is discovered.
func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent) {
logSources := logFile.FindLogSrc()
require.Equal(t, 1, len(logSources))
logSource := logSources[0]
evts := make(chan logs.LogEvent)
lsrc.SetOutput(func(e logs.LogEvent) {
logSource.SetOutput(func(e logs.LogEvent) {
if e != nil {
evts <- e
}
})
return &logSource, evts
}

var tmpfile2 *os.File
defer func() {
if tmpfile2 != nil {
os.Remove(tmpfile2.Name())
}
}()

wg.Add(1)

go func() {
defer wg.Done()

// create a new file matching configured pattern
tmpfile2, err = createTempFile("", filePrefix)
require.NoError(t, err)

_, err = tmpfile2.WriteString(logEntryString + "\n")
func writeLines(t *testing.T, file *os.File, numLines int, msg string) {
t.Log("Fill temp file with sufficient lines to be read.")
for i := 0; i < numLines; i++ {
_, err := file.WriteString(msg + "\n")
require.NoError(t, err)

}()

e := <-evts
if e.Message() != logEntryString {
t.Errorf("Wrong log found from first file: \n%v\nExpecting:\n%v\n", e.Message(), logEntryString)
}
defer lsrc.Stop()
}

for {
lsrcs = tt.FindLogSrc()
if len(lsrcs) > 0 {
break
// createWriteRead creates a temp file, writes to it, then verifies events
// are received. If isParent is true, then spawn a 2nd goroutine for createWriteRead.
// Close the given channel when complete to let caller know it was successful.
func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bool, isParent bool) {
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
// Let caller know when the goroutine is done.
defer close(done)
// done2 is only passed to child if this is the parent.
done2 := make(chan bool)
file := makeTempFile(t, prefix)
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
if isParent {
logFile = setupLogFileForTest(t, file, prefix)
defer logFile.Stop()
}
logSrc, evts := getLogSrc(t, logFile)
defer (*logSrc).Stop()
defer close(evts)
// Choose a large enough number of lines so that even high-spec hosts will not
// complete receiving logEvents before the 2nd createWriteRead() goroutine begins.
const numLines int = 100000
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
const msg string = "this is the best log line ever written to a file"
writeLines(t, file, numLines, msg)
file.Close()
if !isParent {
// Child creates 2nd temp file which is NOT auto removed.
defer os.Remove(file.Name())
}
t.Log("Verify every line written to the temp file is received.")
for i := 0; i < numLines; i++ {
logEvent := <- evts
require.Equal(t, msg, logEvent.Message())
if i != numLines / 2 {
continue
}
// Halfway through start another goroutine to create another temp file.
if isParent {
go createWriteRead(t, prefix, logFile, done2, false)
}
time.Sleep(1 * time.Second)
}

lsrc = lsrcs[0]
lsrc.SetOutput(func(e logs.LogEvent) {
if e != nil {
evts <- e
// Only wait for child if it was spawned
if isParent {
t.Log("Verify child completed.")
select {
case <-done2:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
require.Fail(t, "timeout waiting for child")
}
})

e = <-evts
if e.Message() != logEntryString {
t.Errorf("Wrong log found from 2nd file: \n% x\nExpecting:\n% x\n", e.Message(), logEntryString)
t.Log("Verify 1st temp file was auto deleted.")
_, err := os.Open(file.Name())
assert.True(t, os.IsNotExist(err))
}
}

//Use Wait Group to avoid race condition between opening tmpfile2 to delete tmpfile1 with auto_removal and opening tmpfile1
//to check it exist
wg.Wait()

_, err = os.Open(tmpfile1.Name())
assert.True(t, os.IsNotExist(err))

lsrc.Stop()
tt.Stop()
// TestLogsFileAutoRemoval verifies when a new file matching the configured
// FilePath is discovered, the old file will be automatically deleted after
// being read to the end-of-file.
func TestLogsFileAutoRemoval(t *testing.T) {
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
// Override global in tailersrc.go.
multilineWaitPeriod = 10 * time.Millisecond
prefix := "file_auto_removal"
done := make(chan bool)
createWriteRead(t, prefix, nil, done, true)
t.Log("Verify 1st tmp file created and discovered.")
select {
case <-done:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
require.Fail(t, "timeout waiting for 2nd temp file.")
}
}

func TestLogsTimestampAsMultilineStarter(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions plugins/inputs/logfile/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (tail *Tail) Stop() error {
}

// StopAtEOF stops tailing as soon as the end of the file is reached.
// Blocks until tailer is dead and returns reason for death.
func (tail *Tail) StopAtEOF() error {
tail.Kill(errStopAtEOF)
return tail.Wait()
Expand Down Expand Up @@ -390,7 +391,7 @@ func (tail *Tail) tailFileSync() {
if errReadLine == nil {
tail.sendLine(line, tail.curOffset)
} else {
break
return
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
}
}
} else if err != ErrStop {
Expand Down Expand Up @@ -465,7 +466,6 @@ func (tail *Tail) waitForChanges() error {
case <-tail.Dying():
return ErrStop
}

}

func (tail *Tail) openReader() {
Expand Down Expand Up @@ -510,8 +510,13 @@ func (tail *Tail) sendLine(line string, offset int64) bool {
select {
case tail.Lines <- &Line{line, now, nil, offset}:
case <-tail.Dying():
tail.dropCnt += len(lines) - i
return true
if tail.Err() == errStopAtEOF {
// Try sending, even if it blocks.
tail.Lines <- &Line{line, now, nil, offset}
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
} else {
khanhntd marked this conversation as resolved.
Show resolved Hide resolved
tail.dropCnt += len(lines) - i
return true
}
}
}

Expand Down
46 changes: 37 additions & 9 deletions plugins/inputs/logfile/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const linesWrittenToFile int = 10

type testLogger struct {
debugs, infos, warns, errors []string
}
Expand Down Expand Up @@ -71,22 +75,46 @@ func TestNotTailedCompeletlyLogging(t *testing.T) {
verifyTailerExited(t, tail)
}

func TestDroppedLinesWhenStopAtEOFLogging(t *testing.T) {
tmpfile, tail, tlog := setup(t)
func TestStopAtEOF(t *testing.T) {
tmpfile, tail, _ := setup(t)
defer tearDown(tmpfile)

readThreelines(t, tail)

// Ask the tailer to StopAtEOF
tail.StopAtEOF()
// Since StopAtEOF() will block until the EOF is reached, run it in a goroutine.
done := make(chan bool)
go func() {
tail.StopAtEOF()
close(done)
}()

// Verify the goroutine is blocked indefinitely.
select {
case <-done:
t.Fatalf("StopAtEOF() completed unexpectedly")
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
case <-time.After(time.Second * 1):
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
t.Log("timeout waiting for StopAtEOF() (as expected)")
}

assert.Equal(t, errStopAtEOF, tail.Err())

// Read to EOF
for i := 0; i < linesWrittenToFile - 3; i++ {
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
<-tail.Lines
}

// Verify StopAtEOF() has completed.
select {
case <-done:
t.Log("StopAtEOF() completed (as expected)")
case <- time.After(time.Second * 1):
t.Fatalf("StopAtEOF() has not completed")
}

// Then remove the tmpfile
if err := os.Remove(tmpfile.Name()); err != nil {
t.Fatalf("failed to remove temporary log file %v: %v", tmpfile.Name(), err)
}
// Wait until the tailer should have been terminated
time.Sleep(exitOnDeletionWaitDuration + exitOnDeletionCheckDuration + 1*time.Second)

verifyTailerLogging(t, tlog, "Dropped 7 lines for stopped tail for file "+tmpfile.Name())
verifyTailerExited(t, tail)
}

Expand All @@ -97,7 +125,7 @@ func setup(t *testing.T) (*os.File, *Tail, *testLogger) {
}

// Write the file content
for i := 0; i < 10; i++ {
for i := 0; i < linesWrittenToFile; i++ {
if _, err := fmt.Fprintf(tmpfile, "%v some log line\n", time.Now()); err != nil {
log.Fatal(err)
}
Expand Down
Loading