From 3a19af036b3ce3f14f49d874b3f5643bd31d71fb Mon Sep 17 00:00:00 2001 From: Jayanth Varavani <1111446+jayanthvn@users.noreply.github.com> Date: Wed, 6 Sep 2023 18:44:21 +0000 Subject: [PATCH 1/2] Fixup poller for events race condition --- pkg/events/events.go | 68 ++++++++++++++++++++++------------------ pkg/events/poll/epoll.go | 1 + 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/pkg/events/events.go b/pkg/events/events.go index ba04ab6..0190c4c 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -38,8 +38,9 @@ var _ Events = &events{} func New() Events { return &events{ - PageSize: os.Getpagesize(), - RingCnt: 0, + PageSize: os.Getpagesize(), + RingCnt: 0, + EventFdCnt: 0, } } @@ -48,6 +49,7 @@ type events struct { RingBuffers []*RingBuffer PageSize int RingCnt int + EventFdCnt int eventsStopChannel chan struct{} wg sync.WaitGroup eventsDataChannel chan []byte @@ -138,13 +140,14 @@ func (ev *events) setupRingBuffer(mapFD int, maxEntries uint32) (chan []byte, er ringbuffer.Data = unsafe.Pointer(uintptr(unsafe.Pointer(&producer[0])) + uintptr(ev.PageSize)) ev.RingBuffers = append(ev.RingBuffers, ringbuffer) - ev.RingCnt++ - err = ev.epoller.AddEpollCtl(mapFD, ev.RingCnt) + err = ev.epoller.AddEpollCtl(mapFD, ev.EventFdCnt) if err != nil { unix.Munmap(producer) return nil, fmt.Errorf("failed to Epoll event: %s", err) } + ev.RingCnt++ + ev.EventFdCnt++ //Start channels read ev.eventsStopChannel = make(chan struct{}) @@ -197,40 +200,43 @@ func (ev *events) reconcileEventsDataChannel() { <-ev.eventsStopChannel } -// Similar to libbpf poll ring +// Similar to libbpf poll and process ring +// Ref: https://github.com/torvalds/linux/blob/744a759492b5c57ff24a6e8aabe47b17ad8ee964/tools/lib/bpf/ringbuf.c#L227 func (ev *events) readRingBuffer(eventRing *RingBuffer) { - readDone := true consPosition := eventRing.getConsumerPosition() - for !readDone { - readDone = ev.parseBuffer(consPosition, eventRing) - } + ev.parseBuffer(consPosition, eventRing) } -func (ev *events) parseBuffer(consumerPosition uint64, eventRing *RingBuffer) bool { - readDone := true - producerPosition := eventRing.getProducerPosition() - for consumerPosition < producerPosition { - - // Get the header - Data points to the DataPage which will be offset by consumerPosition - ringdata := eventRing.ParseRingData(consumerPosition) - - // Check if busy then skip, Might not be committed yet - // There are 2 steps -> reserve and then commit/discard - if ringdata.BusyRecord { - readDone = true - break - } +func (ev *events) parseBuffer(consumerPosition uint64, eventRing *RingBuffer) { + var readDone bool + for { + readDone = true + producerPosition := eventRing.getProducerPosition() + for consumerPosition < producerPosition { + + // Get the header - Data points to the DataPage which will be offset by consumerPosition + ringdata := eventRing.ParseRingData(consumerPosition) + + // Check if busy then skip, Might not be committed yet + // There are 2 steps -> reserve and then commit/discard + if ringdata.BusyRecord { + readDone = true + break + } - readDone = false + readDone = false - // Update the position to the next record irrespective of discard or commit of data - consumerPosition += uint64(ringdata.RecordLen) + // Update the position to the next record irrespective of discard or commit of data + consumerPosition += uint64(ringdata.RecordLen) - //Pick the data only if committed - if !ringdata.DiscardRecord { - ev.eventsDataChannel <- ringdata.parseSample() + //Pick the data only if committed + if !ringdata.DiscardRecord { + ev.eventsDataChannel <- ringdata.parseSample() + } + eventRing.setConsumerPosition(consumerPosition) + } + if readDone { + break } - eventRing.setConsumerPosition(consumerPosition) } - return readDone } diff --git a/pkg/events/poll/epoll.go b/pkg/events/poll/epoll.go index a0ce64d..8cb7288 100644 --- a/pkg/events/poll/epoll.go +++ b/pkg/events/poll/epoll.go @@ -49,6 +49,7 @@ func (e *EventPoller) AddEpollCtl(mapFD, eventFD int) error { return fmt.Errorf("failed to Epoll event: %s", err) } e.epollEvent = append(e.epollEvent, epollEvent) + e.bufferCnt++ return nil } From 560050212b1fac154fd1d6a927485d92760dec28 Mon Sep 17 00:00:00 2001 From: Jayanth Varavani <1111446+jayanthvn@users.noreply.github.com> Date: Wed, 6 Sep 2023 18:56:04 +0000 Subject: [PATCH 2/2] Testing package go tidy --- test/go.mod | 2 +- test/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/go.mod b/test/go.mod index 041da3f..aa91a81 100644 --- a/test/go.mod +++ b/test/go.mod @@ -14,7 +14,7 @@ require ( github.com/vishvananda/netns v0.0.4 // indirect go.uber.org/multierr v1.10.0 // indirect go.uber.org/zap v1.25.0 // indirect - golang.org/x/sys v0.6.0 // indirect + golang.org/x/sys v0.12.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/test/go.sum b/test/go.sum index 2665cf2..2eed2e2 100644 --- a/test/go.sum +++ b/test/go.sum @@ -22,8 +22,8 @@ go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=