Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new list compaction-summary command to tempo-cli #588

Merged
merged 2 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526)
* [ENHANCEMENT] Queriers now query all (healthy) ingesters for a trace to mitigate 404s on ingester rollouts/scaleups.
This is a **breaking change** and will likely result in query errors on rollout as the query signature b/n QueryFrontend & Querier has changed. [#557](https://github.com/grafana/tempo/pull/557)
* [ENHANCEMENT] Add list compaction-summary command to tempo-cli [#588](https://github.com/grafana/tempo/pull/588)
* [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554)
* [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561)
* [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)
Expand Down
76 changes: 1 addition & 75 deletions cmd/tempo-cli/cmd-list-blocks.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package main

import (
"context"
"fmt"
"os"
"sort"
"strconv"
"time"

"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
tempodb_backend "github.com/grafana/tempo/tempodb/backend"
"github.com/olekukonko/tablewriter"
)

type listBlocksCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
IncludeCompacted bool `help:"include compacted blocks"`

backendOptions
}

Expand All @@ -36,76 +30,8 @@ func (l *listBlocksCmd) Run(ctx *globalOptions) error {
}

displayResults(results, windowDuration, l.IncludeCompacted)
return nil
}

type blockStats struct {
unifiedBlockMeta
}

func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, windowRange time.Duration, includeCompacted bool) ([]blockStats, error) {
blockIDs, err := r.Blocks(context.Background(), tenantID)
if err != nil {
return nil, err
}

fmt.Println("total blocks: ", len(blockIDs))

// Load in parallel
wg := boundedwaitgroup.New(10)
resultsCh := make(chan blockStats, len(blockIDs))

for _, id := range blockIDs {
wg.Add(1)

go func(id2 uuid.UUID) {
defer wg.Done()

b, err := loadBlock(r, c, tenantID, id2, windowRange, includeCompacted)
if err != nil {
fmt.Println("Error loading block:", id2, err)
return
}

if b != nil {
resultsCh <- *b
}
}(id)
}

wg.Wait()
close(resultsCh)

results := make([]blockStats, 0)
for b := range resultsCh {
results = append(results, b)
}

sort.Slice(results, func(i, j int) bool {
return results[i].end.Before(results[j].end)
})

return results, nil
}

func loadBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, includeCompacted bool) (*blockStats, error) {
fmt.Print(".")

meta, err := r.BlockMeta(context.Background(), id, tenantID)
if err == tempodb_backend.ErrMetaDoesNotExist && !includeCompacted {
return nil, nil
} else if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}

compactedMeta, err := c.CompactedBlockMeta(id, tenantID)
if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}

return &blockStats{
unifiedBlockMeta: getMeta(meta, compactedMeta, windowRange),
}, nil
return nil
}

func displayResults(results []blockStats, windowDuration time.Duration, includeCompacted bool) {
Expand Down
124 changes: 124 additions & 0 deletions cmd/tempo-cli/cmd-list-compactionsummary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"fmt"
"os"
"sort"
"strconv"
"time"

"github.com/dustin/go-humanize"
"github.com/olekukonko/tablewriter"
)

type listCompactionSummaryCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
backendOptions
}

func (l *listCompactionSummaryCmd) Run(ctx *globalOptions) error {
r, c, err := loadBackend(&l.backendOptions, ctx)
if err != nil {
return err
}

windowDuration := time.Hour

results, err := loadBucket(r, c, l.TenantID, windowDuration, false)
if err != nil {
return err
}

displayCompactionSummary(results)

return nil
}

