Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup poller for events race condition #43

Merged
merged 2 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ var _ Events = &events{}

func New() Events {
return &events{
PageSize: os.Getpagesize(),
RingCnt: 0,
PageSize: os.Getpagesize(),
RingCnt: 0,
EventFdCnt: 0,
}

}
Expand All @@ -48,6 +49,7 @@ type events struct {
RingBuffers []*RingBuffer
PageSize int
RingCnt int
EventFdCnt int
eventsStopChannel chan struct{}
wg sync.WaitGroup
eventsDataChannel chan []byte
Expand Down Expand Up @@ -138,13 +140,14 @@ func (ev *events) setupRingBuffer(mapFD int, maxEntries uint32) (chan []byte, er
ringbuffer.Data = unsafe.Pointer(uintptr(unsafe.Pointer(&producer[0])) + uintptr(ev.PageSize))

ev.RingBuffers = append(ev.RingBuffers, ringbuffer)
ev.RingCnt++

err = ev.epoller.AddEpollCtl(mapFD, ev.RingCnt)
err = ev.epoller.AddEpollCtl(mapFD, ev.EventFdCnt)
if err != nil {
unix.Munmap(producer)
return nil, fmt.Errorf("failed to Epoll event: %s", err)
}
ev.RingCnt++
ev.EventFdCnt++

//Start channels read
ev.eventsStopChannel = make(chan struct{})
Expand Down Expand Up @@ -197,40 +200,43 @@ func (ev *events) reconcileEventsDataChannel() {
<-ev.eventsStopChannel
}

// Similar to libbpf poll ring
// Similar to libbpf poll and process ring
// Ref: https://github.com/torvalds/linux/blob/744a759492b5c57ff24a6e8aabe47b17ad8ee964/tools/lib/bpf/ringbuf.c#L227
func (ev *events) readRingBuffer(eventRing *RingBuffer) {
readDone := true
consPosition := eventRing.getConsumerPosition()
for !readDone {
readDone = ev.parseBuffer(consPosition, eventRing)
}
ev.parseBuffer(consPosition, eventRing)
}

func (ev *events) parseBuffer(consumerPosition uint64, eventRing *RingBuffer) bool {
readDone := true
producerPosition := eventRing.getProducerPosition()
for consumerPosition < producerPosition {

// Get the header - Data points to the DataPage which will be offset by consumerPosition
ringdata := eventRing.ParseRingData(consumerPosition)

// Check if busy then skip, Might not be committed yet
// There are 2 steps -> reserve and then commit/discard
if ringdata.BusyRecord {
readDone = true
break
}
func (ev *events) parseBuffer(consumerPosition uint64, eventRing *RingBuffer) {
var readDone bool
for {
readDone = true
producerPosition := eventRing.getProducerPosition()
for consumerPosition < producerPosition {

// Get the header - Data points to the DataPage which will be offset by consumerPosition
ringdata := eventRing.ParseRingData(consumerPosition)

// Check if busy then skip, Might not be committed yet
// There are 2 steps -> reserve and then commit/discard
if ringdata.BusyRecord {
readDone = true
break
}

readDone = false
readDone = false

// Update the position to the next record irrespective of discard or commit of data
consumerPosition += uint64(ringdata.RecordLen)
// Update the position to the next record irrespective of discard or commit of data
consumerPosition += uint64(ringdata.RecordLen)

//Pick the data only if committed
if !ringdata.DiscardRecord {
ev.eventsDataChannel <- ringdata.parseSample()
//Pick the data only if committed
if !ringdata.DiscardRecord {
ev.eventsDataChannel <- ringdata.parseSample()
}
eventRing.setConsumerPosition(consumerPosition)
}
if readDone {
break
}
eventRing.setConsumerPosition(consumerPosition)
}
return readDone
}
1 change: 1 addition & 0 deletions pkg/events/poll/epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (e *EventPoller) AddEpollCtl(mapFD, eventFD int) error {
return fmt.Errorf("failed to Epoll event: %s", err)
}
e.epollEvent = append(e.epollEvent, epollEvent)
e.bufferCnt++
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion test/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/vishvananda/netns v0.0.4 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/sys v0.12.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
)

Expand Down
4 changes: 2 additions & 2 deletions test/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c=
go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk=
golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=