Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for vulture sending long running traces #951

Merged
merged 5 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [ENHANCEMENT] Compression updates: Added s2, improved snappy performance [#961](https://github.com/grafana/tempo/pull/961) (@joe-elliott)
* [ENHANCEMENT] Add search block headers [#943](https://github.com/grafana/tempo/pull/943) (@mdisibio)
* [ENHANCEMENT] Add search block headers for wal blocks [#963](https://github.com/grafana/tempo/pull/963) (@mdisibio)
* [ENHANCEMENT] Add support for vulture sending long running traces [#951](https://github.com/grafana/tempo/pull/951) (@zalegrala)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala)
Expand Down
252 changes: 166 additions & 86 deletions cmd/tempo-vulture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,25 @@ var (
prometheusListenAddress string
prometheusPath string

tempoQueryURL string
tempoPushURL string
tempoOrgID string
tempoWriteBackoffDuration time.Duration
tempoReadBackoffDuration time.Duration
tempoSearchBackoffDuration time.Duration
tempoRetentionDuration time.Duration
tempoSearchRetentionDuration time.Duration
tempoQueryURL string
tempoPushURL string
tempoOrgID string
tempoWriteBackoffDuration time.Duration
tempoLongWriteBackoffDuration time.Duration
tempoReadBackoffDuration time.Duration
tempoSearchBackoffDuration time.Duration
tempoRetentionDuration time.Duration
tempoSearchRetentionDuration time.Duration

logger *zap.Logger

// maxBatchesPerWrite is used when writing and reading, and needs to match so
// that we get the expected number of batches on a trace. A value larger
// than 25 here results in vulture writing traces that exceed the maximum
// trace size.
maxBatchesPerWrite int64 = 25

maxLongWritesPerTrace int64 = 3
)

type traceMetrics struct {
Expand All @@ -56,6 +65,38 @@ type traceMetrics struct {
notFoundSearchAttribute int
}

type traceInfo struct {
timestamp time.Time
r *rand.Rand
traceIDHigh int64
traceIDLow int64
longWritesRemaining int64
}

func newTraceInfo(timestamp time.Time) *traceInfo {
r := newRand(timestamp)

return &traceInfo{
timestamp: timestamp,
r: r,
traceIDHigh: r.Int63(),
traceIDLow: r.Int63(),
longWritesRemaining: r.Int63n(maxLongWritesPerTrace),
}
}

func (t *traceInfo) ready(now time.Time) bool {
// Don't use the last time interval to allow the write loop to finish before we try to read it.
if t.timestamp.After(now.Add(-tempoWriteBackoffDuration)) {
return false
}

// We are not ready if not all writes have had a chance to send.
totalWrites := newTraceInfo(t.timestamp).longWritesRemaining
lastWrite := t.timestamp.Add(time.Duration(totalWrites) * tempoLongWriteBackoffDuration)
return !now.Before(lastWrite.Add(tempoLongWriteBackoffDuration))
}

func init() {
flag.StringVar(&prometheusPath, "prometheus-path", "/metrics", "The path to publish Prometheus metrics to.")
flag.StringVar(&prometheusListenAddress, "prometheus-listen-address", ":80", "The address to listen on for Prometheus scrapes.")
Expand All @@ -64,6 +105,7 @@ func init() {
flag.StringVar(&tempoPushURL, "tempo-push-url", "", "The URL (scheme://hostname:port) at which to push traces to Tempo.")
flag.StringVar(&tempoOrgID, "tempo-org-id", "", "The orgID to query in Tempo")
flag.DurationVar(&tempoWriteBackoffDuration, "tempo-write-backoff-duration", 15*time.Second, "The amount of time to pause between write Tempo calls")
flag.DurationVar(&tempoLongWriteBackoffDuration, "tempo-long-write-backoff-duration", 1*time.Minute, "The amount of time to pause between long write Tempo calls")
flag.DurationVar(&tempoReadBackoffDuration, "tempo-read-backoff-duration", 30*time.Second, "The amount of time to pause between read Tempo calls")
flag.DurationVar(&tempoSearchBackoffDuration, "tempo-search-backoff-duration", 60*time.Second, "The amount of time to pause between search Tempo calls")
flag.DurationVar(&tempoRetentionDuration, "tempo-retention-duration", 336*time.Hour, "The block retention that Tempo is using")
Expand All @@ -89,42 +131,29 @@ func main() {
tickerSearch := time.NewTicker(tempoSearchBackoffDuration)
interval := tempoWriteBackoffDuration

ready := func(info *traceInfo, now time.Time) bool {
// Don't attempt to read on the first itteration if we can't reasonably
// expect the write loop to have fired yet. Double the duration here to
// avoid a race.
if info.timestamp.Before(actualStartTime.Add(2 * tempoWriteBackoffDuration)) {
return false
}

return info.ready(now)
}

// Write
go func() {
c, err := newJaegerGRPCClient(tempoPushURL)
client, err := newJaegerGRPCClient(tempoPushURL)
if err != nil {
panic(err)
}

for now := range tickerWrite.C {
timestamp := now.Round(interval)
r := newRand(timestamp)

traceIDHigh := r.Int63()
traceIDLow := r.Int63()

log := logger.With(
zap.String("org_id", tempoOrgID),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", traceIDHigh, traceIDLow)),
zap.Int64("seed", timestamp.Unix()),
)
log.Info("sending trace")

for i := int64(0); i < generateRandomInt(1, 100, r); i++ {
ctx := user.InjectOrgID(context.Background(), tempoOrgID)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
log.Error("error injecting org id", zap.Error(err))
metricErrorTotal.Inc()
continue
}
err = c.EmitBatch(ctx, makeThriftBatch(traceIDHigh, traceIDLow, r, timestamp))
if err != nil {
log.Error("error pushing batch to Tempo", zap.Error(err))
metricErrorTotal.Inc()
continue
}
}
info := newTraceInfo(timestamp)
emitBatches(client, info)
queueFutureBatches(client, info)
}
}()

Expand All @@ -134,23 +163,18 @@ func main() {
var seed time.Time
startTime, seed = selectPastTimestamp(startTime, now, interval, tempoRetentionDuration)

// Don't attempt to read on the first itteration if we can't reasonably
// expect the write loop to have fired yet. Double the duration here to
// avoid a race.
if seed.Before(actualStartTime.Add(2 * tempoWriteBackoffDuration)) {
continue
}

// Don't use the last time interval to allow the write loop to finish
// before we try to read it.
if seed.After(now.Add(-tempoWriteBackoffDuration)) {
continue
}

log := logger.With(
zap.String("org_id", tempoOrgID),
zap.Int64("seed", seed.Unix()),
)

info := newTraceInfo(seed)

// Don't query for a trace we don't expect to be complete
if !ready(info, now) {
continue
}

client := util.NewClient(tempoQueryURL, tempoOrgID)

// query the trace
Expand All @@ -169,24 +193,17 @@ func main() {
go func() {
for now := range tickerSearch.C {
_, seed := selectPastTimestamp(startTime, now, interval, tempoSearchRetentionDuration)
log := logger.With(
zap.String("org_id", tempoOrgID),
zap.Int64("seed", seed.Unix()),
)

// Don't attempt to read on the first itteration if we can't reasonably
// expect the write loop to have fired yet. Double the duration here to
// avoid a race.
if seed.Before(startTime.Add(2 * tempoWriteBackoffDuration)) {
continue
}
info := newTraceInfo(seed)

// Don't use the last time interval to allow the write loop to finish
// before we try to read it.
if seed.After(now.Add(-tempoWriteBackoffDuration)) {
if !ready(info, now) {
continue
}

log := logger.With(
zap.String("org_id", tempoOrgID),
)

client := util.NewClient(tempoQueryURL, tempoOrgID)

// query a tag we expect the trace to be found within
Expand All @@ -205,6 +222,54 @@ func main() {
log.Fatal(http.ListenAndServe(prometheusListenAddress, nil))
}

func emitBatches(c *jaeger_grpc.Reporter, t *traceInfo) {
log := logger.With(
zap.String("org_id", tempoOrgID),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", t.traceIDHigh, t.traceIDLow)),
zap.Int64("seed", t.timestamp.Unix()),
)

log.Info("sending trace")

for i := int64(0); i < generateRandomInt(1, maxBatchesPerWrite, t.r); i++ {
ctx := user.InjectOrgID(context.Background(), tempoOrgID)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
log.Error("error injecting org id", zap.Error(err))
metricErrorTotal.Inc()
continue
}
err = c.EmitBatch(ctx, makeThriftBatch(t.traceIDHigh, t.traceIDLow, t.r, t.timestamp))
if err != nil {
log.Error("error pushing batch to Tempo", zap.Error(err))
metricErrorTotal.Inc()
continue
}
}

}

func queueFutureBatches(client *jaeger_grpc.Reporter, info *traceInfo) {
if info.longWritesRemaining == 0 {
return
}
log := logger.With(
zap.String("org_id", tempoOrgID),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", info.traceIDHigh, info.traceIDLow)),
zap.Int64("seed", info.timestamp.Unix()),
zap.Int64("longWritesRemaining", info.longWritesRemaining),
)

info.longWritesRemaining--

log.Info("queueing future batches")
go func() {
time.Sleep(tempoLongWriteBackoffDuration)
emitBatches(client, info)
queueFutureBatches(client, info)
}()
}

func pushMetrics(metrics traceMetrics) {
metricTracesInspected.Add(float64(metrics.requested))
metricTracesErrors.WithLabelValues("incorrectresult").Add(float64(metrics.incorrectResult))
Expand Down Expand Up @@ -507,40 +572,55 @@ func hasMissingSpans(t *tempopb.Trace) bool {

func constructTraceFromEpoch(epoch time.Time) *tempopb.Trace {
r := newRand(epoch)
traceIDHigh := r.Int63()
traceIDLow := r.Int63()

info := &traceInfo{
timestamp: epoch,
r: r,
traceIDHigh: r.Int63(),
traceIDLow: r.Int63(),
longWritesRemaining: r.Int63n(maxLongWritesPerTrace),
}

trace := &tempopb.Trace{}

for i := int64(0); i < generateRandomInt(1, 100, r); i++ {
batch := makeThriftBatch(traceIDHigh, traceIDLow, r, epoch)
internalTrace := jaegerTrans.ThriftBatchToInternalTraces(batch)
conv, err := internalTrace.ToOtlpProtoBytes()
if err != nil {
logger.Error(err.Error())
}
addBatches := func(t *traceInfo, trace *tempopb.Trace) {
for i := int64(0); i < generateRandomInt(1, maxBatchesPerWrite, r); i++ {
batch := makeThriftBatch(t.traceIDHigh, t.traceIDLow, r, epoch)
internalTrace := jaegerTrans.ThriftBatchToInternalTraces(batch)
conv, err := internalTrace.ToOtlpProtoBytes()
if err != nil {
logger.Error(err.Error())
}

t := tempopb.Trace{}
err = t.Unmarshal(conv)
if err != nil {
logger.Error(err.Error())
}
t := tempopb.Trace{}
err = t.Unmarshal(conv)
if err != nil {
logger.Error(err.Error())
}

// Due to the several transforms above, some manual mangling is required to
// get the parentSpanID to match. In the case of an empty []byte in place
// for the ParentSpanId, we set to nil here to ensure that the final result
// matches the json.Unmarshal value when tempo is queried.
for _, b := range t.Batches {
for _, l := range b.InstrumentationLibrarySpans {
for _, s := range l.Spans {
if len(s.GetParentSpanId()) == 0 {
s.ParentSpanId = nil
// Due to the several transforms above, some manual mangling is required to
// get the parentSpanID to match. In the case of an empty []byte in place
// for the ParentSpanId, we set to nil here to ensure that the final result
// matches the json.Unmarshal value when tempo is queried.
for _, b := range t.Batches {
for _, l := range b.InstrumentationLibrarySpans {
for _, s := range l.Spans {
if len(s.GetParentSpanId()) == 0 {
s.ParentSpanId = nil
}
}
}
}

trace.Batches = append(trace.Batches, t.Batches...)
}
}

addBatches(info, trace)

trace.Batches = append(trace.Batches, t.Batches...)
for info.longWritesRemaining > 0 {
info.longWritesRemaining--
addBatches(info, trace)
}

return trace
Expand Down
11 changes: 10 additions & 1 deletion cmd/tempo-vulture/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestResponseFixture(t *testing.T) {
err = jsonpb.Unmarshal(f, response)
require.NoError(t, err)

seed := time.Unix(1630624480, 0)
seed := time.Unix(1632146180, 0)
expected := constructTraceFromEpoch(seed)

assert.True(t, equalTraces(expected, response))
Expand All @@ -188,6 +188,15 @@ func TestEqualTraces(t *testing.T) {
require.True(t, equalTraces(a, b))
}

func TestTraceInfo(t *testing.T) {
seed := time.Unix(1632146180, 0)
info := newTraceInfo(seed)
assert.False(t, info.ready(seed))
assert.False(t, info.ready(seed.Add(tempoLongWriteBackoffDuration)))
assert.False(t, info.ready(seed.Add(tempoLongWriteBackoffDuration).Add(1*time.Second)))
assert.True(t, info.ready(seed.Add(2*tempoLongWriteBackoffDuration)))
}

func assertStandardVultureKey(t *testing.T, tag *thrift.Tag) {
if !strings.HasPrefix(tag.Key, "vulture-") {
t.Errorf("prefix vulture- is wanted, have: %s", tag.Key)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-vulture/testdata/trace.json

Large diffs are not rendered by default.