func displayCompactionSummary(results []blockStats) {
fmt.Println()
fmt.Println("Stats by compaction level:")
resultsByLevel := make(map[int][]blockStats)
var levels []int
for _, r := range results {
l := int(r.compactionLevel)

s, ok := resultsByLevel[l]
if !ok {
s = make([]blockStats, 0)
levels = append(levels, l)
}

s = append(s, r)
resultsByLevel[l] = s
}

sort.Ints(levels)

columns := []string{"lvl", "blocks", "total", "smallest block", "largest block", "earliest", "latest"}

out := make([][]string, 0)

for _, l := range levels {
sizeSum := uint64(0)
sizeMin := uint64(0)
sizeMax := uint64(0)
countSum := 0
countMin := 0
countMax := 0
var newest time.Time
var oldest time.Time
for _, r := range resultsByLevel[l] {
sizeSum += r.size
countSum += r.objects

if r.size < sizeMin || sizeMin == 0 {
sizeMin = r.size
}
if r.size > sizeMax {
sizeMax = r.size
}
if r.objects < countMin || countMin == 0 {
countMin = r.objects
}
if r.objects > countMax {
countMax = r.objects
}
if r.start.Before(oldest) || oldest.IsZero() {
oldest = r.start
}
if r.end.After(newest) {
newest = r.end
}
}

line := make([]string, 0)

for _, c := range columns {
s := ""
switch c {
case "lvl":
s = strconv.Itoa(l)
case "blocks":
s = fmt.Sprintf("%d (%d %%)", len(resultsByLevel[l]), len(resultsByLevel[l])*100/len(results))
case "total":
s = fmt.Sprintf("%s objects (%s)", humanize.Comma(int64(countSum)), humanize.Bytes(sizeSum))
case "smallest block":
s = fmt.Sprintf("%s objects (%s)", humanize.Comma(int64(countMin)), humanize.Bytes(sizeMin))
case "largest block":
s = fmt.Sprintf("%s objects (%s)", humanize.Comma(int64(countMax)), humanize.Bytes(sizeMax))
case "earliest":
s = fmt.Sprint(time.Since(oldest).Round(time.Second), " ago")
case "latest":
s = fmt.Sprint(time.Since(newest).Round(time.Second), " ago")
}
line = append(line, s)
}
out = append(out, line)
}

fmt.Println()
w := tablewriter.NewWriter(os.Stdout)
w.SetHeader(columns)
w.AppendBulk(out)
w.Render()
}
67 changes: 6 additions & 61 deletions cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import (
"flag"
"fmt"
"io/ioutil"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/cmd/tempo/app"
"github.com/grafana/tempo/tempodb/backend"
tempodb_backend "github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"gopkg.in/yaml.v2"

Expand All @@ -36,8 +33,9 @@ var cli struct {
globalOptions

List struct {
Block listBlockCmd `cmd:"" help:"List information about a block"`
Blocks listBlocksCmd `cmd:"" help:"List information about all blocks in a bucket"`
Block listBlockCmd `cmd:"" help:"List information about a block"`
Blocks listBlocksCmd `cmd:"" help:"List information about all blocks in a bucket"`
CompactionSummary listCompactionSummaryCmd `cmd:"" help:"List summary of data by compaction level"`
} `cmd:""`

Query queryCmd `cmd:"" help:"query tempo api"`
Expand All @@ -54,7 +52,7 @@ func main() {
ctx.FatalIfErrorf(err)
}

func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, tempodb_backend.Compactor, error) {
func loadBackend(b *backendOptions, g *globalOptions) (backend.Reader, backend.Compactor, error) {
// Defaults
cfg := app.Config{}
cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
Expand Down Expand Up @@ -89,8 +87,8 @@ func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, t
}

var err error
var r tempodb_backend.Reader
var c tempodb_backend.Compactor
var r backend.Reader
var c backend.Compactor

switch cfg.StorageConfig.Trace.Backend {
case "local":
Expand All @@ -111,56 +109,3 @@ func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, t

return r, c, nil
}

type unifiedBlockMeta struct {
id uuid.UUID
compactionLevel uint8
objects int
size uint64
window int64
start time.Time
end time.Time
compacted bool
version string
encoding string
}

func getMeta(meta *backend.BlockMeta, compactedMeta *backend.CompactedBlockMeta, windowRange time.Duration) unifiedBlockMeta {
if meta != nil {
return unifiedBlockMeta{
id: meta.BlockID,
compactionLevel: meta.CompactionLevel,
objects: meta.TotalObjects,
size: meta.Size,
window: meta.EndTime.Unix() / int64(windowRange/time.Second),
start: meta.StartTime,
end: meta.EndTime,
compacted: false,
version: meta.Version,
encoding: meta.Encoding.String(),
}
}
if compactedMeta != nil {
return unifiedBlockMeta{
id: compactedMeta.BlockID,
compactionLevel: compactedMeta.CompactionLevel,
objects: compactedMeta.TotalObjects,
size: compactedMeta.Size,
window: compactedMeta.EndTime.Unix() / int64(windowRange/time.Second),
start: compactedMeta.StartTime,
end: compactedMeta.EndTime,
compacted: true,
version: compactedMeta.Version,
encoding: compactedMeta.Encoding.String(),
}
}
return unifiedBlockMeta{
id: uuid.UUID{},
compactionLevel: 0,
objects: -1,
window: -1,
start: time.Unix(0, 0),
end: time.Unix(0, 0),
compacted: false,
}
}
Loading