Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing Windows events by changing getRecords(). #333

Merged
merged 11 commits into from
Jan 12, 2022
1 change: 1 addition & 0 deletions plugins/inputs/windows_event_log/wineventlog/sys_call.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build windows
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
// +build windows

// Portions Licensed under the Apache License, Version 2.0, Copyright (c) 2012–2017 Elastic <http://www.elastic.co>
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/windows_event_log/wineventlog/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

//go:build windows
// +build windows

package wineventlog
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/windows_event_log/wineventlog/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

//go:build windows
// +build windows

package wineventlog
Expand Down
77 changes: 39 additions & 38 deletions plugins/inputs/windows_event_log/wineventlog/wineventlog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

//go:build windows
// +build windows

package wineventlog
Expand Down Expand Up @@ -104,32 +105,19 @@ func (l *windowsEventLog) Stop() {
}

func (l *windowsEventLog) run() {
recordNumber := l.eventOffset
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
records, err := l.read()
if err == RPC_S_INVALID_BOUND {
log.Printf("E! [windows_event_log] Due to corrupted/large event, skipping event log with record number %d, and log group name %s", recordNumber, l.logGroupName)
recordNumber = recordNumber + 1 // Advance to the next event to avoid being stuck
continue
}
if err != nil {
log.Printf("E! [windows_event_log] Failed to read Windows event logs for log group name %s. Details: %v\n", l.logGroupName, err)
recordNumber = recordNumber + 1
continue
}

Copy link
Contributor Author

@adam-mateen adam-mateen Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code to track recordNumber is unnecessary it only used once and it already gets set before use.
It gets parsed below and set in the LogEvent struct.
I moved printing the error inside the read() function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what if l.read() happens to return NO valid records. Then we would never set recordNumber below and would get stuck on the bad record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if I am crazy, but in the current master branch there is only 1 place where the recordNumber variable is used:
https://github.com/aws/amazon-cloudwatch-agent/blob/master/plugins/inputs/windows_event_log/wineventlog/wineventlog.go#L136

And just before it is used it is set:
https://github.com/aws/amazon-cloudwatch-agent/blob/master/plugins/inputs/windows_event_log/wineventlog/wineventlog.go#L131

There is no point in setting a local variable or incrementing it if it is not used.
I was confused by this code too :)

Copy link
Contributor Author

@adam-mateen adam-mateen Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EvtNext() is a stateful call, meaning each call will return a different list of EvtHandles, or nothing.
Since the windowsEventLog.read() method calls EvtNext() it is also stateful.

MSDN:
https://docs.microsoft.com/en-us/windows/win32/wes/querying-for-events#reading-events-from-the-result-set

The events in the result set are not static; new events that are written to the channel will be included in the result set until ERROR_NO_MORE_ITEMS is set.

Copy link
Contributor

@straussb straussb Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. This almost seems like a bug then, if the following happened:

  1. Read corrupted/large event from event log, get RPC_S_INVALID_BOUND error. No more events available in the log yet.
  2. Agent stops. No updated offset is persisted in the state file.
  3. Agent starts again, starts reading at the bad event again.
  4. Repeat.

I guess that's an edge case though and would be resolved as soon as a new valid event is available in the log. (Although it could be fixed by sending recordNumber on l.offsetCh.)

