Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix auto_removal feature so log lines are NOT dropped from old file #452

Merged
merged 8 commits into from
May 16, 2022
6 changes: 4 additions & 2 deletions plugins/inputs/logfile/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc {

if _, ok := dests[filename]; ok {
continue
} else if fileconfig.AutoRemoval { // This logic means auto_removal does not work with publish_multi_logs
} else if fileconfig.AutoRemoval {
// This logic means auto_removal does not work with publish_multi_logs
for _, dst := range dests {
dst.tailer.StopAtEOF() // Stop all other tailers in favor of the newly found file
// Stop all other tailers in favor of the newly found file
dst.tailer.StopAtEOF()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this no longer blocks!

}
}

Expand Down
166 changes: 91 additions & 75 deletions plugins/inputs/logfile/logfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,100 +358,116 @@ func TestLogsFileRemove(t *testing.T) {
tt.Stop()
}

//When another file is created for the same file config and the file config has auto_removal as true, the old files will stop at EOF and removed afterwards
func TestLogsFileAutoRemoval(t *testing.T) {
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
var wg sync.WaitGroup

multilineWaitPeriod = 10 * time.Millisecond
logEntryString := "anything"
filePrefix := "file_auto_removal"
tmpfile1, err := createTempFile("", filePrefix)
require.NoError(t, err)

_, err = tmpfile1.WriteString(logEntryString + "\n")
require.NoError(t, err)
tmpfile1.Sync()
tmpfile1.Close()

tt := NewLogFile()
tt.Log = TestLogger{t}
tt.FileConfig = []FileConfig{{
FilePath: filepath.Join(filepath.Dir(tmpfile1.Name()), filePrefix+"*"),
func setupLogFileForTest(t *testing.T, file *os.File, prefix string) *LogFile {
logFile := NewLogFile()
logFile.Log = TestLogger{t}
logFile.FileConfig = []FileConfig{{
FilePath: filepath.Join(filepath.Dir(file.Name()), prefix+"*"),
FromBeginning: true,
AutoRemoval: true,
}}
tt.FileConfig[0].init()
tt.started = true
logFile.FileConfig[0].init()
logFile.started = true
return logFile
}

lsrcs := tt.FindLogSrc()
if len(lsrcs) != 1 {
t.Fatalf("%v log src was returned when 1 should be available", len(lsrcs))
}
func makeTempFile(t *testing.T, prefix string) *os.File {
file, err := createTempFile("", prefix)
t.Logf("Created temp file, %s\n", file.Name())
require.NoError(t, err)
return file
}

lsrc := lsrcs[0]
// getLogSrc returns a LogSrc from the given LogFile, and the channel for output.
// Verifies 1 and only 1 LogSrc is discovered.
func getLogSrc(t *testing.T, logFile *LogFile) (*logs.LogSrc, chan logs.LogEvent) {
logSources := logFile.FindLogSrc()
require.Equal(t, 1, len(logSources))
logSource := logSources[0]
evts := make(chan logs.LogEvent)
lsrc.SetOutput(func(e logs.LogEvent) {
logSource.SetOutput(func(e logs.LogEvent) {
if e != nil {
evts <- e
}
})
return &logSource, evts
}

var tmpfile2 *os.File
defer func() {
if tmpfile2 != nil {
os.Remove(tmpfile2.Name())
}
}()

wg.Add(1)

go func() {
defer wg.Done()

// create a new file matching configured pattern
tmpfile2, err = createTempFile("", filePrefix)
require.NoError(t, err)

_, err = tmpfile2.WriteString(logEntryString + "\n")
func writeLines(t *testing.T, file *os.File, numLines int, msg string) {
t.Log("Fill temp file with sufficient lines to be read.")
for i := 0; i < numLines; i++ {
_, err := file.WriteString(msg + "\n")
require.NoError(t, err)

}()

e := <-evts
if e.Message() != logEntryString {
t.Errorf("Wrong log found from first file: \n%v\nExpecting:\n%v\n", e.Message(), logEntryString)
}
defer lsrc.Stop()
}

for {
lsrcs = tt.FindLogSrc()
if len(lsrcs) > 0 {
break
// createWriteRead creates a temp file, writes to it, then verifies events
// are received. If isParent is true, then spawn a 2nd goroutine for createWriteRead.
// Close the given channel when complete to let caller know it was successful.
func createWriteRead(t *testing.T, prefix string, logFile *LogFile, done chan bool, isParent bool) {
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
// Let caller know when the goroutine is done.
defer close(done)
// done2 is only passed to child if this is the parent.
done2 := make(chan bool)
file := makeTempFile(t, prefix)
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
if isParent {
logFile = setupLogFileForTest(t, file, prefix)
defer logFile.Stop()
}
logSrc, evts := getLogSrc(t, logFile)
defer (*logSrc).Stop()
defer close(evts)
const numLines int = 1000
const msg string = "this is the best log line ever written to a file"
writeLines(t, file, numLines, msg)
file.Close()
if !isParent {
// Child creates 2nd temp file which is NOT auto removed.
defer os.Remove(file.Name())
}
t.Log("Verify every line written to the temp file is received.")
for i := 0; i < numLines; i++ {
logEvent := <- evts
require.Equal(t, msg, logEvent.Message())
if i != numLines / 2 {
continue
}
// Halfway through start another goroutine to create another temp file.
if isParent {
go createWriteRead(t, prefix, logFile, done2, false)
}
time.Sleep(1 * time.Second)
}

lsrc = lsrcs[0]
lsrc.SetOutput(func(e logs.LogEvent) {
if e != nil {
evts <- e
// Only wait for child if it was spawned
if isParent {
t.Log("Verify child completed.")
select {
case <-done2:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
require.Fail(t, "timeout waiting for child")
}
})

e = <-evts
if e.Message() != logEntryString {
t.Errorf("Wrong log found from 2nd file: \n% x\nExpecting:\n% x\n", e.Message(), logEntryString)
t.Log("Verify 1st temp file was auto deleted.")
_, err := os.Open(file.Name())
assert.True(t, os.IsNotExist(err))
}
}

//Use Wait Group to avoid race condition between opening tmpfile2 to delete tmpfile1 with auto_removal and opening tmpfile1
//to check it exist
wg.Wait()

_, err = os.Open(tmpfile1.Name())
assert.True(t, os.IsNotExist(err))

lsrc.Stop()
tt.Stop()
// TestLogsFileAutoRemoval verifies when a new file matching the configured
// FilePath is discovered, the old file will be automatically deleted after
// being read to the end-of-file.
func TestLogsFileAutoRemoval(t *testing.T) {
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
// Override global in tailersrc.go.
multilineWaitPeriod = 10 * time.Millisecond
prefix := "file_auto_removal"
done := make(chan bool)
createWriteRead(t, prefix, nil, done, true)
t.Log("Verify 1st tmp file created and discovered.")
select {
case <-done:
t.Log("Completed before timeout (as expected)")
case <-time.After(time.Second * 5):
require.Fail(t, "timeout waiting for 2nd temp file.")
}
}

func TestLogsTimestampAsMultilineStarter(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions plugins/inputs/logfile/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (tail *Tail) Stop() error {
}

// StopAtEOF stops tailing as soon as the end of the file is reached.
// Blocks until tailer is dead and returns reason for death.
func (tail *Tail) StopAtEOF() error {
tail.Kill(errStopAtEOF)
return tail.Wait()
Expand Down Expand Up @@ -390,7 +391,7 @@ func (tail *Tail) tailFileSync() {
if errReadLine == nil {
tail.sendLine(line, tail.curOffset)
} else {
break
return
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
}
}
} else if err != ErrStop {
Expand Down Expand Up @@ -465,7 +466,6 @@ func (tail *Tail) waitForChanges() error {
case <-tail.Dying():
return ErrStop
}

}

func (tail *Tail) openReader() {
Expand Down Expand Up @@ -510,8 +510,13 @@ func (tail *Tail) sendLine(line string, offset int64) bool {
select {
case tail.Lines <- &Line{line, now, nil, offset}:
case <-tail.Dying():
tail.dropCnt += len(lines) - i
return true
if tail.Err() == errStopAtEOF {
// Try sending, even if it blocks.
tail.Lines <- &Line{line, now, nil, offset}
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
} else {
khanhntd marked this conversation as resolved.
Show resolved Hide resolved
tail.dropCnt += len(lines) - i
return true
}
}
}

Expand Down
47 changes: 38 additions & 9 deletions plugins/inputs/logfile/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const linesWrittenToFile int = 10

type testLogger struct {
debugs, infos, warns, errors []string
}
Expand Down Expand Up @@ -56,6 +60,7 @@ func (l *testLogger) Info(args ...interface{}) {

func TestNotTailedCompeletlyLogging(t *testing.T) {
tmpfile, tail, tlog := setup(t)
tmpfile.Close()
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
defer tearDown(tmpfile)

readThreelines(t, tail)
Expand All @@ -71,22 +76,46 @@ func TestNotTailedCompeletlyLogging(t *testing.T) {
verifyTailerExited(t, tail)
}

func TestDroppedLinesWhenStopAtEOFLogging(t *testing.T) {
tmpfile, tail, tlog := setup(t)
func TestStopAtEOF(t *testing.T) {
tmpfile, tail, _ := setup(t)
defer tearDown(tmpfile)

readThreelines(t, tail)

// Ask the tailer to StopAtEOF
tail.StopAtEOF()
// Since StopAtEOF() will block until the EOF is reached, run it in a goroutine.
done := make(chan bool)
go func() {
tail.StopAtEOF()
close(done)
}()

// Verify the goroutine is blocked indefinitely.
select {
case <-done:
t.Fatalf("StopAtEOF() completed unexpectedly")
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
case <-time.After(time.Second * 1):
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
fmt.Println("timeout waiting for StopAtEOF() (as expected)")
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
}

assert.Equal(t, errStopAtEOF, tail.Err())

// Read to EOF
for i := 0; i < linesWrittenToFile - 3; i++ {
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
<-tail.Lines
}

// Verify StopAtEOF() has completed.
select {
case <-done:
fmt.Println("StopAtEOF() completed (as expected)")
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
case <- time.After(time.Second * 1):
t.Fatalf("StopAtEOF() has not completed")
}

// Then remove the tmpfile
if err := os.Remove(tmpfile.Name()); err != nil {
t.Fatalf("failed to remove temporary log file %v: %v", tmpfile.Name(), err)
}
// Wait until the tailer should have been terminated
time.Sleep(exitOnDeletionWaitDuration + exitOnDeletionCheckDuration + 1*time.Second)

verifyTailerLogging(t, tlog, "Dropped 7 lines for stopped tail for file "+tmpfile.Name())
verifyTailerExited(t, tail)
}

Expand All @@ -97,7 +126,7 @@ func setup(t *testing.T) (*os.File, *Tail, *testLogger) {
}

// Write the file content
for i := 0; i < 10; i++ {
for i := 0; i < linesWrittenToFile; i++ {
if _, err := fmt.Fprintf(tmpfile, "%v some log line\n", time.Now()); err != nil {
log.Fatal(err)
}
Expand Down
14 changes: 8 additions & 6 deletions plugins/inputs/logfile/tailersrc.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type tailerSrc struct {
startTailerOnce sync.Once
cleanUpFns []func()
}
// Verify tailerSrc implements LogSrc
var _ logs.LogSrc = (*tailerSrc)(nil)
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved

func NewTailerSrc(
group, stream, destination, stateFilePath string,
Expand Down Expand Up @@ -119,26 +121,26 @@ func (ts *tailerSrc) SetOutput(fn func(logs.LogEvent)) {
ts.startTailerOnce.Do(func() { go ts.runTail() })
}

func (ts tailerSrc) Group() string {
func (ts *tailerSrc) Group() string {
return ts.group
}

func (ts tailerSrc) Stream() string {
func (ts *tailerSrc) Stream() string {
return ts.stream
}

func (ts tailerSrc) Description() string {
func (ts *tailerSrc) Description() string {
return ts.tailer.Filename
}

func (ts tailerSrc) Destination() string {
func (ts *tailerSrc) Destination() string {
return ts.destination
}

func (ts tailerSrc) Retention() int {
func (ts *tailerSrc) Retention() int {
return ts.retentionInDays
}
func (ts tailerSrc) Done(offset fileOffset) {
func (ts *tailerSrc) Done(offset fileOffset) {
// ts.offsetCh will only be blocked when the runSaveState func has exited,
// which only happens when the original file has been removed, thus making
// Keeping its offset useless
Expand Down