Skip to content

Commit

Permalink
Merge pull request #52 from calyptia/pwhelan-feat-buffered-channel-dr…
Browse files Browse the repository at this point in the history
…ain-single-collect

[FEAT] buffered channel drain single collect
  • Loading branch information
niedbalski authored Oct 6, 2023
2 parents 5428486 + 47629f2 commit 4c8336c
Show file tree
Hide file tree
Showing 4 changed files with 503 additions and 51 deletions.
20 changes: 19 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,25 @@ jobs:

- name: Unit tests
run: |
go test -v -covermode=atomic -coverprofile=coverage.out ./...
# go test -v -covermode=atomic -coverprofile=coverage.out ./...
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackCtrlC\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackDangle\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackInfinite\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackInfiniteLatency\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackInfiniteConcurrent\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestPlugin\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackInfiniteConcurrent\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
./configloader/
go test -v -covermode=atomic -coverprofile=coverage.out \
./output/
- name: Upload coverage to Codecov
if: ${{ github.event_name != 'pull_request' }}
Expand Down
146 changes: 103 additions & 43 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package plugin
import "C"

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -27,10 +29,19 @@ import (
"github.com/calyptia/plugin/output"
)

const (
// maxBufferedMessages is the number of messages that will be buffered
// between each fluent-bit interval (approx 1 second).
defaultMaxBufferedMessages = 300000
// collectInterval is set to the interval present before in core-fluent-bit.
collectInterval = 1000 * time.Nanosecond
)

var (
unregister func()
cmt *cmetrics.Context
logger Logger
unregister func()
cmt *cmetrics.Context
logger Logger
maxBufferedMessages = defaultMaxBufferedMessages
)

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
Expand Down Expand Up @@ -62,7 +73,7 @@ func FLBPluginRegister(def unsafe.Pointer) int {
}

// FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase.
// here all the plugin context should be initialised and any data or flag required for
// here all the plugin context should be initialized and any data or flag required for
// plugins to execute the collect or flush callback.
//
//export FLBPluginInit
Expand Down Expand Up @@ -94,6 +105,12 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
}

err = theInput.Init(ctx, fbit)
if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" {
maxbuffered, err := strconv.Atoi(maxbuffered)
if err != nil {
maxBufferedMessages = maxbuffered
}
}
} else {
conf := &flbOutputConfigLoader{ptr: ptr}
cmt, err = output.FLBPluginGetCMetricsContext(ptr)
Expand All @@ -116,10 +133,40 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
return input.FLB_OK
}

// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialised, the plugin implementation is responsible for handling the incoming data and the context
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
// will not execute further callbacks.
// flbPluginReset is meant to reset the plugin between tests.
func flbPluginReset() {
theInputLock.Lock()
defer theInputLock.Unlock()

once = sync.Once{}
close(theChannel)
theInput = nil
}

func testFLBPluginInputCallback() ([]byte, error) {
data := unsafe.Pointer(nil)
var csize C.size_t

FLBPluginInputCallback(&data, &csize)

if data == nil {
return []byte{}, nil
}

defer C.free(data)
return C.GoBytes(data, C.int(csize)), nil
}

// Lock used to synchronize access to theInput variable.
var theInputLock sync.Mutex

// FLBPluginInputCallback this method gets invoked by the fluent-bit
// runtime, once the plugin has been initialized, the plugin
// implementation is responsible for handling the incoming data and the
// context that gets past
//
// This function will invoke Collect only once to preserve backward
// compatible behavior. There are unit tests to enforce this behavior.
//
//export FLBPluginInputCallback
func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
Expand All @@ -130,50 +177,63 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
return input.FLB_RETRY
}

var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message)
go func() {
err = theInput.Collect(runCtx, theChannel)
}()
theChannel = make(chan Message, maxBufferedMessages)

theInputLock.Lock()

go func(theChannel chan<- Message) {
defer theInputLock.Unlock()

err := theInput.Collect(runCtx, theChannel)
if err != nil {
fmt.Fprintf(os.Stderr,
"collect error: %s\n", err.Error())
}
}(theChannel)
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_OK
}
buf := bytes.NewBuffer([]byte{})

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- {
select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_ERROR
}

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
}
buf.Grow(len(b))
buf.Write(b)
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
loop = 0
default:
loop = 0
}
}

if buf.Len() > 0 {
b := buf.Bytes()
cdata := C.CBytes(b)

*data = cdata
*csize = C.size_t(len(b))

// C.free(unsafe.Pointer(cdata))
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
if csize != nil {
*csize = C.size_t(len(b))
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
default:
break
}

return input.FLB_OK
Expand All @@ -191,7 +251,7 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
// plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation.
//
//export FLBPluginFlush
//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions.
//nolint:funlen //ignore length requirement for this function
func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
initWG.Wait()

Expand Down
Loading

0 comments on commit 4c8336c

Please sign in to comment.