records := l.read()
for _, record := range records {
value, err := record.Value()
if err != nil {
log.Printf("E! [windows_event_log] Error happened when collecting windows events : %v", err)
log.Printf("E! [wineventlog] Error happened when collecting windows events : %v", err)
continue
}
recordNumber, _ = strconv.ParseUint(record.System.EventRecordID, 10, 64)
// TODO: Create and send log event to output fn
recordNumber, _ := strconv.ParseUint(record.System.EventRecordID, 10, 64)
evt := &LogEvent{
msg: value,
t: record.System.TimeCreated.SystemTime,
Expand Down Expand Up @@ -171,7 +159,8 @@ func (l *windowsEventLog) Open() error {
// Subscribe for events
eventHandle, err := EvtSubscribe(0, uintptr(signalEvent), channelPath, query, bookmark, 0, 0, EvtSubscribeStartAfterBookmark)
if err != nil {
fmt.Errorf("error when subscribing for events. Details: %v", err)
log.Printf("E! [wineventlog] EvtSubscribe(), name %v, err %v", l.name, err)
return err
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
}

l.eventHandle = eventHandle
Expand Down Expand Up @@ -221,14 +210,14 @@ func (l *windowsEventLog) runSaveState() {
}
err := l.saveState(offset)
if err != nil {
log.Printf("E! [windows_event_log] Error happened when saving file state %s to file state folder %s: %v", l.logGroupName, l.stateFilePath, err)
log.Printf("E! [wineventlog] Error happened when saving file state %s to file state folder %s: %v", l.logGroupName, l.stateFilePath, err)
continue
}
lastSavedOffset = offset
case <-l.done:
err := l.saveState(offset)
if err != nil {
log.Printf("E! [windows_event_log] Error happened during final file state saving of logfile %s to file state folder %s, duplicate log maybe sent at next start: %v", l.logGroupName, l.stateFilePath, err)
log.Printf("E! [wineventlog] Error happened during final file state saving of logfile %s to file state folder %s, duplicate log maybe sent at next start: %v", l.logGroupName, l.stateFilePath, err)
}
break
}
Expand All @@ -244,7 +233,7 @@ func (l *windowsEventLog) saveState(offset uint64) error {
return ioutil.WriteFile(l.stateFilePath, content, 0644)
}

func (l *windowsEventLog) read() ([]*windowsEventLogRecord, error) {
func (l *windowsEventLog) read() []*windowsEventLogRecord {
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
maxToRead := l.maxToRead
var eventHandles []EvtHandle
defer func() {
Expand All @@ -258,14 +247,13 @@ func (l *windowsEventLog) read() ([]*windowsEventLogRecord, error) {
eventHandles = make([]EvtHandle, maxToRead)
err := EvtNext(l.eventHandle, uint32(len(eventHandles)),
&eventHandles[0], 0, 0, &numRead)

// Handle special case when events size is too large - retry with smaller size
if err == RPC_S_INVALID_BOUND {
if maxToRead == 1 {
log.Printf("E! [windows_event_log] Out of bounds error due to large events size. Will skip the event as we cannot process it. Details: %v\n", err)
return nil, err
log.Printf("E! [wineventlog] Out of bounds error due to large events size. Will skip the event as we cannot process it. Details: %v\n", err)
return nil
}
log.Printf("W! [windows_event_log] Out of bounds error due to large events size. Retrying with half of the read batch size (%d). Details: %v\n", maxToRead/2, err)
log.Printf("W! [wineventlog] Out of bounds error due to large events size. Retrying with half of the read batch size (%d). Details: %v\n", maxToRead/2, err)
maxToRead /= 2
for _, h := range eventHandles {
EvtClose(h)
Expand All @@ -277,10 +265,14 @@ func (l *windowsEventLog) read() ([]*windowsEventLogRecord, error) {
}
// Decode the events into objects
if numRead == 0 {
return nil, nil
return nil
}

return l.getRecords(eventHandles[:numRead])
records := l.getRecords(eventHandles[:numRead])
if len(records) != int(numRead) {
log.Printf("D! [wineventlog] requested events (%d) != returned events (%d)\n", numRead, len(records))
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
}
return records
}

type LogEvent struct {
Expand All @@ -302,9 +294,12 @@ func (le LogEvent) Done() {
le.src.Done(le.offset)
}

func (l *windowsEventLog) getRecords(handles []EvtHandle) (records []*windowsEventLogRecord, err error) {
// getRecords attempts to render and format each of the given handles.
// if one handle has an error, continue on because something is better than nothing.
func (l *windowsEventLog) getRecords(handles []EvtHandle) (records []*windowsEventLogRecord) {
//Windows event message supports 31839 characters. https://msdn.microsoft.com/EN-US/library/windows/desktop/aa363679.aspx
bufferSize := 1 << 17
var err error
for _, evtHandle := range handles {
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
renderBuf := make([]byte, bufferSize)
var outputBuf []byte
Expand All @@ -314,7 +309,8 @@ func (l *windowsEventLog) getRecords(handles []EvtHandle) (records []*windowsEve
// for rendering the event and getting a readable XML format that contains the log message.
// - We can later do more research on comparing other methods to get the publisher details such as EvtCreateRenderContext
if outputBuf, err = RenderEventXML(evtHandle, renderBuf); err != nil {
return nil, err
log.Printf("I! [wineventlog] RenderEventXML() err %v", err)
continue
}

newRecord := newEventLogRecord(l)
Expand All @@ -324,19 +320,22 @@ func (l *windowsEventLog) getRecords(handles []EvtHandle) (records []*windowsEve

var publisherMetadataEvtHandle EvtHandle
if publisherMetadataEvtHandle, err = EvtOpenPublisherMetadata(0, publisher, nil, 0, 0); err != nil {
return nil, err
log.Printf("I! [wineventlog] EvtOpenPublisherMetadata() err %v, publisher %v", err, newRecord.System.Provider.Name)
continue
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
}

var bufferUsed uint32
if err = EvtFormatMessage(publisherMetadataEvtHandle, evtHandle, 0, 0, 0, EvtFormatMessageXml, uint32(bufferSize), &renderBuf[0], &bufferUsed); err != nil {
EvtClose(publisherMetadataEvtHandle)
return nil, err
log.Printf("I! [wineventlog] EvtFormatMessage() err %v, publisher %v", err, newRecord.System.Provider.Name)
continue
}
EvtClose(publisherMetadataEvtHandle)

var descriptionBytes []byte
if descriptionBytes, err = UTF16ToUTF8Bytes(renderBuf, bufferUsed); err != nil {
return nil, err
log.Printf("I! [wineventlog] UTF16ToUTF8Bytes() err %v", err)
continue
}

switch l.renderFormat {
Expand All @@ -347,38 +346,40 @@ func (l *windowsEventLog) getRecords(handles []EvtHandle) (records []*windowsEve
//old SSM agent Windows format
var recordMessage eventMessage
if err = xml.Unmarshal(descriptionBytes, &recordMessage); err != nil {
return nil, err
log.Printf("I! [wineventlog] Unmarshal() err %v", err)
continue
}

newRecord.System.Description = recordMessage.Message
default:
return nil, fmt.Errorf("format %s is not recognized", l.renderFormat)
log.Printf("I! [wineventlog] l.renderFormat is not recognized, %s", l.renderFormat)
continue
}

//add record to array
records = append(records, newRecord)
}
return records, err
return records
}

func (l *windowsEventLog) loadState() {
if _, err := os.Stat(l.stateFilePath); err != nil {
log.Printf("I! [windows_event_log] The state file for %s does not exist: %v", l.stateFilePath, err)
log.Printf("I! [wineventlog] The state file for %s does not exist: %v", l.stateFilePath, err)
return
}

byteArray, err := ioutil.ReadFile(l.stateFilePath)
if err != nil {
log.Printf("W! [windows_event_log] Issue encountered when reading offset from file %s: %v", l.stateFilePath, err)
log.Printf("W! [wineventlog] Issue encountered when reading offset from file %s: %v", l.stateFilePath, err)
return
}

offset, err := strconv.ParseInt(strings.Split(string(byteArray), "\n")[0], 10, 64)
if err != nil {
log.Printf("W! [windows_event_log] Issue encountered when parsing offset value %v: %v", byteArray, err)
log.Printf("W! [wineventlog] Issue encountered when parsing offset value %v: %v", byteArray, err)
return
}

log.Printf("I! [windows_event_log] Reading from offset %v in %s", offset, l.stateFilePath)
log.Printf("I! [wineventlog] Reading from offset %v in %s", offset, l.stateFilePath)
l.eventOffset = uint64(offset)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

//go:build !windows
// +build !windows

package wineventlog
Loading