Skip to content

Commit

Permalink
Add otel tracing for client req and endpoint related kinds
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcel Ludwig committed Sep 2, 2021
1 parent f4712d6 commit 8e2b363
Show file tree
Hide file tree
Showing 18 changed files with 419 additions and 94 deletions.
16 changes: 9 additions & 7 deletions command/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func NewRun(ctx context.Context) *Run {
set.Var(&AcceptForwardedValue{settings: &settings}, "accept-forwarded-url", "-accept-forwarded-url [proto][,host][,port]")
set.Var(&settings.TLSDevProxy, "https-dev-proxy", "-https-dev-proxy 8443:8080,9443:9000")
set.BoolVar(&settings.NoProxyFromEnv, "no-proxy-from-env", settings.NoProxyFromEnv, "-no-proxy-from-env")
set.BoolVar(&settings.TelemetryTraces, "telemetry-traces", settings.TelemetryTraces, "-telemetry-traces")
set.BoolVar(&settings.TelemetryMetrics, "telemetry-metrics", settings.TelemetryMetrics, "-telemetry-metrics")
set.StringVar(&settings.RequestIDFormat, "request-id-format", settings.RequestIDFormat, "-request-id-format uuid4")
set.StringVar(&settings.RequestIDAcceptFromHeader, "request-id-accept-from-header", settings.RequestIDAcceptFromHeader, "-request-id-accept-from-header X-UID")
set.StringVar(&settings.RequestIDBackendHeader, "request-id-backend-header", settings.RequestIDBackendHeader, "-request-id-backend-header Couper-Request-ID")
Expand Down Expand Up @@ -153,11 +155,13 @@ func (r *Run) Execute(args Args, config *config.Couper, logEntry *logrus.Entry)
}
}

metrics, err := telemetry.NewMetrics(nil, logEntry)
if err != nil {
return err
}
go metrics.ListenAndServe()
telog := logEntry.WithField("type", "couper_telemetry")
telemetry.InitExporter(r.context, &telemetry.Options{
Exporter: "otlp",
Metrics: r.settings.TelemetryMetrics,
ServiceName: "couper",
Traces: r.settings.TelemetryTraces,
}, telog)

listenCmdShutdown()

Expand All @@ -166,8 +170,6 @@ func (r *Run) Execute(args Args, config *config.Couper, logEntry *logrus.Entry)
logEntry.Infof("Server closed: %s", s.Addr)
}

_ = metrics.Close()

return nil
}

