Skip to content

Commit

Permalink
[enrichment/trace] Handle exception for span events (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar authored Oct 1, 2024
1 parent 6d89cba commit 10e3f6c
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 27 deletions.
24 changes: 20 additions & 4 deletions enrichments/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,18 @@ type SpanEventConfig struct {
// TimestampUs is a temporary attribute to enable higher
// resolution timestamps in Elasticsearch. For more details see:
// https://github.com/elastic/opentelemetry-dev/issues/374.
TimestampUs AttributeConfig `mapstructure:"timestamp_us"`
ProcessorEvent AttributeConfig `mapstructure:"processor_event"`
TimestampUs AttributeConfig `mapstructure:"timestamp_us"`
TransactionSampled AttributeConfig `mapstructure:"transaction_sampled"`
TransactionType AttributeConfig `mapstructure:"transaction_type"`
ProcessorEvent AttributeConfig `mapstructure:"processor_event"`

// For exceptions/errors
ErrorID AttributeConfig `mapstructure:"error_id"`
ErrorExceptionHandled AttributeConfig `mapstructure:"error_exception_handled"`
ErrorGroupingKey AttributeConfig `mapstructure:"error_grouping_key"`

// For no exceptions/errors
EventKind AttributeConfig `mapstructure:"event_kind"`
}

// AttributeConfig is the configuration options for each attribute.
Expand Down Expand Up @@ -124,8 +134,14 @@ func Enabled() Config {
RepresentativeCount: AttributeConfig{Enabled: true},
},
SpanEvent: SpanEventConfig{
TimestampUs: AttributeConfig{Enabled: true},
ProcessorEvent: AttributeConfig{Enabled: true},
TimestampUs: AttributeConfig{Enabled: true},
TransactionSampled: AttributeConfig{Enabled: true},
TransactionType: AttributeConfig{Enabled: true},
ProcessorEvent: AttributeConfig{Enabled: true},
ErrorID: AttributeConfig{Enabled: true},
ErrorExceptionHandled: AttributeConfig{Enabled: true},
ErrorGroupingKey: AttributeConfig{Enabled: true},
EventKind: AttributeConfig{Enabled: true},
},
}
}
6 changes: 6 additions & 0 deletions enrichments/trace/internal/elastic/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ const (
AttributeSpanDestinationServiceResource = "span.destination.service.resource"
AttributeSpanDurationUs = "span.duration.us"
AttributeSpanRepresentativeCount = "span.representative_count"

// span event attributes
AttributeParentID = "parent.id"
AttributeErrorID = "error.id"
AttributeErrorExceptionHandled = "error.exception.handled"
AttributeErrorGroupingKey = "error.grouping_key"
)
106 changes: 93 additions & 13 deletions enrichments/trace/internal/elastic/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package elastic

import (
"crypto/md5"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"math"
"net"
"net/http"
Expand Down Expand Up @@ -69,6 +73,8 @@ type spanEnrichmentContext struct {

spanStatusCode ptrace.StatusCode

// TODO (lahsivjar): Refactor span enrichment to better utilize isTransaction
isTransaction bool
isMessaging bool
isRPC bool
isHTTP bool
Expand Down Expand Up @@ -149,19 +155,22 @@ func (s *spanEnrichmentContext) Enrich(span ptrace.Span, cfg config.Config) {
return true
})

// Ensure all dependent attributes are handled.
s.normalizeAttributes()

if isElasticTransaction(span) {
s.enrichTransaction(span, cfg.Transaction)
} else {
s.enrichSpan(span, cfg.Span)
}
s.isTransaction = isElasticTransaction(span)
s.enrich(span, cfg)

spanEvents := span.Events()
for i := 0; i < spanEvents.Len(); i++ {
var c spanEventEnrichmentContext
c.enrich(spanEvents.At(i), cfg.SpanEvent)
c.enrich(s, spanEvents.At(i), cfg.SpanEvent)
}
}

func (s *spanEnrichmentContext) enrich(span ptrace.Span, cfg config.Config) {
if s.isTransaction {
s.enrichTransaction(span, cfg.Transaction)
} else {
s.enrichSpan(span, cfg.Span)
}
}

