From 86dfa71fe01488708e82ae738865237979cdf87a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 29 Apr 2024 09:34:44 -0700 Subject: [PATCH] Log dropped records --- sdk/log/batch.go | 6 ++++++ sdk/log/batch_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index f14160e46518..6c87edf81d0b 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -10,6 +10,8 @@ import ( "sync" "sync/atomic" "time" + + "go.opentelemetry.io/otel/internal/global" ) const ( @@ -148,6 +150,10 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { return } + if d := b.q.Dropped(); d > 0 { + global.Warn("dropped log records", "dropped", d) + } + qLen := b.q.TryDequeue(buf, func(r []Record) bool { ok := b.exporter.EnqueueExport(r) if ok { diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 8028fef4cd6d..bb836809f7f8 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -4,7 +4,9 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( + "bytes" "context" + stdlog "log" "slices" "strconv" "sync" @@ -12,10 +14,12 @@ import ( "time" "unsafe" + "github.com/go-logr/stdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/log" ) @@ -413,6 +417,41 @@ func TestBatchProcessor(t *testing.T) { }) }) + t.Run("DroppedLogs", func(t *testing.T) { + orig := global.GetLogger() + t.Cleanup(func() { global.SetLogger(orig) }) + buf := new(bytes.Buffer) + stdr.SetVerbosity(1) + global.SetLogger(stdr.New(stdlog.New(buf, "", 0))) + + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + + b := NewBatchProcessor( + e, + WithMaxQueueSize(1), + WithExportMaxBatchSize(1), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + var r Record + assert.NoError(t, b.OnEmit(ctx, r), "queued") + assert.NoError(t, b.OnEmit(ctx, r), "dropped") + + var n int + require.Eventually(t, func() bool { + n = e.ExportN() + return n > 0 + }, 2*time.Second, time.Microsecond, "blocked export not attempted") + + got := buf.String() + want := `"level"=1 "msg"="dropped log records" "dropped"=1` + assert.Contains(t, got, want) + + close(e.ExportTrigger) + _ = b.Shutdown(ctx) + }) + t.Run("ConcurrentSafe", func(t *testing.T) { const goRoutines = 10