Expand Down
9 changes: 6 additions & 3 deletions config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,24 @@ var DefaultSettings = Settings{

// Settings represents the <Settings> object.
type Settings struct {
AcceptForwarded *AcceptForwarded

AcceptForwardedURL []string `hcl:"accept_forwarded_url,optional"`
DefaultPort int `hcl:"default_port,optional"`
HealthPath string `hcl:"health_path,optional"`
LogFormat string `hcl:"log_format,optional"`
LogLevel string `hcl:"log_level,optional"`
LogPretty bool `hcl:"log_pretty,optional"`
NoProxyFromEnv bool `hcl:"no_proxy_from_env,optional"`
RequestIDFormat string `hcl:"request_id_format,optional"`
RequestIDAcceptFromHeader string `hcl:"request_id_accept_from_header,optional"`
RequestIDBackendHeader string `hcl:"request_id_backend_header,optional"`
RequestIDClientHeader string `hcl:"request_id_client_header,optional"`
RequestIDFormat string `hcl:"request_id_format,optional"`
SecureCookies string `hcl:"secure_cookies,optional"`
TLSDevProxy List `hcl:"https_dev_proxy,optional"`
TelemetryMetrics bool `hcl:"telemetry_metrics,optional"`
TelemetryTraces bool `hcl:"telemetry_traces,optional"`
XForwardedHost bool `hcl:"xfh,optional"`
AcceptForwardedURL []string `hcl:"accept_forwarded_url,optional"`
AcceptForwarded *AcceptForwarded
}

var _ flag.Value = &List{}
Expand Down
4 changes: 2 additions & 2 deletions eval/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ func ApplyResponseContext(ctx context.Context, body hcl.Body, beresp *http.Respo
return nil
}

content, _, _ := body.PartialContent(config.BackendInlineSchema)
if attr, ok := content.Attributes["set_response_status"]; ok {
bodyContent, _, _ := body.PartialContent(config.BackendInlineSchema)
if attr, ok := bodyContent.Attributes["set_response_status"]; ok {
_, err := ApplyResponseStatus(ctx, attr, beresp)
return err
}
Expand Down
8 changes: 8 additions & 0 deletions handler/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/hashicorp/hcl/v2"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"

"github.com/avenga/couper/config"
"github.com/avenga/couper/config/request"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/avenga/couper/eval"
"github.com/avenga/couper/handler/producer"
"github.com/avenga/couper/server/writer"
"github.com/avenga/couper/telemetry"
)

var _ http.Handler = &Endpoint{}
Expand Down Expand Up @@ -70,6 +72,10 @@ func (e *Endpoint) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
reqCtx := context.WithValue(req.Context(), request.Endpoint, e.opts.LogPattern)
reqCtx = context.WithValue(reqCtx, request.EndpointKind, e.opts.LogHandlerKind)
*req = *req.WithContext(reqCtx)
if e.opts.LogPattern != "" {
span := trace.SpanFromContext(reqCtx)
span.SetAttributes(telemetry.KeyEndpoint.String(e.opts.LogPattern))
}

defer func() {
rc := recover()
Expand Down Expand Up @@ -135,6 +141,8 @@ func (e *Endpoint) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}

if err == nil {
_, span := telemetry.NewSpanFromContext(subCtx, "response", trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()
clientres, err = producer.NewResponse(req, e.opts.Response.Context, evalContext, http.StatusOK)
}
} else {
Expand Down
12 changes: 11 additions & 1 deletion handler/producer/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"runtime/debug"
"sync"

"go.opentelemetry.io/otel/trace"

"github.com/avenga/couper/config/request"
"github.com/avenga/couper/telemetry"
)

type Proxy struct {
Expand All @@ -21,6 +24,9 @@ func (pr Proxies) Produce(ctx context.Context, clientReq *http.Request, results
var currentName string // at least pre roundtrip
wg := &sync.WaitGroup{}

producerCtx, rootSpan := telemetry.NewSpanFromContext(ctx, "proxies", trace.WithSpanKind(trace.SpanKindProducer))
defer rootSpan.End()

defer func() {
if rp := recover(); rp != nil {
sendResult(ctx, results, &Result{
Expand All @@ -35,8 +41,12 @@ func (pr Proxies) Produce(ctx context.Context, clientReq *http.Request, results

for _, proxy := range pr {
currentName = proxy.Name
outCtx := withRoundTripName(ctx, proxy.Name)
outCtx := withRoundTripName(producerCtx, proxy.Name)
outCtx = context.WithValue(outCtx, request.RoundTripProxy, true)

// span end by result reader
outCtx, _ = telemetry.NewSpanFromContext(outCtx, proxy.Name, trace.WithSpanKind(trace.SpanKindServer))

// since proxy and backend may work on the "same" outReq this must be cloned.
outReq := clientReq.Clone(outCtx)

Expand Down
23 changes: 16 additions & 7 deletions handler/producer/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"sync"

"github.com/hashicorp/hcl/v2"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"

"github.com/avenga/couper/config"
"github.com/avenga/couper/config/request"
"github.com/avenga/couper/eval"
"github.com/avenga/couper/eval/content"
"github.com/avenga/couper/telemetry"
)

// Request represents the producer <Request> object.
Expand All @@ -30,6 +33,9 @@ func (r Requests) Produce(ctx context.Context, req *http.Request, results chan<-
var currentName string // at least pre roundtrip
wg := &sync.WaitGroup{}

producerCtx, rootSpan := telemetry.NewSpanFromContext(ctx, "requests", trace.WithSpanKind(trace.SpanKindProducer))
defer rootSpan.End()

defer func() {
if rp := recover(); rp != nil {
sendResult(ctx, results, &Result{
Expand All @@ -46,29 +52,30 @@ func (r Requests) Produce(ctx context.Context, req *http.Request, results chan<-
updated := evalctx.WithClientRequest(req)

for _, or := range r {
outCtx := withRoundTripName(ctx, or.Name)
// span end by result reader
outCtx, span := telemetry.NewSpanFromContext(withRoundTripName(producerCtx, or.Name), or.Name, trace.WithSpanKind(trace.SpanKindClient))

bodyContent, _, diags := or.Context.PartialContent(config.Request{Remain: or.Context}.Schema(true))
if diags.HasErrors() {
sendResult(ctx, results, &Result{Err: diags})
sendResult(outCtx, results, &Result{Err: diags})
continue
}

method, err := content.GetAttribute(updated.HCLContext(), bodyContent, "method")
if err != nil {
sendResult(ctx, results, &Result{Err: err})
sendResult(outCtx, results, &Result{Err: err})
continue
}

body, defaultContentType, err := eval.GetBody(updated.HCLContext(), bodyContent)
if err != nil {
sendResult(ctx, results, &Result{Err: err})
sendResult(outCtx, results, &Result{Err: err})
continue
}

url, err := content.GetAttribute(updated.HCLContext(), bodyContent, "url")
if err != nil {
sendResult(ctx, results, &Result{Err: err})
sendResult(outCtx, results, &Result{Err: err})
continue
}

Expand All @@ -88,7 +95,7 @@ func (r Requests) Produce(ctx context.Context, req *http.Request, results chan<-
// see <go roundtrip()> at the end of current for-loop.
outreq, err := http.NewRequest(strings.ToUpper(method), "", nil)
if err != nil {
sendResult(ctx, results, &Result{Err: err})
sendResult(outCtx, results, &Result{Err: err})
continue
}

Expand All @@ -101,10 +108,12 @@ func (r Requests) Produce(ctx context.Context, req *http.Request, results chan<-
*outreq = *outreq.WithContext(outCtx)
err = eval.ApplyRequestContext(outCtx, or.Context, outreq)
if err != nil {
sendResult(ctx, results, &Result{Err: err})
sendResult(outCtx, results, &Result{Err: err})
continue
}

span.SetAttributes(semconv.HTTPClientAttributesFromHTTPRequest(outreq)...)

wg.Add(1)
go roundtrip(or.Backend, outreq, results, wg)
}
Expand Down
5 changes: 5 additions & 0 deletions handler/producer/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"runtime/debug"
"sync"

"go.opentelemetry.io/otel/trace"

"github.com/avenga/couper/config/request"
"github.com/avenga/couper/errors"
)
Expand Down Expand Up @@ -44,6 +46,7 @@ func roundtrip(rt http.RoundTripper, req *http.Request, results chan<- *Result,
defer wg.Done()

rtn := req.Context().Value(request.RoundTripName).(string)
span := trace.SpanFromContext(req.Context())

defer func() {
if rp := recover(); rp != nil {
Expand All @@ -56,6 +59,7 @@ func roundtrip(rt http.RoundTripper, req *http.Request, results chan<- *Result,
stack: debug.Stack(),
})
}
span.End()
sendResult(req.Context(), results, &Result{
Err: err,
RoundTripName: rtn,
Expand All @@ -64,6 +68,7 @@ func roundtrip(rt http.RoundTripper, req *http.Request, results chan<- *Result,
}()

beresp, err := rt.RoundTrip(req)
span.End()
sendResult(req.Context(), results, &Result{
Beresp: beresp,
Err: err,
Expand Down
23 changes: 20 additions & 3 deletions handler/transport/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/hashicorp/hcl/v2"
"github.com/sirupsen/logrus"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"

"github.com/avenga/couper/config"
"github.com/avenga/couper/config/request"
Expand All @@ -22,6 +24,7 @@ import (
"github.com/avenga/couper/handler/validation"
"github.com/avenga/couper/logging"
"github.com/avenga/couper/server/writer"
"github.com/avenga/couper/telemetry"
"github.com/avenga/couper/utils"
)

Expand Down Expand Up @@ -65,9 +68,20 @@ func NewBackend(ctx hcl.Body, tc *Config, opts *BackendOptions, log *logrus.Entr

// RoundTrip implements the <http.RoundTripper> interface.
func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) {
traceOpts := []trace.SpanStartOption{
trace.WithAttributes(semconv.NetAttributesFromHTTPRequest("tcp", req)...),
trace.WithAttributes(telemetry.KeyBackend.String(b.name)),
}
spanName := "backend"
if b.name != "" {
spanName += "." + b.name
}
ctx, span := telemetry.NewSpanFromContext(req.Context(), spanName, traceOpts...)
defer span.End()

// Execute before <b.evalTransport()> due to right
// handling of query-params in the URL attribute.
if err := eval.ApplyRequestContext(req.Context(), b.context, req); err != nil {
if err := eval.ApplyRequestContext(ctx, b.context, req); err != nil {
return nil, err
}

Expand All @@ -82,6 +96,9 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) {
req.URL.Scheme = tc.Scheme
req.Host = tc.Hostname

span.SetAttributes(telemetry.KeyOrigin.String(tc.Origin))
span.SetAttributes(semconv.HTTPClientAttributesFromHTTPRequest(req)...)

// handler.Proxy marks proxy roundtrips since we should not handle headers twice.
_, isProxyReq := req.Context().Value(request.RoundTripProxy).(bool)

Expand Down Expand Up @@ -137,11 +154,11 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) {
// Backend response context creates the beresp variables in first place and applies this context
// to the current beresp obj. Downstream response context evals reading their beresp variable values
// from this result. Fallback case is for testing purposes.
if evalCtx, ok := req.Context().Value(request.ContextType).(*eval.Context); ok {
if evalCtx, ok := ctx.Value(request.ContextType).(*eval.Context); ok {
evalCtx = evalCtx.WithBeresps(beresp)
err = eval.ApplyResponseContext(evalCtx, b.context, beresp)
} else {
err = eval.ApplyResponseContext(req.Context(), b.context, beresp)
err = eval.ApplyResponseContext(ctx, b.context, beresp)
}

return beresp, err
Expand Down
7 changes: 7 additions & 0 deletions telemetry/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package telemetry

import "go.opentelemetry.io/otel/attribute"

var KeyEndpoint = attribute.Key("couper.endpoint")
var KeyBackend = attribute.Key("couper.backend")
var KeyOrigin = attribute.Key("couper.origin")
60 changes: 60 additions & 0 deletions telemetry/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: "2"
services:

# Jaeger
jaeger-all-in-one:
hostname: jaeger-all-in-one
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268"
- "14250"

# Zipkin
zipkin-all-in-one:
hostname: zipkin-all-in-one
image: openzipkin/zipkin:latest
ports:
- "9411:9411"

# Collector
otel-collector:
image: 'otel/opentelemetry-collector-contrib-dev:latest'
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- $PWD/telemetry/otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "1888:1888" # pprof extension
- "8888:8888" # Prometheus metrics exposed by the collector
- "8889:8889" # Prometheus exporter metrics
- "13133:13133" # health_check extension
- "4317" # OTLP gRPC receiver
- "55670:55679" # zpages extension
depends_on:
- jaeger-all-in-one
- zipkin-all-in-one

couper:
build:
dockerfile: $PWD/Dockerfile
context: $PWD
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317
# - OTA_EXPORTER_JAEGER_ENDPOINT
# - OTA_EXPORTER_JAEGER_SERVICE_NAME
# - OTA_EXPORTER_ZIPKIN_ENDPOINT
# - OTA_EXPORTER_ZIPKIN_SERVICE_NAME
- USER=dev-docker # fromEnv filled attrs
- COUPER_WATCH=true
ports:
- "8080:8080"
depends_on:
- otel-collector

prometheus:
container_name: prometheus
image: prom/prometheus:latest
volumes:
- $PWD/telemetry/prometheus.yaml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
Loading

0 comments on commit 8e2b363

Please sign in to comment.