Expand All @@ -173,7 +182,7 @@ func (s *spanEnrichmentContext) enrichTransaction(
span.Attributes().PutInt(AttributeTimestampUs, getTimestampUs(span.StartTimestamp()))
}
if cfg.Sampled.Enabled {
span.Attributes().PutBool(AttributeTransactionSampled, true)
span.Attributes().PutBool(AttributeTransactionSampled, s.getSampled())
}
if cfg.ID.Enabled {
span.Attributes().PutStr(AttributeTransactionID, span.SpanID().String())
Expand All @@ -195,7 +204,7 @@ func (s *spanEnrichmentContext) enrichTransaction(
span.Attributes().PutInt(AttributeTransactionDurationUs, getDurationUs(span))
}
if cfg.Type.Enabled {
s.setTxnType(span)
span.Attributes().PutStr(AttributeTransactionType, s.getTxnType())
}
if cfg.Result.Enabled {
s.setTxnResult(span)
Expand Down Expand Up @@ -247,15 +256,20 @@ func (s *spanEnrichmentContext) normalizeAttributes() {
}
}

func (s *spanEnrichmentContext) setTxnType(span ptrace.Span) {
func (s *spanEnrichmentContext) getSampled() bool {
// Assumes that the method is called only for transaction
return true
}

func (s *spanEnrichmentContext) getTxnType() string {
txnType := "unknown"
switch {
case s.isMessaging:
txnType = "messaging"
case s.isRPC, s.isHTTP:
txnType = "request"
}
span.Attributes().PutStr(AttributeTransactionType, txnType)
return txnType
}

func (s *spanEnrichmentContext) setTxnResult(span ptrace.Span) {
Expand Down Expand Up @@ -417,15 +431,33 @@ func (s *spanEnrichmentContext) setDestinationService(span ptrace.Span) {
}

type spanEventEnrichmentContext struct {
exception bool
exceptionType string
exceptionMessage string

exception bool
exceptionEscaped bool
}

func (s *spanEventEnrichmentContext) enrich(
parentCtx *spanEnrichmentContext,
se ptrace.SpanEvent,
cfg config.SpanEventConfig,
) {
// Extract top level span event information.
s.exception = se.Name() == "exception"
if s.exception {
se.Attributes().Range(func(k string, v pcommon.Value) bool {
switch k {
case semconv.AttributeExceptionEscaped:
s.exceptionEscaped = v.Bool()
case semconv.AttributeExceptionType:
s.exceptionType = v.Str()
case semconv.AttributeExceptionMessage:
s.exceptionMessage = v.Str()
}
return true
})
}

// Enrich span event attributes.
if cfg.TimestampUs.Enabled {
Expand All @@ -434,6 +466,41 @@ func (s *spanEventEnrichmentContext) enrich(
if cfg.ProcessorEvent.Enabled && s.exception {
se.Attributes().PutStr(AttributeProcessorEvent, "error")
}
if s.exceptionType == "" && s.exceptionMessage == "" {
// Span event does not represent an exception
return
}

// Span event represents exception
if cfg.ErrorID.Enabled {
if id, err := newUniqueID(); err == nil {
se.Attributes().PutStr(AttributeErrorID, id)
}
}
if cfg.ErrorExceptionHandled.Enabled {
se.Attributes().PutBool(AttributeErrorExceptionHandled, !s.exceptionEscaped)
}
if cfg.ErrorGroupingKey.Enabled {
// See https://github.com/elastic/apm-data/issues/299
hash := md5.New()
// ignoring errors in hashing
if s.exceptionType != "" {
io.WriteString(hash, s.exceptionType)
} else if s.exceptionMessage != "" {
io.WriteString(hash, s.exceptionMessage)
}
se.Attributes().PutStr(AttributeErrorGroupingKey, hex.EncodeToString(hash.Sum(nil)))
}

// Transaction type and sampled are added as span event enrichment only for errors
if parentCtx.isTransaction && s.exception {
if cfg.TransactionSampled.Enabled {
se.Attributes().PutBool(AttributeTransactionSampled, parentCtx.getSampled())
}
if cfg.TransactionType.Enabled {
se.Attributes().PutStr(AttributeTransactionType, parentCtx.getTxnType())
}
}
}

// getRepresentativeCount returns the number of spans represented by an
Expand Down Expand Up @@ -540,3 +607,16 @@ var standardStatusCodeResults = [...]string{
"HTTP 4xx",
"HTTP 5xx",
}

func newUniqueID() (string, error) {
var u [16]byte
if _, err := io.ReadFull(rand.Reader, u[:]); err != nil {
return "", err
}

// convert to string
buf := make([]byte, 32)
hex.Encode(buf, u[:])

return string(buf), nil
}
80 changes: 70 additions & 10 deletions enrichments/trace/internal/elastic/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package elastic

import (
"crypto/md5"
"encoding/hex"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -762,34 +764,85 @@ func TestSpanEventEnrich(t *testing.T) {
ts := pcommon.NewTimestampFromTime(now)
for _, tc := range []struct {
name string
parent ptrace.Span
input ptrace.SpanEvent
config config.SpanEventConfig
errorID bool // indicates if the error ID should be present in the result
enrichedAttrs map[string]any
}{
{
name: "not_exception",
name: "not_exception",
parent: ptrace.NewSpan(),
input: func() ptrace.SpanEvent {
event := ptrace.NewSpanEvent()
event.SetTimestamp(ts)
return event
}(),
config: config.Enabled().SpanEvent,
config: config.Enabled().SpanEvent,
errorID: false, // error ID is only present for exceptions
enrichedAttrs: map[string]any{
AttributeTimestampUs: ts.AsTime().UnixMicro(),
},
},
{
name: "exception",
name: "exception_with_elastic_txn",
parent: func() ptrace.Span {
// No parent, elastic txn
span := ptrace.NewSpan()
return span
}(),
input: func() ptrace.SpanEvent {
event := ptrace.NewSpanEvent()
event.SetName("exception")
event.SetTimestamp(ts)
event.Attributes().PutStr(semconv.AttributeExceptionType, "java.net.ConnectionError")
event.Attributes().PutStr(semconv.AttributeExceptionMessage, "something is wrong")
event.Attributes().PutStr(semconv.AttributeExceptionStacktrace, `Exception in thread "main" java.lang.RuntimeException: Test exception\\n at com.example.GenerateTrace.methodB(GenerateTrace.java:13)\\n at com.example.GenerateTrace.methodA(GenerateTrace.java:9)\\n at com.example.GenerateTrace.main(GenerateTrace.java:5)`)
return event
}(),
config: config.Enabled().SpanEvent,
errorID: true,
enrichedAttrs: map[string]any{
AttributeTimestampUs: ts.AsTime().UnixMicro(),
AttributeProcessorEvent: "error",
AttributeErrorExceptionHandled: true,
AttributeErrorGroupingKey: func() string {
hash := md5.New()
hash.Write([]byte("java.net.ConnectionError"))
return hex.EncodeToString(hash.Sum(nil))
}(),
AttributeTransactionSampled: true,
AttributeTransactionType: "unknown",
},
},
{
name: "exception_with_elastic_span",
parent: func() ptrace.Span {
// Parent, elastic span
span := ptrace.NewSpan()
span.SetParentSpanID([8]byte{8, 9, 10, 11, 12, 13, 14})
return span
}(),
input: func() ptrace.SpanEvent {
event := ptrace.NewSpanEvent()
event.SetName("exception")
event.SetTimestamp(ts)
event.Attributes().PutStr(semconv.AttributeExceptionType, "java.net.ConnectionError")
event.Attributes().PutStr(semconv.AttributeExceptionMessage, "something is wrong")
event.Attributes().PutStr(semconv.AttributeExceptionStacktrace, `Exception in thread "main" java.lang.RuntimeException: Test exception\\n at com.example.GenerateTrace.methodB(GenerateTrace.java:13)\\n at com.example.GenerateTrace.methodA(GenerateTrace.java:9)\\n at com.example.GenerateTrace.main(GenerateTrace.java:5)`)
return event
}(),
config: config.Enabled().SpanEvent,
config: config.Enabled().SpanEvent,
errorID: true,
enrichedAttrs: map[string]any{
AttributeTimestampUs: ts.AsTime().UnixMicro(),
AttributeProcessorEvent: "error",
AttributeTimestampUs: ts.AsTime().UnixMicro(),
AttributeProcessorEvent: "error",
AttributeErrorExceptionHandled: true,
AttributeErrorGroupingKey: func() string {
hash := md5.New()
hash.Write([]byte("java.net.ConnectionError"))
return hex.EncodeToString(hash.Sum(nil))
}(),
},
},
} {
Expand All @@ -801,13 +854,20 @@ func TestSpanEventEnrich(t *testing.T) {
expectedAttrs[k] = v
}

span := ptrace.NewSpan()
tc.input.MoveTo(span.Events().AppendEmpty())
EnrichSpan(span, config.Config{
tc.input.MoveTo(tc.parent.Events().AppendEmpty())
EnrichSpan(tc.parent, config.Config{
SpanEvent: tc.config,
})

assert.Empty(t, cmp.Diff(expectedAttrs, span.Events().At(0).Attributes().AsRaw()))
actual := tc.parent.Events().At(0).Attributes()
errorID, ok := actual.Get(AttributeErrorID)
assert.Equal(t, tc.errorID, ok, "error_id must be present for exception and must not be present for non-exception")
if tc.errorID {
assert.NotEmpty(t, errorID, "error_id must not be empty")
}
// Ignore error in actual diff since it is randomly generated
actual.Remove(AttributeErrorID)
assert.Empty(t, cmp.Diff(expectedAttrs, actual.AsRaw()))
})
}
}
Expand Down

0 comments on commit 10e3f6c

Please sign in to comment.